第1820篇:实时AI应用的端到端延迟优化——从请求到响应的全链路分析
第1820篇:实时AI应用的端到端延迟优化——从请求到响应的全链路分析
上周有个读者问我:他们的AI问答接口,平均响应时间3秒多,用户体验很差,怎么优化?
我问了几个问题:LLM调用本身多少时间?有没有做链路追踪?网络传输是哪里的?他一下没答上来。
这个问题暴露了一个常见的盲区:很多人优化延迟靠猜,而不是靠数据。
在动手优化之前,你必须先做全链路追踪,搞清楚时间都花在哪里了。否则你优化了一个耗时5%的步骤,对整体P99延迟毫无帮助。这就是端到端延迟优化的第一原则:测量先于优化。
这篇文章我从链路追踪开始,系统讲端到端延迟优化的方法论和具体手段。
延迟的组成部分
一个典型的实时AI请求,延迟分解如下:
在大多数场景里,LLM调用(步骤F)占总延迟的70-90%。但这不意味着其他步骤不重要——特别是当你的LLM响应已经很快时(比如用了流式输出),其他步骤的延迟就会显得突出。
全链路追踪的实现
先把链路追踪做好,用数据说话:
@Configuration
public class ObservabilityConfig {
@Bean
public Tracer tracer(OpenTelemetry openTelemetry) {
return openTelemetry.getTracer("ai-application", "1.0.0");
}
@Bean
public OpenTelemetry openTelemetry() {
// 配置Jaeger exporter
JaegerGrpcSpanExporter jaegerExporter = JaegerGrpcSpanExporter.builder()
.setEndpoint("http://jaeger:14250")
.build();
return OpenTelemetrySdk.builder()
.setTracerProvider(
SdkTracerProvider.builder()
.addSpanProcessor(BatchSpanProcessor.builder(jaegerExporter).build())
.setSampler(Sampler.traceIdRatioBased(0.1)) // 采样10%的请求
.build()
)
.buildAndRegisterGlobal();
}
}@Service
@Slf4j
public class TracedAIService {
private final Tracer tracer;
private final LLMService llmService;
private final FeatureStore featureStore;
private final CacheService cacheService;
private final MeterRegistry meterRegistry;
public AIResponse processRequest(AIRequest request) {
// 创建根Span
Span rootSpan = tracer.spanBuilder("ai.request.process")
.setSpanKind(SpanKind.SERVER)
.startSpan();
try (Scope scope = rootSpan.makeCurrent()) {
rootSpan.setAttribute("request.id", request.getRequestId());
rootSpan.setAttribute("user.id", request.getUserId());
rootSpan.setAttribute("request.type", request.getType());
long totalStart = System.currentTimeMillis();
// Step 1: 检查缓存
long cacheStart = System.currentTimeMillis();
Optional<AIResponse> cachedResponse = checkCache(request);
recordStepLatency("cache_check", System.currentTimeMillis() - cacheStart);
if (cachedResponse.isPresent()) {
rootSpan.setAttribute("cache.hit", true);
return cachedResponse.get();
}
rootSpan.setAttribute("cache.hit", false);
// Step 2: 特征获取
long featureStart = System.currentTimeMillis();
RequestContext context = fetchFeatures(request);
recordStepLatency("feature_fetch", System.currentTimeMillis() - featureStart);
// Step 3: LLM调用
long llmStart = System.currentTimeMillis();
String llmResult = callLLM(request, context);
long llmLatency = System.currentTimeMillis() - llmStart;
recordStepLatency("llm_call", llmLatency);
rootSpan.setAttribute("llm.latency_ms", llmLatency);
// Step 4: 结果后处理
long postStart = System.currentTimeMillis();
AIResponse response = postProcess(request, llmResult);
recordStepLatency("post_process", System.currentTimeMillis() - postStart);
// Step 5: 异步写缓存
long cacheWriteStart = System.currentTimeMillis();
asyncWriteCache(request, response);
recordStepLatency("cache_write_async", System.currentTimeMillis() - cacheWriteStart);
long totalLatency = System.currentTimeMillis() - totalStart;
rootSpan.setAttribute("total.latency_ms", totalLatency);
// 记录SLA违反情况
if (totalLatency > 3000) {
log.warn("SLA violation: request {} took {}ms",
request.getRequestId(), totalLatency);
meterRegistry.counter("ai.sla.violation").increment();
}
return response;
} catch (Exception e) {
rootSpan.recordException(e);
rootSpan.setStatus(StatusCode.ERROR, e.getMessage());
throw e;
} finally {
rootSpan.end();
}
}
private void recordStepLatency(String step, long latencyMs) {
meterRegistry.timer("ai.step.latency", "step", step)
.record(latencyMs, TimeUnit.MILLISECONDS);
log.debug("Step [{}] took {}ms", step, latencyMs);
}
private Optional<AIResponse> checkCache(AIRequest request) {
Span span = tracer.spanBuilder("cache.check").startSpan();
try (Scope s = span.makeCurrent()) {
return cacheService.get(request.getCacheKey());
} finally {
span.end();
}
}
private RequestContext fetchFeatures(AIRequest request) {
Span span = tracer.spanBuilder("feature.fetch").startSpan();
try (Scope s = span.makeCurrent()) {
return featureStore.getContext(request.getUserId());
} finally {
span.end();
}
}
private String callLLM(AIRequest request, RequestContext context) {
Span span = tracer.spanBuilder("llm.call")
.setAttribute("llm.model", llmService.getModelName())
.startSpan();
try (Scope s = span.makeCurrent()) {
return llmService.generate(request.getPrompt(), context);
} finally {
span.end();
}
}
private AIResponse postProcess(AIRequest request, String llmResult) {
Span span = tracer.spanBuilder("post.process").startSpan();
try (Scope s = span.makeCurrent()) {
return AIResponse.builder()
.requestId(request.getRequestId())
.content(llmResult)
.build();
} finally {
span.end();
}
}
private void asyncWriteCache(AIRequest request, AIResponse response) {
// 异步写缓存,不阻塞主流程
CompletableFuture.runAsync(() ->
cacheService.set(request.getCacheKey(), response, Duration.ofMinutes(10)));
}
}优化技术1:语义缓存
LLM调用耗时最长,最有价值的优化就是减少LLM调用次数。语义缓存比传统的精确字符串缓存强多了:
@Service
@Slf4j
public class SemanticCacheService {
private final EmbeddingModel embeddingModel;
private final VectorStore vectorStore;
private final RedisTemplate<String, CacheEntry> redisTemplate;
private static final double SIMILARITY_THRESHOLD = 0.95; // 相似度阈值
private static final Duration CACHE_TTL = Duration.ofHours(1);
/**
* 语义缓存查找
* 如果有语义相似的历史请求,直接返回历史结果
*/
public Optional<String> semanticGet(String prompt) {
try {
// 向量化当前请求
float[] queryEmbedding = embeddingModel.embed(prompt).content().vector();
// 在缓存向量库中查找相似请求
List<EmbeddingMatch<TextSegment>> matches = vectorStore.search(
EmbeddingSearchRequest.builder()
.queryEmbedding(Embedding.from(queryEmbedding))
.maxResults(1)
.minScore(SIMILARITY_THRESHOLD)
.build()
).matches();
if (!matches.isEmpty()) {
String cacheKey = matches.get(0).embedded().metadata().getString("cache_key");
CacheEntry entry = redisTemplate.opsForValue().get(cacheKey);
if (entry != null && !entry.isExpired()) {
log.debug("Semantic cache hit, similarity: {:.3f}", matches.get(0).score());
meterRegistry.counter("semantic_cache.hit").increment();
return Optional.of(entry.getResponse());
}
}
meterRegistry.counter("semantic_cache.miss").increment();
return Optional.empty();
} catch (Exception e) {
log.error("Semantic cache lookup failed", e);
return Optional.empty(); // 缓存失败不影响正常流程
}
}
/**
* 将新的请求-响应对存入语义缓存
*/
public void semanticPut(String prompt, String response) {
try {
String cacheKey = "semantic:" + Hashing.sha256()
.hashString(prompt, StandardCharsets.UTF_8)
.toString().substring(0, 16);
// 向量化并存入向量库
float[] embedding = embeddingModel.embed(prompt).content().vector();
TextSegment segment = TextSegment.from(prompt,
Metadata.from("cache_key", cacheKey));
vectorStore.add(Embedding.from(embedding), segment);
// 存入Redis
CacheEntry entry = new CacheEntry(response, System.currentTimeMillis());
redisTemplate.opsForValue().set(cacheKey, entry, CACHE_TTL);
} catch (Exception e) {
log.error("Semantic cache write failed", e);
}
}
}优化技术2:请求并行化
很多AI请求涉及多个独立的子任务,串行执行浪费时间:
@Service
@Slf4j
public class ParallelExecutionService {
// 虚拟线程池:适合I/O密集型的AI调用
private final ExecutorService virtualThreadPool =
Executors.newVirtualThreadPerTaskExecutor();
/**
* 并行执行多个独立的AI任务
*/
public MultiAnalysisResult analyzeInParallel(String text) {
long startTime = System.currentTimeMillis();
// 这三个分析完全独立,可以并行执行
CompletableFuture<String> sentimentFuture = CompletableFuture.supplyAsync(
() -> analyzeSentiment(text), virtualThreadPool);
CompletableFuture<List<String>> keywordsFuture = CompletableFuture.supplyAsync(
() -> extractKeywords(text), virtualThreadPool);
CompletableFuture<String> categoryFuture = CompletableFuture.supplyAsync(
() -> classifyCategory(text), virtualThreadPool);
// 等待全部完成(带超时)
try {
CompletableFuture.allOf(sentimentFuture, keywordsFuture, categoryFuture)
.get(10, TimeUnit.SECONDS);
long elapsed = System.currentTimeMillis() - startTime;
log.info("Parallel analysis completed in {}ms", elapsed);
return MultiAnalysisResult.builder()
.sentiment(sentimentFuture.get())
.keywords(keywordsFuture.get())
.category(categoryFuture.get())
.analysisTimeMs(elapsed)
.build();
} catch (TimeoutException e) {
log.warn("Parallel analysis timeout, returning partial results");
// 超时时返回已完成的部分
return MultiAnalysisResult.builder()
.sentiment(getFutureOrDefault(sentimentFuture, "UNKNOWN"))
.keywords(getFutureOrDefault(keywordsFuture, Collections.emptyList()))
.category(getFutureOrDefault(categoryFuture, "UNKNOWN"))
.partial(true)
.build();
} catch (Exception e) {
throw new RuntimeException("Parallel analysis failed", e);
}
}
private <T> T getFutureOrDefault(CompletableFuture<T> future, T defaultValue) {
if (future.isDone() && !future.isCompletedExceptionally()) {
try { return future.get(); }
catch (Exception e) { return defaultValue; }
}
future.cancel(true);
return defaultValue;
}
private String analyzeSentiment(String text) {
// 调用LLM做情感分析
return "POSITIVE";
}
private List<String> extractKeywords(String text) {
return List.of("关键词1", "关键词2");
}
private String classifyCategory(String text) {
return "科技";
}
}优化技术3:流式响应降低感知延迟
即使LLM的首token延迟不变,流式响应能极大改善用户感知体验:
@RestController
@Slf4j
public class StreamingAIController {
private final StreamingLLMService streamingLLMService;
/**
* SSE流式响应
* 用户不需要等所有token生成完才看到内容
*/
@GetMapping(value = "/api/ai/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamChat(@RequestParam String question) {
log.info("Streaming chat request: {}", question.substring(0, Math.min(50, question.length())));
return streamingLLMService.generateStreaming(question)
.map(token -> ServerSentEvent.builder(token)
.event("token")
.build())
.concatWith(
// 发送完成事件
Flux.just(ServerSentEvent.<String>builder()
.event("done")
.data("[DONE]")
.build())
)
.doOnError(error -> log.error("Streaming error", error))
.onErrorReturn(ServerSentEvent.<String>builder()
.event("error")
.data("服务暂时不可用,请稍后重试")
.build());
}
}
@Service
public class StreamingLLMService {
private final StreamingChatLanguageModel streamingModel;
public Flux<String> generateStreaming(String prompt) {
return Flux.create(sink -> {
streamingModel.generate(prompt, new StreamingResponseHandler<AiMessage>() {
@Override
public void onNext(String token) {
sink.next(token);
}
@Override
public void onComplete(Response<AiMessage> response) {
sink.complete();
}
@Override
public void onError(Throwable error) {
sink.error(error);
}
});
});
}
}优化技术4:Prompt优化减少Token数
LLM的延迟与token数量强相关,减少无效token能显著提升速度:
@Component
@Slf4j
public class PromptOptimizer {
/**
* Prompt压缩:去除冗余,保留语义
* 经验:合理的Prompt压缩能减少20-40%的token数
*/
public String optimizePrompt(String originalPrompt) {
String optimized = originalPrompt
// 去除多余空行
.replaceAll("\n{3,}", "\n\n")
// 去除行首尾空格
.lines()
.map(String::trim)
.filter(line -> !line.isEmpty())
.collect(Collectors.joining("\n"))
// 压缩连续空格
.replaceAll("[ \t]+", " ");
int originalTokens = estimateTokens(originalPrompt);
int optimizedTokens = estimateTokens(optimized);
if (originalTokens != optimizedTokens) {
log.debug("Prompt optimized: {} -> {} estimated tokens",
originalTokens, optimizedTokens);
}
return optimized;
}
/**
* 动态System Prompt:根据任务类型选择最精简的System Prompt
* 避免每次都发送一个通用的长System Prompt
*/
public String buildSystemPrompt(String taskType) {
return switch (taskType) {
case "SUMMARIZE" -> "你是摘要专家。用一段话总结,不超过100字。";
case "TRANSLATE" -> "你是翻译专家。只输出译文,不要解释。";
case "CLASSIFY" -> "分类以下文本。只输出分类结果,格式:类别名称。";
case "QA" -> "根据提供的信息回答问题。如无相关信息,说明无法回答。";
default -> "你是AI助手,简洁回答用户问题。";
};
}
/**
* 粗略估算token数(英文约1token/4字符,中文约1token/1.5字符)
*/
private int estimateTokens(String text) {
int chineseChars = (int) text.chars()
.filter(c -> c >= 0x4E00 && c <= 0x9FFF)
.count();
int otherChars = text.length() - chineseChars;
return (int)(chineseChars / 1.5 + otherChars / 4.0);
}
}优化技术5:请求批处理
单独调用LLM很贵,批处理可以显著提升吞吐量和降低单位成本:
@Service
@Slf4j
public class BatchLLMService {
private final ChatLanguageModel batchModel;
// 批处理队列
private final BlockingQueue<BatchTask> taskQueue = new LinkedBlockingQueue<>();
private final Map<String, CompletableFuture<String>> pendingResults = new ConcurrentHashMap<>();
@PostConstruct
public void startBatchProcessor() {
Thread batchThread = Thread.ofVirtual().start(this::batchProcessLoop);
}
/**
* 提交单个任务,返回Future
*/
public CompletableFuture<String> submit(String prompt) {
String taskId = UUID.randomUUID().toString();
CompletableFuture<String> future = new CompletableFuture<>();
pendingResults.put(taskId, future);
taskQueue.offer(new BatchTask(taskId, prompt));
// 设置超时
future.orTimeout(30, TimeUnit.SECONDS)
.exceptionally(e -> {
pendingResults.remove(taskId);
return null;
});
return future;
}
/**
* 批处理循环:每100ms或累积20个任务时触发一次批处理
*/
private void batchProcessLoop() {
while (!Thread.currentThread().isInterrupted()) {
try {
List<BatchTask> batch = new ArrayList<>();
// 等最多100ms,或者凑够20个任务
BatchTask first = taskQueue.poll(100, TimeUnit.MILLISECONDS);
if (first == null) continue;
batch.add(first);
taskQueue.drainTo(batch, 19); // 最多再取19个
processBatch(batch);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (Exception e) {
log.error("Batch processing error", e);
}
}
}
private void processBatch(List<BatchTask> batch) {
log.debug("Processing batch of {} tasks", batch.size());
// 构造批处理Prompt:把多个任务合并成一个请求
String batchPrompt = buildBatchPrompt(batch);
try {
String response = batchModel.generate(batchPrompt);
Map<String, String> results = parseBatchResponse(response, batch);
// 分发结果
batch.forEach(task -> {
CompletableFuture<String> future = pendingResults.remove(task.getTaskId());
if (future != null) {
String result = results.getOrDefault(task.getTaskId(), "处理失败");
future.complete(result);
}
});
} catch (Exception e) {
// 批处理失败,所有任务都失败
batch.forEach(task -> {
CompletableFuture<String> future = pendingResults.remove(task.getTaskId());
if (future != null) {
future.completeExceptionally(e);
}
});
}
}
private String buildBatchPrompt(List<BatchTask> batch) {
StringBuilder sb = new StringBuilder();
sb.append("请依次处理以下").append(batch.size()).append("个任务,每个任务用===分隔:\n\n");
for (int i = 0; i < batch.size(); i++) {
sb.append("任务").append(i + 1).append(":\n");
sb.append(batch.get(i).getPrompt()).append("\n===\n");
}
sb.append("请按任务编号顺序输出结果,格式:\n");
sb.append("结果1:[任务1的回答]\n结果2:[任务2的回答]\n...");
return sb.toString();
}
private Map<String, String> parseBatchResponse(String response, List<BatchTask> batch) {
Map<String, String> results = new HashMap<>();
for (int i = 0; i < batch.size(); i++) {
String marker = "结果" + (i + 1) + ":";
int start = response.indexOf(marker);
if (start >= 0) {
start += marker.length();
int end = response.indexOf("结果" + (i + 2) + ":", start);
String result = end > 0
? response.substring(start, end).trim()
: response.substring(start).trim();
results.put(batch.get(i).getTaskId(), result);
}
}
return results;
}
}优化效果量化
把优化前后的数据对比整理成表格,这是汇报效果时最直接的方式:
@Component
@Slf4j
public class LatencyBenchmark {
/**
* 定期运行延迟基准测试
* 用标准化的测试用例,确保优化效果可比较
*/
@Scheduled(cron = "0 0 3 * * *") // 每天凌晨3点
public void runBenchmark() {
List<BenchmarkCase> testCases = loadStandardTestCases();
Map<String, Long> latencies = new HashMap<>();
for (BenchmarkCase tc : testCases) {
long start = System.currentTimeMillis();
// 执行标准测试
try {
tracedAIService.processRequest(tc.getRequest());
latencies.put(tc.getCaseId(), System.currentTimeMillis() - start);
} catch (Exception e) {
latencies.put(tc.getCaseId(), -1L); // 失败标记
}
}
// 计算P50/P90/P99
List<Long> sortedLatencies = latencies.values().stream()
.filter(v -> v > 0)
.sorted()
.collect(Collectors.toList());
if (!sortedLatencies.isEmpty()) {
int size = sortedLatencies.size();
log.info("Benchmark results: P50={}ms, P90={}ms, P99={}ms",
sortedLatencies.get(size / 2),
sortedLatencies.get((int)(size * 0.9)),
sortedLatencies.get((int)(size * 0.99))
);
}
}
}我们实际的优化历程
把真实项目的优化过程说一下,比单纯讲技术更有参考价值:
优化前:P50=2.8s,P99=7.2s,LLM调用占92%的时间
第一轮优化(语义缓存):高频相似请求的缓存命中率达到38%,这部分请求延迟降到50ms以内。整体P50降到1.8s,P99=6.1s。
第二轮优化(并行化):把原本串行的"鉴权→特征获取→Prompt构建"改为并行,节省了约400ms。P50=1.4s,P99=5.2s。
第三轮优化(流式输出):首个token出现时间降到300ms以内,用户感知体验大幅改善。实际总延迟没变,但用户满意度评分从3.2/5升到了4.1/5——延迟数字一样,用户体验天差地别。
第四轮优化(Prompt压缩):把平均token数从1200压缩到780,LLM调用延迟下降35%。P50=1.1s,P99=3.8s。
第五轮优化(批处理):对于后台任务(不需要实时响应的),改用批处理,LLM API费用降低了60%。
一共5轮优化,总P50从2.8s降到1.1s,P99从7.2s降到3.8s。每一轮都先测量再优化,每一轮都有数据支撑。这才是正确的优化姿势。
最重要的一句话:不要在没有数据的情况下优化。链路追踪是一切优化工作的前提。
