Java 结构化并发实战——Java 21 StructuredTaskScope 改变了什么
Java 结构化并发实战——Java 21 StructuredTaskScope 改变了什么
适读人群:在 Java 里做并发编程、想了解 Java 21 新并发模型的工程师 | 阅读时长:约14分钟 | 核心价值:结构化并发的核心思想,以及它如何让并发代码更安全
在写这篇文章之前,我回想了一下自己这些年写并发代码踩过的坑,发现大部分都可以归结为一类问题:任务泄漏。
什么叫任务泄漏?就是你启动了一些异步任务,但在某种情况下(异常、超时、提前返回),这些任务没有被正确取消,在后台静悄悄地继续跑,消耗资源,甚至产生副作用。
举个具体例子:
// 这段代码有任务泄漏的风险
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> getUser(userId));
CompletableFuture<List<Order>> ordersFuture = CompletableFuture.supplyAsync(() -> getOrders(userId));
// 如果 userFuture 抛出异常,ordersFuture 还在继续执行
// 如果这个方法因为超时被调用方取消了,两个 future 也还在继续执行
User user = userFuture.get(); // 可能抛异常
List<Order> orders = ordersFuture.get();结构化并发(Structured Concurrency)就是为了解决这个问题:子任务的生命周期不能超过启动它们的父任务,就像 try-with-resources 里的资源,作用域结束时一定被关闭。
一、StructuredTaskScope 基础
StructuredTaskScope 是 Java 21 引入的(JEP 453,preview),用法类似 try-with-resources:
import java.util.concurrent.StructuredTaskScope;
public UserDetailVO getUserDetail(Long userId) throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// 启动两个子任务
StructuredTaskScope.Subtask<User> userTask =
scope.fork(() -> userRepository.findById(userId).orElseThrow());
StructuredTaskScope.Subtask<List<Order>> ordersTask =
scope.fork(() -> orderRepository.findByUserId(userId));
// 等待所有子任务完成,或者任一失败
scope.join().throwIfFailed();
// 到这里,两个任务都成功完成了
return UserDetailVO.builder()
.user(userTask.get())
.orders(ordersTask.get())
.build();
} // scope 关闭:如果还有未完成的任务,会自动取消
}关键点:scope.join() 会等待所有子任务完成,scope 关闭时会取消所有还没完成的子任务。
这意味着:不管是正常完成、抛出异常、还是方法被取消,所有子任务都会有明确的结局,不会泄漏。
二、两种内置的 Scope 策略
ShutdownOnFailure:任一失败就停止所有任务
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var task1 = scope.fork(() -> fetchUserData(userId));
var task2 = scope.fork(() -> fetchOrderData(userId));
var task3 = scope.fork(() -> fetchPointData(userId));
scope.join().throwIfFailed(); // 任一任务失败,整个操作失败
// 三个都成功才到这里
return buildResult(task1.get(), task2.get(), task3.get());
}ShutdownOnSuccess:任一成功就停止(取最快的结果)
// 场景:向多个数据源查询,取第一个返回的结果
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<User>()) {
scope.fork(() -> primaryCache.getUser(userId));
scope.fork(() -> secondaryCache.getUser(userId));
scope.fork(() -> database.getUser(userId));
scope.join(); // 等待第一个成功
return scope.result(); // 取得第一个成功的结果
}
// 其他两个任务在 scope 关闭时自动取消三、超时控制
public UserDetailVO getUserDetail(Long userId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var userTask = scope.fork(() -> userRepository.findById(userId).orElseThrow());
var ordersTask = scope.fork(() -> orderRepository.findByUserId(userId));
// 整体超时控制
scope.joinUntil(Instant.now().plusMillis(500)); // 500ms 超时
scope.throwIfFailed();
return UserDetailVO.builder()
.user(userTask.get())
.orders(ordersTask.get())
.build();
} catch (TimeoutException e) {
log.warn("获取用户详情超时: userId={}", userId);
throw new ServiceException("请求超时,请稍后重试");
}
}四、嵌套结构化并发
结构化并发可以嵌套,形成树状的任务结构:
public DashboardVO getDashboard(Long userId) throws Exception {
try (var outerScope = new StructuredTaskScope.ShutdownOnFailure()) {
// 任务1:获取用户概览(内部再并发)
var profileTask = outerScope.fork(() -> {
try (var innerScope = new StructuredTaskScope.ShutdownOnFailure()) {
var userTask = innerScope.fork(() -> getUser(userId));
var levelTask = innerScope.fork(() -> getUserLevel(userId));
innerScope.join().throwIfFailed();
return buildProfile(userTask.get(), levelTask.get());
}
});
// 任务2:获取订单统计(直接查询)
var statsTask = outerScope.fork(() -> getOrderStats(userId));
outerScope.join().throwIfFailed();
return DashboardVO.builder()
.profile(profileTask.get())
.stats(statsTask.get())
.build();
}
}这棵任务树有清晰的生命周期层级:外层 scope 关闭时,内层 scope(如果还在运行)也会被取消。
五、自定义 Scope 策略
可以继承 StructuredTaskScope 实现自定义策略:
// 策略:至少X个任务成功才算成功
public class ShutdownOnNSuccess<T> extends StructuredTaskScope<T> {
private final int requiredSuccesses;
private final List<T> results = new CopyOnWriteArrayList<>();
private volatile boolean done = false;
public ShutdownOnNSuccess(int n) {
this.requiredSuccesses = n;
}
@Override
protected void handleComplete(Subtask<? extends T> subtask) {
if (subtask.state() == Subtask.State.SUCCESS) {
results.add(subtask.get());
if (results.size() >= requiredSuccesses && !done) {
done = true;
shutdown(); // 达到目标数量,关闭 scope
}
}
}
public List<T> results() {
ensureOwnerAndJoined(); // 确保已经 join
return Collections.unmodifiableList(results);
}
}
// 使用:向3个节点发送请求,2个成功就认为成功(多数原则)
try (var scope = new ShutdownOnNSuccess<Boolean>(2)) {
scope.fork(() -> node1.replicate(data));
scope.fork(() -> node2.replicate(data));
scope.fork(() -> node3.replicate(data));
scope.join();
List<Boolean> successes = scope.results();
if (successes.size() < 2) {
throw new ReplicationException("复制失败,成功节点数不足");
}
}六、结构化并发 vs CompletableFuture
| 维度 | CompletableFuture | StructuredTaskScope |
|---|---|---|
| 任务泄漏 | 容易泄漏 | 自动防止(scope 关闭时取消) |
| 错误传播 | 需要手动处理 | 内置策略处理 |
| 超时控制 | 需要额外代码 | joinUntil() |
| 嵌套并发 | 代码复杂 | 自然的嵌套结构 |
| 调试 | 线程栈不直观 | 配合虚拟线程,栈更清晰 |
| 成熟度 | 非常成熟 | Java 21 preview |
七、结构化并发 + 虚拟线程
结构化并发和虚拟线程是天然的搭档:
// 用虚拟线程的 executor 启动子任务(默认就是虚拟线程)
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
// StructuredTaskScope 默认使用虚拟线程,可以启动大量任务
List<StructuredTaskScope.Subtask<ProductVO>> tasks = productIds.stream()
.map(id -> scope.fork(() -> productService.getProduct(id)))
.collect(Collectors.toList());
scope.join().throwIfFailed();
return tasks.stream()
.map(StructuredTaskScope.Subtask::get)
.collect(Collectors.toList());
}
// 即使 productIds 有1000个,也没问题,虚拟线程很轻量结构化并发目前还是 preview 特性,API 可能在后续版本有变化,但核心思想已经很清晰了。我在内部工具项目里试用过,在需要"并发多个任务,要么全成功要么全失败"这种场景里,比 CompletableFuture.allOf() 的写法干净很多。
最后一篇,我会把这几篇的核心内容串起来,讲我在两个真实项目里的响应式 vs 虚拟线程的选型经历。
