CompletableFuture异步编排:thenCompose、thenCombine与异常处理
CompletableFuture异步编排:thenCompose、thenCombine与异常处理
适读人群:有Future使用经验、想写出优雅异步代码的Java工程师 | 阅读时长:约16分钟
开篇故事
2021年底,我们做一个电商首页的数据聚合接口,需要同时调用:用户信息服务(约50ms)、商品推荐服务(约120ms)、优惠券服务(约80ms)、库存服务(约60ms)。
第一版代码用了最传统的方式:串行调用,总耗时 = 50+120+80+60 = 310ms。
第二版用Future并行:
Future<User> userFuture = executor.submit(() -> userService.get(userId));
Future<List<Product>> productFuture = executor.submit(() -> productService.recommend(userId));
// ...
User user = userFuture.get(); // 阻塞等待并行后耗时约120ms(取最慢的那个),提升明显。但问题来了——Future.get()是阻塞的,提交任务的主线程在调用.get()时什么都不能干;而且四个Future的结果聚合、异常处理,代码写得极其难看。
第三版,我换成了CompletableFuture,代码量减少了40%,逻辑清晰了一倍,异常处理也终于像样了。
今天把CompletableFuture最常用也最容易搞混的几个API讲清楚:thenCompose、thenCombine,以及异常处理三剑客。
一、CompletableFuture的核心设计
1.1 为什么Future不够用
Future<T>的问题:
- 只能通过
get()阻塞等待,无法注册回调 - 多个Future的结果组合非常麻烦(嵌套调用)
- 异常处理(
try-catch get())割裂了异步链路 - 无法手动完成(
complete())
CompletableFuture<T>实现了Future<T>和CompletionStage<T>两个接口,CompletionStage定义了几十个链式操作方法,让异步流程的编排变得优雅。
1.2 thenApply、thenCompose、thenCombine的区别
这三个方法经常被混淆:
| 方法 | 场景 | 输入 | 输出 |
|---|---|---|---|
thenApply(fn) | 转换结果(同步函数) | T | U |
thenCompose(fn) | 链式异步(fn返回CF) | T | CompletableFuture<U> → U |
thenCombine(other, fn) | 合并两个CF的结果 | T, U | V |
thenCompose和thenApply的关键区别:
thenApply(x -> transform(x))→ 返回CompletableFuture<U>,U是transform的直接返回值thenCompose(x -> anotherAsync(x))→ anotherAsync返回CompletableFuture<U>,compose会"打平"嵌套,结果是CompletableFuture<U>而不是CompletableFuture<CompletableFuture<U>>
类比Java Stream:thenCompose相当于flatMap,thenApply相当于map。
二、异步编排核心机制解析
2.1 异常处理三种方式
方式1:exceptionally(仅处理异常,正常值透传)
cf.exceptionally(ex -> {
log.error("操作失败", ex);
return defaultValue; // 提供降级值
})方式2:handle(正常和异常都处理)
cf.handle((result, ex) -> {
if (ex != null) {
return defaultValue;
}
return transform(result);
})方式3:whenComplete(观察结果,不改变值)
cf.whenComplete((result, ex) -> {
if (ex != null) {
metrics.recordError();
} else {
metrics.recordSuccess();
}
// 不能修改结果,适合做日志/监控
})2.2 线程执行上下文:async后缀的意义
CompletableFuture的方法有三类版本:
thenApply(fn):在前一个任务的线程中执行fn(或调用者线程)thenApplyAsync(fn):在ForkJoinPool.commonPool()中执行fnthenApplyAsync(fn, executor):在指定线程池中执行fn
重要陷阱: 如果使用thenApply(无async),fn可能在任意线程执行,包括调用链创建者的线程。在Spring MVC这种场景下,如果fn运行在Tomcat请求线程,会阻塞HTTP请求处理。
生产中建议:明确指定executor,不要依赖commonPool(共享线程池,高负载下会互相竞争)。
三、完整代码实现
3.1 首页数据聚合:thenCombine并行调用
package com.laozhang.concurrent.future;
import java.util.List;
import java.util.concurrent.*;
/**
* CompletableFuture实战:电商首页数据聚合
*
* 同时获取:用户信息、商品推荐、优惠券、库存状态
* 任何一个失败时,降级处理(不影响整体响应)
*
* 测试环境:JDK 11
* 预期耗时:约120ms(取最慢的商品推荐服务)
*/
public class HomePageAggregator {
// 业务线程池(IO密集型,配置较大的线程数)
private static final ExecutorService IO_POOL = new ThreadPoolExecutor(
20, 50, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200),
r -> new Thread(r, "io-pool-" + System.currentTimeMillis()),
new ThreadPoolExecutor.CallerRunsPolicy()
);
// 模拟各下游服务
private User fetchUser(String userId) throws InterruptedException {
Thread.sleep(50);
return new User(userId, "张三");
}
private List<String> fetchRecommendations(String userId) throws InterruptedException {
Thread.sleep(120);
return List.of("iPhone 15", "MacBook Pro", "AirPods");
}
private List<String> fetchCoupons(String userId) throws InterruptedException {
Thread.sleep(80);
return List.of("满100减20", "9折券");
}
private boolean checkStock(String productId) throws InterruptedException {
Thread.sleep(60);
return true;
}
/**
* 聚合首页数据
*/
public HomePageData aggregate(String userId) {
long start = System.currentTimeMillis();
// 1. 并行发起用户信息和推荐列表的请求
CompletableFuture<User> userCF = CompletableFuture
.supplyAsync(() -> {
try { return fetchUser(userId); }
catch (InterruptedException e) { throw new RuntimeException(e); }
}, IO_POOL)
.exceptionally(ex -> {
System.out.println("[降级] 获取用户信息失败:" + ex.getMessage());
return new User(userId, "游客"); // 降级默认值
});
CompletableFuture<List<String>> productCF = CompletableFuture
.supplyAsync(() -> {
try { return fetchRecommendations(userId); }
catch (InterruptedException e) { throw new RuntimeException(e); }
}, IO_POOL)
.exceptionally(ex -> {
System.out.println("[降级] 获取推荐失败:" + ex.getMessage());
return List.of("热销商品1", "热销商品2"); // 降级热门推荐
});
CompletableFuture<List<String>> couponCF = CompletableFuture
.supplyAsync(() -> {
try { return fetchCoupons(userId); }
catch (InterruptedException e) { throw new RuntimeException(e); }
}, IO_POOL)
.exceptionally(ex -> {
System.out.println("[降级] 获取优惠券失败:" + ex.getMessage());
return List.of(); // 降级:空优惠券列表
});
// 2. 合并推荐和优惠券(两个独立的异步结果)
CompletableFuture<HomePageData> resultCF = userCF
.thenCombineAsync(productCF, (user, products) -> {
// 此时user和products都已完成
HomePageData data = new HomePageData();
data.setUser(user);
data.setProducts(products);
return data;
}, IO_POOL)
.thenCombineAsync(couponCF, (data, coupons) -> {
data.setCoupons(coupons);
return data;
}, IO_POOL);
// 3. 等待所有完成(设置超时,防止某个下游服务卡住)
try {
HomePageData result = resultCF.get(2, TimeUnit.SECONDS);
System.out.printf("首页数据聚合完成,耗时:%dms%n",
System.currentTimeMillis() - start);
return result;
} catch (TimeoutException e) {
System.out.println("聚合超时,返回部分数据");
return resultCF.getNow(new HomePageData()); // 超时取当前已完成的部分
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException("聚合失败", e);
}
}
// 数据类
static class User {
String id, name;
User(String id, String name) { this.id = id; this.name = name; }
@Override public String toString() { return "User{id=" + id + ",name=" + name + "}"; }
}
static class HomePageData {
User user;
List<String> products;
List<String> coupons;
void setUser(User u) { this.user = u; }
void setProducts(List<String> p) { this.products = p; }
void setCoupons(List<String> c) { this.coupons = c; }
@Override public String toString() {
return "HomePageData{user=" + user + ",products=" + products + ",coupons=" + coupons + "}";
}
}
public static void main(String[] args) throws InterruptedException {
HomePageAggregator aggregator = new HomePageAggregator();
HomePageData data = aggregator.aggregate("user123");
System.out.println("结果:" + data);
IO_POOL.shutdown();
}
}3.2 thenCompose:依赖链式异步调用
package com.laozhang.concurrent.future;
import java.util.concurrent.*;
/**
* thenCompose实战:依赖链式异步调用
*
* 场景:
* 1. 根据用户ID查询订单列表(异步)
* 2. 根据最新订单ID查询物流信息(异步,依赖步骤1的结果)
* 3. 根据物流信息查询预计到货时间(异步,依赖步骤2的结果)
*
* 使用thenCompose而不是thenApply的原因:
* 步骤2的输入依赖步骤1的输出,且步骤2本身返回CompletableFuture
* 如果用thenApply,会得到CompletableFuture<CompletableFuture<Logistics>>
* 用thenCompose,会打平嵌套,得到CompletableFuture<Logistics>
*/
public class OrderTrackingDemo {
private static final ExecutorService POOL = Executors.newFixedThreadPool(10);
// 模拟异步服务调用
private static CompletableFuture<String> getLatestOrderId(String userId) {
return CompletableFuture.supplyAsync(() -> {
sleep(50);
return "ORDER-" + userId + "-20240101";
}, POOL);
}
private static CompletableFuture<String> getLogisticsTrackingNumber(String orderId) {
return CompletableFuture.supplyAsync(() -> {
sleep(80);
if (orderId.contains("ERROR")) {
throw new RuntimeException("物流查询失败:" + orderId);
}
return "SF-1234567890";
}, POOL);
}
private static CompletableFuture<String> getEstimatedDelivery(String trackingNumber) {
return CompletableFuture.supplyAsync(() -> {
sleep(60);
return "预计2024-01-05 18:00前送达";
}, POOL);
}
/**
* 正确使用thenCompose链式调用
*/
public static void queryOrderTracking(String userId) {
long start = System.currentTimeMillis();
CompletableFuture<String> trackingInfo = getLatestOrderId(userId)
// thenCompose: 打平嵌套,orderId → CF<trackingNumber>
.thenCompose(orderId -> {
System.out.println("查到订单:" + orderId);
return getLogisticsTrackingNumber(orderId);
})
// 再次thenCompose: trackingNumber → CF<deliveryTime>
.thenCompose(trackingNumber -> {
System.out.println("查到运单号:" + trackingNumber);
return getEstimatedDelivery(trackingNumber);
})
// 处理整个链路的异常
.exceptionally(ex -> {
System.out.println("查询链路异常:" + ex.getMessage());
return "物流信息暂时无法获取,请稍后重试";
})
// 日志记录(不改变结果)
.whenComplete((result, ex) -> {
System.out.printf("查询耗时:%dms%n", System.currentTimeMillis() - start);
});
try {
String result = trackingInfo.get(5, TimeUnit.SECONDS);
System.out.println("最终结果:" + result);
} catch (Exception e) {
System.out.println("查询超时或失败:" + e.getMessage());
}
}
/**
* 错误示范:用thenApply处理返回CF的函数,会产生嵌套CF
*/
@SuppressWarnings("unused")
public static void wrongUsage(String userId) {
// 这里result的类型是 CompletableFuture<CompletableFuture<String>>
// 需要再次.get()才能拿到真正的值,非常awkward
CompletableFuture<CompletableFuture<String>> wrongResult =
getLatestOrderId(userId)
.thenApply(orderId -> getLogisticsTrackingNumber(orderId));
// 正确做法是thenCompose,不是thenApply!
}
private static void sleep(long ms) {
try { Thread.sleep(ms); } catch (InterruptedException e) { Thread.currentThread().interrupt(); }
}
public static void main(String[] args) throws InterruptedException {
System.out.println("=== 正常流程 ===");
queryOrderTracking("user123");
Thread.sleep(500);
System.out.println("\n=== 异常降级 ===");
queryOrderTracking("userERROR"); // 触发物流查询异常
POOL.shutdown();
}
}四、踩坑实录
坑1:thenApply中抛异常被吞掉
报错现象: thenApply里的代码抛了RuntimeException,但整个CompletableFuture链没有任何报错,调用get()也拿到了正常结果(其实是null)。
原因分析: 其实不是被吞掉了,而是被包装成了CompletionException存储在CF里。但如果后续链路有exceptionally处理了它,最终get()拿到的是exceptionally的返回值(可能是null或默认值),看起来像"没有报错"。
解法: 调用get()时会把ExecutionException抛出来。如果用join(),会抛CompletionException。不要用getNow(T valueIfAbsent)来"检查是否完成",这样会在CF未完成或失败时悄悄返回默认值。
// 这样能明确看到异常
try {
T result = cf.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause(); // 真正的异常
log.error("异步任务失败", cause);
}坑2:CompletableFuture.allOf()不能获取各个CF的结果
报错现象: CompletableFuture.allOf(cf1, cf2, cf3).join()等待完成,但不知道怎么拿到每个CF的结果。
原因分析: allOf()返回的是CompletableFuture<Void>,只表示"全部完成",不携带结果。需要等allOf完成后,分别从原来的CF取值。
CompletableFuture<String> cf1 = ...;
CompletableFuture<Integer> cf2 = ...;
CompletableFuture.allOf(cf1, cf2)
.thenApply(v -> {
// allOf完成后,cf1和cf2都已complete,join()不会阻塞
String r1 = cf1.join();
Integer r2 = cf2.join();
return r1 + " " + r2;
});坑3:在Spring WebMVC中使用CompletableFuture没有异步效果
报错现象: Controller返回CompletableFuture<ResponseEntity>,但Tomcat线程依然被占用,异步没有释放Tomcat线程的效果。
原因分析: Spring MVC支持返回CompletableFuture作为异步响应,但需要配置AsyncSupportedConfigurer。且如果CF里的任务运行在commonPool,而commonPool的线程也被Tomcat的DeferredResult机制需要,可能产生意外的线程依赖。
解法: 明确指定自定义线程池(thenApplyAsync(fn, myPool)),不用commonPool;Spring配置中开启异步支持并设置超时。
坑4:exceptionally只处理当前CF的异常,不处理之前已经处理过的异常
报错现象: 链路里有多个exceptionally,期望每个都能独立处理各自阶段的异常,但发现后面的exceptionally有时会拿到前面阶段的异常。
原因分析: exceptionally处理异常后,返回的是一个"正常完成"的新CF。如果exceptionally的回调本身抛出了异常,那个异常会传播到后面的exceptionally。但如果exceptionally成功返回了降级值,后面的exceptionally不会再触发(因为CF已经正常完成了)。
弄清楚链的结构:每个.thenXxx()或.exceptionally()都返回一个新的CF,而不是修改原来的CF。
五、总结与延伸
CompletableFuture最常用的操作模式:
| 场景 | 使用方式 |
|---|---|
| 单个异步任务 | supplyAsync(fn, executor) |
| 转换结果(同步) | thenApply(fn) |
| 链式异步(结果作为下一个异步的输入) | thenCompose(fn) |
| 并行两个CF,合并结果 | thenCombine(other, fn) |
| 等待所有完成 | allOf(cf1, cf2, ...) |
| 等待任意一个完成 | anyOf(cf1, cf2, ...) |
| 异常降级 | exceptionally(fn) |
| 正常+异常都处理 | handle(fn) |
| 只观察,不改变值 | whenComplete(fn) |
生产建议:
- 永远指定自定义executor,不用commonPool
- 给每个CF链加超时(
orTimeout(timeout, unit),JDK 9+) - 异常处理放在链的末尾,覆盖整条链路
- 避免在CF链里调用阻塞操作(如
get()),会造成死锁
