JDK21结构化并发:StructuredTaskScope让并发代码更可靠
JDK21结构化并发:StructuredTaskScope让并发代码更可靠
适读人群:有Java并发编程经验、关注JDK21新特性的后端工程师 | 阅读时长:约15分钟 | 文章类型:特性详解+实战
开篇故事
并发编程最让人头疼的不是写逻辑,而是处理失败。
我曾经写过这样的代码:
CompletableFuture<UserInfo> userFuture = CompletableFuture.supplyAsync(() -> getUser(userId));
CompletableFuture<OrderList> orderFuture = CompletableFuture.supplyAsync(() -> getOrders(userId));
CompletableFuture<AccountInfo> accountFuture = CompletableFuture.supplyAsync(() -> getAccount(userId));
// 等三个都完成
CompletableFuture.allOf(userFuture, orderFuture, accountFuture).join();
UserInfo user = userFuture.get();
OrderList orders = orderFuture.get();
AccountInfo account = accountFuture.get();这段代码的问题:如果getOrders失败抛异常了,getUser和getAccount还在跑,占用资源。我需要手动处理取消逻辑,代码很繁琐。
JDK21的结构化并发(Structured Concurrency)就是为了解决这个问题。
一、什么是结构化并发
核心思想
结构化并发借鉴了"结构化编程"的思路:就像if/while/for让代码流程有清晰的进入和退出点,结构化并发让并发任务有清晰的开始和结束边界。
核心约束:在一个作用域(Scope)里启动的子任务,必须在该作用域结束前全部完成(或被取消)。父任务不会在子任务完成之前退出。
两种内置策略
JDK21提供了两种StructuredTaskScope:
- ShutdownOnFailure:任一任务失败,取消所有其他任务(对应"全部成功才行"的场景)
- ShutdownOnSuccess:任一任务成功,取消所有其他任务(对应"有一个成功就行"的场景)
二、核心原理深挖
与CompletableFuture的对比
任务生命周期保证
结构化并发给了一个重要保证:子任务的生命周期不会超过父任务。
这意味着:
- 资源泄漏的风险降低(子任务不会在你不知道的时候继续运行)
- 错误处理集中(在scope.join()之后统一处理)
- 可观测性更好(线程dump能反映结构化的层次)
三、完整代码实现
代码一:两种策略的完整演示
package com.laozhang.jdk21;
import java.util.concurrent.StructuredTaskScope;
import java.util.concurrent.StructuredTaskScope.ShutdownOnFailure;
import java.util.concurrent.StructuredTaskScope.ShutdownOnSuccess;
/**
* JDK21结构化并发完整演示
* 注:StructuredTaskScope在JDK21仍是Preview特性,需要--enable-preview
* JDK25预计正式GA
*/
public class StructuredConcurrencyDemo {
// 模拟的数据类
record UserProfile(Long id, String name) {}
record OrderSummary(Long userId, int orderCount) {}
record AccountBalance(Long userId, long balance) {}
// 模拟IO调用
static UserProfile fetchUser(Long userId) throws InterruptedException {
Thread.sleep(100); // 模拟100ms延迟
return new UserProfile(userId, "张三");
}
static OrderSummary fetchOrders(Long userId) throws InterruptedException {
Thread.sleep(150);
// 模拟偶尔失败
if (userId % 2 == 0) throw new RuntimeException("订单服务暂时不可用");
return new OrderSummary(userId, 5);
}
static AccountBalance fetchAccount(Long userId) throws InterruptedException {
Thread.sleep(80);
return new AccountBalance(userId, 9900L);
}
// ===== 策略1:ShutdownOnFailure(全部成功才继续)=====
record DashboardData(UserProfile user, OrderSummary orders, AccountBalance account) {}
static DashboardData fetchDashboard(Long userId) throws Exception {
try (var scope = new ShutdownOnFailure()) {
// fork三个并发任务(每个任务是一个虚拟线程)
StructuredTaskScope.Subtask<UserProfile> userTask =
scope.fork(() -> fetchUser(userId));
StructuredTaskScope.Subtask<OrderSummary> orderTask =
scope.fork(() -> fetchOrders(userId));
StructuredTaskScope.Subtask<AccountBalance> accountTask =
scope.fork(() -> fetchAccount(userId));
// 等待所有任务完成,或任一失败时取消其他
scope.join() // 等待
.throwIfFailed(); // 如果有失败,抛出异常
// 到这里,所有任务都成功了
return new DashboardData(
userTask.get(),
orderTask.get(),
accountTask.get()
);
}
}
// ===== 策略2:ShutdownOnSuccess(最快的一个成功就够了)=====
// 比如:同时查主库和备库,谁先回来用谁
static String fetchFromFastestSource(String data) throws Exception {
try (var scope = new ShutdownOnSuccess<String>()) {
scope.fork(() -> {
Thread.sleep(200); // 主库慢一点
return "主库数据: " + data;
});
scope.fork(() -> {
Thread.sleep(50); // 备库快
return "备库数据: " + data;
});
scope.join(); // 等待最快的那个完成
return scope.result(); // 返回第一个成功的结果
}
}
public static void main(String[] args) {
System.out.println("=== ShutdownOnFailure:全部成功才继续 ===");
// 奇数ID(订单服务不会失败)
try {
long start = System.currentTimeMillis();
DashboardData data = fetchDashboard(1L);
long elapsed = System.currentTimeMillis() - start;
System.out.println("成功! 耗时: " + elapsed + "ms(并发执行,约150ms)");
System.out.println("用户: " + data.user().name());
System.out.println("订单: " + data.orders().orderCount() + "笔");
System.out.println("余额: " + data.account().balance() + "分");
} catch (Exception e) {
System.out.println("失败: " + e.getMessage());
}
System.out.println();
// 偶数ID(订单服务会失败)
try {
fetchDashboard(2L); // 会抛异常
} catch (Exception e) {
System.out.println("ID=2时失败(预期内): " + e.getMessage());
System.out.println("其他任务已被自动取消");
}
System.out.println("\n=== ShutdownOnSuccess:最快成功的就够了 ===");
try {
long start = System.currentTimeMillis();
String result = fetchFromFastestSource("用户信息");
long elapsed = System.currentTimeMillis() - start;
System.out.println("获取到: " + result + ",耗时: " + elapsed + "ms(约50ms)");
} catch (Exception e) {
System.out.println("失败: " + e.getMessage());
}
}
}代码二:与CompletableFuture对比和迁移指南
package com.laozhang.jdk21;
import java.util.concurrent.*;
/**
* CompletableFuture vs StructuredTaskScope:对比与迁移
*/
public class ConcurrencyMigrationGuide {
// ===== CompletableFuture写法(旧)=====
static String fetchDataWithCF(String userId) throws Exception {
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
CompletableFuture<String> profileFuture = CompletableFuture.supplyAsync(
() -> "用户" + userId, executor);
CompletableFuture<String> ordersFuture = CompletableFuture.supplyAsync(
() -> {
try { Thread.sleep(100); } catch (InterruptedException e) {}
return "订单" + userId;
}, executor);
try {
// 等两个都完成
CompletableFuture.allOf(profileFuture, ordersFuture).get(2, TimeUnit.SECONDS);
return profileFuture.get() + " + " + ordersFuture.get();
} catch (TimeoutException e) {
// 超时要手动取消(CompletableFuture的cancel不一定停止任务)
profileFuture.cancel(true);
ordersFuture.cancel(true);
throw new RuntimeException("超时");
}
// 问题:executor要手动关闭,错误传播要手动处理,代码复杂
}
// ===== StructuredTaskScope写法(新)=====
static String fetchDataWithScope(String userId) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
var profile = scope.fork(() -> "用户" + userId);
var orders = scope.fork(() -> {
Thread.sleep(100);
return "订单" + userId;
});
scope.join().throwIfFailed();
return profile.get() + " + " + orders.get();
}
// scope自动关闭,自动取消,自动传播错误
// 代码意图更清晰
}
// ===== 适合保留CompletableFuture的场景 =====
// 1. 事件驱动的响应式编程(thenApply/thenCompose链式)
// 2. 不需要等待完成,只是触发后不管的"fire and forget"
// 3. 已有成熟的响应式框架(Spring WebFlux等)
// ===== 适合用StructuredTaskScope的场景 =====
// 1. 聚合多个下游调用(等全部完成)
// 2. 竞速查询(取最快的结果)
// 3. 需要明确超时和取消语义的场景
public static void main(String[] args) throws Exception {
System.out.println("=== 对比演示 ===");
long start = System.currentTimeMillis();
String cfResult = fetchDataWithCF("001");
System.out.println("CF结果: " + cfResult + " (" + (System.currentTimeMillis() - start) + "ms)");
start = System.currentTimeMillis();
String scopeResult = fetchDataWithScope("001");
System.out.println("Scope结果: " + scopeResult + " (" + (System.currentTimeMillis() - start) + "ms)");
System.out.println("\n=== 使用建议 ===");
System.out.println("新代码优先用StructuredTaskScope(更安全、更清晰)");
System.out.println("事件驱动/响应式场景保留CompletableFuture");
System.out.println("注意:JDK21的StructuredTaskScope仍是Preview,JDK25预计GA");
}
}四、踩坑实录
坑1:忘记调用scope.join(),任务直接被关闭
报错现象:
java.lang.IllegalStateException: Owner did not join before closing根本原因:
StructuredTaskScope要求在关闭(try-with-resources自动关闭)之前必须调用join()。没有join()就关闭会抛异常。
具体解法:
try (var scope = new ShutdownOnFailure()) {
var task = scope.fork(...);
scope.join(); // 必须调用!
// ...
}坑2:JDK21需要--enable-preview才能使用StructuredTaskScope
报错现象:
error: StructuredTaskScope is a preview feature and is disabled by default根本原因:
StructuredTaskScope在JDK21是Preview特性,需要额外编译选项。
具体解法:
<!-- Maven编译配置 -->
<compilerArgs>
<arg>--enable-preview</arg>
</compilerArgs>
<release>21</release>
<!-- 运行时也需要 -->
<!-- java --enable-preview -jar app.jar -->或者等JDK25,预计正式GA。
坑3:在scope里启动的任务不能再启动自己的StructuredTaskScope
根本原因:
结构化并发要求严格的层次结构,一个scope里的子任务如果需要进一步并行,需要在子任务里创建新的scope,而不能共用父scope。
五、总结与延伸
结构化并发是JDK21对并发编程模型的一次重大改进,它和虚拟线程是天然的搭档。
核心价值:
- 安全:子任务生命周期由作用域保证,不会在后台"游荡"
- 简洁:错误处理、取消、等待逻辑都内置,不需要手动处理
- 可观测:结构化的线程关系在线程dump里清晰可见
当前状态(2024年):
- JDK19:首次引入Preview
- JDK21:第二个Preview
- JDK25(预计):正式GA
如果你现在在JDK21并且不怕用Preview特性,强烈建议在IO密集型的聚合查询场景试用。配合虚拟线程,能让并发代码既简洁又高效。
