Java 21 Structured Concurrency:替代CompletableFuture的场景分析
Java 21 Structured Concurrency:替代CompletableFuture的场景分析
适读人群:Java中高级工程师 | 阅读时长:约16分钟 | 技术栈:Java 21、StructuredTaskScope、CompletableFuture
开篇故事
说到CompletableFuture,我有个一直没忘的噩梦般的经历。
那是2021年,我加入一个项目,接手一个"并行聚合服务"。代码大概是这样的:
CompletableFuture<UserInfo> userFuture = CompletableFuture.supplyAsync(...)
.thenApply(...)
.thenCompose(...)
.exceptionally(...);
CompletableFuture<List<Order>> orderFuture = ...
CompletableFuture.allOf(userFuture, orderFuture, productFuture, logFuture, configFuture)
.thenApply(ignored -> {
// 在这里收集结果,但如果任何一个future失败了...
// 错误处理逻辑散落在各处
})
.exceptionally(e -> {
// 这个catch能捕获什么?不确定
return null;
});某天生产上出了个Bug,一个下游服务超时了,但聚合接口没有报错,也没有返回超时的服务数据,就静默地返回了一个残缺的结果。排查了两小时,发现是某个.exceptionally()把错误吞了。
CompletableFuture的问题不是功能不够,而是在复杂场景下很容易写出"看起来对,实则不对"的错误处理代码。取消传播、错误传播、生命周期管理,都需要手动精心处理。
Java 21的结构化并发(Structured Concurrency)就是为了解决这个问题而来的。
一、核心问题:非结构化并发的三大痛点
痛点一:错误处理不可靠
// 这段代码的问题是什么?
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> fetchUser());
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> fetchOrder());
CompletableFuture.allOf(f1, f2).join();
String user = f1.get(); // 如果f1失败,这里抛异常
String order = f2.get(); // 如果f2失败,这里抛异常
// 问题:f1失败时,f2依然在运行(浪费资源)
// f2失败时,f1依然在运行(资源泄漏)
// 没有任何机制保证"一个失败,其他自动取消"痛点二:生命周期泄漏
// 这段代码有资源泄漏
public Response handleRequest() {
CompletableFuture<User> userTask = executor.submit(this::fetchUser);
CompletableFuture<List<Order>> orderTask = executor.submit(this::fetchOrders);
// 如果这里抛出异常,或者当前线程被中断
// userTask和orderTask依然在后台运行!
// 没有人管理它们的生命周期
return combine(userTask.get(), orderTask.get());
}痛点三:调试困难
CompletableFuture的执行是非结构化的——任务在不同线程池的不同线程上运行,调用链在日志里完全不连续,出了问题很难追踪。
二、原理深度解析
2.1 结构化并发的核心思想
结构化并发的核心思想很简单:并发任务的生命周期,应该在创建它们的代码块内完成。就像结构化编程保证了代码的单一入口和出口,结构化并发保证了并发任务的明确边界。
关键保证:
- 子任务的生命周期严格限制在scope的
try块内 - scope关闭时,所有子任务要么完成,要么被取消
- 父任务的取消/中断自动传播到所有子任务
2.2 两种内置策略
三、完整代码实现
3.1 基础用法对比
// === CompletableFuture 写法 ===
public UserDashboard getDashboardOld(Long userId) {
CompletableFuture<UserInfo> userFuture = CompletableFuture
.supplyAsync(() -> userClient.getUser(userId), executor);
CompletableFuture<List<Order>> orderFuture = CompletableFuture
.supplyAsync(() -> orderClient.getOrders(userId), executor);
CompletableFuture<UserStats> statsFuture = CompletableFuture
.supplyAsync(() -> statsClient.getStats(userId), executor);
return CompletableFuture.allOf(userFuture, orderFuture, statsFuture)
.thenApply(ignored -> {
try {
return new UserDashboard(
userFuture.get(),
orderFuture.get(),
statsFuture.get()
);
} catch (Exception e) {
throw new RuntimeException(e);
}
})
.exceptionally(e -> UserDashboard.error())
.join();
}
// === Structured Concurrency 写法 ===
public UserDashboard getDashboardNew(Long userId) throws InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 提交子任务
StructuredTaskScope.Subtask<UserInfo> userTask =
scope.fork(() -> userClient.getUser(userId));
StructuredTaskScope.Subtask<List<Order>> orderTask =
scope.fork(() -> orderClient.getOrders(userId));
StructuredTaskScope.Subtask<UserStats> statsTask =
scope.fork(() -> statsClient.getStats(userId));
// 等待所有任务完成(任一失败则取消其余)
scope.join().throwIfFailed();
// 此时所有任务都成功了,安全获取结果
return new UserDashboard(
userTask.get(),
orderTask.get(),
statsTask.get()
);
}
// scope关闭时,自动确保所有子任务完成
}代码对比一目了然:Structured Concurrency版本更线性、更直观,错误处理更明确。
3.2 竞速模式(第一个成功就行)
/**
* 多数据源查询:任意一个返回结果即可
* 典型场景:查询多个缓存,第一个命中就返回
*/
public String getFromAnywhere(String key) throws InterruptedException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<String>()) {
// 并行查询多个数据源
scope.fork(() -> localCache.get(key)); // 本地缓存,最快
scope.fork(() -> redisCache.get(key)); // Redis缓存
scope.fork(() -> database.get(key)); // 数据库,最慢
// 等待第一个成功的结果
scope.join();
return scope.result(); // 返回第一个成功的结果
}
// 一旦有一个成功,其他任务自动取消
}3.3 自定义Scope策略
/**
* 自定义策略:超过半数子任务成功才认为整体成功
* 用于投票、分布式共识等场景
*/
public class MajorityScope<T> extends StructuredTaskScope<T> {
private final int threshold;
private final List<T> results = new CopyOnWriteArrayList<>();
private volatile boolean done = false;
public MajorityScope(int totalTasks) {
this.threshold = totalTasks / 2 + 1;
}
@Override
protected void handleComplete(StructuredTaskScope.Subtask<? extends T> subtask) {
if (subtask.state() == StructuredTaskScope.Subtask.State.SUCCESS) {
results.add(subtask.get());
if (results.size() >= threshold && !done) {
done = true;
shutdown(); // 达到阈值,关闭scope
}
}
}
public List<T> results() {
return Collections.unmodifiableList(results);
}
}
// 使用自定义Scope
public ConsensusResult getConsensus(String key) throws InterruptedException {
try (var scope = new MajorityScope<String>(5)) {
for (Node node : clusterNodes) {
scope.fork(() -> node.get(key));
}
scope.join();
return new ConsensusResult(scope.results());
}
}3.4 与ScopedValue结合(替代ThreadLocal)
// Java 21新特性:ScopedValue 是虚拟线程友好的ThreadLocal替代品
// 配合结构化并发,实现请求上下文的安全传递
public static final ScopedValue<RequestContext> REQUEST_CONTEXT = ScopedValue.newInstance();
public Response handleRequest(HttpRequest request) throws InterruptedException {
RequestContext ctx = new RequestContext(request.getUserId(), request.getTenantId());
// ScopedValue在整个scope内都可见,包括fork出的子任务
return ScopedValue.where(REQUEST_CONTEXT, ctx).call(() -> {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var userTask = scope.fork(() -> {
// 子任务可以访问REQUEST_CONTEXT
Long userId = REQUEST_CONTEXT.get().getUserId();
return userService.getUser(userId);
});
var auditTask = scope.fork(() -> {
RequestContext context = REQUEST_CONTEXT.get();
return auditService.log(context); // 审计日志自动带上请求上下文
});
scope.join().throwIfFailed();
return new Response(userTask.get());
}
});
}3.5 超时控制
/**
* 带超时的结构化并发
*/
public UserDashboard getDashboardWithTimeout(Long userId)
throws InterruptedException, TimeoutException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var userTask = scope.fork(() -> userClient.getUser(userId));
var orderTask = scope.fork(() -> orderClient.getOrders(userId));
// 设置整体超时
scope.joinUntil(Instant.now().plusSeconds(3));
scope.throwIfFailed();
return new UserDashboard(userTask.get(), orderTask.get());
} catch (TimeoutException e) {
log.warn("获取用户仪表板超时, userId={}", userId);
throw e;
}
// scope关闭时,超时未完成的任务自动取消
}四、工程实践:什么时候用,什么时候不用
4.1 适合结构化并发的场景
4.2 CompletableFuture的不可替代场景
结构化并发目前(Java 21)还是预览特性,API可能变化。而且它有一个本质限制:子任务必须在父scope的try块内完成。这意味着你无法把Future"返回出去"供其他地方使用。
CompletableFuture的场景:
- 需要把Future传递给调用方
- 需要复杂的链式转换(thenApply、thenCompose)
- 需要精细控制任务调度(指定executor)
- 响应式风格的异步组合
五、踩坑实录
坑一:忘记scope是预览特性,需要开启
# Java 21编译需要--enable-preview
javac --enable-preview --release 21 MyClass.java
# 运行也需要
java --enable-preview MyClass
# Maven配置
<configuration>
<compilerArgs>
<arg>--enable-preview</arg>
</compilerArgs>
<release>21</release>
</configuration>坑二:子任务的异常没有正确传播
// 错误:joinIfFailed()没有调用,子任务失败了也不知道
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(() -> riskyOperation());
scope.join(); // 仅等待完成
// 没有调用throwIfFailed()!
return task.get(); // 如果task失败,这里抛的是异常,但错误信息不完整
}
// 正确
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task = scope.fork(() -> riskyOperation());
scope.join().throwIfFailed(); // 链式调用,有失败就抛出
return task.get();
}坑三:在scope外调用subtask.get()
StructuredTaskScope.Subtask<String> leakedTask;
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
leakedTask = scope.fork(() -> fetchData());
scope.join().throwIfFailed();
}
// scope已关闭
// 这行代码可以工作,但这是不好的实践
// 因为已经破坏了结构化并发的约束
String result = leakedTask.get(); // 不推荐坑四:和Spring AOP代理的兼容性
结构化并发的子任务在独立的虚拟线程中运行,Spring的事务传播、安全上下文等通过ThreadLocal传递的信息,默认不会传递到子任务。
这和我们之前说的ScopedValue有关,同时也要注意Spring的@Transactional不能跨虚拟线程边界。
六、总结与个人判断
结构化并发是Java并发编程的一次真正进步,它把"正确的并发"变得更容易写,把"错误的并发"变得更难写。
我对它的评价:思想很好,API还在演进,现在可以在新项目中尝鲜,但生产环境要谨慎。
预览特性意味着Java 22、23可能还会改API,你现在写的代码可能需要改动。我建议等到Java 25(LTS版本预期支持结构化并发稳定特性)再在生产系统大规模使用。
但作为学习和理解并发思想的工具,现在就值得深入研究。它背后的思想——并发任务的生命周期应该有明确边界——这个思想本身就很有价值,即使你不用这个API,把这种思维用到CompletableFuture的设计中,也会让你的代码质量提升一个档次。
