Kafka 消息可靠性保障——幂等性、事务消息、消费者精确一次语义
Kafka 消息可靠性保障——幂等性、事务消息、消费者精确一次语义
适读人群:需要在 Kafka 中保障消息不丢不重的 Java 后端开发者 | 阅读时长:约18分钟 | 核心价值:彻底搞清楚 Kafka 三种投递语义,在生产环境实现精确一次
那个"钱多了"的线上事故
我有个朋友小宁,在某互联网金融公司做后端,负责积分系统。有一次他们做了一个活动——用户完成某个行为后,通过 Kafka 消息触发积分发放。活动结束后复盘,发现有一批用户收到了双倍积分,总共多发了大概 800 万积分,按当时的兑换率折合将近 20 万元的优惠券。
排查下来,问题出在 Producer 端:Kafka Broker 确实收到了消息,但返回 ACK 时网络抖动,Producer 没收到 ACK,触发重试,同一条消息被发送了两次。Broker 收到两条一样的消息,Consumer 消费了两次,积分发了两次。
这是一个典型的消息重复投递问题,对应 Kafka 的"至少一次"(at-least-once)语义。
小宁后来问我:"Kafka 不是说可以做精确一次(exactly-once)吗?怎么实现?"
这篇文章就来彻底讲清楚这个问题。
三种投递语义,你必须先搞清楚
在讨论具体实现前,先明确三个概念:
At-Most-Once(至多一次) 消息可能丢失,但不会重复。实现方式:发完消息不等 ACK,消费到就提交 offset。适合日志、监控等允许丢失的场景。
At-Least-Once(至少一次) 消息不会丢失,但可能重复。大多数 Kafka 应用的默认状态。Producer 重试可能导致重复发送,Consumer 在处理中崩溃可能导致重复消费。
Exactly-Once(精确一次) 消息不丢不重,每条消息恰好被处理一次。实现成本最高,Kafka 通过幂等性 + 事务来支持。
Producer 端:幂等性实现原理
基本原理
Kafka 的幂等性(Idempotent Producer)在 0.11.0 版本引入,开启后 Broker 会保证同一个 Producer 发送的相同消息(相同序列号)只被持久化一次。
核心机制:
- Broker 为每个 Producer 分配一个
Producer ID(PID) - Producer 发送的每条消息都携带
Sequence Number,单调递增 - Broker 对同一 PID + Partition 的消息序列号进行校验,拒绝重复序列号
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 开启幂等性
props.put("enable.idempotence", true);
// 开启幂等性后,以下参数会被自动调整:
// acks 必须为 all(自动设置)
// retries 必须 > 0(自动设置为 Integer.MAX_VALUE)
// max.in.flight.requests.per.connection 必须 <= 5(自动校验)
KafkaProducer<String, String> producer = new KafkaProducer<>(props);幂等性的局限: 只保证单个 Producer 会话内、单个 Partition 内的幂等,不保证跨会话(Producer 重启后 PID 变化)和跨 Partition 的幂等。
Producer 端:事务消息实现
当业务需要原子性地写入多个 Partition,或者同时操作 DB 和 Kafka,就需要事务支持。
纯 Kafka 事务(跨多个 Partition)
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
// 事务性 ID:唯一标识这个 Producer 实例,跨会话保持一致
// 相同 transactional.id 的新实例启动时会 fence 掉旧实例
props.put("transactional.id", "order-producer-001");
props.put("enable.idempotence", true);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 初始化事务(只调用一次)
producer.initTransactions();
try {
producer.beginTransaction();
// 原子性地写入多个 Topic/Partition
producer.send(new ProducerRecord<>("order-topic", orderId, orderJson));
producer.send(new ProducerRecord<>("inventory-topic", skuId, inventoryJson));
producer.send(new ProducerRecord<>("payment-topic", paymentId, paymentJson));
// 提交事务:三条消息要么全部可见,要么全部不可见
producer.commitTransaction();
} catch (ProducerFencedException e) {
// 被其他相同 transactional.id 的实例 fence 了,无法恢复
producer.close();
} catch (Exception e) {
// 其他异常,回滚事务
producer.abortTransaction();
log.error("事务失败,已回滚", e);
}结合数据库的事务消息模式
在实践中,更常见的场景是:既要更新数据库,又要发 Kafka 消息,需要保证两者的原子性。
纯 Kafka 事务解决不了这个问题(Kafka 事务不参与数据库事务)。这时候有一个经典的解决方案:事务发件箱模式(Transactional Outbox Pattern):
@Service
@Transactional
public class OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private OutboxEventMapper outboxEventMapper;
/**
* 创建订单:在同一个数据库事务中,既写 order 表,又写 outbox 表
* outbox 表中的消息由独立的 relay 进程异步发送到 Kafka
*/
public void createOrder(OrderRequest request) {
// 1. 写订单
Order order = buildOrder(request);
orderMapper.insert(order);
// 2. 写发件箱(同一个 DB 事务)
OutboxEvent event = new OutboxEvent();
event.setId(UUID.randomUUID().toString());
event.setAggregateType("Order");
event.setAggregateId(order.getId());
event.setEventType("OrderCreated");
event.setPayload(JSON.toJSONString(order));
event.setCreatedAt(LocalDateTime.now());
event.setStatus(EventStatus.PENDING);
outboxEventMapper.insert(event);
// DB 事务提交:order 和 outbox event 要么一起成功,要么一起失败
}
}
/**
* 发件箱 Relay:定期扫描 outbox 表,将 PENDING 事件发送到 Kafka
*/
@Component
public class OutboxEventRelay {
@Autowired
private OutboxEventMapper outboxEventMapper;
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Scheduled(fixedDelay = 1000) // 每秒执行一次
public void relay() {
List<OutboxEvent> events = outboxEventMapper.findPendingEvents(100);
for (OutboxEvent event : events) {
try {
kafkaTemplate.send("order-events", event.getAggregateId(), event.getPayload())
.addCallback(
result -> markAsPublished(event.getId()),
ex -> log.error("发送失败,event_id={}", event.getId(), ex)
);
} catch (Exception e) {
log.error("发送异常", e);
}
}
}
private void markAsPublished(String eventId) {
outboxEventMapper.updateStatus(eventId, EventStatus.PUBLISHED);
}
}这个方案的优点:
- DB 事务保证本地原子性
- Relay 进程保证消息最终发出
- 重启后从 PENDING 状态的消息继续发送,不丢消息
缺点是消息可能重复发送(Relay 崩溃后重启会重发),需要 Consumer 做幂等处理。
Consumer 端:精确一次语义实现
Consumer 端的"精确一次",核心是保证消息处理和 offset 提交的原子性。
幂等消费者设计
最简单有效的方案:让消费逻辑本身幂等,这样即使重复消费也不产生副作用。
@Component
public class IdempotentConsumer {
@Autowired
private MessageIdempotencyRepository idempotencyRepo;
@Autowired
private BusinessService businessService;
@KafkaListener(topics = "order-events", groupId = "order-processor",
containerFactory = "manualAckListenerFactory")
public void consume(ConsumerRecord<String, String> record, Acknowledgment ack) {
// 构建幂等 key:topic + partition + offset 唯一标识一条消息
String idempotencyKey = String.format("%s-%d-%d",
record.topic(), record.partition(), record.offset());
// 检查是否已处理
if (idempotencyRepo.isProcessed(idempotencyKey)) {
log.warn("消息已处理,跳过。key={}", idempotencyKey);
ack.acknowledge();
return;
}
try {
// 执行业务逻辑
businessService.processOrder(record.value());
// 标记为已处理(Redis SET NX 或 DB 唯一索引)
idempotencyRepo.markAsProcessed(idempotencyKey, Duration.ofDays(7));
// 手动提交 offset
ack.acknowledge();
} catch (Exception e) {
log.error("消息处理失败,将不提交 offset,等待重试", e);
// 不调用 ack.acknowledge(),消息会被重新投递
}
}
}Kafka Streams 的精确一次
如果你用 Kafka Streams,框架层面就支持精确一次语义:
Properties streamsConfig = new Properties();
streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, "exactly-once-app");
streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
// 开启精确一次语义(Kafka 2.6+ 推荐使用 EXACTLY_ONCE_V2)
streamsConfig.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
StreamsConfig.EXACTLY_ONCE_V2);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("input-topic");
stream
.filter((key, value) -> value != null)
.mapValues(value -> processValue(value))
.to("output-topic");
KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfig);
streams.start();注意: EXACTLY_ONCE_V2 对性能有影响,吞吐量大概会降低 20-30%,延迟也会增加。在开启前评估你的业务是否真的需要。
三大踩坑实录
坑一:开启事务后吞吐量暴跌
现象: 开启 Kafka 事务后,Producer 的 TPS 从原来的 50000/s 降到了 5000/s,差了整整 10 倍。
原因: Kafka 事务需要两阶段提交,每个事务都要向 Transaction Coordinator 发送 beginTransaction、addPartition、commitTransaction 三次 RPC。如果每条消息都是一个独立事务,RPC 开销占主导。
解法: 批量消息共享一个事务,而不是每条消息一个事务:
// 错误做法:每条消息一个事务
for (Order order : orders) {
producer.beginTransaction();
producer.send(new ProducerRecord<>("orders", order.getId(), JSON.toJSONString(order)));
producer.commitTransaction();
}
// 正确做法:批量消息共用一个事务
producer.beginTransaction();
for (Order order : orders) {
producer.send(new ProducerRecord<>("orders", order.getId(), JSON.toJSONString(order)));
}
producer.commitTransaction();批量后 TPS 恢复到 35000/s,损耗在可接受范围内。
坑二:幂等性 key 设计不当导致误判
现象: 用业务消息 ID 作为幂等 key,发现有部分合法的重复 key 被错误地过滤掉了——比如用户完成同一个任务两次(系统设计允许),但第二次触发的积分消息被幂等检查拦截了。
原因: 用业务 ID 作为幂等 key 会把"同一业务事件的不同发生"也过滤掉。Kafka 消息的天然唯一标识是 topic + partition + offset 的组合,而不是业务 ID。
解法: 幂等 key 使用 消息的唯一 ID(可以在 Producer 发送时设置到 Header,或者使用 offset 组合),业务层的重复检查由业务逻辑自己负责。
坑三:Outbox Relay 延迟影响下游实时性
现象: 使用 Outbox 模式后,下游系统收到消息的延迟从原来的 20ms 变成了 500ms-1000ms,产品方反馈体验变差。
原因: Outbox Relay 用定时任务每秒扫描一次,在最坏情况下消息会等近 1 秒才被发出。
解法: 改用数据库变更日志(CDC)方式监听 outbox 表,Debezium 可以实时捕获 INSERT 事件并立即发送到 Kafka,延迟降到 50ms 以内:
# Debezium connector 配置
connector.class: io.debezium.connector.mysql.MySqlConnector
database.server.name: orderdb
table.include.list: orderdb.outbox_events
transforms: outbox
transforms.outbox.type: io.debezium.transforms.outbox.EventRouter总结:可靠性保障选型建议
| 场景 | 推荐方案 | 说明 |
|---|---|---|
| 普通业务消息 | 幂等 Producer + 消费者幂等 | 成本低,覆盖 99% 场景 |
| 跨多 Partition 原子写 | Kafka 事务 | 注意批量提交降损耗 |
| DB + Kafka 原子性 | Outbox 模式 | 最可靠,有延迟 |
| 实时流处理 | Kafka Streams EOS | 框架原生支持 |
小宁的积分问题,最终用"幂等 Consumer + 消息 ID 去重表"解决了,成本最低,效果最好。精确一次不是一个开关,而是一套从 Producer 到 Consumer 的完整工程实践。
