Kafka Streams 流处理实战——实时数据管道的 Java 实现方案
Kafka Streams 流处理实战——实时数据管道的 Java 实现方案
适读人群:需要构建实时数据处理管道的 Java 后端开发者 | 阅读时长:约18分钟 | 核心价值:掌握 Kafka Streams 核心 API,实现生产级别的实时流处理管道
从"每天凌晨跑批"到"实时计算"
我有个做数据平台的朋友小沈,他们的报表系统曾经是这样工作的:每天凌晨 2 点,跑一个批处理 Job,把昨天的订单数据聚合计算,生成各种运营报表。早上 8 点,运营团队来上班,看到的是昨天的数据。
产品经理有一天提了个需求:能不能让运营实时看到当前的数据,比如当前销售额、实时转化率?
小沈最开始的方案:把批处理频率改成每 5 分钟一次。但这带来了新问题:5 分钟的 Job 要扫描 5 分钟内所有订单,数据量大时,Job 本身可能要跑超过 5 分钟,就卡死了。
后来他们引入了 Kafka Streams,把"批计算"变成了"流计算"——订单创建时就实时累加到指标里,延迟从小时级降到了秒级。
Kafka Streams 是什么
Kafka Streams 是 Kafka 官方的流处理库,有几个核心特点:
- 纯 Java 库:不需要独立的集群(不像 Flink),直接嵌入应用程序
- 基于 Kafka:输入、输出都是 Kafka Topic,状态存储也可以用 Kafka
- 支持有状态操作:聚合、Join、时间窗口
- 精确一次语义:配置
EXACTLY_ONCE_V2后保证不重不漏
适用场景:
- 实时数据转换(格式转换、过滤、清洗)
- 实时聚合(统计、求和、计数)
- 流 Join(把两条流的数据合并)
- 复杂事件检测(CEP)
核心概念
KStream:无限的记录流,每条记录都会被处理(类比数据库的 INSERT 流水)
KTable:变更日志流,同一个 key 的新记录会覆盖旧记录(类比数据库的当前表)
GlobalKTable:全量数据表,所有分区的数据都在每个实例上(用于小表广播 Join)
快速入门:实时订单统计
@Configuration
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kafkaStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "order-analytics-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 开启精确一次语义
props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE_V2);
// 状态目录(RocksDB 存储路径)
props.put(StreamsConfig.STATE_DIR_CONFIG, "/var/kafka-streams");
return new KafkaStreamsConfiguration(props);
}
}@Component
public class OrderAnalyticsPipeline {
@Autowired
private StreamsBuilder streamsBuilder;
@PostConstruct
public void buildPipeline() {
// 输入:订单事件 Topic
KStream<String, String> orderStream =
streamsBuilder.stream("order-events");
// 解析订单,过滤支付成功的事件
KStream<String, OrderEvent> paidOrders = orderStream
.mapValues(json -> JSON.parseObject(json, OrderEvent.class))
.filter((key, order) -> "PAID".equals(order.getStatus()));
// 按商品分类统计实时销售额
paidOrders
.groupBy((key, order) -> order.getCategoryId()) // 按分类分组
.aggregate(
() -> new CategoryStats(), // 初始值
(categoryId, order, stats) -> { // 聚合逻辑
stats.setTotalAmount(
stats.getTotalAmount().add(order.getAmount()));
stats.setOrderCount(stats.getOrderCount() + 1);
return stats;
},
Materialized.<String, CategoryStats, KeyValueStore<Bytes, byte[]>>
as("category-sales-store") // 状态存储名称(可对外查询)
.withValueSerde(new CategoryStatsSerde())
)
.toStream()
.to("category-sales-realtime"); // 输出到结果 Topic
log.info("订单分析 Pipeline 构建完成");
}
}时间窗口聚合:5 分钟滚动统计
@Component
public class WindowedAnalyticsPipeline {
@PostConstruct
public void buildWindowedPipeline() {
KStream<String, OrderEvent> orderStream = streamsBuilder
.<String, String>stream("order-events")
.mapValues(json -> JSON.parseObject(json, OrderEvent.class))
.filter((k, v) -> "PAID".equals(v.getStatus()));
// 5分钟滚动窗口统计每个品牌的销售额
orderStream
.groupBy((key, order) -> order.getBrandId())
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.aggregate(
BrandWindowStats::new,
(brandId, order, stats) -> {
stats.addOrder(order.getAmount(), order.getOrderId());
return stats;
},
Materialized.<String, BrandWindowStats, WindowStore<Bytes, byte[]>>
as("brand-window-store")
.withValueSerde(new BrandWindowStatsSerde())
)
.toStream()
.map((windowedKey, stats) -> {
// windowedKey 包含 key 和时间窗口信息
String key = windowedKey.key();
Window window = windowedKey.window();
stats.setWindowStart(window.start());
stats.setWindowEnd(window.end());
return KeyValue.pair(key, stats);
})
.mapValues(JSON::toJSONString)
.to("brand-window-sales");
}
}流 Join:订单 + 用户信息实时关联
@Component
public class OrderUserJoinPipeline {
@PostConstruct
public void buildJoinPipeline() {
// 订单流(实时流)
KStream<String, OrderEvent> orderStream = streamsBuilder
.<String, String>stream("order-events")
.mapValues(json -> JSON.parseObject(json, OrderEvent.class))
.selectKey((k, order) -> order.getUserId()); // 以 userId 为 key
// 用户信息表(KTable,同一用户的新信息覆盖旧信息)
// 这里假设有一个用户信息变更的 Topic(CDC 产出)
KTable<String, UserInfo> userTable = streamsBuilder
.<String, String>table("user-profile-changes")
.mapValues(json -> JSON.parseObject(json, UserInfo.class));
// Join:订单 + 用户信息
orderStream
.join(
userTable,
(order, user) -> {
// 合并订单和用户信息
EnrichedOrder enriched = new EnrichedOrder();
enriched.setOrderId(order.getOrderId());
enriched.setAmount(order.getAmount());
enriched.setUserId(order.getUserId());
if (user != null) {
enriched.setUserLevel(user.getLevel());
enriched.setUserCity(user.getCity());
}
return enriched;
}
)
.mapValues(JSON::toJSONString)
.to("enriched-orders");
}
}状态查询:实时查询聚合结果
Kafka Streams 的状态存储支持交互式查询,让其他服务可以实时读取聚合结果:
@Service
public class StreamsStateQueryService {
@Autowired
private KafkaStreams kafkaStreams;
/**
* 查询某品牌的实时销售统计
*/
public CategoryStats getCategoryStats(String categoryId) {
// 从状态存储中读取聚合结果(本地 RocksDB)
ReadOnlyKeyValueStore<String, CategoryStats> store =
kafkaStreams.store(
StoreQueryParameters.fromNameAndType(
"category-sales-store",
QueryableStoreTypes.keyValueStore()
)
);
CategoryStats stats = store.get(categoryId);
return stats != null ? stats : new CategoryStats();
}
/**
* 查询所有分类的实时统计(遍历 store)
*/
public List<CategoryStats> getAllCategoryStats() {
ReadOnlyKeyValueStore<String, CategoryStats> store =
kafkaStreams.store(
StoreQueryParameters.fromNameAndType(
"category-sales-store",
QueryableStoreTypes.keyValueStore()
)
);
List<CategoryStats> result = new ArrayList<>();
KeyValueIterator<String, CategoryStats> iterator = store.all();
while (iterator.hasNext()) {
result.add(iterator.next().value);
}
iterator.close();
return result;
}
}三大踩坑实录
坑一:时间窗口数据积累不准,总是有几分钟的延迟
现象: 5 分钟时间窗口的聚合结果,实际上要等 10-15 分钟才能看到,数据远没有预期的实时。
原因: Kafka Streams 的时间窗口基于事件时间(消息中的时间戳),但消息到达 Kafka 时可能有延迟(网络延迟、Consumer lag)。为了等待迟到的数据,Kafka Streams 会等到水位(watermark)超过窗口结束时间后才输出窗口结果。默认的 grace period(宽限期)可能导致窗口等待很久。
解法: 明确设置 grace period:
// 设置 grace period 为 0,不等待迟到数据(适合实时性要求高的场景)
TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))
// 或者设置短暂的 grace period(等待最多 30 秒的迟到数据)
TimeWindows.ofSizeAndGrace(Duration.ofMinutes(5), Duration.ofSeconds(30))坑二:多实例部署时状态查询失败
现象: 单实例运行时状态查询正常;扩展到 3 个实例后,有些查询请求返回 KeyQueryMetadata not found,错误。
原因: Kafka Streams 在多实例下,不同分区的状态存储在不同的实例上。查询某个 key 时,如果这个 key 的分区在其他实例上,本地查不到,需要通过网络转发查询。
解法: 使用 Kafka Streams 的 Metadata 接口找到数据所在的实例,再转发:
public CategoryStats getCategoryStats(String categoryId) {
// 找到这个 key 在哪个实例上
KeyQueryMetadata metadata = kafkaStreams.queryMetadataForKey(
"category-sales-store", categoryId, Serdes.String().serializer());
// 如果在当前实例,直接查本地
if (isLocalHost(metadata.activeHost())) {
return queryLocalStore(categoryId);
}
// 如果在其他实例,通过 HTTP 转发查询
String remoteHost = metadata.activeHost().host() + ":" + metadata.activeHost().port();
return restTemplate.getForObject(
"http://" + remoteHost + "/internal/streams/category/" + categoryId,
CategoryStats.class);
}坑三:Rebalance 期间流处理暂停,数据积压
现象: 扩容或缩容实例时,Kafka Streams 会触发 Rebalance,Rebalance 期间所有实例停止处理,输入 Topic 积压越来越多,恢复后需要一段时间才能消化完积压。
原因: Kafka Streams 的 Rebalance 需要重新分配分区和状态存储,分配完成之前所有实例不处理数据。状态存储迁移(把 RocksDB 的数据从一个实例迁移到另一个实例)会耗费时间。
解法: 使用 Kafka Streams 2.6+ 的 KIP-664(Static Membership),减少不必要的 Rebalance:
// 每个实例设置唯一的 Group Instance ID
props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG,
System.getenv("POD_NAME")); // K8s 中用 Pod Name同时使用 Standby Replicas(状态备份副本),Rebalance 时直接从备副本恢复,不需要重建状态:
props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);与 Flink 的对比
经常有人问我:Kafka Streams 和 Flink,该用哪个?
简单说:
- Kafka Streams:适合轻量级流处理、已有 Kafka 基础、不想维护独立集群、Java 团队
- Flink:适合复杂流处理(复杂 CEP、多流 Join、ML Pipeline)、需要极低延迟(毫秒级)、有专职大数据团队
小沈的场景(实时销售统计)用 Kafka Streams 完全够,代码量少、运维简单、没有额外的 Flink 集群。
