第1759篇:微服务间的AI上下文传播——如何在服务边界传递对话状态
第1759篇:微服务间的AI上下文传播——如何在服务边界传递对话状态
做多轮对话功能的时候,有个问题让我想了很久:对话历史应该放在哪里?
最简单的做法是放在客户端,每次请求把完整的对话历史带上来。这对于简单的demo没问题,但生产系统不行——对话越来越长,请求体越来越大,带宽和序列化开销都是问题,而且前端也很难管理。
放服务端,又面临另一个问题:AI系统往往不是一个单体,而是多个服务的协作。一个"对话"可能跨越了对话管理服务、知识库检索服务、AI推理服务、用户画像服务……这些服务怎么共享同一个对话上下文?
这不只是"存哪里"的问题,更是"如何在微服务边界传递"的问题。今天来系统讲这块。
一、AI对话上下文的构成
在讨论怎么传递之前,先弄清楚AI对话上下文包含什么。
核心上下文(必须传递):
- 会话ID(conversationId):标识一次对话
- 对话历史(messages):所有历史消息
- 系统提示词版本:确保整个对话用同一个版本的提示词
运行时上下文(需要跨服务共享):
- 当前使用的模型和参数配置
- 已调用的工具和结果
- 已检索的知识库文档(避免重复检索)
- 用户画像信息(个性化)
元数据(追踪用):
- traceId:链路追踪ID
- sessionId:HTTP会话ID
- requestId:当前请求ID
这么多信息,不可能全部塞进HTTP Header里,也不应该每次都全量传递。需要一个分层的传递策略。
二、分层传递策略
核心思路:
- 客户端到网关:只传conversationId,其他信息服务端都有
- 服务间传递:通过Header传递"上下文令牌"(contextToken),本质是一个指向Redis里完整上下文的key
- Redis作为上下文存储:完整的对话历史、工具调用结果等存在Redis,服务通过conversationId读取
三、上下文存储模型设计
@Data
@Builder
@JsonSerialize
public class ConversationContext {
private String conversationId;
private String userId;
private String tenantId;
// 对话历史(核心)
private List<ConversationMessage> messages;
// 当前使用的提示词配置
private String promptId;
private String promptVersion;
// 模型配置快照(保证整个对话用同一套参数)
private ModelConfigSnapshot modelConfig;
// 工具调用状态
private List<ToolCallRecord> toolCallHistory;
// 已加载的知识库文档(缓存,避免重复检索)
private List<String> loadedDocumentIds;
// 用户偏好(从用户画像服务拉取后缓存)
private Map<String, String> userPreferences;
// 上下文元数据
private LocalDateTime createdAt;
private LocalDateTime lastUpdatedAt;
private int turnCount;
private long totalTokensUsed;
// 用于乐观锁
private long version;
@Data
@Builder
public static class ConversationMessage {
private String role; // system/user/assistant/tool
private String content;
private String toolCallId; // tool角色时使用
private List<ToolCall> toolCalls; // assistant发起工具调用时
private LocalDateTime timestamp;
private int tokenCount;
}
@Data
@Builder
public static class ModelConfigSnapshot {
private String model;
private double temperature;
private int maxTokens;
private double topP;
private String capturedAt;
}
@Data
@Builder
public static class ToolCallRecord {
private String toolCallId;
private String toolName;
private String arguments;
private String result;
private long durationMs;
private LocalDateTime calledAt;
}
}上下文存储服务:
@Service
@Slf4j
public class ConversationContextStore {
private final ReactiveRedisTemplate<String, String> redis;
private final ObjectMapper objectMapper;
private static final String KEY_PREFIX = "conv:ctx:";
private static final Duration TTL = Duration.ofHours(24);
/**
* 创建新会话上下文
*/
public Mono<ConversationContext> create(String userId, String tenantId,
String promptId, ModelConfigSnapshot modelConfig) {
String conversationId = generateConversationId();
ConversationContext ctx = ConversationContext.builder()
.conversationId(conversationId)
.userId(userId)
.tenantId(tenantId)
.promptId(promptId)
.modelConfig(modelConfig)
.messages(new ArrayList<>())
.toolCallHistory(new ArrayList<>())
.loadedDocumentIds(new ArrayList<>())
.userPreferences(new HashMap<>())
.createdAt(LocalDateTime.now())
.lastUpdatedAt(LocalDateTime.now())
.turnCount(0)
.totalTokensUsed(0)
.version(0)
.build();
return saveContext(ctx).thenReturn(ctx);
}
/**
* 加载上下文
*/
public Mono<ConversationContext> load(String conversationId) {
return redis.opsForValue()
.get(KEY_PREFIX + conversationId)
.flatMap(json -> {
try {
return Mono.just(objectMapper.readValue(json, ConversationContext.class));
} catch (JsonProcessingException e) {
return Mono.error(new ContextDeserializationException(conversationId, e));
}
})
.switchIfEmpty(Mono.error(new ConversationNotFoundException(conversationId)));
}
/**
* 追加消息到上下文(乐观锁保证并发安全)
*/
public Mono<ConversationContext> appendMessage(String conversationId,
ConversationContext.ConversationMessage message,
int tokensUsed) {
return load(conversationId)
.flatMap(ctx -> {
ctx.getMessages().add(message);
ctx.setTurnCount(ctx.getTurnCount() + 1);
ctx.setTotalTokensUsed(ctx.getTotalTokensUsed() + tokensUsed);
ctx.setLastUpdatedAt(LocalDateTime.now());
// 上下文过长时自动压缩
if (shouldCompress(ctx)) {
ctx = compressContext(ctx);
}
long expectedVersion = ctx.getVersion();
ctx.setVersion(expectedVersion + 1);
ConversationContext finalCtx = ctx;
// 乐观锁实现:用Lua脚本检查版本后再保存
return saveWithOptimisticLock(finalCtx, expectedVersion);
});
}
private Mono<ConversationContext> saveWithOptimisticLock(ConversationContext ctx,
long expectedVersion) {
String key = KEY_PREFIX + ctx.getConversationId();
String luaScript =
"local current = redis.call('get', KEYS[1])\n" +
"if current then\n" +
" local parsed = cjson.decode(current)\n" +
" if parsed.version ~= tonumber(ARGV[2]) then\n" +
" return 0\n" +
" end\n" +
"end\n" +
"redis.call('set', KEYS[1], ARGV[1], 'EX', ARGV[3])\n" +
"return 1";
return Mono.fromCallable(() -> objectMapper.writeValueAsString(ctx))
.flatMap(json -> redis.execute(
RedisScript.of(luaScript, Long.class),
Collections.singletonList(key),
json,
String.valueOf(expectedVersion),
String.valueOf(TTL.getSeconds())
).next())
.flatMap(result -> {
if (result == 0L) {
// 版本冲突,重试
return appendMessage(ctx.getConversationId(),
ctx.getMessages().get(ctx.getMessages().size() - 1),
0);
}
return Mono.just(ctx);
});
}
/**
* 上下文压缩:保留系统消息 + 最近N轮 + 重要工具调用结果
*/
private ConversationContext compressContext(ConversationContext ctx) {
List<ConversationContext.ConversationMessage> messages = ctx.getMessages();
// 保留系统消息
List<ConversationContext.ConversationMessage> systemMessages = messages.stream()
.filter(m -> "system".equals(m.getRole()))
.collect(Collectors.toList());
// 保留最近10轮(20条消息)
int keepCount = 20;
List<ConversationContext.ConversationMessage> recentMessages =
messages.size() > keepCount ?
messages.subList(messages.size() - keepCount, messages.size()) :
new ArrayList<>(messages);
// 合并:系统消息 + 压缩摘要占位 + 最近消息
List<ConversationContext.ConversationMessage> compressed = new ArrayList<>();
compressed.addAll(systemMessages);
if (messages.size() > keepCount + systemMessages.size()) {
// 添加历史摘要占位符(实际项目中可以用AI生成摘要)
compressed.add(ConversationContext.ConversationMessage.builder()
.role("system")
.content("[对话历史摘要:此前对话已压缩,用户询问过关于..." +
"产品功能,助手提供了相关说明]")
.timestamp(LocalDateTime.now())
.tokenCount(50)
.build());
}
compressed.addAll(recentMessages);
ctx.setMessages(compressed);
log.info("Context compressed: conversationId={}, original={} messages, compressed={} messages",
ctx.getConversationId(), messages.size(), compressed.size());
return ctx;
}
private boolean shouldCompress(ConversationContext ctx) {
// 消息数量超过50条,或总token数超过100000
return ctx.getMessages().size() > 50 || ctx.getTotalTokensUsed() > 100_000;
}
private Mono<Void> saveContext(ConversationContext ctx) {
return Mono.fromCallable(() -> objectMapper.writeValueAsString(ctx))
.flatMap(json -> redis.opsForValue().set(
KEY_PREFIX + ctx.getConversationId(), json, TTL))
.then();
}
private String generateConversationId() {
return "conv_" + UUID.randomUUID().toString().replace("-", "");
}
}四、上下文在HTTP Header中的传播
服务间传递上下文不能把完整数据塞进Header(Header有大小限制,一般4-8KB)。我们传递的是一个轻量的"上下文令牌",包含conversationId和一些必要的元数据:
@Data
@Builder
public class AIContextToken {
private String conversationId;
private String userId;
private String tenantId;
private String traceId;
private String requestId;
private long contextVersion; // 用于版本校验
public String toHeaderValue() {
// 简单的Base64编码,不涉及敏感信息所以不加密
try {
String json = objectMapper.writeValueAsString(this);
return Base64.getUrlEncoder().encodeToString(json.getBytes(StandardCharsets.UTF_8));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
public static AIContextToken fromHeaderValue(String headerValue) {
try {
byte[] decoded = Base64.getUrlDecoder().decode(headerValue);
String json = new String(decoded, StandardCharsets.UTF_8);
return objectMapper.readValue(json, AIContextToken.class);
} catch (Exception e) {
throw new InvalidContextTokenException(headerValue, e);
}
}
}网关注入上下文令牌:
@Component
public class AIContextInjectionFilter implements GlobalFilter, Ordered {
private static final String AI_CONTEXT_HEADER = "X-AI-Context-Token";
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
String conversationId = exchange.getRequest().getHeaders()
.getFirst("X-Conversation-Id");
if (conversationId == null) {
return chain.filter(exchange);
}
// 构建上下文令牌
String userId = extractUserId(exchange);
String tenantId = extractTenantId(exchange);
String traceId = getCurrentTraceId();
String requestId = UUID.randomUUID().toString();
AIContextToken token = AIContextToken.builder()
.conversationId(conversationId)
.userId(userId)
.tenantId(tenantId)
.traceId(traceId)
.requestId(requestId)
.build();
// 注入Header
ServerHttpRequest mutatedRequest = exchange.getRequest().mutate()
.header(AI_CONTEXT_HEADER, token.toHeaderValue())
.header("X-Request-Id", requestId)
.build();
return chain.filter(exchange.mutate().request(mutatedRequest).build());
}
@Override
public int getOrder() {
return -5;
}
}在服务内解析上下文令牌:
@Component
public class AIContextExtractor {
private static final String AI_CONTEXT_HEADER = "X-AI-Context-Token";
/**
* 从HTTP请求中提取AI上下文令牌
*/
public Optional<AIContextToken> extract(HttpServletRequest request) {
String headerValue = request.getHeader(AI_CONTEXT_HEADER);
if (headerValue == null) return Optional.empty();
try {
return Optional.of(AIContextToken.fromHeaderValue(headerValue));
} catch (Exception e) {
log.warn("Failed to parse AI context token: {}", e.getMessage());
return Optional.empty();
}
}
/**
* 从Reactive请求中提取
*/
public Optional<AIContextToken> extract(ServerWebExchange exchange) {
String headerValue = exchange.getRequest().getHeaders()
.getFirst(AI_CONTEXT_HEADER);
if (headerValue == null) return Optional.empty();
try {
return Optional.of(AIContextToken.fromHeaderValue(headerValue));
} catch (Exception e) {
log.warn("Failed to parse AI context token: {}", e.getMessage());
return Optional.empty();
}
}
}五、ThreadLocal vs Reactor Context
在微服务里传递上下文,有两种选择:传统的ThreadLocal(适合Servlet栈)和Reactor Context(适合WebFlux响应式栈)。
AI服务推荐用WebFlux,所以重点讲Reactor Context的使用:
// 定义上下文Key
public class AIContextKeys {
public static final Context.Key<AIContextToken> CONTEXT_TOKEN =
Context.key("ai.context.token");
public static final Context.Key<ConversationContext> CONV_CONTEXT =
Context.key("ai.conversation.context");
}
// 在请求处理入口写入Context
@Component
public class AIContextWebFilter implements WebFilter {
private final AIContextExtractor extractor;
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
Optional<AIContextToken> tokenOpt = extractor.extract(exchange);
if (tokenOpt.isEmpty()) {
return chain.filter(exchange);
}
AIContextToken token = tokenOpt.get();
return chain.filter(exchange)
.contextWrite(ctx -> ctx.put(AIContextKeys.CONTEXT_TOKEN, token));
}
}
// 在服务内读取Context
@Service
public class RAGRetrievalService {
private final ConversationContextStore contextStore;
public Mono<List<Document>> retrieve(String query) {
// 从Reactor Context读取AI上下文令牌
return Mono.deferContextual(contextView -> {
Optional<AIContextToken> tokenOpt = contextView.getOrEmpty(
AIContextKeys.CONTEXT_TOKEN);
if (tokenOpt.isEmpty()) {
// 没有会话上下文,直接检索
return doRetrieval(query, null);
}
AIContextToken token = tokenOpt.get();
// 加载会话上下文
return contextStore.load(token.getConversationId())
.flatMap(convCtx -> {
// 检查是否已经加载过这些文档(避免重复检索)
List<String> alreadyLoaded = convCtx.getLoadedDocumentIds();
return doRetrieval(query, alreadyLoaded)
.flatMap(docs -> {
// 记录本次加载的文档
List<String> newDocIds = docs.stream()
.map(Document::getId)
.collect(Collectors.toList());
convCtx.getLoadedDocumentIds().addAll(newDocIds);
return contextStore.saveContext(convCtx)
.thenReturn(docs);
});
})
.onErrorResume(ConversationNotFoundException.class, e -> {
// 找不到会话上下文,直接检索
return doRetrieval(query, null);
});
});
}
private Mono<List<Document>> doRetrieval(String query, List<String> excludeIds) {
// 实际的向量检索逻辑
return vectorStore.search(query, excludeIds);
}
}六、跨服务调用时的上下文传播
当服务A调用服务B时,需要把Reactor Context里的AI上下文令牌传递到HTTP Header里:
@Component
public class AIContextPropagatingWebClient {
private final WebClient.Builder webClientBuilder;
public WebClient buildClient(String serviceUrl) {
return webClientBuilder.baseUrl(serviceUrl)
.filter(this::propagateAIContext)
.build();
}
private Mono<ClientResponse> propagateAIContext(
ClientRequest request, ExchangeFunction next) {
return Mono.deferContextual(contextView -> {
// 从Reactor Context读取AI上下文令牌
Optional<AIContextToken> tokenOpt = contextView.getOrEmpty(
AIContextKeys.CONTEXT_TOKEN);
ClientRequest.Builder mutatedRequest = ClientRequest.from(request);
tokenOpt.ifPresent(token -> {
mutatedRequest.header("X-AI-Context-Token", token.toHeaderValue());
});
// 也传播traceId
contextView.getOrEmpty(TraceContext.class).ifPresent(trace -> {
mutatedRequest.header("X-Trace-Id", trace.getTraceId());
mutatedRequest.header("X-Span-Id", trace.getSpanId());
});
return next.exchange(mutatedRequest.build());
});
}
}七、对话上下文的并发处理
同一个用户可能同时发起多个请求(比如刷新页面),这时候对同一个conversationId的并发写入会造成数据竞争。除了前面提到的乐观锁,还需要考虑业务层面的限制:
@Service
@Slf4j
public class ConversationConcurrencyGuard {
private final ReactiveRedisTemplate<String, String> redis;
private static final Duration LOCK_TIMEOUT = Duration.ofSeconds(30);
private static final String LOCK_PREFIX = "conv:lock:";
/**
* 获取会话处理锁,保证同一会话同一时间只有一个请求在处理
*/
public <T> Mono<T> withConversationLock(String conversationId, Mono<T> work) {
String lockKey = LOCK_PREFIX + conversationId;
String lockValue = UUID.randomUUID().toString();
return redis.opsForValue()
.setIfAbsent(lockKey, lockValue, LOCK_TIMEOUT)
.flatMap(acquired -> {
if (!acquired) {
return Mono.error(new ConversationBusyException(
"Another request is being processed for conversation: " +
conversationId));
}
return work
.doFinally(signal -> releaseLock(lockKey, lockValue).subscribe());
});
}
private Mono<Boolean> releaseLock(String lockKey, String lockValue) {
String luaScript =
"if redis.call('get', KEYS[1]) == ARGV[1] then\n" +
" return redis.call('del', KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
return redis.execute(
RedisScript.of(luaScript, Long.class),
Collections.singletonList(lockKey),
lockValue
).next().map(result -> result == 1L);
}
}八、上下文的序列化与性能
对话历史越来越长,序列化/反序列化的性能会成为瓶颈。几个优化点:
压缩存储:
@Service
public class CompressedContextStore {
public void save(String key, ConversationContext ctx) {
try {
byte[] json = objectMapper.writeValueAsBytes(ctx);
// GZIP压缩(对于文本内容,压缩率通常超过50%)
ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (GZIPOutputStream gzip = new GZIPOutputStream(baos)) {
gzip.write(json);
}
byte[] compressed = baos.toByteArray();
redis.opsForValue().set(key, compressed, TTL);
log.debug("Context saved: original={}B, compressed={}B, ratio={}%",
json.length, compressed.length,
100 - (compressed.length * 100 / json.length));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}分段加载:对于很长的对话,不是每次都需要加载完整历史,可以按需加载:
// 只加载最近N条消息
public Mono<List<ConversationContext.ConversationMessage>> loadRecentMessages(
String conversationId, int n) {
return load(conversationId)
.map(ctx -> {
List<ConversationContext.ConversationMessage> messages = ctx.getMessages();
int start = Math.max(0, messages.size() - n);
return messages.subList(start, messages.size());
});
}九、跨会话的用户画像传播
除了单次会话的上下文,用户的长期偏好(用户画像)也需要在AI服务间共享:
@Service
public class UserProfileContextEnricher {
private final UserProfileService profileService;
private final ReactiveRedisTemplate<String, String> redis;
private static final Duration PROFILE_CACHE_TTL = Duration.ofMinutes(30);
/**
* 将用户画像信息注入到会话上下文中
*/
public Mono<ConversationContext> enrichWithUserProfile(ConversationContext ctx) {
String cacheKey = "user:profile:ai:" + ctx.getUserId();
return redis.opsForValue().get(cacheKey)
.flatMap(cachedProfile -> {
try {
Map<String, String> profile = objectMapper.readValue(
cachedProfile, new TypeReference<>() {});
ctx.setUserPreferences(profile);
return Mono.just(ctx);
} catch (Exception e) {
return fetchAndCacheProfile(ctx, cacheKey);
}
})
.switchIfEmpty(fetchAndCacheProfile(ctx, cacheKey));
}
private Mono<ConversationContext> fetchAndCacheProfile(
ConversationContext ctx, String cacheKey) {
return profileService.getUserAIProfile(ctx.getUserId())
.flatMap(profile -> {
ctx.setUserPreferences(profile.toMap());
// 缓存用户画像
return Mono.fromCallable(() -> objectMapper.writeValueAsString(profile.toMap()))
.flatMap(json -> redis.opsForValue().set(cacheKey, json, PROFILE_CACHE_TTL))
.thenReturn(ctx);
});
}
}十、踩坑记录
坑一:会话上下文过期导致中途失效
用户长时间暂停对话,回来继续时发现上下文已经过期(TTL到了)。重新开始一个新会话的话,之前的上下文全丢了,用户体验很差。
解法:每次访问时刷新TTL(滑动过期),而不是固定过期时间。另外,重要的历史对话在TTL前做持久化到数据库,让用户能从历史里找回。
坑二:Reactor Context不够用时降级到ThreadLocal
有些场景(比如在@Scheduled方法里、在Java SDK的回调里),Reactor Context是拿不到的,必须退回ThreadLocal。建议封装一个统一的工具类,自动判断当前是否在Reactive上下文中:
public class AIContextHolder {
private static final ThreadLocal<AIContextToken> threadLocalContext = new ThreadLocal<>();
public static void setThreadLocalContext(AIContextToken token) {
threadLocalContext.set(token);
}
public static Optional<AIContextToken> getContext() {
// 优先从Reactor Context获取(当前在reactive流中)
// 如果不在,退回到ThreadLocal
AIContextToken local = threadLocalContext.get();
return Optional.ofNullable(local);
}
public static void clearThreadLocalContext() {
threadLocalContext.remove();
}
}坑三:跨服务调用时上下文版本不一致
服务A更新了conversationId的上下文(version从5变为6),同时服务B拿着version=5的上下文做了操作,乐观锁冲突。频繁冲突时,重试会造成延迟显著增加。
解法:减少不必要的上下文写操作。服务B如果只是读取上下文、不修改,就不要触发写。只有真正修改了上下文(比如追加了消息),才写回Redis。
坑四:Base64编码的Header大小超限
AIContextToken如果包含太多字段,Base64编码后可能超过Header大小限制。要把token精简到最小,只包含必要的标识信息,其他数据从Redis按需加载。
十一、小结
微服务间的AI上下文传播,本质上是分布式状态管理问题,只是状态的内容(对话历史、工具调用记录)比较特殊。
几个核心设计原则:
轻Header,重存储:HTTP Header只传conversationId和必要元数据,完整上下文存Redis,按需加载。
版本控制保并发安全:乐观锁避免并发写入导致的数据覆盖。
分层TTL:内存缓存(秒级)→ Redis(小时级)→ 数据库(永久),根据访问频率分层存储,兼顾性能和持久性。
上下文传播透明化:通过WebFilter和WebClient拦截器自动传播,业务代码不感知。这个"透明"很重要,如果每个服务每个方法都要手动传context,维护成本极高。
