第2317篇:AI应用的领域驱动设计——用DDD建模复杂AI业务场景
第2317篇:AI应用的领域驱动设计——用DDD建模复杂AI业务场景
适读人群:AI系统架构师、高级工程师 | 阅读时长:约19分钟 | 核心价值:掌握用DDD思想建模AI业务场景的方法论,解决AI应用中常见的贫血模型和领域逻辑散乱问题
我观察过很多团队写AI应用,尤其是稍微复杂一点的AI Agent系统,代码往往长成这样:一个巨大的Service类,里面有callLLM()、doRetrieval()、saveResult()、notifyUser()……业务规则散落在各处,数据对象只有getter/setter没有任何行为,类之间的依赖关系混乱。
这是典型的贫血模型。在AI应用里,这个问题特别突出,因为AI系统天然包含大量的"领域概念"——对话会话、知识库、Agent任务、评估结果——这些概念有丰富的业务规则,却被退化成了简单的数据容器。
DDD(领域驱动设计)提供了一套系统的方法来解决这个问题。
AI应用中的核心领域概念
在典型的企业AI应用中,有几个核心领域:
对话领域的建模
/**
* Conversation 聚合根
* 封装了对话的核心业务规则:
* - 消息历史的管理
* - 上下文长度限制
* - 对话状态的生命周期
*/
public class Conversation {
private final ConversationId id;
private final UserId userId;
private final List<Message> messages;
private ConversationStatus status;
private final ConversationConfig config;
private Instant createdAt;
private Instant lastActiveAt;
// 领域事件列表(待发布)
private final List<DomainEvent> domainEvents = new ArrayList<>();
private Conversation(ConversationId id, UserId userId, ConversationConfig config) {
this.id = id;
this.userId = userId;
this.messages = new ArrayList<>();
this.status = ConversationStatus.ACTIVE;
this.config = config;
this.createdAt = Instant.now();
this.lastActiveAt = Instant.now();
// 发布领域事件
domainEvents.add(new ConversationStartedEvent(id, userId));
}
public static Conversation start(UserId userId, ConversationConfig config) {
return new Conversation(ConversationId.generate(), userId, config);
}
/**
* 添加用户消息
* 业务规则:
* 1. 对话必须是激活状态
* 2. 消息内容不能为空
* 3. 触发上下文更新
*/
public void addUserMessage(String content) {
assertStatus(ConversationStatus.ACTIVE, "非激活对话不能添加消息");
if (content == null || content.isBlank()) {
throw new InvalidMessageException("消息内容不能为空");
}
Message message = Message.userMessage(content, this.id);
messages.add(message);
this.lastActiveAt = Instant.now();
domainEvents.add(new UserMessageAddedEvent(this.id, message));
}
/**
* 添加AI回复
* 业务规则:
* 1. AI消息必须在用户消息之后
* 2. 记录token使用量
*/
public void addAssistantMessage(String content, TokenUsage tokenUsage) {
assertLastMessageIsFrom(MessageRole.USER, "AI消息前必须有用户消息");
Message message = Message.assistantMessage(content, tokenUsage, this.id);
messages.add(message);
this.lastActiveAt = Instant.now();
domainEvents.add(new AssistantMessageAddedEvent(this.id, message, tokenUsage));
// 检查是否超出上下文长度限制
if (getTotalTokens() > config.maxContextTokens()) {
pruneOldMessages();
}
}
/**
* 获取用于LLM调用的上下文消息
* 业务规则:
* 1. 保留system消息
* 2. 在token限制内尽量保留最新的历史
* 3. 返回的是值对象,不暴露内部集合
*/
public ConversationContext buildContext() {
List<Message> contextMessages = new ArrayList<>();
int tokenBudget = config.maxContextTokens();
// 从最新消息往前遍历,直到填满token预算
for (int i = messages.size() - 1; i >= 0; i--) {
Message msg = messages.get(i);
if (tokenBudget - msg.estimatedTokens() < 0) break;
contextMessages.add(0, msg);
tokenBudget -= msg.estimatedTokens();
}
return ConversationContext.of(contextMessages, config.systemPrompt());
}
/**
* 归档对话
* 业务规则:只有ACTIVE状态的对话可以归档
*/
public void archive(String reason) {
assertStatus(ConversationStatus.ACTIVE, "只有激活的对话可以归档");
this.status = ConversationStatus.ARCHIVED;
domainEvents.add(new ConversationArchivedEvent(this.id, reason));
}
private void pruneOldMessages() {
// 保留最新的N条消息,删除旧消息
int keepCount = config.pruneKeepCount();
if (messages.size() > keepCount) {
List<Message> removed = messages.subList(0, messages.size() - keepCount);
messages.removeAll(removed);
domainEvents.add(new ConversationPrunedEvent(this.id, removed.size()));
}
}
private long getTotalTokens() {
return messages.stream().mapToLong(Message::estimatedTokens).sum();
}
public List<DomainEvent> pullDomainEvents() {
List<DomainEvent> events = new ArrayList<>(domainEvents);
domainEvents.clear();
return events;
}
}Agent任务领域的建模
/**
* AgentTask 聚合根
* 封装了Agent任务的生命周期和业务规则
*/
public class AgentTask {
private final AgentTaskId id;
private final String goal;
private final UserId requestedBy;
private TaskStatus status;
private final List<TaskStep> steps;
private TaskResult result;
private int retryCount;
private final TaskPolicy policy;
private final List<DomainEvent> domainEvents = new ArrayList<>();
public static AgentTask create(String goal, UserId userId, TaskPolicy policy) {
AgentTask task = new AgentTask(
AgentTaskId.generate(), goal, userId, policy
);
task.domainEvents.add(new AgentTaskCreatedEvent(task.id, goal, userId));
return task;
}
/**
* 开始执行
* 业务规则:只有PENDING状态才能开始执行
*/
public void startExecution() {
if (status != TaskStatus.PENDING) {
throw new InvalidTaskStateException("任务状态[%s]不允许开始执行".formatted(status));
}
this.status = TaskStatus.RUNNING;
domainEvents.add(new AgentTaskStartedEvent(this.id));
}
/**
* 记录步骤完成
* 业务规则:步骤只能在RUNNING状态时记录
*/
public void recordStepCompletion(String stepDescription, Object stepResult) {
assertStatus(TaskStatus.RUNNING);
TaskStep step = TaskStep.completed(
steps.size() + 1, stepDescription, stepResult
);
steps.add(step);
domainEvents.add(new TaskStepCompletedEvent(this.id, step));
}
/**
* 完成任务
* 业务规则:检查是否满足完成条件(有至少一个成功步骤)
*/
public void complete(Object finalResult) {
assertStatus(TaskStatus.RUNNING);
boolean hasSuccessfulStep = steps.stream()
.anyMatch(s -> s.status() == TaskStep.StepStatus.COMPLETED);
if (!hasSuccessfulStep) {
throw new TaskCompletionException("任务不能在没有成功步骤的情况下完成");
}
this.result = TaskResult.success(finalResult);
this.status = TaskStatus.COMPLETED;
domainEvents.add(new AgentTaskCompletedEvent(this.id, result));
}
/**
* 失败处理
* 业务规则:根据重试策略决定是重试还是最终失败
*/
public TaskFailureDecision fail(String reason) {
assertStatus(TaskStatus.RUNNING);
this.retryCount++;
if (retryCount <= policy.maxRetries() && policy.isRetryableError(reason)) {
// 可以重试
this.status = TaskStatus.PENDING;
domainEvents.add(new AgentTaskRetryScheduledEvent(this.id, retryCount, reason));
return TaskFailureDecision.RETRY;
} else {
// 最终失败
this.result = TaskResult.failure(reason);
this.status = TaskStatus.FAILED;
domainEvents.add(new AgentTaskFailedEvent(this.id, reason));
return TaskFailureDecision.GIVE_UP;
}
}
/**
* 业务查询方法:获取当前执行进度
*/
public TaskProgress getProgress() {
long completedSteps = steps.stream()
.filter(s -> s.status() == TaskStep.StepStatus.COMPLETED)
.count();
return new TaskProgress(completedSteps, steps.size(), status);
}
}知识库领域建模
/**
* KnowledgeBase 聚合根
* 管理文档的生命周期和分块策略
*/
public class KnowledgeBase {
private final KnowledgeBaseId id;
private String name;
private final List<KnowledgeDocument> documents;
private final ChunkingConfig chunkingConfig;
private KnowledgeBaseStatus status;
/**
* 添加文档
* 业务规则:
* 1. 文档名称不能重复
* 2. 文档大小不能超过限制
* 3. 自动应用分块策略
*/
public KnowledgeDocument addDocument(String name, String content, DocumentMetadata metadata) {
if (hasDocumentWithName(name)) {
throw new DuplicateDocumentException("知识库中已存在名为[%s]的文档".formatted(name));
}
if (content.length() > chunkingConfig.maxDocumentSize()) {
throw new DocumentTooLargeException("文档超过大小限制:%d字符".formatted(content.length()));
}
// 自动分块
List<DocumentChunk> chunks = chunkingConfig.chunkingStrategy().chunk(content);
KnowledgeDocument document = KnowledgeDocument.create(
name, content, chunks, metadata, this.id
);
documents.add(document);
return document;
}
/**
* 更新文档
* 业务规则:更新文档时自动重新分块,并标记向量需要重新生成
*/
public void updateDocument(DocumentId documentId, String newContent) {
KnowledgeDocument document = findDocumentOrThrow(documentId);
document.updateContent(newContent, chunkingConfig);
// 触发重新向量化的领域事件
// 这个事件会被DocumentVectorizationService处理
}
private boolean hasDocumentWithName(String name) {
return documents.stream().anyMatch(d -> d.name().equals(name));
}
}应用服务层:编排领域对象
应用服务负责编排,不包含业务规则:
/**
* ConversationApplicationService
* 职责:编排领域对象 + 基础设施调用
* 不包含:业务规则(业务规则在领域模型中)
*/
@Service
public class ConversationApplicationService {
private final ConversationRepository conversationRepo;
private final LLMPort llmPort; // 端口(六边形架构)
private final RAGPort ragPort;
private final ApplicationEventPublisher eventPublisher;
@Transactional
public ConversationResponse chat(ChatCommand command) {
// 加载聚合根
Conversation conversation = conversationRepo
.findById(command.conversationId())
.orElseGet(() -> Conversation.start(
command.userId(), ConversationConfig.defaultConfig()
));
// 调用领域方法(业务规则在这里)
conversation.addUserMessage(command.userMessage());
// 获取检索上下文(基础设施操作)
String retrievedContext = ragPort.retrieve(command.userMessage(), command.knowledgeBaseId());
// 构建LLM上下文(领域逻辑)
ConversationContext context = conversation.buildContext();
// 调用LLM(基础设施操作)
LLMResponse llmResponse = llmPort.chat(context, retrievedContext);
// 记录AI回复(业务规则在这里)
conversation.addAssistantMessage(llmResponse.content(), llmResponse.tokenUsage());
// 持久化
conversationRepo.save(conversation);
// 发布领域事件
conversation.pullDomainEvents().forEach(eventPublisher::publishEvent);
return ConversationResponse.of(llmResponse.content(), conversation.getId());
}
}DDD在AI应用中的特别收益
在AI应用中采用DDD,有几个特别明显的收益:
业务规则有了家:Conversation.addUserMessage()、AgentTask.fail()里面都有清晰的业务规则,不是散落在Service层的if-else。
领域事件驱动AI流水线:ConversationArchivedEvent触发总结生成,DocumentAddedEvent触发向量化,TaskCompletedEvent触发通知——这些事件链条在DDD里是自然的。
聚合根保护不变量:Conversation确保AI消息前必须有用户消息,AgentTask确保只有RUNNING状态才能记录步骤——这些约束在模型内部强制执行,不会被意外绕过。
可测试性大幅提升:领域模型没有Spring依赖,纯Java对象,单元测试极其简单。
