WebFlux 与 Kafka 集成——响应式消息处理管道实战
WebFlux 与 Kafka 集成——响应式消息处理管道实战
适读人群:需要在 WebFlux 项目中接入 Kafka 的工程师 | 阅读时长:约14分钟 | 核心价值:响应式 Kafka 消费和生产的完整实现,包含背压、批量处理、错误重试
大概是前年冬天,我在做一个日志分析平台的重构。原来的架构是:Kafka consumer 拉消息,同步写入 Elasticsearch,简单粗暴。
问题在于:Kafka 的消费速度很快,每秒能拉几千条消息;但 Elasticsearch 的写入是批量的,每次批量写入有延迟,大约180到220ms一批。消费者根本跟不上,消息队列的 lag 一直在增加。
当时我们的解决方案是加消费者实例,但这个方向本质上是在绕开问题。
后来重构那个平台的时候,我选了 Spring WebFlux + Reactor Kafka,用响应式的方式处理消息流,背压机制自然地解决了速率不匹配的问题。这篇文章是那次重构的总结。
一、依赖配置
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<!-- Reactor Kafka,响应式 Kafka 客户端 -->
<groupId>io.projectreactor.kafka</groupId>
<artifactId>reactor-kafka</artifactId>
<version>1.3.21</version>
</dependency>二、响应式 Kafka 消费者
传统 Kafka 消费是这样的:while(true) { records = poll(); process(records); },典型的轮询阻塞模式。
Reactor Kafka 的 KafkaReceiver 把这个过程包装成了 Flux<ReceiverRecord>:
@Configuration
public class KafkaConfig {
@Bean
public ReceiverOptions<String, String> receiverOptions(
@Value("${kafka.bootstrap-servers}") String bootstrapServers,
@Value("${kafka.consumer.group-id}") String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// 手动提交 offset,由 Reactor Kafka 管理
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500); // 每次拉取最多500条
return ReceiverOptions.<String, String>create(props)
.subscription(Collections.singleton("log-events"));
}
}消费者实现:
@Service
@Slf4j
public class LogEventConsumer implements ApplicationRunner {
@Autowired
private ReceiverOptions<String, String> receiverOptions;
@Autowired
private ElasticsearchWriter esWriter;
@Override
public void run(ApplicationArguments args) {
KafkaReceiver.create(receiverOptions)
.receive() // 返回 Flux<ReceiverRecord<String, String>>
.doOnNext(record -> log.debug("收到消息: offset={}, key={}",
record.offset(), record.key()))
// 批量处理:每500条或每2秒,取先到的那个
.bufferTimeout(500, Duration.ofSeconds(2))
.flatMap(
batch -> processBatch(batch),
1 // 串行处理,等上一批处理完再处理下一批(背压!)
)
.doOnError(e -> log.error("消息处理异常", e))
.retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(5))
.maxBackoff(Duration.ofMinutes(1))) // 无限重试,但有退避
.subscribe();
}
private Mono<Void> processBatch(List<ReceiverRecord<String, String>> batch) {
return Flux.fromIterable(batch)
.map(record -> parseLogEvent(record.value()))
.collectList()
.flatMap(events -> esWriter.bulkIndex(events)) // 批量写 ES
.doOnSuccess(v -> {
// 批量提交 offset
batch.forEach(record -> record.receiverOffset().acknowledge());
log.info("批次处理完成: {} 条", batch.size());
})
.onErrorResume(e -> {
log.error("批次处理失败: {} 条", batch.size(), e);
// 批次失败时怎么处理?我的策略是:写入死信队列,然后 acknowledge
// 不 acknowledge 的话,这批消息会一直重复消费,可能导致死循环
return writeToDlq(batch)
.doOnSuccess(v -> batch.forEach(r -> r.receiverOffset().acknowledge()));
});
}
}三、背压如何在这里发挥作用
flatMap(processBatch, 1) 这里的并发度 1 是关键。
当 processBatch 在处理(等待 ES 写入)时,上游的 bufferTimeout 会暂停收集,而 KafkaReceiver 会停止从 Kafka 拉取消息(减慢 poll() 频率)。
这就是响应式背压在实际场景里的工作方式:不是你去调用 request(n),而是整个链的速率自然地被最慢的那个环节"限速"了。
我在重构后的平台里观察过这个效果:ES 写入慢的时候,Kafka consumer 的 poll() 间隔会自动增加,消息 lag 稳定在一个合理范围内,不再无限增长。
四、响应式 Kafka 生产者
@Configuration
public class KafkaProducerConfig {
@Bean
public SenderOptions<String, String> senderOptions(
@Value("${kafka.bootstrap-servers}") String bootstrapServers) {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 等所有 replica 确认
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.LINGER_MS_CONFIG, 10); // 批量发送,等10ms攒一批
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
return SenderOptions.create(props);
}
@Bean
public KafkaSender<String, String> kafkaSender(SenderOptions<String, String> options) {
return KafkaSender.create(options);
}
}@Service
public class EventProducer {
@Autowired
private KafkaSender<String, String> kafkaSender;
// 发送单条消息
public Mono<Void> sendEvent(String topic, String key, String value) {
SenderRecord<String, String, String> record = SenderRecord.create(
new ProducerRecord<>(topic, key, value), key);
return kafkaSender.send(Mono.just(record))
.doOnNext(result -> {
RecordMetadata metadata = result.recordMetadata();
log.debug("消息发送成功: topic={}, partition={}, offset={}",
metadata.topic(), metadata.partition(), metadata.offset());
})
.then();
}
// 批量发送
public Mono<Void> sendBatch(String topic, List<LogEvent> events) {
Flux<SenderRecord<String, String, String>> records = Flux.fromIterable(events)
.map(event -> SenderRecord.create(
new ProducerRecord<>(topic, event.getId(), event.toJson()),
event.getId()));
return kafkaSender.send(records)
.doOnNext(result -> {
if (result.exception() != null) {
log.error("消息发送失败: key={}", result.correlationMetadata(),
result.exception());
}
})
.filter(result -> result.exception() != null)
.count()
.doOnNext(failCount -> {
if (failCount > 0) {
log.warn("批次中有 {} 条消息发送失败", failCount);
}
})
.then();
}
}五、处理消费顺序和并发
有时候需要按 key 顺序处理,同一个 key 的消息不能并发:
KafkaReceiver.create(receiverOptions)
.receive()
// 按 key 分组,同一 key 的消息顺序处理
.groupBy(record -> record.key())
.flatMap(groupedFlux ->
groupedFlux
.concatMap(record -> processRecord(record)) // 同 key 串行
, 20) // 不同 key 可以并发,最多20个 key 同时处理
.subscribe();六、监控和可观测性
Reactor Kafka 暴露了一些指标,可以接入 Micrometer:
// 在配置里启用 Micrometer 集成
Map<String, Object> props = new HashMap<>();
// ... 基础配置
props.put("reactor.kafka.metrics.enabled", true);
ReceiverOptions<String, String> options = ReceiverOptions.<String, String>create(props)
.subscription(Collections.singleton("log-events"))
.addAssignListener(partitions -> log.info("分区分配: {}", partitions))
.addRevokeListener(partitions -> log.info("分区撤销: {}", partitions));还可以手动埋点:
.doOnNext(record -> {
Metrics.counter("kafka.received.count",
"topic", record.topic(),
"partition", String.valueOf(record.partition())).increment();
})七、重构效果
那个日志分析平台重构之后的对比:
| 指标 | 重构前 | 重构后 |
|---|---|---|
| 峰值消费速率 | 约1200条/s(受限于线程池) | 约4700条/s |
| Kafka lag(高峰期) | 持续增长,最高约50万条 | 稳定在5000条以内 |
| ES 写入失败率 | 约0.7%(超时) | 约0.03% |
| Consumer 实例数 | 8个 | 3个 |
| 单实例内存 | ~1.2GB | ~480MB |
消费者实例从8个减到3个,内存从1.2GB降到480MB,消费速率提升了接近4倍,这是那次重构最直观的收益。
Kafka + WebFlux 的组合在日志处理、事件流处理、数据管道这些场景里非常合适。关键是理解背压的工作方式,以及合理配置并发度,避免上游被打爆。
下一篇聊 Java 21 虚拟线程,很多人问我:虚拟线程出来了,WebFlux 还有必要用吗?这个问题我有自己的看法,下篇详细说。
