第1705篇:CompletableFuture在并发AI调用中的高级用法——扇出、聚合与超时
第1705篇:CompletableFuture在并发AI调用中的高级用法——扇出、聚合与超时
讲一件挺典型的踩坑经历。
我们有个"AI综合报告"功能,给定一家公司名,需要同时查询:公司背景、财务分析、行业对比、舆情评分四个维度,每个维度都要调用一次AI,最后整合成一份报告。
最初的实现很直接:四次AI调用串行执行,每次平均2秒,总共8秒+。用户等得不耐烦,投诉率挺高的。
后来我把四个调用改成并发,用CompletableFuture协调,整体时间压到了2.5秒左右。但改的过程中也踩了不少坑,今天把这些经验完整写出来。
一、为什么AI场景需要并发调用
AI API调用的特点是:延迟高、CPU消耗低。一次gpt-4o的调用,首token延迟在0.5-3秒,生成完整响应要2-10秒,但这段时间你的Java线程几乎是在等待IO。
这正是并发的用武之地。如果多个AI调用之间没有依赖关系,完全可以同时发出去,总耗时取决于最慢的那个,而不是所有调用的累加。
串行:调用1(2s) → 调用2(3s) → 调用3(1.5s) → 调用4(2.5s) = 9秒
并发:max(2s, 3s, 1.5s, 2.5s) = 3秒CompletableFuture是Java 8+处理这类场景的标准工具,但它有不少细节和坑,用好了威力很大,用不好会导致线程泄漏、超时失控等问题。
二、基础回顾:CompletableFuture的核心方法
先快速过一遍关键方法,不细讲基础,重点在高级用法:
// 创建
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> callAI());
// 转换结果(不阻塞)
CompletableFuture<Integer> length = future.thenApply(String::length);
// 异步转换(结果本身也是异步的)
CompletableFuture<String> chained = future.thenCompose(text -> callAI(text));
// 两个future都完成后合并
CompletableFuture<String> combined = future1.thenCombine(future2, (a, b) -> a + b);
// 等待所有完成(不关心结果)
CompletableFuture.allOf(f1, f2, f3).join();
// 等待任意一个完成
CompletableFuture.anyOf(f1, f2, f3).join();
// 异常处理
future.exceptionally(ex -> "出错了: " + ex.getMessage());
future.handle((result, ex) -> ex != null ? "默认值" : result);这些是基础,下面进入实战。
三、扇出(Fan-Out)模式:并行多路AI调用
扇出是最常用的模式:把一个任务分解成多个独立的并行子任务。
@Service
public class CompanyAnalysisService {
private final ChatClient chatClient;
private final ExecutorService aiExecutor;
public CompanyAnalysisService(ChatClient chatClient) {
this.chatClient = chatClient;
// 专用线程池,不要用commonPool(会影响其他任务)
this.aiExecutor = Executors.newFixedThreadPool(
10, // 根据AI API的并发限制调整
r -> {
Thread t = new Thread(r, "ai-worker");
t.setDaemon(true);
return t;
}
);
}
// 综合分析报告Record
public record CompanyReport(
String companyName,
String background,
String financialAnalysis,
String industryComparison,
SentimentScore sentimentScore,
long totalDurationMs
) {
public record SentimentScore(int score, String reasoning) {}
}
public CompanyReport generateReport(String companyName) {
long start = System.currentTimeMillis();
// 四个并发AI调用
CompletableFuture<String> backgroundFuture = CompletableFuture.supplyAsync(
() -> analyzeBackground(companyName), aiExecutor
);
CompletableFuture<String> financialFuture = CompletableFuture.supplyAsync(
() -> analyzeFinancial(companyName), aiExecutor
);
CompletableFuture<String> industryFuture = CompletableFuture.supplyAsync(
() -> compareIndustry(companyName), aiExecutor
);
CompletableFuture<CompanyReport.SentimentScore> sentimentFuture =
CompletableFuture.supplyAsync(
() -> analyzeSentiment(companyName), aiExecutor
);
// 等待所有完成并聚合
// 注意:这里要用join()而不是get(),join()抛出的是unchecked exception
try {
CompletableFuture.allOf(
backgroundFuture, financialFuture, industryFuture, sentimentFuture
).join();
long duration = System.currentTimeMillis() - start;
return new CompanyReport(
companyName,
backgroundFuture.join(),
financialFuture.join(),
industryFuture.join(),
sentimentFuture.join(),
duration
);
} catch (CompletionException e) {
// 任何一个子任务抛了异常,allOf.join()会重新抛出
throw new AIAnalysisException("公司分析失败: " + e.getCause().getMessage(), e);
}
}
private String analyzeBackground(String company) {
return chatClient.prompt()
.user("简要介绍" + company + "的公司背景、创立历程和主要业务,200字以内")
.call().content();
}
private String analyzeFinancial(String company) {
return chatClient.prompt()
.user("分析" + company + "最近的财务状况和增长趋势,200字以内")
.call().content();
}
private String compareIndustry(String company) {
return chatClient.prompt()
.user("将" + company + "与同行业主要竞争对手对比,指出其竞争优劣势,200字以内")
.call().content();
}
private CompanyReport.SentimentScore analyzeSentiment(String company) {
String response = chatClient.prompt()
.user("评估市场和媒体对" + company + "的整体评价情绪,给出1-10分并说明理由," +
"返回JSON格式:{\"score\": 7, \"reasoning\": \"理由\"}")
.call().content();
// 解析JSON...(省略)
return new CompanyReport.SentimentScore(7, "示例");
}
@PreDestroy
public void shutdown() {
aiExecutor.shutdown();
}
}关键点:这里我专门创建了一个 aiExecutor 线程池,而不是用 ForkJoinPool.commonPool()。原因是:
- AI调用是IO密集型的长任务,会长期占用线程
- commonPool的线程数默认等于CPU核数-1,很容易被AI调用占满,影响其他任务
- 专用线程池可以根据AI API的并发限制来配置线程数
四、超时控制:不能让一个慢调用拖垮全局
AI API有时候会很慢,特别是在高峰期。如果一个调用30秒没有返回,而你用 .join() 等着,用户体验直接崩。
Java 9开始,CompletableFuture 有了 orTimeout 和 completeOnTimeout 方法:
@Service
public class TimeoutAwareAIService {
private final ChatClient chatClient;
private final ExecutorService aiExecutor;
// 带超时的单次调用
public String callWithTimeout(String prompt, long timeoutMs) {
return CompletableFuture.supplyAsync(
() -> chatClient.prompt().user(prompt).call().content(),
aiExecutor
)
.orTimeout(timeoutMs, TimeUnit.MILLISECONDS) // 超时抛出TimeoutException
.exceptionally(ex -> {
if (ex instanceof TimeoutException) {
log.warn("AI调用超时({}ms)", timeoutMs);
return "请求超时,请稍后重试";
}
throw new CompletionException(ex);
})
.join();
}
// 超时时使用默认值(不抛异常)
public String callWithFallback(String prompt, String fallback, long timeoutMs) {
return CompletableFuture.supplyAsync(
() -> chatClient.prompt().user(prompt).call().content(),
aiExecutor
)
.completeOnTimeout(fallback, timeoutMs, TimeUnit.MILLISECONDS)
// completeOnTimeout:超时后用fallback值完成,不抛异常
.join();
}
// 多个调用,每个都有独立超时,聚合时取成功的结果
public Map<String, String> callMultipleWithTimeout(
Map<String, String> prompts, long perCallTimeoutMs) {
Map<String, CompletableFuture<String>> futures = prompts.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> CompletableFuture.supplyAsync(
() -> chatClient.prompt()
.user(entry.getValue())
.call().content(),
aiExecutor
)
.completeOnTimeout(
"该维度分析超时,请单独重试",
perCallTimeoutMs,
TimeUnit.MILLISECONDS
)
));
// 等待所有完成(每个都有自己的超时兜底)
CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0])).join();
return futures.entrySet().stream()
.collect(Collectors.toMap(
Map.Entry::getKey,
entry -> entry.getValue().join()
));
}
}orTimeout vs completeOnTimeout 的区别:
orTimeout:超时抛出TimeoutException,任务以异常结束completeOnTimeout:超时用指定的默认值完成,任务正常结束
在AI场景里,我更常用 completeOnTimeout,因为某一个维度超时了,我还是希望把其他维度的结果返回给用户,而不是整个请求失败。
五、扇入聚合(Fan-In):多种合并策略
扇入是扇出的对立面:多个并发任务完成后,如何合并结果?有几种不同的策略:
public class FanInStrategies {
private final ExecutorService executor;
// 策略1:等所有完成,全部成功才返回
public <T> List<T> waitAll(List<CompletableFuture<T>> futures) {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
return futures.stream()
.map(CompletableFuture::join)
.toList();
}
// 策略2:等所有完成,部分失败也返回(失败的用Optional.empty()表示)
public <T> List<Optional<T>> waitAllTolerant(List<CompletableFuture<T>> futures) {
List<CompletableFuture<Optional<T>>> tolerantFutures = futures.stream()
.map(f -> f.handle((result, ex) ->
ex != null ? Optional.<T>empty() : Optional.of(result)))
.toList();
CompletableFuture.allOf(tolerantFutures.toArray(new CompletableFuture[0])).join();
return tolerantFutures.stream()
.map(CompletableFuture::join)
.toList();
}
// 策略3:取最快完成的N个结果
public <T> List<T> waitFastest(List<CompletableFuture<T>> futures, int n) {
// 用BlockingQueue收集完成的结果
BlockingQueue<T> results = new LinkedBlockingQueue<>();
futures.forEach(f -> f.thenAccept(results::offer));
List<T> collected = new ArrayList<>();
while (collected.size() < n) {
try {
T result = results.poll(30, TimeUnit.SECONDS);
if (result != null) {
collected.add(result);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
return collected;
}
// 策略4:任意一个成功就返回(其他取消,用于竞争调用/降级策略)
public <T> CompletableFuture<T> firstSuccessful(List<CompletableFuture<T>> futures) {
CompletableFuture<T> result = new CompletableFuture<>();
futures.forEach(f -> f.whenComplete((val, ex) -> {
if (ex == null) {
result.complete(val);
}
}));
// 如果全部失败
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.exceptionally(ex -> {
result.completeExceptionally(new Exception("所有AI调用都失败了"));
return null;
});
return result;
}
}六、竞争调用(Hedging):提高可靠性的高级技巧
Hedging(对冲)是一个很有用的高级技巧:同时或者错开很短时间向多个模型或者多个API端点发出相同请求,取最先成功返回的结果。
在AI场景里,当一个模型可能因为负载高而响应慢时,可以同时调用备用模型:
@Service
public class HedgedAIService {
private final ChatClient primaryClient; // gpt-4o
private final ChatClient secondaryClient; // claude-3-5-sonnet(备用)
private final ScheduledExecutorService scheduler;
private final ExecutorService aiExecutor;
// 对冲请求:先发给主模型,100ms后如果没返回就同时发给备用模型
public String hedgedCall(String prompt) {
CompletableFuture<String> primaryFuture = CompletableFuture.supplyAsync(
() -> primaryClient.prompt().user(prompt).call().content(),
aiExecutor
);
// 延迟100ms后启动备用请求
CompletableFuture<String> hedgeFuture = new CompletableFuture<>();
scheduler.schedule(() -> {
if (!primaryFuture.isDone()) {
// 主请求还没完成,启动备用
log.debug("启动对冲请求");
CompletableFuture.supplyAsync(
() -> secondaryClient.prompt().user(prompt).call().content(),
aiExecutor
).whenComplete((result, ex) -> {
if (ex == null) hedgeFuture.complete(result);
else hedgeFuture.completeExceptionally(ex);
});
}
}, 100, TimeUnit.MILLISECONDS);
// 取主请求和备用请求中最先成功的
return CompletableFuture.anyOf(primaryFuture, hedgeFuture)
.thenApply(o -> (String) o)
.orTimeout(15, TimeUnit.SECONDS)
.exceptionally(ex -> "服务暂时不可用")
.join();
}
}这个模式在P99延迟优化上效果很明显。主请求95%的情况下足够快,只有当它慢的时候备用请求才能抢先完成。额外的成本是偶尔会多发一次API调用。
七、有依赖的并发:DAG执行模式
前面的扇出都是完全独立的并行任务。实际业务里,任务之间往往有依赖关系,形成一个有向无环图(DAG):
任务A
├── 任务B(依赖A)
│ └── 任务D(依赖B和C)
└── 任务C(依赖A)CompletableFuture可以很自然地表达这种依赖:
@Service
public class DAGAIProcessor {
private final ChatClient chatClient;
private final ExecutorService executor;
// 多步骤报告生成:有依赖关系的并发任务
public String generateComplexReport(String topic) {
// 第一层:独立任务,可以并发
CompletableFuture<String> rawDataFuture = CompletableFuture.supplyAsync(
() -> gatherRawData(topic), executor
);
CompletableFuture<String> expertOpinionFuture = CompletableFuture.supplyAsync(
() -> gatherExpertOpinion(topic), executor
);
// 第二层:依赖第一层结果
// B依赖rawData
CompletableFuture<String> analysisFromDataFuture = rawDataFuture
.thenApplyAsync(rawData -> analyzeData(rawData), executor);
// C依赖rawData和expertOpinion(两个都需要)
CompletableFuture<String> synthesizedViewFuture = rawDataFuture
.thenCombineAsync(
expertOpinionFuture,
(rawData, expertOpinion) -> synthesize(rawData, expertOpinion),
executor
);
// 第三层:依赖第二层的两个结果
CompletableFuture<String> finalReportFuture = analysisFromDataFuture
.thenCombineAsync(
synthesizedViewFuture,
(analysis, synthesis) -> compileReport(analysis, synthesis),
executor
);
// 只需要等最终结果
return finalReportFuture
.orTimeout(60, TimeUnit.SECONDS)
.join();
}
private String gatherRawData(String topic) {
return chatClient.prompt()
.user("收集关于" + topic + "的原始数据和事实信息")
.call().content();
}
private String gatherExpertOpinion(String topic) {
return chatClient.prompt()
.user("收集领域专家对" + topic + "的主流观点")
.call().content();
}
private String analyzeData(String rawData) {
return chatClient.prompt()
.user("对以下数据进行深度分析:\n" + rawData)
.call().content();
}
private String synthesize(String rawData, String expertOpinion) {
return chatClient.prompt()
.user("综合以下数据和专家观点:\n数据:" + rawData + "\n专家观点:" + expertOpinion)
.call().content();
}
private String compileReport(String analysis, String synthesis) {
return chatClient.prompt()
.user("基于以下分析编写完整报告:\n分析:" + analysis + "\n综合:" + synthesis)
.call().content();
}
}八、限流:控制并发AI调用的速率
AI API有速率限制(RPM: requests per minute, TPM: tokens per minute)。如果并发调用太多,会触发429错误。需要一个信号量来控制并发度:
@Service
public class RateLimitedAIService {
private final ChatClient chatClient;
private final Semaphore concurrencySemaphore;
private final ExecutorService executor;
public RateLimitedAIService(ChatClient chatClient) {
this.chatClient = chatClient;
// 控制最大并发数为5
this.concurrencySemaphore = new Semaphore(5);
this.executor = Executors.newCachedThreadPool(
r -> new Thread(r, "ai-rate-limited")
);
}
public CompletableFuture<String> callAsync(String prompt) {
return CompletableFuture.supplyAsync(() -> {
try {
// 获取许可,如果超出并发限制则等待
boolean acquired = concurrencySemaphore.tryAcquire(5, TimeUnit.SECONDS);
if (!acquired) {
throw new AIRateLimitException("获取并发许可超时,系统繁忙");
}
try {
return chatClient.prompt().user(prompt).call().content();
} finally {
concurrencySemaphore.release();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new AIServiceException("被中断", e);
}
}, executor);
}
// 批量调用,自动控制速率
public List<String> callBatch(List<String> prompts) {
List<CompletableFuture<String>> futures = prompts.stream()
.map(this::callAsync)
.toList();
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.orTimeout(120, TimeUnit.SECONDS)
.join();
return futures.stream()
.map(f -> f.exceptionally(ex -> "调用失败: " + ex.getMessage()).join())
.toList();
}
}九、指数退避重试
AI API的瞬时错误(503、429)很常见,加个重试逻辑是必要的:
public class RetryableAICall {
private final ChatClient chatClient;
private final ExecutorService executor;
private final ScheduledExecutorService scheduler;
// 带指数退避的重试
public CompletableFuture<String> callWithRetry(
String prompt, int maxRetries) {
return callWithRetryInternal(prompt, maxRetries, 0, 1000);
}
private CompletableFuture<String> callWithRetryInternal(
String prompt, int maxRetries, int attempt, long delayMs) {
return CompletableFuture.supplyAsync(
() -> chatClient.prompt().user(prompt).call().content(),
executor
).handle((result, ex) -> {
if (ex == null) {
return CompletableFuture.completedFuture(result);
}
if (attempt >= maxRetries) {
CompletableFuture<String> failed = new CompletableFuture<>();
failed.completeExceptionally(
new AIServiceException("重试" + maxRetries + "次后仍然失败", ex));
return failed;
}
// 判断是否值得重试
if (!isRetryable(ex)) {
CompletableFuture<String> failed = new CompletableFuture<>();
failed.completeExceptionally(ex);
return failed;
}
log.warn("AI调用失败,{}ms后第{}次重试", delayMs, attempt + 1);
// 延迟后重试(指数退避)
CompletableFuture<String> retryFuture = new CompletableFuture<>();
long nextDelay = Math.min(delayMs * 2, 30000); // 最大30秒
scheduler.schedule(() -> {
callWithRetryInternal(prompt, maxRetries, attempt + 1, nextDelay)
.whenComplete((retryResult, retryEx) -> {
if (retryEx != null) retryFuture.completeExceptionally(retryEx);
else retryFuture.complete(retryResult);
});
}, delayMs, TimeUnit.MILLISECONDS);
return retryFuture;
}).thenCompose(Function.identity()); // 展开嵌套的CompletableFuture
}
private boolean isRetryable(Throwable ex) {
// 429 Too Many Requests 和 503 Service Unavailable 值得重试
// 400 Bad Request(参数错误)不值得重试
return ex.getMessage() != null && (
ex.getMessage().contains("429") ||
ex.getMessage().contains("503") ||
ex.getMessage().contains("timeout")
);
}
}十、整体流程架构
小结
CompletableFuture在AI并发调用里的核心用法:
- 扇出:
supplyAsync+allOf并行执行独立AI调用 - 超时控制:
orTimeout抛异常,completeOnTimeout用默认值兜底 - 容错聚合:
handle把异常转为Optional,部分失败不影响整体 - 对冲请求:延迟后启动备用请求,取最快成功的
- DAG依赖:
thenCombineAsync表达有依赖的并发任务图 - 限流:Semaphore控制最大并发数,避免触发API限制
- 重试:指数退避+ScheduledExecutor实现异步重试
最重要的一条经验:一定要用专用线程池,不要用commonPool。AI调用是长时间IO,会堵住线程池,影响系统其他部分。
