第1924篇:分布式Session在多Agent系统中的管理——跨服务的上下文传递
第1924篇:分布式Session在多Agent系统中的管理——跨服务的上下文传递
多Agent系统是最近两年我接触最多的架构模式,也是踩坑最多的地方。
先说一个让我印象深刻的问题。
我们有一个法律咨询Agent系统,用户提问后,由一个"路由Agent"判断问题类型,然后分发给"合同法专家Agent"或者"劳动法专家Agent"。用户的第一个问题是"我跟公司签的劳动合同有问题",路由Agent发给了劳动法Agent,劳动法Agent回答了。然后用户紧接着问:"上面那份合同里关于竞业协议的部分怎么看?"
这时候路由Agent又来了,它不知道上一轮对话是什么,把这个问题发给了合同法Agent。合同法Agent也不知道上一轮的上下文,它不知道"上面那份合同"是什么,给出了一个牛头不对马嘴的回复。
这就是多Agent系统里的经典问题:跨服务的上下文断裂。
每个Agent都是独立的服务,有独立的内存,没有任何共享状态。如果不设计好Session管理,多轮对话就会变成一盘散沙,每个Agent都只能看到它自己收到的那条消息,完全不知道对话历史是什么。
问题的本质:上下文的归属
多Agent系统的Session管理比单服务复杂,根本原因是上下文的归属不清晰。
在单服务系统里,Session很简单:用户A的所有请求都打到同一个服务实例,Session存在那个实例的内存里(或者Redis里),每次请求带上SessionID就能取到上下文。
在多Agent系统里,问题变了:
问题来了:
- 用户和编排Agent之间有对话历史(多轮对话)
- 编排Agent和每个子Agent之间也有交互历史(子任务上下文)
- 子Agent之间可能需要共享某些信息(比如Agent1搜到的资料,Agent3写作时要用)
这三层上下文,归属和生命周期都不同。乱存到一个Session里,会乱成一锅粥。
多层Session模型
我设计了一个三层Session模型来解决这个问题:
对话会话(Conversation Session)
最外层,代表一个用户和AI系统的完整对话,可能跨越多轮、持续几天甚至几周:
@Data
@Builder
public class ConversationSession {
private String conversationId; // 全局唯一会话ID
private String userId;
private String systemId; // 属于哪个AI系统
// 对话历史(多轮消息记录)
private List<ConversationMessage> messages;
// 用户级别的上下文(跨任务持久)
private UserProfile userProfile;
private Map<String, Object> userContext; // 用户的偏好、历史选择等
// 活跃任务列表
private List<String> activeTaskIds;
private Instant createdAt;
private Instant lastActiveAt;
private ConversationStatus status; // ACTIVE, INACTIVE, ARCHIVED
// 会话的元信息
private String currentIntent; // 当前识别到的用户意图
private String currentLanguage; // 当前语言
}任务会话(Task Session)
当用户的某个意图触发了一次多Agent协作任务,就创建一个Task Session:
@Data
@Builder
public class TaskSession {
private String taskId;
private String conversationId; // 所属对话会话
private String taskType; // 任务类型:"legal_consultation", "code_review"等
// 任务目标
private String userIntent; // 用户的原始意图
private Map<String, Object> taskGoal;
// 跨Agent的共享状态(这是关键!)
private SharedTaskContext sharedContext;
// 任务执行状态
private TaskStatus status; // PENDING, RUNNING, COMPLETED, FAILED
private List<AgentExecutionRecord> executionHistory; // 各Agent的执行记录
// 任务结果
private Map<String, Object> taskResult;
private Instant createdAt;
private Instant completedAt;
private Duration timeout;
}
@Data
public class SharedTaskContext {
// 这个Map里的内容对所有参与本任务的Agent可见
private final ConcurrentHashMap<String, Object> sharedData = new ConcurrentHashMap<>();
// 锁机制,防止多Agent并发修改同一个Key
private final ConcurrentHashMap<String, String> keyLocks = new ConcurrentHashMap<>();
public void put(String key, Object value, String agentId) {
// 记录是哪个Agent写入的
sharedData.put(key, value);
sharedData.put(key + ".__writer__", agentId);
sharedData.put(key + ".__timestamp__", System.currentTimeMillis());
}
public Optional<Object> get(String key) {
return Optional.ofNullable(sharedData.get(key));
}
public boolean tryLock(String key, String agentId) {
return keyLocks.putIfAbsent(key, agentId) == null;
}
public void unlock(String key, String agentId) {
keyLocks.remove(key, agentId);
}
}Agent会话(Agent Session)
最内层,是单个Agent在处理单次子任务时的上下文:
@Data
@Builder
public class AgentSession {
private String agentSessionId;
private String taskId; // 所属任务会话
private String agentId; // 哪个Agent
private String agentType; // Agent类型
// Agent的本地记忆(只对本Agent可见)
private List<Message> localMessages; // 本Agent的消息历史
private Map<String, Object> localMemory; // 临时工作记忆
// 工具调用记录
private List<ToolCallRecord> toolCallHistory;
// 执行状态
private AgentStatus status;
private int retryCount;
private String lastError;
// 结果
private AgentOutput output;
private Instant startedAt;
private Instant completedAt;
}存储方案:Redis + 分层TTL
三层Session有不同的生命周期和读写频率,需要差异化的存储策略:
@Component
public class SessionStorageManager {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private ObjectMapper objectMapper;
// 对话Session:保留7天(用户可能间隔好几天再来继续聊)
private static final Duration CONVERSATION_TTL = Duration.ofDays(7);
// 任务Session:保留24小时
private static final Duration TASK_TTL = Duration.ofHours(24);
// Agent Session:保留1小时(完成后很快用不上了)
private static final Duration AGENT_TTL = Duration.ofHours(1);
// Key前缀
private static final String CONV_PREFIX = "ai:session:conv:";
private static final String TASK_PREFIX = "ai:session:task:";
private static final String AGENT_PREFIX = "ai:session:agent:";
private static final String CONV_TASKS_PREFIX = "ai:session:conv:tasks:"; // Set类型
// ===== 对话Session =====
public void saveConversationSession(ConversationSession session) {
String key = CONV_PREFIX + session.getConversationId();
try {
String json = objectMapper.writeValueAsString(session);
redisTemplate.opsForValue().set(key, json, CONVERSATION_TTL);
} catch (JsonProcessingException e) {
throw new SessionStorageException("保存对话Session失败", e);
}
}
public Optional<ConversationSession> getConversationSession(String conversationId) {
String key = CONV_PREFIX + conversationId;
String json = redisTemplate.opsForValue().get(key);
if (json == null) return Optional.empty();
try {
ConversationSession session = objectMapper.readValue(json, ConversationSession.class);
// 每次读取时刷新TTL
redisTemplate.expire(key, CONVERSATION_TTL);
return Optional.of(session);
} catch (JsonProcessingException e) {
log.error("反序列化对话Session失败: {}", e.getMessage());
return Optional.empty();
}
}
// ===== 任务Session =====
public void saveTaskSession(TaskSession task) {
String key = TASK_PREFIX + task.getTaskId();
try {
// 任务Session用Hash存储,便于部分更新
Map<String, String> fields = new HashMap<>();
fields.put("base", objectMapper.writeValueAsString(task));
fields.put("sharedContext",
objectMapper.writeValueAsString(task.getSharedContext()));
redisTemplate.opsForHash().putAll(key, fields);
redisTemplate.expire(key, TASK_TTL);
// 维护对话->任务的关联关系
String convTasksKey = CONV_TASKS_PREFIX + task.getConversationId();
redisTemplate.opsForSet().add(convTasksKey, task.getTaskId());
redisTemplate.expire(convTasksKey, CONVERSATION_TTL);
} catch (JsonProcessingException e) {
throw new SessionStorageException("保存任务Session失败", e);
}
}
public void updateSharedContext(String taskId, String key, Object value, String agentId) {
// 只更新SharedContext里的一个Key,不用整体序列化
String redisKey = TASK_PREFIX + taskId;
String contextField = "sharedContext";
// 用Lua脚本保证原子性
String luaScript = """
local current = redis.call('HGET', KEYS[1], KEYS[2])
local data = cjson.decode(current)
data['sharedData'][ARGV[1]] = ARGV[2]
data['sharedData'][ARGV[1] .. '.__writer__'] = ARGV[3]
data['sharedData'][ARGV[1] .. '.__timestamp__'] = tonumber(ARGV[4])
redis.call('HSET', KEYS[1], KEYS[2], cjson.encode(data))
return 1
""";
try {
redisTemplate.execute(
new DefaultRedisScript<>(luaScript, Long.class),
Arrays.asList(redisKey, contextField),
key,
objectMapper.writeValueAsString(value),
agentId,
String.valueOf(System.currentTimeMillis())
);
} catch (Exception e) {
log.error("更新SharedContext失败, taskId={}, key={}: {}", taskId, key, e.getMessage());
throw new SessionStorageException("更新共享上下文失败", e);
}
}
}上下文传递:SessionContext对象
每次Agent收到任务时,需要拿到完整的上下文。设计一个统一的SessionContext对象,在各Agent之间传递:
@Data
@Builder
public class SessionContext {
// 三层Session的引用
private ConversationSession conversation;
private TaskSession task;
private AgentSession agent;
// 便捷访问方法
/**
* 获取对话历史(最近N轮)
*/
public List<ConversationMessage> getRecentHistory(int maxTurns) {
List<ConversationMessage> messages = conversation.getMessages();
int startIndex = Math.max(0, messages.size() - maxTurns * 2);
return messages.subList(startIndex, messages.size());
}
/**
* 从共享上下文读取数据
*/
public <T> Optional<T> getSharedData(String key, Class<T> type) {
return task.getSharedContext().get(key)
.map(v -> type.cast(v));
}
/**
* 向共享上下文写入数据
*/
public void putSharedData(String key, Object value) {
task.getSharedContext().put(key, value, agent.getAgentId());
}
/**
* 获取用户的基本信息
*/
public UserProfile getUserProfile() {
return conversation.getUserProfile();
}
/**
* 获取当前任务目标
*/
public String getTaskGoal() {
return task.getUserIntent();
}
/**
* 构建给LLM的系统提示(包含必要上下文)
*/
public String buildSystemPrompt(String agentRole) {
StringBuilder sb = new StringBuilder();
sb.append("你是").append(agentRole).append("。\n\n");
// 添加任务上下文
sb.append("当前任务目标:").append(task.getUserIntent()).append("\n\n");
// 添加已有的共享信息
SharedTaskContext shared = task.getSharedContext();
if (!shared.getSharedData().isEmpty()) {
sb.append("其他Agent已经提供的信息:\n");
shared.getSharedData().entrySet().stream()
.filter(e -> !e.getKey().contains(".__")) // 过滤元信息
.forEach(e -> sb.append("- ").append(e.getKey())
.append(": ").append(e.getValue()).append("\n"));
sb.append("\n");
}
// 添加用户偏好
if (conversation.getUserProfile() != null) {
UserProfile profile = conversation.getUserProfile();
if (profile.getPreferredResponseStyle() != null) {
sb.append("用户偏好的回复风格:").append(profile.getPreferredResponseStyle()).append("\n");
}
}
return sb.toString();
}
}上下文截断:超长历史的处理
多轮对话进行几十轮后,历史消息会很长,超出模型上下文窗口。需要智能截断:
@Component
public class ConversationHistoryCompressor {
@Autowired
private SummarizationService summarizationService;
private static final int MAX_MESSAGES = 20; // 保留最近20条消息
private static final int SUMMARY_THRESHOLD = 30; // 超过30条触发摘要压缩
public List<Message> compressHistory(List<ConversationMessage> history,
int maxTokens) {
if (history.size() <= MAX_MESSAGES) {
return convertToMessages(history);
}
// 策略:保留最近N条原文 + 早期历史的摘要
int recentCount = MAX_MESSAGES;
List<ConversationMessage> recentMessages = history.subList(
history.size() - recentCount, history.size()
);
List<ConversationMessage> oldMessages = history.subList(
0, history.size() - recentCount
);
// 对早期历史生成摘要
String summary = generateSummary(oldMessages);
// 组合:摘要 + 最近消息
List<Message> compressed = new ArrayList<>();
compressed.add(Message.system("以下是之前对话的摘要:\n" + summary));
compressed.addAll(convertToMessages(recentMessages));
return compressed;
}
private String generateSummary(List<ConversationMessage> messages) {
// 如果已经有之前的摘要,增量更新
// 否则全量生成
String conversationText = messages.stream()
.map(m -> m.getRole() + ": " + m.getContent())
.collect(Collectors.joining("\n"));
return summarizationService.summarize(
conversationText,
"请用200字以内概括以上对话的主要内容、达成的共识和遗留的问题"
);
}
private List<Message> convertToMessages(List<ConversationMessage> history) {
return history.stream()
.map(m -> new Message(m.getRole(), m.getContent()))
.collect(Collectors.toList());
}
}跨Agent的上下文注入
编排Agent在调用子Agent时,需要把相关上下文注入到子Agent的请求里:
@Component
public class AgentOrchestrator {
@Autowired
private SessionStorageManager sessionStorage;
@Autowired
private Map<String, AgentExecutor> agentExecutors;
@Autowired
private ConversationHistoryCompressor historyCompressor;
public OrchestrationResult orchestrate(String conversationId,
String userMessage) throws Exception {
// 1. 加载或创建对话Session
ConversationSession conversation = sessionStorage
.getConversationSession(conversationId)
.orElseGet(() -> createNewConversation(conversationId));
// 2. 添加用户消息到历史
conversation.getMessages().add(
new ConversationMessage("user", userMessage, Instant.now())
);
// 3. 判断任务类型,创建任务Session
TaskPlan plan = planTask(conversation, userMessage);
TaskSession taskSession = createTaskSession(conversation, plan);
// 4. 按计划执行各Agent
for (AgentTask agentTask : plan.getAgentTasks()) {
SessionContext context = buildSessionContext(conversation, taskSession, agentTask);
AgentExecutor executor = agentExecutors.get(agentTask.getAgentType());
if (executor == null) {
throw new AgentNotFoundException("未找到Agent类型: " + agentTask.getAgentType());
}
// 执行Agent,传入完整上下文
AgentResult result = executor.execute(agentTask, context);
// 将结果写入共享上下文,供后续Agent使用
if (result.isSuccess()) {
context.putSharedData(agentTask.getOutputKey(), result.getOutput());
log.info("Agent {}执行完成,输出已写入共享上下文: {}",
agentTask.getAgentType(), agentTask.getOutputKey());
} else {
log.warn("Agent {}执行失败: {}", agentTask.getAgentType(), result.getError());
// 根据任务配置决定是否继续
if (agentTask.isRequired()) {
throw new AgentExecutionException("必需Agent执行失败: " + agentTask.getAgentType());
}
}
// 持久化更新后的任务Session
sessionStorage.saveTaskSession(taskSession);
}
// 5. 生成最终回复
String finalResponse = generateFinalResponse(taskSession, conversation);
// 6. 更新对话历史
conversation.getMessages().add(
new ConversationMessage("assistant", finalResponse, Instant.now())
);
sessionStorage.saveConversationSession(conversation);
return new OrchestrationResult(finalResponse, taskSession.getTaskId());
}
private SessionContext buildSessionContext(ConversationSession conversation,
TaskSession task,
AgentTask agentTask) {
// 为每个Agent创建独立的Agent Session
AgentSession agentSession = AgentSession.builder()
.agentSessionId(UUID.randomUUID().toString())
.taskId(task.getTaskId())
.agentId(agentTask.getAgentId())
.agentType(agentTask.getAgentType())
.localMessages(new ArrayList<>())
.localMemory(new HashMap<>())
.toolCallHistory(new ArrayList<>())
.status(AgentStatus.RUNNING)
.startedAt(Instant.now())
.build();
return SessionContext.builder()
.conversation(conversation)
.task(task)
.agent(agentSession)
.build();
}
}上下文安全:防止Agent越权访问
一个安全问题容易被忽视:Agent A不应该能随意读取Agent B的私有上下文,也不应该能覆盖其他Agent写入的共享数据(除非有明确的权限)。
@Component
public class SessionAccessController {
public void validateAccess(String agentId,
String dataKey,
AccessType accessType,
TaskSession task) {
SharedTaskContext context = task.getSharedContext();
if (accessType == AccessType.WRITE) {
// 检查这个Key是否已经被其他Agent锁定
Object writer = context.getSharedData().get(dataKey + ".__writer__");
if (writer != null && !writer.equals(agentId)) {
// 可以覆盖,但要记录日志
log.warn("Agent {}覆盖了Agent {}写入的数据: {}",
agentId, writer, dataKey);
}
// 某些关键Key只允许特定Agent写入
if (isProtectedKey(dataKey)) {
String allowedWriter = getKeyOwner(dataKey, task.getTaskType());
if (!agentId.equals(allowedWriter)) {
throw new UnauthorizedAccessException(
String.format("Agent %s 无权写入保护Key: %s", agentId, dataKey)
);
}
}
}
}
private boolean isProtectedKey(String key) {
// 某些Key是受保护的,比如"final_decision"只能由决策Agent写入
return key.startsWith("protected.") || key.equals("final_decision");
}
private String getKeyOwner(String key, String taskType) {
// 根据任务类型和Key返回允许写入的AgentID
// 实际可以从配置中读取
return "decision-agent";
}
}会话恢复:处理中断的任务
网络断开、Agent崩溃,任务中途失败了怎么办?需要支持任务恢复:
@Component
public class TaskRecoveryService {
@Autowired
private SessionStorageManager sessionStorage;
@Autowired
private AgentOrchestrator orchestrator;
/**
* 检查并恢复中断的任务
*/
public void recoverInterruptedTasks(String conversationId) {
ConversationSession conversation = sessionStorage
.getConversationSession(conversationId)
.orElseThrow(() -> new SessionNotFoundException(conversationId));
for (String taskId : conversation.getActiveTaskIds()) {
TaskSession task = sessionStorage.getTaskSession(taskId).orElse(null);
if (task == null) continue;
if (task.getStatus() == TaskStatus.RUNNING) {
long runningTime = Duration.between(task.getCreatedAt(), Instant.now()).toMinutes();
if (runningTime > 10) {
// 运行超过10分钟且未完成,认为是中断的任务
log.warn("发现中断任务: {}, 已运行{}分钟", taskId, runningTime);
recoverTask(task, conversation);
}
}
}
}
private void recoverTask(TaskSession task, ConversationSession conversation) {
// 找到未完成的AgentTask,从断点续跑
List<AgentTask> pendingTasks = task.getExecutionPlan().stream()
.filter(t -> !task.getCompletedAgentIds().contains(t.getAgentId()))
.collect(Collectors.toList());
if (pendingTasks.isEmpty()) {
// 所有Agent都完成了,但任务状态没更新,直接标记完成
task.setStatus(TaskStatus.COMPLETED);
sessionStorage.saveTaskSession(task);
return;
}
log.info("恢复任务{},还有{}个Agent未完成", task.getTaskId(), pendingTasks.size());
// 从上次中断的位置继续执行
orchestrator.resumeTask(task, pendingTasks, conversation);
}
}监控与调试
多Agent系统调试很痛苦,日志分散在各个Agent里。用结构化日志把Session信息注入每条日志:
@Aspect
@Component
public class SessionContextLoggingAspect {
@Around("@annotation(AgentOperation)")
public Object logWithSessionContext(ProceedingJoinPoint pjp) throws Throwable {
SessionContext context = extractContext(pjp.getArgs());
if (context != null) {
// 将Session信息放入MDC,所有日志都会带上这些字段
MDC.put("conversationId", context.getConversation().getConversationId());
MDC.put("taskId", context.getTask().getTaskId());
MDC.put("agentId", context.getAgent().getAgentId());
MDC.put("agentType", context.getAgent().getAgentType());
}
try {
return pjp.proceed();
} finally {
MDC.remove("conversationId");
MDC.remove("taskId");
MDC.remove("agentId");
MDC.remove("agentType");
}
}
}这样在日志里就能按conversationId把整个对话的所有Agent日志串在一起看,排查问题时很省力。
踩坑总结
做多Agent Session管理,我踩过这几个坑:
坑一:SharedContext没有冲突保护,两个并行Agent写了相同Key。解决方案:并行Agent要分配独立的Key,或者用读写锁协调写入顺序。
坑二:对话历史无限增长,最后大模型上下文溢出。解决方案:超过阈值后做摘要压缩,同时对每轮消息设置最大token数限制。
坑三:Agent Session忘记清理,Redis里积累了大量过期数据。解决方案:给每种Session设合理的TTL,任务完成后主动触发清理,不要完全依赖TTL自动过期。
坑四:任务中断后不知道从哪里恢复。解决方案:每个AgentTask完成后立即持久化状态,保证断点可查。
多Agent系统的Session管理,表面上是个"存哪里、怎么取"的技术问题,实际上是一个"上下文的生命周期管理"的设计问题。
理清楚三层Session的边界,做好持久化和恢复机制,多Agent系统才能真正稳定地跑起来。
