AI 应用的 Streaming 背压处理——客户端来不及消费怎么办
AI 应用的 Streaming 背压处理——客户端来不及消费怎么办
做 AI 应用的人都喜欢流式响应——用户能看到文字一个一个地蹦出来,比盯着加载转圈要好多了。
但流式响应有一个问题,我在前期系统压测的时候才意识到:服务端产生数据的速度,和客户端消费数据的速度,不一定是匹配的。
具体场景是这样的:我们做了一个文档生成功能,服务端接收到请求后,调用 LLM 流式生成内容,然后通过 WebSocket 推给客户端。在低并发情况下一切正常,但并发上来之后,某些客户端会出现内容丢失的情况——前几句话正常显示,然后突然跳到结尾,中间一大段内容不见了。
排查之后发现:服务端的 LLM 响应速度大约是 50 token/s,某些客户端(特别是网络状况差的移动端)消费速度可能只有 20 token/s。当缓冲区满了之后,旧数据直接被丢弃了。
这就是背压(Backpressure)问题。
什么是背压
背压是响应式编程里的核心概念:当数据的生产速度超过消费速度时,需要一种机制让消费方告诉生产方"慢一点",而不是让数据堆积或者丢失。
想象一条水管:水源的水压很高,但水桶装得慢,水溢出来了。解决办法不是加大水桶,而是给水管加一个阀门——根据水桶的当前状态调节水流速度。
在流式 AI 响应中:
- 生产方:LLM 模型,产生 token 的速度由模型决定
- 消费方:客户端,受限于网络带宽、UI 渲染速度
- 传输通道:服务端到客户端的 HTTP/WebSocket 连接
当生产速度 > 消费速度,不处理背压的后果是:
- 服务端内存溢出(缓冲区无限增长)
- 数据丢失(缓冲区满后新数据被抛弃)
- 客户端 OOM(一次收到大量数据,来不及处理)
Spring WebFlux 的背压机制
Spring WebFlux 基于 Project Reactor,Reactor 的 Flux 和 Mono 原生支持背压。
Reactor 的背压通过 Subscriber.request(n) 机制实现:消费方在能消费多少数据时,就向生产方请求多少数据。生产方不会主动推送超过请求量的数据。
这个机制的关键是:消费方控制节奏,生产方配合。
在 AI 应用中,LLM 的流式输出本质上是一个 Flux,Spring AI 的 ChatClient.stream() 返回的就是 Flux<String>,可以直接参与 Reactor 的背压体系。
核心实现:背压控制的流式 AI 响应
Maven 依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
<version>1.0.0-M6</version>
</dependency>基础流式响应(无背压控制)
先看看没有背压控制是什么样的:
@RestController
@RequestMapping("/api/stream")
public class NaiveStreamController {
private final ChatClient chatClient;
@GetMapping(value = "/naive", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> naiveStream(@RequestParam String question) {
// 这里没有任何背压控制
// 如果客户端来不及消费,Spring WebFlux 会自动处理一部分
// 但当客户端网络极慢时,服务端会持续积压数据
return chatClient.prompt()
.user(question)
.stream()
.content();
}
}这个实现在大多数情况下"能用",因为 Spring WebFlux 和 HTTP/2 有一些内置的流控机制。但在极端情况下(客户端极慢、服务端高并发)仍然会出问题。
带背压控制的实现
@RestController
@RequestMapping("/api/stream")
@Slf4j
public class BackpressureStreamController {
private final ChatClient chatClient;
/**
* 带背压控制的 SSE 流式响应
* 使用 onBackpressureBuffer 策略:在背压发生时缓冲数据
*/
@GetMapping(value = "/buffered", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> bufferedStream(@RequestParam String question,
@RequestParam(defaultValue = "200") int bufferSize) {
return chatClient.prompt()
.user(question)
.stream()
.content()
// 当下游消费慢时,缓冲最多 bufferSize 个元素
// 超出缓冲区时会抛出 BufferOverflowException
.onBackpressureBuffer(bufferSize,
dropped -> log.warn("背压缓冲区溢出,丢弃 token"),
BufferOverflowStrategy.DROP_OLDEST)
// 错误处理:背压溢出时返回错误信号
.onErrorResume(e -> {
log.error("流式响应出错", e);
return Flux.just("[ERROR: " + e.getMessage() + "]");
});
}
/**
* 带速率限制的流式响应
* 限制服务端产生数据的速率,避免生产过快
*/
@GetMapping(value = "/rate-limited", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> rateLimitedStream(@RequestParam String question) {
return chatClient.prompt()
.user(question)
.stream()
.content()
// 每 50ms 最多发送一个 token
// 这会强制降低生产速率,但会增加整体响应时间
.delayElements(Duration.ofMillis(20))
.onBackpressureBuffer(100);
}
/**
* 带超时的流式响应
* 如果客户端长时间不消费,主动关闭连接
*/
@GetMapping(value = "/timeout", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> timeoutStream(@RequestParam String question) {
return chatClient.prompt()
.user(question)
.stream()
.content()
.onBackpressureBuffer(500)
// 如果单个元素在 30 秒内没有被消费,触发超时
.timeout(Duration.ofSeconds(30))
.onErrorResume(TimeoutException.class, e -> {
log.warn("流式响应超时,问题:{}", question);
return Flux.just("[TIMEOUT]");
});
}
}背压策略选择
不同场景适合不同的背压策略:
@Service
@Slf4j
public class AdaptiveStreamService {
private final ChatClient chatClient;
/**
* 根据客户端类型选择背压策略
*/
public Flux<String> streamWithStrategy(String question, ClientType clientType) {
Flux<String> rawStream = chatClient.prompt()
.user(question)
.stream()
.content();
return switch (clientType) {
// PC 客户端:网络好,缓冲区可以小一点
case PC_BROWSER -> rawStream
.onBackpressureBuffer(200, BufferOverflowStrategy.ERROR);
// 移动端:网络可能不稳定,需要更大的缓冲区和更宽容的策略
case MOBILE -> rawStream
.onBackpressureBuffer(500,
dropped -> log.debug("移动端背压:丢弃旧数据"),
BufferOverflowStrategy.DROP_OLDEST);
// API 调用方:可能有自己的消费速率控制,给最大缓冲
case API_CLIENT -> rawStream
.onBackpressureLatest() // 只保留最新的,适合只关心最终结果的调用方
.collectList() // 收集完整结果
.flatMapMany(Flux::fromIterable);
// 批处理:不在乎实时性,等完整结果
case BATCH -> rawStream
.onBackpressureDrop() // 背压时直接丢弃(批处理通常重试整个请求)
.onErrorContinue((e, item) -> log.warn("批处理丢弃 item: {}", item));
};
}
public enum ClientType {
PC_BROWSER, MOBILE, API_CLIENT, BATCH
}
}完整的背压监控系统
光处理背压还不够,还要能看到背压发生的情况,才能在系统层面做优化:
@Component
@Slf4j
public class BackpressureMonitor {
private final MeterRegistry meterRegistry;
// 背压事件计数器
private final Counter backpressureDropCounter;
private final Counter backpressureOverflowCounter;
private final Timer streamDurationTimer;
public BackpressureMonitor(MeterRegistry meterRegistry) {
this.meterRegistry = meterRegistry;
this.backpressureDropCounter = Counter.builder("ai.stream.backpressure.drop")
.description("背压导致的数据丢弃次数")
.register(meterRegistry);
this.backpressureOverflowCounter = Counter.builder("ai.stream.backpressure.overflow")
.description("背压缓冲区溢出次数")
.register(meterRegistry);
this.streamDurationTimer = Timer.builder("ai.stream.duration")
.description("流式响应持续时间")
.register(meterRegistry);
}
/**
* 包装 Flux,添加监控
*/
public Flux<String> monitoredStream(Flux<String> stream, String clientId) {
AtomicInteger totalTokens = new AtomicInteger(0);
AtomicInteger droppedTokens = new AtomicInteger(0);
Timer.Sample sample = Timer.start(meterRegistry);
return stream
.doOnNext(token -> {
totalTokens.incrementAndGet();
})
.onBackpressureBuffer(500,
dropped -> {
droppedTokens.incrementAndGet();
backpressureDropCounter.increment();
log.warn("客户端 {} 背压丢弃 token,已丢弃 {} 个",
clientId, droppedTokens.get());
},
BufferOverflowStrategy.DROP_OLDEST)
.doOnComplete(() -> {
sample.stop(streamDurationTimer);
log.info("流式响应完成,客户端:{},总 token:{},丢弃:{}",
clientId, totalTokens.get(), droppedTokens.get());
if (droppedTokens.get() > 0) {
backpressureOverflowCounter.increment();
// 报告背压事件,供后续分析
reportBackpressureEvent(clientId, totalTokens.get(), droppedTokens.get());
}
})
.doOnError(e -> {
log.error("流式响应出错,客户端:{}", clientId, e);
});
}
private void reportBackpressureEvent(String clientId, int total, int dropped) {
double dropRate = (double) dropped / total;
if (dropRate > 0.1) {
log.warn("高背压告警:客户端 {} 丢弃率 {:.1f}%,请检查客户端网络状况",
clientId, dropRate * 100);
}
}
}WebFlux 背压在服务端的传导
一个容易忽视的问题:背压不只存在于服务端到客户端,还存在于服务端内部。
当服务端同时处理大量流式请求时,LLM API 的响应速度是有上限的,服务端内部也需要做流量控制:
@Service
@Slf4j
public class RateLimitedLLMService {
private final ChatClient chatClient;
// 并发流式请求限制
private final Semaphore concurrencyLimiter = new Semaphore(50);
// 全局 token 速率限制(每秒最多 10000 token)
private final RateLimiter tokenRateLimiter = RateLimiter.create(10000);
/**
* 带并发控制的流式响应
*/
public Flux<String> controlledStream(String question, String requestId) {
return Flux.create(sink -> {
// 非阻塞地尝试获取并发许可
if (!concurrencyLimiter.tryAcquire()) {
sink.error(new TooManyRequestsException("当前并发请求过多,请稍后重试"));
return;
}
Flux<String> llmStream = chatClient.prompt()
.user(question)
.stream()
.content();
llmStream.subscribe(
token -> {
// 速率限制:确保 token 产生速率不超过上限
tokenRateLimiter.acquire();
sink.next(token);
},
error -> {
concurrencyLimiter.release();
sink.error(error);
},
() -> {
concurrencyLimiter.release();
sink.complete();
}
);
});
}
/**
* 基于令牌桶的自适应背压
* 当系统负载高时,自动降低流式响应速率
*/
public Flux<String> adaptiveStream(String question) {
return chatClient.prompt()
.user(question)
.stream()
.content()
.flatMap(token -> {
// 根据系统负载动态调整延迟
Duration delay = calculateAdaptiveDelay();
return Mono.just(token).delayElement(delay);
}, 1) // 并发度为 1,确保顺序
.onBackpressureBuffer(300);
}
private Duration calculateAdaptiveDelay() {
// 根据 CPU 使用率、并发请求数等指标动态计算延迟
double loadFactor = getCurrentSystemLoad();
if (loadFactor < 0.5) {
return Duration.ZERO;
} else if (loadFactor < 0.7) {
return Duration.ofMillis(10);
} else if (loadFactor < 0.9) {
return Duration.ofMillis(30);
} else {
return Duration.ofMillis(50);
}
}
private double getCurrentSystemLoad() {
// 从监控指标获取系统负载
// 这里简化为固定值
return 0.3;
}
}实际压测中发现的问题
我用 JMeter 做了一个压测,模拟 100 个并发客户端,每个客户端消费速度有差异(模拟移动端/PC 端的差异):
场景 1:无背压控制
- 50% 的慢速客户端出现内容丢失
- 某些请求端对端延迟超过 10 秒(数据在缓冲区积压)
- 服务端内存在 5 分钟内增长了 800MB
场景 2:onBackpressureBuffer(500)
- 内容丢失几乎消失
- 慢速客户端平均延迟增加了约 3 秒(缓冲等待时间)
- 服务端内存稳定在增长约 200MB 后停止
场景 3:自适应背压 + 速率限制
- 内容丢失完全消失
- 平均响应延迟增加 1.5 秒
- 服务端内存增长稳定,没有飙升
从压测数据来看,背压控制的代价是延迟轻微增加,换来的是稳定性和数据完整性的大幅提升。对于 AI 生成场景,这个代价是值得的。
常见背压策略总结
| 策略 | 适用场景 | 副作用 |
|---|---|---|
onBackpressureBuffer(n) | 大多数场景,默认推荐 | 内存消耗增加,溢出时报错 |
onBackpressureBuffer(n, DROP_OLDEST) | 流式内容,允许丢弃旧数据 | 内容可能不连续 |
onBackpressureDrop() | 丢失可接受的场景 | 内容可能大量丢失 |
onBackpressureLatest() | 只关心最新状态的场景 | 中间过程全部丢失 |
delayElements() | 需要精确控制速率 | 整体延迟增加 |
总结
背压不是一个高深的概念,本质就是流量控制——让消费方有能力告诉生产方"你产生得太快了"。
Spring WebFlux + Reactor 给了我们完整的背压基础设施,但要用好它需要:
- 了解你的生产速度和消费速度分别是多少
- 选择合适的背压策略(Buffer/Drop/Latest)
- 加监控,知道背压事件发生的频率和严重程度
- 针对不同客户端类型做差异化处理
AI 应用的流式响应,不是"开启 stream 模式"就完了,背压处理是生产级实现必须考虑的问题。
