Java CompletableFuture 异步编排——从回调地狱到链式编排的完整进化
Java CompletableFuture 异步编排——从回调地狱到链式编排的完整进化
适读人群:有Future使用经验、想做异步优化的Java开发者 | 阅读时长:约18分钟 | 核心价值:掌握CompletableFuture的完整API体系,能在实际项目中设计出高效的异步编排方案
那段让我羞耻的代码
2020年刚入行不久,我接手了一个商品详情页的接口优化需求。原来的代码是这样的:
public ProductDetailVO getProductDetail(Long productId) {
// 串行调用,每个约50ms,总耗时250ms
ProductInfo product = productService.getById(productId);
PriceInfo price = priceService.getPrice(productId);
StockInfo stock = stockService.getStock(productId);
List<Review> reviews = reviewService.getReviews(productId, 5);
SellerInfo seller = sellerService.getSeller(product.getSellerId());
return buildVO(product, price, stock, reviews, seller);
}五个RPC调用串行执行,接口耗时250ms,P99更高。我当时想到的"优化方案"是用Future:
// 我的第一版"优化"...真的挺惨的
Future<ProductInfo> pFuture = executor.submit(() -> productService.getById(productId));
Future<PriceInfo> prFuture = executor.submit(() -> priceService.getPrice(productId));
Future<StockInfo> sFuture = executor.submit(() -> stockService.getStock(productId));
Future<List<Review>> rFuture = executor.submit(() -> reviewService.getReviews(productId, 5));
ProductInfo product = pFuture.get(); // 阻塞等待
Future<SellerInfo> seFuture = executor.submit(() -> sellerService.getSeller(product.getSellerId()));
PriceInfo price = prFuture.get();
StockInfo stock = sFuture.get();
List<Review> reviews = rFuture.get();
SellerInfo seller = seFuture.get();代码更丑了,而且有明显的问题:没有超时处理,任何一个调用hang住整个接口就挂了;错误处理全靠catch散落各处;sellerInfo依赖product,逻辑不清晰。
后来用CompletableFuture重写,代码质量和性能都有了质的飞跃。今天把这整套东西讲清楚。
CompletableFuture 的核心设计理念
CompletableFuture是Java 8引入的,它的设计理念来自函数式编程:
- 异步计算的结果可以被组合:就像数学里的函数组合 f(g(x))
- 链式调用:每个操作返回新的CompletableFuture,可以继续链式处理
- 声明式描述计算图:先描述"做什么",再触发执行
创建 CompletableFuture 的几种方式
// 1. 有返回值的异步任务(推荐指定线程池!)
ExecutorService pool = Executors.newFixedThreadPool(10);
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(
() -> fetchData(), pool
);
// 2. 无返回值的异步任务
CompletableFuture<Void> cf2 = CompletableFuture.runAsync(
() -> sendEmail(), pool
);
// 3. 已完成的CompletableFuture(用于测试或直接返回结果)
CompletableFuture<String> cf3 = CompletableFuture.completedFuture("hello");
// 4. 手动完成
CompletableFuture<String> cf4 = new CompletableFuture<>();
// 在某个回调里
cf4.complete("result"); // 正常完成
cf4.completeExceptionally(new RuntimeException("error")); // 异常完成重要警告: supplyAsync和runAsync不指定线程池时,使用ForkJoinPool.commonPool()。在Web应用中,这个公共池是全局共享的,如果异步任务很多或执行时间长,会影响其他使用公共池的代码(包括并行Stream)。生产环境必须指定专用线程池。
完整 API 体系与链式编排
1. 变换结果:thenApply / thenApplyAsync
// thenApply:在当前线程或前一个任务的线程中执行(可能不开新线程)
CompletableFuture<Integer> cf = CompletableFuture
.supplyAsync(() -> "hello world")
.thenApply(String::length); // 把String变成Integer
// thenApplyAsync:保证在指定线程池里异步执行
CompletableFuture<Integer> cf2 = CompletableFuture
.supplyAsync(() -> "hello world", pool)
.thenApplyAsync(s -> {
// 这里在 pool 中执行
return s.length();
}, pool);thenApply vs thenApplyAsync 的区别:
thenApply:如果前一个任务已完成,当前线程直接执行;如果没完成,在前一个任务的线程执行thenApplyAsync:保证在线程池里异步执行,不阻塞当前线程
2. 消费结果:thenAccept / thenRun
CompletableFuture<Void> cf = CompletableFuture
.supplyAsync(() -> queryData())
.thenAccept(data -> saveToCache(data)); // 消费结果,无返回值
CompletableFuture<Void> cf2 = CompletableFuture
.supplyAsync(() -> processOrder())
.thenRun(() -> sendNotification()); // 不关心上游结果,只是在完成后做一件事3. 组合两个任务:thenCompose / thenCombine
// thenCompose:前一个任务的结果作为参数,创建下一个CompletableFuture(串行依赖)
CompletableFuture<SellerInfo> cf = CompletableFuture
.supplyAsync(() -> productService.getById(productId), pool)
.thenComposeAsync(product ->
CompletableFuture.supplyAsync(
() -> sellerService.getSeller(product.getSellerId()), pool
), pool
);
// thenCombine:两个独立任务都完成后,合并它们的结果(并行后合并)
CompletableFuture<PriceInfo> priceFuture = CompletableFuture
.supplyAsync(() -> priceService.getPrice(productId), pool);
CompletableFuture<StockInfo> stockFuture = CompletableFuture
.supplyAsync(() -> stockService.getStock(productId), pool);
CompletableFuture<String> combined = priceFuture.thenCombine(
stockFuture,
(price, stock) -> price.getAmount() + " | " + stock.getCount()
);4. 等待所有/任意完成:allOf / anyOf
// allOf:等待所有任务完成
CompletableFuture<Void> all = CompletableFuture.allOf(
future1, future2, future3
);
// allOf本身返回Void,需要手动获取各future的结果
all.thenRun(() -> {
try {
String r1 = future1.get();
String r2 = future2.get();
String r3 = future3.get();
// 三个结果都拿到了
} catch (Exception e) { /* handle */ }
});
// anyOf:任意一个完成即返回
CompletableFuture<Object> any = CompletableFuture.anyOf(
future1, future2, future3
);
any.thenAccept(result -> System.out.println("最快的结果: " + result));5. 异常处理
CompletableFuture<String> cf = CompletableFuture
.supplyAsync(() -> {
if (Math.random() > 0.5) throw new RuntimeException("随机失败");
return "success";
})
// exceptionally:异常时提供默认值
.exceptionally(ex -> {
System.out.println("异常: " + ex.getMessage());
return "default value";
})
// handle:无论成功失败都执行(类似 finally)
.handle((result, ex) -> {
if (ex != null) {
return "error: " + ex.getMessage();
}
return "ok: " + result;
})
// whenComplete:不改变结果,只做副作用(日志、监控等)
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("任务失败", ex);
} else {
metrics.record(result);
}
});完整实战代码:商品详情页异步编排
import java.util.concurrent.*;
/**
* 商品详情页异步编排完整实现
* 并行获取商品基本信息、价格、库存、评论
* seller信息依赖商品信息,串行获取
* 任意服务超时100ms则降级
*/
@Service
public class ProductDetailServiceV2 {
private final ExecutorService pool = new ThreadPoolExecutor(
20, 40, 60, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(200),
new NamedThreadFactory("product-detail"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
public ProductDetailVO getProductDetail(Long productId) {
long startTime = System.currentTimeMillis();
// 1. 并行发起:价格、库存、评论(不依赖其他数据)
CompletableFuture<PriceInfo> priceFuture = CompletableFuture
.supplyAsync(() -> priceService.getPrice(productId), pool)
.orTimeout(100, TimeUnit.MILLISECONDS)
.exceptionally(ex -> PriceInfo.defaultPrice()); // 超时降级
CompletableFuture<StockInfo> stockFuture = CompletableFuture
.supplyAsync(() -> stockService.getStock(productId), pool)
.orTimeout(100, TimeUnit.MILLISECONDS)
.exceptionally(ex -> StockInfo.unknown());
CompletableFuture<List<Review>> reviewFuture = CompletableFuture
.supplyAsync(() -> reviewService.getReviews(productId, 5), pool)
.orTimeout(100, TimeUnit.MILLISECONDS)
.exceptionally(ex -> Collections.emptyList());
// 2. 获取商品基本信息(优先级最高,不降级)
CompletableFuture<ProductInfo> productFuture = CompletableFuture
.supplyAsync(() -> productService.getById(productId), pool)
.orTimeout(200, TimeUnit.MILLISECONDS);
// 3. seller信息依赖product,用thenCompose串行
CompletableFuture<SellerInfo> sellerFuture = productFuture
.thenComposeAsync(product ->
CompletableFuture.supplyAsync(
() -> sellerService.getSeller(product.getSellerId()), pool
).orTimeout(100, TimeUnit.MILLISECONDS)
.exceptionally(ex -> SellerInfo.anonymous()),
pool
);
// 4. 等待所有任务完成,组装结果
try {
CompletableFuture.allOf(
priceFuture, stockFuture, reviewFuture,
productFuture, sellerFuture
).get(300, TimeUnit.MILLISECONDS); // 总超时300ms
ProductDetailVO vo = new ProductDetailVO();
vo.setProduct(productFuture.join());
vo.setPrice(priceFuture.join());
vo.setStock(stockFuture.join());
vo.setReviews(reviewFuture.join());
vo.setSeller(sellerFuture.join());
long elapsed = System.currentTimeMillis() - startTime;
log.info("商品详情获取完成,耗时 {}ms", elapsed);
return vo;
} catch (TimeoutException e) {
log.warn("商品详情获取超时,productId={}", productId);
return buildPartialVO(productFuture, priceFuture, stockFuture,
reviewFuture, sellerFuture);
} catch (Exception e) {
log.error("商品详情获取异常", e);
throw new ServiceException("获取商品详情失败");
}
}
/**
* 超时时,返回已完成的部分数据,未完成的用默认值
*/
private ProductDetailVO buildPartialVO(
CompletableFuture<ProductInfo> productFuture,
CompletableFuture<PriceInfo> priceFuture,
CompletableFuture<StockInfo> stockFuture,
CompletableFuture<List<Review>> reviewFuture,
CompletableFuture<SellerInfo> sellerFuture) {
ProductDetailVO vo = new ProductDetailVO();
vo.setProduct(getOrDefault(productFuture, null));
vo.setPrice(getOrDefault(priceFuture, PriceInfo.defaultPrice()));
vo.setStock(getOrDefault(stockFuture, StockInfo.unknown()));
vo.setReviews(getOrDefault(reviewFuture, Collections.emptyList()));
vo.setSeller(getOrDefault(sellerFuture, SellerInfo.anonymous()));
return vo;
}
private <T> T getOrDefault(CompletableFuture<T> future, T defaultValue) {
if (future.isDone() && !future.isCompletedExceptionally()) {
return future.join();
}
return defaultValue;
}
}三个踩坑实录
坑一:用 get() 等待,把异步变成了同步
现象: 接口用了CompletableFuture,但耗时和串行代码一样,完全没有性能提升。
原因: 每提交一个任务就立刻get()等结果,任务根本没有并行执行的机会。
// 错误:每行都在等待,其实是串行
ProductInfo product = CompletableFuture.supplyAsync(() -> getProduct(id)).get();
PriceInfo price = CompletableFuture.supplyAsync(() -> getPrice(id)).get();
// 等于没有异步化,白写
// 正确:先全部提交,再统一等待
CompletableFuture<ProductInfo> pf = CompletableFuture.supplyAsync(() -> getProduct(id), pool);
CompletableFuture<PriceInfo> pr = CompletableFuture.supplyAsync(() -> getPrice(id), pool);
// 此时两个任务已在并行执行
CompletableFuture.allOf(pf, pr).get(); // 统一等待坑二:异常被吞掉,接口返回错误数据却没有报错日志
现象: 某个服务调用失败,但接口返回200,数据是默认值,没有任何异常日志。
原因: exceptionally里只返回了默认值,没有记录日志,导致异常被静默处理。
// 错误:异常被吞掉
.exceptionally(ex -> defaultValue); // 没有日志,排查困难
// 正确:记录日志后再返回默认值
.exceptionally(ex -> {
log.warn("调用XXX服务失败,使用默认值。原因: {}", ex.getMessage());
return defaultValue;
});坑三:thenApply 和 thenApplyAsync 混用导致线程池切换开销
现象: 某个异步链路中,CPU使用率正常,但延迟远超预期,profiling发现大量线程上下文切换。
原因: 链路中混用了thenApply(可能在调用方线程执行)和thenApplyAsync(在线程池执行),导致任务在不同线程间频繁切换,上下文切换开销超过了计算本身。
// 问题写法:线程频繁切换
.supplyAsync(() -> step1(), pool) // pool线程
.thenApply(r -> step2(r)) // 可能在调用方线程
.thenApplyAsync(r -> step3(r), pool) // 切回pool线程
.thenApply(r -> step4(r)) // 可能又切回调用方
// 清晰写法:统一在pool里执行
.supplyAsync(() -> step1(), pool)
.thenApplyAsync(r -> step2(r), pool)
.thenApplyAsync(r -> step3(r), pool)
.thenApplyAsync(r -> step4(r), pool)小结
CompletableFuture的核心价值在于把异步计算图用代码清晰表达出来:
thenApply/thenCompose:描述串行依赖thenCombine/allOf:描述并行合并exceptionally/handle:描述降级逻辑orTimeout:描述超时控制(Java 9+)
从250ms串行到80ms并行,我们的商品详情页接口用这套方案实现了3倍多的性能提升,并且代码的可读性反而更好了——因为异步关系被清晰地表达出来了,而不是藏在一堆callback里。
