命令模式:线程池的任务队列与撤销操作的完整实现
命令模式:线程池的任务队列与撤销操作的完整实现
适读人群:中高级Java开发者 | 阅读时长:约22分钟 | 模式类型:行为型
开篇故事
做文档协同编辑系统时,遇到了一个绕不过去的需求:撤销(Undo)和重做(Redo)。
用户在文档里打了一段字、删了一个段落、修改了格式,按 Ctrl+Z 要能撤销,按 Ctrl+Y 要能重做。这个功能听起来不难,但实现起来如果没有正确的设计思路,会非常混乱。
一开始我用最朴素的方式——保存每次操作前后的文档快照,撤销时恢复到上一个快照。但文档可能很大,每次操作都保存完整快照,内存开销爆炸。
后来改用命令模式:把每次操作封装成一个命令对象,命令对象不仅知道"怎么执行",还知道"怎么撤销"。维护一个命令栈,执行时入栈,撤销时出栈并执行 undo(),重做时重新入栈执行 execute()。这样内存只需要保存命令对象(通常很小),不需要保存完整文档快照。
这就是命令模式的精髓:将请求(操作)封装成对象,使得可以用不同的请求对客户进行参数化,对请求排队或记录请求日志,以及支持可撤销的操作。
一、模式动机:将"做什么"与"怎么做"分离
命令模式的核心价值:
- 解耦调用者与执行者:发出命令的对象不需要知道命令具体如何执行。
- 命令可以被排队:
Runnable接口本身就是命令模式——线程池的任务队列存储的就是命令对象。 - 支持撤销/重做:命令对象封装了执行和撤销逻辑。
- 事务性操作:一组命令可以作为一个整体执行,失败时回滚(宏命令)。
二、模式结构
三、线程池任务队列与 Runnable 的命令模式分析
3.1 Runnable 就是命令接口
Java 中的 Runnable 接口是命令模式最简洁的体现:
// Runnable 是命令接口(只有execute,没有undo)
@FunctionalInterface
public interface Runnable {
void run(); // 即 execute()
}
// Callable 是带返回值的命令接口
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
// Future 是命令模式的命令执行结果
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning); // 取消命令
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException; // 获取命令结果
}线程池中,execute(Runnable) 就是"提交命令",BlockingQueue 就是"命令队列",工作线程就是"命令执行者(Receiver)":
// ThreadPoolExecutor 是 Invoker 角色
public void execute(Runnable command) { // Runnable 就是 Command
// 将命令放入任务队列(命令队列)
workQueue.offer(command);
// 或者直接分配给工作线程执行
}3.2 FutureTask 的完整命令实现
FutureTask 是一个完整的命令对象,它同时实现了 Runnable 和 Future:
public class FutureTask<V> implements RunnableFuture<V> {
private Callable<V> callable; // 真正的命令内容
private volatile int state; // 命令状态:NEW → COMPLETING → NORMAL/EXCEPTIONAL/CANCELLED
private Object outcome; // 命令执行结果或异常
@Override
public void run() {
// 执行命令
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result = c.call(); // 调用真正的业务逻辑
set(result); // 保存结果
}
} catch (Throwable ex) {
setException(ex);
}
}
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
// 取消命令(如果还没开始执行的话)
if (state != NEW || !UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)) {
return false;
}
// ...
return true;
}
@Override
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING) awaitDone(false, 0L); // 等待命令执行完成
return report(s);
}
}四、生产级代码实现:支持撤销的文档编辑器
4.1 命令接口与基础命令
/**
* 可撤销命令接口
*/
public interface UndoableCommand {
/**
* 执行命令
*/
void execute();
/**
* 撤销命令(恢复到执行前的状态)
*/
void undo();
/**
* 命令描述(用于撤销历史记录显示)
*/
String getDescription();
/**
* 是否可以与前一个命令合并(避免每次按键都是一个独立命令)
*/
default boolean canMergeWith(UndoableCommand previous) { return false; }
/**
* 合并两个命令(返回合并后的命令)
*/
default UndoableCommand mergeWith(UndoableCommand previous) { return this; }
}
/**
* 文档模型
*/
public class Document {
private StringBuilder content;
private int cursorPosition;
public Document(String initialContent) {
this.content = new StringBuilder(initialContent);
this.cursorPosition = initialContent.length();
}
public String getContent() { return content.toString(); }
public int getCursorPosition() { return cursorPosition; }
public void insert(int position, String text) {
content.insert(position, text);
cursorPosition = position + text.length();
}
public String delete(int startPosition, int length) {
String deleted = content.substring(startPosition, startPosition + length);
content.delete(startPosition, startPosition + length);
cursorPosition = startPosition;
return deleted;
}
public void moveCursor(int position) {
cursorPosition = Math.max(0, Math.min(position, content.length()));
}
}
/**
* 插入文字命令
*/
public class InsertTextCommand implements UndoableCommand {
private final Document document;
private final int insertPosition;
private String text; // 可以被后续命令合并扩展
public InsertTextCommand(Document document, int position, String text) {
this.document = document;
this.insertPosition = position;
this.text = text;
}
@Override
public void execute() {
document.insert(insertPosition, text);
}
@Override
public void undo() {
document.delete(insertPosition, text.length());
}
@Override
public String getDescription() {
return "插入文字: " + (text.length() > 10 ? text.substring(0, 10) + "..." : text);
}
// 连续输入的字符可以合并成一个命令,避免每次Ctrl+Z只撤销一个字符
@Override
public boolean canMergeWith(UndoableCommand previous) {
if (!(previous instanceof InsertTextCommand prevInsert)) return false;
// 在上一个插入命令的末尾继续插入
return insertPosition == prevInsert.insertPosition + prevInsert.text.length();
}
@Override
public UndoableCommand mergeWith(UndoableCommand previous) {
InsertTextCommand prevInsert = (InsertTextCommand) previous;
return new InsertTextCommand(document, prevInsert.insertPosition,
prevInsert.text + this.text);
}
}
/**
* 删除文字命令
*/
public class DeleteTextCommand implements UndoableCommand {
private final Document document;
private final int deletePosition;
private final int length;
private String deletedText; // 执行时记录被删除的内容(用于撤销)
public DeleteTextCommand(Document document, int position, int length) {
this.document = document;
this.deletePosition = position;
this.length = length;
}
@Override
public void execute() {
// 执行时保存被删除的内容
deletedText = document.delete(deletePosition, length);
}
@Override
public void undo() {
// 将删除的内容插回原来的位置
document.insert(deletePosition, deletedText);
}
@Override
public String getDescription() {
return "删除文字: " + (deletedText != null && deletedText.length() > 10
? deletedText.substring(0, 10) + "..." : deletedText);
}
}
/**
* 宏命令:将多个命令组合成一个原子操作
*/
public class MacroCommand implements UndoableCommand {
private final List<UndoableCommand> commands;
private final String description;
public MacroCommand(String description, List<UndoableCommand> commands) {
this.commands = new ArrayList<>(commands);
this.description = description;
}
@Override
public void execute() {
// 按顺序执行所有命令
commands.forEach(UndoableCommand::execute);
}
@Override
public void undo() {
// 逆序撤销所有命令
List<UndoableCommand> reversed = new ArrayList<>(commands);
Collections.reverse(reversed);
reversed.forEach(UndoableCommand::undo);
}
@Override
public String getDescription() { return description; }
}
/**
* 命令调用者(Invoker):管理命令历史,支持撤销/重做
*/
@Slf4j
public class CommandHistory {
private final Deque<UndoableCommand> undoStack = new ArrayDeque<>(); // 撤销栈
private final Deque<UndoableCommand> redoStack = new ArrayDeque<>(); // 重做栈
private final int maxHistorySize;
public CommandHistory(int maxHistorySize) {
this.maxHistorySize = maxHistorySize;
}
/**
* 执行命令并加入撤销历史
*/
public void execute(UndoableCommand command) {
// 尝试与上一个命令合并(如:连续输入字符)
if (!undoStack.isEmpty()) {
UndoableCommand previous = undoStack.peek();
if (command.canMergeWith(previous)) {
undoStack.pop();
command = command.mergeWith(previous); // 合并命令
}
}
command.execute();
undoStack.push(command);
// 执行新命令后,清空重做栈
redoStack.clear();
// 限制历史记录大小
while (undoStack.size() > maxHistorySize) {
undoStack.removeLast();
}
log.debug("Executed: {}, undo stack size: {}", command.getDescription(), undoStack.size());
}
/**
* 撤销最近一次操作
*/
public boolean undo() {
if (undoStack.isEmpty()) {
log.debug("Nothing to undo");
return false;
}
UndoableCommand command = undoStack.pop();
command.undo();
redoStack.push(command); // 放入重做栈
log.debug("Undone: {}", command.getDescription());
return true;
}
/**
* 重做最近一次撤销的操作
*/
public boolean redo() {
if (redoStack.isEmpty()) {
log.debug("Nothing to redo");
return false;
}
UndoableCommand command = redoStack.pop();
command.execute();
undoStack.push(command); // 放回撤销栈
log.debug("Redone: {}", command.getDescription());
return true;
}
public boolean canUndo() { return !undoStack.isEmpty(); }
public boolean canRedo() { return !redoStack.isEmpty(); }
/**
* 获取撤销历史列表(用于显示"历史记录"面板)
*/
public List<String> getUndoHistory() {
return undoStack.stream()
.map(UndoableCommand::getDescription)
.collect(Collectors.toList());
}
}4.2 结合线程池的异步命令队列
/**
* 异步命令执行器
* 将命令放入队列异步执行,支持命令的取消
*/
@Service
@Slf4j
public class AsyncCommandExecutor {
private final ExecutorService executor;
private final ConcurrentHashMap<String, Future<?>> pendingCommands = new ConcurrentHashMap<>();
public AsyncCommandExecutor() {
this.executor = new ThreadPoolExecutor(
10, 50, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadFactoryBuilder().setNameFormat("cmd-executor-%d").build(),
new ThreadPoolExecutor.AbortPolicy()
);
}
/**
* 提交异步命令(命令模式:Runnable是Command)
*/
public <T> String submitCommand(String commandId, Callable<T> command,
Consumer<T> onSuccess, Consumer<Throwable> onFailure) {
if (StringUtils.isEmpty(commandId)) {
commandId = UUID.randomUUID().toString();
}
final String finalCommandId = commandId;
Future<?> future = executor.submit(() -> {
try {
T result = command.call();
onSuccess.accept(result);
log.info("Command {} completed successfully", finalCommandId);
} catch (Exception e) {
onFailure.accept(e);
log.error("Command {} failed: {}", finalCommandId, e.getMessage(), e);
} finally {
pendingCommands.remove(finalCommandId);
}
});
pendingCommands.put(commandId, future);
log.info("Command {} submitted to queue", commandId);
return commandId;
}
/**
* 取消命令(命令模式支持的关键特性)
*/
public boolean cancelCommand(String commandId) {
Future<?> future = pendingCommands.get(commandId);
if (future == null) {
log.warn("Command {} not found in pending commands", commandId);
return false;
}
boolean cancelled = future.cancel(true); // true表示允许中断正在执行的命令
if (cancelled) {
pendingCommands.remove(commandId);
log.info("Command {} cancelled", commandId);
}
return cancelled;
}
public boolean isCommandPending(String commandId) {
return pendingCommands.containsKey(commandId);
}
public int getPendingCommandCount() {
return pendingCommands.size();
}
}
// 使用示例:异步批量导出任务
@Service
public class ReportExportService {
@Autowired
private AsyncCommandExecutor commandExecutor;
public String startExport(ExportRequest request) {
String taskId = "EXPORT-" + System.currentTimeMillis();
commandExecutor.submitCommand(
taskId,
() -> {
// 命令的执行逻辑
return generateReport(request);
},
reportFile -> {
// 成功回调
notifyExportReady(request.getUserId(), taskId, reportFile);
},
error -> {
// 失败回调
notifyExportFailed(request.getUserId(), taskId, error.getMessage());
}
);
return taskId;
}
public boolean cancelExport(String taskId) {
return commandExecutor.cancelCommand(taskId);
}
}五、踩坑实录
坑一:命令对象持有过多引用导致内存泄漏
撤销栈里的命令对象会一直持有对 Document 等大对象的引用。如果用户一直操作,撤销栈无限增长,大量的 Document 引用使 GC 无法回收。
解决方案:设置 maxHistorySize 限制撤销栈大小;对于大型文档操作,命令对象只保存必要的差量信息(增量/删除的内容),而不是完整文档引用。
坑二:宏命令的部分执行失败
宏命令执行到一半发生异常,前几个子命令已经执行了,但后续的没有执行。这时候宏命令的状态是"半执行",撤销也很麻烦(已执行的部分要撤销,未执行的部分不能撤销)。
解决方案:宏命令必须支持事务语义——要么全部执行成功,要么全部不执行。在 execute() 中如果某个子命令失败,自动调用已执行子命令的 undo()。
坑三:异步命令的状态追踪
用线程池执行命令时,命令的执行状态(排队中/执行中/成功/失败)需要额外维护。Future.isDone() 只能告诉你命令是否"完成"(包括取消和异常),无法区分"成功"和"失败"。
需要在命令对象或执行器中额外维护一套状态机来追踪命令的精确状态。
六、总结
命令模式的精髓在于将"动作"对象化,这带来了极大的灵活性:
- Java 的
Runnable/Callable/FutureTask是命令模式的经典实现。 - 撤销/重做 是命令模式的杀手级特性,所有需要"历史回退"功能的系统都应该考虑命令模式。
- 宏命令 将多个命令组合成事务性操作,要么全执行,要么全撤销。
- 异步命令队列 是命令模式与线程池的自然结合,支持命令的排队、取消、超时控制。
