Kafka Streams流处理:窗口聚合与实时统计的实现
Kafka Streams流处理:窗口聚合与实时统计的实现
适读人群:需要在Kafka上做实时统计计算,想了解Kafka Streams的Java工程师 | 阅读时长:约17分钟
开篇故事
我们的运营大屏需要实时展示:过去1分钟的下单量、过去5分钟的支付额、过去1小时的活跃用户数。
最初的方案是:消费者消费Kafka消息,实时更新Redis计数器,大屏读Redis。这个方案写了几百行代码,处理窗口计算(1分钟窗口要在61秒时清零)时尤其麻烦,还踩了不少并发Bug。
后来用Kafka Streams重写了这个功能,代码量缩短到了1/3,窗口计算框架帮我们处理了所有的状态管理和时间窗口逻辑。更重要的是,Kafka Streams是无状态化部署的(状态存在Kafka内部),扩缩容比原来的方案简单多了。
今天把Kafka Streams的窗口聚合讲透。
一、Kafka Streams核心概念
KStream:表示无界的消息流,每条消息都是一个独立的事件。
KTable:表示有状态的键值对,代表某个Key在当前时间点的最新值。
Window:将无界流按时间切成有界片段,在每个窗口内做聚合计算。
二、四种时间窗口类型
| 窗口类型 | 特点 | 适用场景 |
|---|---|---|
| Tumbling Window(滚动窗口) | 固定大小,不重叠 | 每分钟统计,每小时统计 |
| Hopping Window(跳跃窗口) | 固定大小,可重叠(有步长) | 过去5分钟(每1分钟更新) |
| Sliding Window(滑动窗口) | 随新消息滑动 | 每来一条消息重新计算 |
| Session Window(会话窗口) | 按活跃间隔分组 | 用户会话分析 |
三、完整代码实现
3.1 基础配置
/**
* Kafka Streams基础配置
*/
@Configuration
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfig() {
Map<String, Object> props = new HashMap<>();
// 基础配置
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-stats-streams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
// 默认序列化
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
// 状态存储目录(RocksDB)
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/kafka-streams/state");
// 线程数(每个线程处理一个或多个Partition)
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
// Exactly-Once语义(需要Kafka 0.11+)
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
// 提交间隔(默认100ms,影响状态更新延迟)
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
return new KafkaStreamsConfiguration(props);
}
}3.2 滚动窗口:每分钟订单量统计
/**
* 每分钟订单量统计(Tumbling Window)
*/
@Configuration
@Slf4j
public class OrderCountStreamConfig {
// 自定义OrderEvent的Serde
private final Serde<OrderEvent> orderEventSerde = buildOrderSerde();
@Bean
public KStream<String, OrderEvent> orderCountStream(StreamsBuilder streamsBuilder) {
// 1. 从输入Topic创建KStream
KStream<String, String> rawStream = streamsBuilder.stream("order-events",
Consumed.with(Serdes.String(), Serdes.String()));
// 2. 反序列化消息
KStream<String, OrderEvent> orderStream = rawStream
.mapValues(value -> {
try {
return new ObjectMapper().readValue(value, OrderEvent.class);
} catch (Exception e) {
log.warn("消息解析失败: {}", value);
return null;
}
})
.filter((key, value) -> value != null); // 过滤解析失败的消息
// 3. 按userId分组,统计每分钟每用户的下单量(1分钟滚动窗口)
KTable<Windowed<String>, Long> orderCountByUser = orderStream
.selectKey((key, order) -> order.getUserId()) // 以userId为key
.groupByKey(Grouped.with(Serdes.String(), orderEventSerde))
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(1))) // 1分钟滚动窗口
.count(Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(
"order-count-by-user-store")
.withKeySerde(Serdes.String())
.withValueSerde(Serdes.Long())
);
// 4. 转换为可读的KStream并输出
orderCountByUser
.toStream()
.map((windowedKey, count) -> {
String userId = windowedKey.key();
long windowStart = windowedKey.window().start();
long windowEnd = windowedKey.window().end();
String statsKey = userId;
String statsValue = String.format(
"{\"userId\":\"%s\",\"count\":%d,\"windowStart\":%d,\"windowEnd\":%d}",
userId, count, windowStart, windowEnd
);
return KeyValue.pair(statsKey, statsValue);
})
.to("order-count-stats", Produced.with(Serdes.String(), Serdes.String()));
return orderStream;
}
private Serde<OrderEvent> buildOrderSerde() {
return Serdes.serdeFrom(
(topic, order) -> {
try {
return new ObjectMapper().writeValueAsBytes(order);
} catch (Exception e) {
throw new RuntimeException(e);
}
},
(topic, bytes) -> {
try {
return new ObjectMapper().readValue(bytes, OrderEvent.class);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
);
}
}3.3 跳跃窗口:过去5分钟(每分钟更新)
/**
* 过去5分钟支付额统计(每分钟滑动更新)
* Hopping Window: size=5min, advance=1min
*/
@Bean
public KStream<String, String> paymentAmountStream(StreamsBuilder streamsBuilder) {
KStream<String, String> rawStream = streamsBuilder.stream("payment-events");
KTable<Windowed<String>, Double> paymentAmountTable = rawStream
.mapValues(v -> {
try {
return new ObjectMapper().readValue(v, PaymentEvent.class);
} catch (Exception e) { return null; }
})
.filter((k, v) -> v != null)
.selectKey((k, payment) -> payment.getMerchantId()) // 按商户ID统计
.groupByKey()
.windowedBy(
// size=5min,advance=1min:每分钟更新一次5分钟窗口内的总额
TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))
.advanceBy(Duration.ofMinutes(1))
)
.aggregate(
() -> 0.0, // 初始值
(key, payment, total) -> total + payment.getAmount().doubleValue(),
Materialized.<String, Double, WindowStore<Bytes, byte[]>>as(
"payment-amount-store")
.withValueSerde(Serdes.Double())
);
paymentAmountTable
.toStream()
.mapValues((windowedKey, total) ->
String.format("{\"merchantId\":\"%s\",\"total\":%.2f}",
windowedKey.key(), total))
.to("payment-amount-stats");
return rawStream;
}3.4 查询窗口内的状态(交互式查询)
/**
* 通过Interactive Queries查询Streams内部状态
* 用于大屏实时展示,不需要等消息输出到Output Topic
*/
@Service
@Slf4j
public class OrderStatsQueryService {
private final KafkaStreams kafkaStreams;
public OrderStatsQueryService(KafkaStreams kafkaStreams) {
this.kafkaStreams = kafkaStreams;
}
/**
* 查询某个用户在指定时间窗口内的下单量
*/
public Long getOrderCount(String userId, long fromTimestamp, long toTimestamp) {
ReadOnlyWindowStore<String, Long> windowStore =
kafkaStreams.store(
StoreQueryParameters.fromNameAndType(
"order-count-by-user-store",
QueryableStoreTypes.windowStore()
)
);
// 查询时间范围内的所有窗口值
WindowStoreIterator<Long> iterator = windowStore.fetch(
userId,
Instant.ofEpochMilli(fromTimestamp),
Instant.ofEpochMilli(toTimestamp)
);
long totalCount = 0;
while (iterator.hasNext()) {
KeyValue<Long, Long> entry = iterator.next();
totalCount += entry.value;
}
iterator.close();
return totalCount;
}
/**
* 查询所有商户过去5分钟的支付总额Top10
*/
public List<Map.Entry<String, Double>> getTopMerchantPayment() {
ReadOnlyWindowStore<String, Double> windowStore =
kafkaStreams.store(
StoreQueryParameters.fromNameAndType(
"payment-amount-store",
QueryableStoreTypes.windowStore()
)
);
long now = System.currentTimeMillis();
long fiveMinutesAgo = now - 5 * 60 * 1000;
Map<String, Double> merchantTotals = new HashMap<>();
// 遍历所有Key(实际生产中应该维护一个商户列表,按需查询)
KeyValueIterator<Windowed<String>, Double> allEntries =
windowStore.fetchAll(
Instant.ofEpochMilli(fiveMinutesAgo),
Instant.ofEpochMilli(now)
);
while (allEntries.hasNext()) {
KeyValue<Windowed<String>, Double> entry = allEntries.next();
merchantTotals.merge(entry.key.key(), entry.value, Double::sum);
}
allEntries.close();
return merchantTotals.entrySet().stream()
.sorted(Map.Entry.<String, Double>comparingByValue().reversed())
.limit(10)
.collect(Collectors.toList());
}
}四、踩坑实录
坑1:窗口结束后还有迟到数据
实时系统中,由于网络延迟,某些事件的时间戳可能比Kafka消息的到达时间早很多。比如手机APP离线后上传的用户行为数据,事件发生时间是10分钟前,但现在才到Kafka。
Kafka Streams默认TimeWindows.ofSizeWithNoGrace():窗口关闭后(窗口结束时间到了),迟到的消息会被丢弃。
解决方案:配置Grace Period(宽限期),允许迟到消息进入已关闭的窗口:
TimeWindows.ofSizeAndGrace(Duration.ofMinutes(1), Duration.ofMinutes(5))
// 窗口1分钟,额外等5分钟接受迟到数据坑2:状态存储路径权限问题
Kafka Streams使用RocksDB存储本地状态(状态目录STATE_DIR_CONFIG),如果目录不存在或没有写权限,流处理应用启动失败,报FAILED状态。
解决方案:确保STATE_DIR目录存在且有写权限,在Docker/K8s中通过Volume挂载持久化状态目录。
坑3:应用重启时状态重建耗时过长
本地状态被清除(Pod重启、磁盘清空)后,Kafka Streams需要从Kafka的Changelog Topic重建本地状态,如果状态数据量大(几GB),重建需要几十分钟,期间应用处于Restoring状态,无法正常处理新消息。
解决方案:配置Standby Replicas,在另一个实例上维护热备状态:
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);坑4:KTable的更新频率导致Output Topic消息量爆炸
每来一条消息,窗口KTable就更新一次,KTable的每次更新都会向Output Topic发送一条消息。如果每秒处理10万条消息,Output Topic每秒就有10万条更新消息,下游消费者根本消费不过来。
解决方案:用suppress()操作抑制中间结果,只在窗口关闭时输出最终结果:
.suppress(
Suppressed.untilWindowCloses(
Suppressed.BufferConfig.maxBytes(1024 * 1024 * 100L) // 100MB缓冲
)
)五、总结
Kafka Streams的优势:
- 无需额外集群:直接在应用内运行,不需要Flink/Spark集群
- 内置状态管理:RocksDB + Changelog Topic,自动处理容错和恢复
- 精确语义:支持Exactly-Once(Kafka 0.11+)
- 弹性伸缩:增加实例自动分担Partition,无需手动分配
局限性:适合中等规模的流处理(单集群数十万TPS),超大规模场景(百万TPS以上)建议用Flink。
下一篇(第449期)讲消息序列化选型,JSON vs Protobuf vs Avro,在Kafka场景下的性能对比,告诉你什么时候值得换序列化方案。
