Project Reactor 核心——Mono、Flux、操作符,Java 工程师的理解路径
Project Reactor 核心——Mono、Flux、操作符,Java 工程师的理解路径
适读人群:有 Java 基础、想搞懂响应式编程底层逻辑的工程师 | 阅读时长:约16分钟 | 核心价值:用 Java 工程师熟悉的视角,把 Reactor 最核心的概念讲清楚
我学 Project Reactor 的过程挺曲折的。第一遍看文档,感觉每个词都认识,连在一起完全不知道在说什么。什么"冷信号"、"热信号"、"背压"、"调度器"……整个人处于一种"我好像懂了但其实完全没懂"的状态。
后来我换了个思路:不从响应式编程的理论出发,而是从"我已经知道的 Java 知识"出发,往 Reactor 上靠。这样反而快了很多。
这篇文章就是按这个路径写的,专门给有 Java 经验但响应式基础薄弱的工程师。
一、从 Optional 和 CompletableFuture 理解 Mono
如果你用过 Optional,你就已经理解了 Mono 的一半。
Optional<T> 是对"可能有值、可能没有"的包装。你不直接操作值,而是操作这个包装器:
Optional<User> optUser = Optional.ofNullable(findUser(id));
Optional<String> name = optUser.map(User::getName); // 如果有值就转换
name.orElse("unknown"); // 如果没值用默认值Mono<T> 也是一种包装,包装的是"未来某个时刻会有0个或1个值"。操作方式几乎一样:
Mono<User> monoUser = userRepository.findById(id); // 还没执行,只是描述
Mono<String> name = monoUser.map(User::getName); // 还没执行,继续描述
name.block(); // 现在才真正执行,等待结果(生产环境别这么用)两者的区别:Optional 是同步的,值已经在那了;Mono 是异步的,值还在路上。
如果你用过 CompletableFuture,那你对 Mono 的异步本质应该更有感觉:
// CompletableFuture 风格
CompletableFuture<User> future = CompletableFuture.supplyAsync(() -> findUser(id));
CompletableFuture<String> nameFuture = future.thenApply(User::getName);
String name = nameFuture.get(); // 阻塞等待
// Mono 风格(本质上在做同样的事,但 API 更丰富)
Mono<User> monoUser = Mono.fromCallable(() -> findUser(id));
Mono<String> nameMono = monoUser.map(User::getName);
String name = nameMono.block(); // 同样是阻塞等待,但 Mono 有更多操作符Mono 可以简单理解为"功能更强的 CompletableFuture",但背后的实现思想不一样——Mono 是基于响应式流规范的,有背压支持,这个后面再说。
二、Flux:就是 Stream,但是异步的
理解 Flux 就更简单了:它就是 Java Stream,但是值是异步到达的。
// Java Stream:同步,所有数据在内存里
Stream<User> users = userList.stream();
Stream<String> names = users.map(User::getName);
names.collect(Collectors.toList()); // 终止操作,执行整个管道
// Flux:异步,数据可能来自网络、数据库、消息队列
Flux<User> usersFlux = userRepository.findAll(); // 数据库查询,异步的
Flux<String> namesFlux = usersFlux.map(User::getName);
namesFlux.collectList().block(); // 收集成 List,等待完成操作符的对应关系基本是一对一的:
| Java Stream | Reactor 对应 | 说明 |
|---|---|---|
map() | map() | 同步转换 |
flatMap() | flatMap() | 异步转换,展开内层 |
filter() | filter() | 过滤 |
reduce() | reduce() | 归约 |
collect() | collectList() / collect() | 聚合 |
limit() | take() | 取前N个 |
skip() | skip() | 跳过前N个 |
当然 Flux 有很多 Stream 没有的操作符,比如 buffer()、window()、groupBy(),这些是为流式处理专门设计的。
三、最重要的一个概念:冷信号 vs 热信号
这个概念我当时看了好几遍才真正理解,给你举个生活类比。
冷信号(Cold Publisher):像 Netflix 的点播电影。你订阅的时候,从第一帧开始放,每个订阅者都是从头开始的独立播放。
Mono<User> coldMono = Mono.fromCallable(() -> {
System.out.println("正在查数据库..."); // 每次订阅都会执行一次
return userRepository.findById(1L);
});
// 第一次订阅
coldMono.subscribe(u -> System.out.println("订阅者1: " + u.getName()));
// 第二次订阅
coldMono.subscribe(u -> System.out.println("订阅者2: " + u.getName()));
// "正在查数据库..." 会打印两次,查了两次数据库热信号(Hot Publisher):像电视直播。已经在播了,你进来只能看到你进来之后的内容,不会从头播。
// 把冷信号转成热信号
Flux<Long> hotFlux = Flux.interval(Duration.ofSeconds(1))
.publish() // 转成 ConnectableFlux(热信号)
.autoConnect(); // 有第一个订阅者时开始发射
// 1秒后加入的订阅者,就错过了前1秒的数据大部分情况下你用的都是冷信号——数据库查询、HTTP 请求这些都是冷的,每次订阅就重新查一次。
热信号常见于:WebSocket 推送、Kafka 消费、事件总线这些场景,数据是"实时发生的",不是按需查的。
四、map vs flatMap,这里是最容易搞错的地方
很多人刚开始写 Reactor,会在 map 和 flatMap 之间纠结,甚至用错。
简单的区分方法:
map:你的转换函数返回普通值,用mapflatMap:你的转换函数返回另一个 Mono/Flux,用flatMap
// 场景1:把 User 转成 UserVO(同步转换),用 map
Mono<UserVO> vo = userMono.map(user -> UserVO.from(user));
// 场景2:根据 User 去查订单(异步查询,返回 Mono),用 flatMap
Mono<List<Order>> orders = userMono.flatMap(user ->
orderRepository.findByUserId(user.getId()) // 返回 Mono<List<Order>>
);
// 如果你用 map,得到的是 Mono<Mono<List<Order>>>,双重嵌套,类型不对flatMap 的"flat"就是"展平"的意思:把 Mono<Mono<T>> 展平成 Mono<T>。这和 Java Stream 的 flatMap 是完全一样的含义。
还有一个 flatMapMany,用在 Mono 里但结果是多个值的场景:
// 根据一个 userId 查多个订单
Flux<Order> orders = userMono.flatMapMany(user ->
orderRepository.findByUserIdFlux(user.getId()) // 返回 Flux<Order>
);五、操作符:我最常用的那些
不打算把所有操作符列一遍,那没意义,文档比我写得全。这里只说我日常用得最多的几个。
zip 和 zipWith:并行等待多个异步操作
// 并发查用户信息和用户的订单列表,两个请求同时发出,都完成后合并
Mono<UserDetailVO> detail = Mono.zip(
userRepository.findById(userId),
orderRepository.findLatestByUserId(userId)
).map(tuple -> {
User user = tuple.getT1();
Order latestOrder = tuple.getT2();
return UserDetailVO.builder()
.user(user)
.latestOrder(latestOrder)
.build();
});
// 注意:两个查询是并发执行的,不是顺序的,这比用 flatMap 链式调用快我在一个保险核心系统项目里用过这个,把三个下游服务并发调用从串行 380ms 优化到了并行 127ms,效果很明显。
defaultIfEmpty 和 switchIfEmpty:处理空值
// 查不到用户时返回默认值
Mono<User> user = userRepository.findById(id)
.defaultIfEmpty(User.anonymous()); // 简单默认值
// 查不到时执行另一个异步操作(比如查缓存兜底)
Mono<User> user2 = userRepository.findById(id)
.switchIfEmpty(cacheService.getUser(id)); // 返回 Mono<User>timeout:防止下游慢查询拖垮整个链
Mono<Order> order = orderService.getOrder(orderId)
.timeout(Duration.ofMillis(500)) // 超过500ms就超时
.onErrorReturn(TimeoutException.class, Order.empty());
// 不要让一个慢接口拖垮整个请求,这个比在 MVC 里设超时更优雅retry 和 retryWhen:自动重试
// 简单重试3次
Mono<User> user = userRepository.findById(id)
.retry(3);
// 带退避的重试(更常用)
Mono<User> userWithBackoff = userRepository.findById(id)
.retryWhen(Retry.backoff(3, Duration.ofMillis(100))
.maxBackoff(Duration.ofSeconds(2)));
// 第1次重试等100ms,第2次等200ms,第3次最多等2000ms六、调度器:控制代码在哪个线程上跑
这个是我觉得 Reactor 里最需要理解清楚的概念之一。
Reactor 提供了几种内置调度器:
// Schedulers.parallel():CPU密集型任务,线程数=CPU核数
// 注意:这里不能有任何阻塞调用,否则会把整个调度器卡死
Flux.range(1, 1000)
.parallel()
.runOn(Schedulers.parallel())
.map(i -> heavyCompute(i))
.sequential()
.subscribe();
// Schedulers.boundedElastic():IO密集型、阻塞操作
// 线程数有上限(默认CPU核数×10),防止无限创建线程
Mono.fromCallable(() -> jdbcTemplate.query("select..."))
.subscribeOn(Schedulers.boundedElastic()); // 注意:是 subscribeOn
// Schedulers.single():单线程,适合需要顺序执行的场景
// Schedulers.immediate():在当前线程执行(默认)publishOn 和 subscribeOn 是两个容易搞混的方法:
Mono.fromCallable(() -> {
System.out.println("source: " + Thread.currentThread().getName());
return fetchData();
})
.subscribeOn(Schedulers.boundedElastic()) // 影响上游(source)的执行线程
.map(data -> {
System.out.println("map1: " + Thread.currentThread().getName()); // elastic 线程
return transform(data);
})
.publishOn(Schedulers.parallel()) // 从这里开始切换线程
.map(data -> {
System.out.println("map2: " + Thread.currentThread().getName()); // parallel 线程
return data;
});简单记忆:subscribeOn 影响整个链的"起点"线程;publishOn 影响它之后的操作用哪个线程。
七、订阅和错误处理
Reactor 的链不会自己执行,需要有人订阅。
Mono<User> userMono = userRepository.findById(id);
// 方式1:block(),阻塞等待(测试用,生产慎用)
User user = userMono.block();
// 方式2:subscribe(),非阻塞订阅
userMono.subscribe(
user -> handleSuccess(user), // 成功回调
error -> handleError(error), // 错误回调
() -> System.out.println("完成") // 完成回调
);
// 方式3:在 WebFlux 控制器里直接返回 Mono,框架负责订阅
@GetMapping("/user/{id}")
public Mono<UserVO> getUser(@PathVariable Long id) {
return userRepository.findById(id).map(UserVO::from);
// WebFlux 会在合适的时候订阅这个 Mono
}错误处理的操作符:
Mono<User> userMono = userRepository.findById(id)
.onErrorReturn(new User()) // 出错返回默认值
.onErrorResume(e -> fallbackService.getUser(id)) // 出错切换到另一个 Mono
.onErrorMap(DbException.class, // 转换异常类型
e -> new ServiceException("数据库查询失败", e))
.doOnError(e -> log.error("查用户失败", e)); // 不处理,只做副作用(如日志)八、一个完整的例子把上面的东西串起来
给你看一个接近真实业务的例子:查用户详情,需要并发获取用户基本信息、最近订单、积分余额,任一服务超时则降级:
@Service
public class UserDetailService {
public Mono<UserDetailVO> getUserDetail(Long userId) {
// 三个请求并发发出
Mono<UserInfo> userInfoMono = userService.getUserInfo(userId)
.timeout(Duration.ofMillis(300))
.onErrorResume(e -> Mono.just(UserInfo.empty())); // 超时降级
Mono<List<Order>> ordersMono = orderService.getRecentOrders(userId, 5)
.timeout(Duration.ofMillis(500))
.onErrorResume(e -> Mono.just(Collections.emptyList()));
Mono<Integer> pointsMono = pointService.getBalance(userId)
.timeout(Duration.ofMillis(200))
.onErrorReturn(0); // 积分查不到就给0,不影响主流程
// zip 并发等待三个结果
return Mono.zip(userInfoMono, ordersMono, pointsMono)
.map(tuple -> UserDetailVO.builder()
.userInfo(tuple.getT1())
.recentOrders(tuple.getT2())
.pointBalance(tuple.getT3())
.build())
.doOnSuccess(vo -> log.info("用户详情查询完成: userId={}", userId))
.doOnError(e -> log.error("用户详情查询失败: userId={}", userId, e));
}
}这段代码如果用 Spring MVC + CompletableFuture 也能实现,但 Reactor 的表达力更强,错误处理更自然,调度控制也更细致。
掌握这些基础之后,WebFlux 的控制器层就是水到渠成的事了。下一篇我打算写 WebFlux + R2DBC 的实战,响应式数据库访问是 WebFlux 落地的关键一环,坑也挺多的。
