WebFlux 背压机制实战——数据流速不一致时怎么不把服务压垮
WebFlux 背压机制实战——数据流速不一致时怎么不把服务压垮
适读人群:了解 WebFlux 基础,需要处理高吞吐数据流的工程师 | 阅读时长:约13分钟 | 核心价值:背压的本质、实际场景、可落地的实现方案
背压(Backpressure)是响应式编程里讲得最多、但在实际项目里用对的人最少的概念之一。
我第一次接触背压是看文档,当时感觉懂了。但等到真正遇到问题,才意识到自己之前"懂的"其实是字面意思,根本没理解它解决的是什么场景。
触发我认真思考这个问题的,是一个数据导出服务。那个服务需要把数据库里的数据批量读出来,处理之后写到 CSV 文件,再压缩上传到 OSS。数据库里大概有几百万条记录。
最初的实现简单粗暴:
// 最初的实现,看起来很"响应式"
userRepository.findAll() // Flux<User>,从数据库流式读取
.map(user -> buildCsvRow(user))
.buffer(1000) // 凑1000条再写
.flatMap(batch -> csvWriter.writeBatch(batch))
.subscribe();跑起来发现:内存一直在涨,最终 OOM 重启。
问题出在哪?findAll() 从数据库读数据的速度,比 csvWriter.writeBatch() 写文件的速度快很多。数据库读出来的数据在内存里堆积,最终把内存撑爆了。
这就是背压要解决的问题。
一、背压的本质:生产者和消费者的速率不匹配
背压是一个来自流体力学的比喻:管道里的流体,如果下游堵了,压力会反向传导给上游,让上游减缓速率。
在数据流中:
- 生产者(Publisher):发出数据,可能很快
- 消费者(Subscriber):处理数据,可能很慢
- 背压:消费者告诉生产者"我能处理多少,你就发多少",不要把我淹没
响应式流规范(Reactive Streams,就是 java.util.concurrent.Flow 背后的那套接口)的核心机制就是背压:
// Reactive Streams 规范的核心接口(简化版)
interface Subscriber<T> {
void onSubscribe(Subscription s); // 订阅成功,拿到 Subscription
void onNext(T t); // 收到一条数据
void onError(Throwable t); // 出错
void onComplete(); // 完成
}
interface Subscription {
void request(long n); // 消费者主动告诉生产者:我准备好接收 n 条数据了
void cancel(); // 取消订阅
}关键是 request(n):消费者控制节奏,拉式(pull)而不是推式(push)。
二、WebFlux 默认的背压策略
在实际使用 Reactor 时,你不需要直接操作 Subscription,框架帮你处理了。但你需要理解几种背压策略:
策略1:ERROR(默认)
// 如果生产者产生数据的速度超过消费者请求的速度,抛出 OverflowException
Flux.interval(Duration.ofMillis(1)) // 每1ms发一条数据,非常快
.onBackpressureError() // 明确设置:溢出就报错
.subscribe(data -> {
Thread.sleep(100); // 消费者处理很慢,每100ms处理一条
process(data);
});
// 结果:很快就会抛出 reactor.core.Exceptions$OverflowException策略2:DROP(丢弃)
// 消费者来不及处理的数据直接丢弃
Flux.interval(Duration.ofMillis(1))
.onBackpressureDrop(dropped -> log.warn("数据被丢弃: {}", dropped))
.subscribe(data -> {
Thread.sleep(100);
process(data);
});
// 适合场景:数据丢失可接受的实时监控、日志采样等策略3:LATEST(只保留最新)
// 只保留最新的那条数据,旧的全部丢弃
Flux.interval(Duration.ofMillis(1))
.onBackpressureLatest()
.subscribe(data -> {
Thread.sleep(100);
process(data);
});
// 适合场景:传感器数据、股票行情,只关心最新状态策略4:BUFFER(缓冲)
// 把来不及处理的数据先缓冲,默认缓冲无限大(危险!)
Flux.interval(Duration.ofMillis(1))
.onBackpressureBuffer() // 无限缓冲,OOM 警告!
.subscribe(data -> {
Thread.sleep(100);
process(data);
});
// 实际应该设置缓冲上限
Flux.interval(Duration.ofMillis(1))
.onBackpressureBuffer(
10000, // 最多缓冲1万条
dropped -> log.warn("缓冲区满,丢弃: {}", dropped), // 满了之后的处理
BufferOverflowStrategy.DROP_OLDEST // 丢最老的
)
.subscribe(data -> process(data));三、回到那个数据导出的问题
现在用背压的视角重新看:
// 问题的根源:flatMap 默认并发度是 256,会预先 request 大量数据
userRepository.findAll()
.buffer(1000)
.flatMap(batch -> csvWriter.writeBatch(batch))
// flatMap 的内部会提前拉取数据,不等写操作完成
.subscribe();flatMap 默认会并发订阅 256 个内层 Publisher,这意味着它会同时请求大量上游数据。当写文件速度跟不上,数据就在内存里堆积。
修复方案一:降低 flatMap 的并发度
userRepository.findAll()
.buffer(1000)
.flatMap(
batch -> csvWriter.writeBatch(batch),
1 // 并发度设为1,变成串行:等上一批写完,再处理下一批
)
.subscribe();修复方案二:用 concatMap 代替 flatMap
// concatMap 就是并发度为1的 flatMap,保证顺序且完全串行
userRepository.findAll()
.buffer(1000)
.concatMap(batch -> csvWriter.writeBatch(batch))
.subscribe();修复方案三:控制上游的读取速度(更彻底的背压)
userRepository.findAll()
.buffer(1000)
.concatMap(batch ->
csvWriter.writeBatch(batch)
.delaySubscription(Duration.ofMillis(50)) // 稍微延迟,给 IO 喘气时间
)
.subscribe();我最终选的是方案二,串行写入,简单可靠。数据量几百万条,哪怕慢一点,稳定是第一位的。跑完花了大约17分钟,但内存一直稳定在300MB以内,没有任何问题。
四、限流:控制并发请求数
另一个背压的实际场景:你在调一个下游接口,但那个接口有 QPS 限制。你不能让 Flux 无限制地并发调。
// 场景:批量查询用户信息,下游接口限制 QPS 100
Flux.fromIterable(userIds) // 可能有几千个 ID
.flatMap(
id -> userApiClient.getUserInfo(id),
20 // 最多20个并发请求,不要打爆下游
)
.collectList()
.subscribe();如果需要更精确的 QPS 控制:
// 用 delayElements 控制发射速率:每10ms发一个请求(最多100 QPS)
Flux.fromIterable(userIds)
.delayElements(Duration.ofMillis(10)) // 限速
.flatMap(id -> userApiClient.getUserInfo(id))
.subscribe();或者用 window + delaySubscription 做窗口限流:
// 每100ms处理一批,每批最多10个并发
Flux.fromIterable(userIds)
.window(10) // 每10个为一批
.concatMap(batch ->
batch.flatMap(id -> userApiClient.getUserInfo(id))
.collectList()
.delaySubscription(Duration.ofMillis(100)) // 每批间隔100ms
)
.flatMap(Flux::fromIterable) // 展平 List
.subscribe();五、WebSocket 和 SSE 场景的背压
在推送场景(SSE、WebSocket)里,背压更重要,因为服务端可能推数据很快,但客户端处理慢。
// SSE 推送,服务端每秒生产100条数据,但客户端只能每秒处理10条
@GetMapping(value = "/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<DataVO>> streamData() {
return dataService.getDataStream()
// WebFlux SSE 会自动处理背压,客户端读慢了,服务端发慢点
// 但如果生产速度实在太快,需要显式处理
.onBackpressureDrop(data -> log.warn("SSE 客户端处理太慢,丢弃数据"))
.map(data -> ServerSentEvent.<DataVO>builder()
.data(data)
.build());
}六、背压监控
光处理还不够,你还得知道背压有没有真正生效,有没有数据丢失。
// 在关键位置加监控
Flux<Data> dataStream = sourceFlux
.doOnNext(d -> metrics.counter("producer.emitted").increment())
.onBackpressureDrop(d -> {
metrics.counter("backpressure.dropped").increment();
log.warn("背压丢弃数据,考虑调整处理速度");
})
.doOnNext(d -> metrics.counter("consumer.received").increment());通过监控 producer.emitted 和 consumer.received 的差值,就能知道有多少数据被背压丢弃了。如果这个值持续增大,说明系统处理能力跟不上数据生产速度,需要扩容或者优化处理逻辑。
背压这个话题,说起来很"高大上",但落地场景其实就是两类:批量处理的速率控制,和调用下游时的并发控制。把这两类场景处理好,80%的场景就够了。
下一篇写 WebFlux 与 Spring Security 的整合,响应式鉴权是个经常踩坑的地方,因为原来 MVC 那套 SecurityContext 存储方式在 WebFlux 里完全不一样了。
