Kafka三端消息保障:生产者acks、Broker副本同步、消费者提交
Kafka三端消息保障:生产者acks、Broker副本同步、消费者提交
适读人群:需要构建可靠消息系统的Java工程师,关注消息丢失与重复消费问题 | 阅读时长:约17分钟
开篇故事
两年前,我们的积分系统出了一次让我至今后怕的事故。
用户完成订单后,订单服务发送一条"积分增加"消息到Kafka,积分服务消费后给用户增加积分。系统运行了大半年没出问题,直到某天凌晨,我们的一台Kafka Broker因为磁盘故障突然下线。
故障处理完后,运营发来报告:有约4600个用户订单对应的积分没有增加,但用户已经购买成功,客诉如雪片般飞来。
排查下来发现问题出在Producer端:当时我们的Producer配置是acks=1(只等Leader确认),这台下线的Broker恰好是多个Partition的Leader。Producer接收到ACK后就认为发送成功了,但Follower还没来得及同步,Leader就宕机了,消息从那4600条记录起就永久丢失了。
后来我花了两周时间把Kafka的消息保障机制从生产端到消费端完整研究了一遍,设计了新的三端可靠性方案。今天完整分享出来。
一、消息保障的三个层次
Kafka的消息保障分三个端:
Producer端 ──────→ Broker端 ──────→ Consumer端
(acks配置) (副本同步) (offset提交)
↓ ↓ ↓
生产可靠性 存储可靠性 消费可靠性每个端都有可能造成消息丢失或重复,必须三端配合才能实现端到端的消息保障。
二、生产者端:acks参数深度解析
2.1 三种acks模式对比
2.2 acks=1时的消息丢失场景
这正是我们事故的根因。看这个时序:
2.3 acks=all的正确配套配置
只配acks=all是不够的,还需要配合Broker端的min.insync.replicas:
# Broker端配置(server.properties 或 Topic级别配置)
# 至少有2个ISR副本确认才算写入成功
min.insync.replicas=2
# 副本数=3,加上min.insync.replicas=2,可以容忍1个副本故障
default.replication.factor=3
# Producer端配置
acks=all
# 注意:acks=all + min.insync.replicas=2 的组合意味着:
# 如果ISR中只剩1个副本,写入会失败(抛出NotEnoughReplicasException)
# 这是"宁可失败,不可丢数据"的设计选择如果ISR副本数 < min.insync.replicas,Producer会收到NotEnoughReplicasException,这时候要做好重试和告警。
2.4 生产者发送失败处理
/**
* 三种acks模式下的Producer配置工厂
* 根据业务对可靠性和延迟的不同要求选择合适的配置
*/
@Configuration
public class KafkaProducerStrategyConfig {
/**
* 高可靠配置:适用于支付、积分、库存等核心业务
* acks=all + min.insync.replicas=2 + 幂等
* 延迟:P99约15-25ms
*/
@Bean("reliableProducerFactory")
public ProducerFactory<String, String> reliableProducerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
configs.put(ProducerConfig.ACKS_CONFIG, "all");
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configs.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
// 幂等模式下,in-flight请求数不能超过5
configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
configs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000); // 2分钟内必须成功
configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
configs.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
/**
* 高吞吐配置:适用于日志、埋点等允许少量丢失的业务
* acks=1 + 批量发送
* 吞吐量:约普通配置的3-5倍
*/
@Bean("highThroughputProducerFactory")
public ProducerFactory<String, String> highThroughputProducerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
configs.put(ProducerConfig.ACKS_CONFIG, "1");
configs.put(ProducerConfig.BATCH_SIZE_CONFIG, 131072); // 128KB
configs.put(ProducerConfig.LINGER_MS_CONFIG, 20); // 凑满20ms发一批
configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");
configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 67108864L); // 64MB缓冲
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
}三、Broker端:副本同步的可靠性保障
3.1 消息持久化流程
3.2 刷盘策略的权衡
Kafka默认是异步刷盘(写到PageCache就返回),依赖副本机制保证数据安全:
# Broker端刷盘配置
# 每写10000条消息刷一次盘(0=不主动刷,依赖OS)
log.flush.interval.messages=10000
# 每1秒刷一次盘
log.flush.interval.ms=1000
# 生产中通常两个都不配,完全依赖OS刷盘策略
# 依靠副本机制而非刷盘保证可靠性,性能更好为什么不建议配置强制刷盘?因为每次都等磁盘写入会让吞吐量下降90%以上。有了3副本机制,即使机器突然断电(数据在PageCache未刷盘),只要还有2个副本存活,数据就不会丢失。
3.3 消息压缩与解压缩的位置
这个细节很多人搞错:
| 端点 | 操作 | 说明 |
|---|---|---|
| Producer | 压缩 | 一批消息压缩后发送 |
| Broker | 不解压(通常) | 直接存储压缩数据,节省磁盘 |
| Consumer | 解压 | 拉取后解压读取 |
但有一种情况Broker会解压:当Broker端配置了compression.type=producer(默认值)时,保持Producer的压缩格式;如果配置了其他格式(如snappy),Broker会先解压再用新格式重新压缩,这会显著降低Broker性能。
四、消费者端:offset提交策略
4.1 自动提交vs手动提交
自动提交的问题:默认每5秒提交一次(auto.commit.interval.ms=5000),如果消费者在提交后、消息处理完前宕机,消息丢失。如果在消息处理中途提交,也会丢失。
手动提交的问题:提交前宕机,消息会被重复消费。
4.2 手动提交的正确实现
/**
* 消费者端三种提交策略示例
* 根据业务对丢失和重复的容忍度选择
*/
@Service
@Slf4j
public class OrderEventConsumer {
private final OrderService orderService;
private final RedisTemplate<String, String> redisTemplate;
public OrderEventConsumer(OrderService orderService,
RedisTemplate<String, String> redisTemplate) {
this.orderService = orderService;
this.redisTemplate = redisTemplate;
}
/**
* 策略1:处理完批次后同步提交
* 保证:不丢失(至少一次消费,可能重复)
* 适用:有幂等保障的业务
*/
@KafkaListener(
topics = "order-events",
groupId = "order-group-v1",
containerFactory = "manualAckContainerFactory"
)
public void consumeWithBatchCommit(
List<ConsumerRecord<String, String>> records,
Acknowledgment acknowledgment) {
boolean allSuccess = true;
for (ConsumerRecord<String, String> record : records) {
try {
// 业务处理
OrderEvent event = parseEvent(record.value());
orderService.processEvent(event);
} catch (Exception e) {
log.error("处理失败,partition={}, offset={}",
record.partition(), record.offset(), e);
allSuccess = false;
// 策略:失败不影响整体提交,依赖幂等重试
// 或者:将失败消息发到死信队列
}
}
// 整个批次处理完后提交
// 注意:这里提交的是整个批次,不是单条
acknowledgment.acknowledge();
log.info("批次提交完成,共处理{}条,成功={}", records.size(), allSuccess);
}
/**
* 策略2:逐条处理逐条提交(最精确,但性能最低)
* 保证:最多一条消息重复消费
* 适用:对重复消费极敏感的场景
*/
@KafkaListener(
topics = "payment-events",
groupId = "payment-group",
containerFactory = "manualAckContainerFactory"
)
public void consumeWithPerRecordCommit(
ConsumerRecord<String, String> record,
Acknowledgment acknowledgment) {
String msgId = record.topic() + "-" + record.partition() + "-" + record.offset();
// 幂等检查:Redis中检查该消息是否已处理
Boolean isProcessed = redisTemplate.hasKey("processed:" + msgId);
if (Boolean.TRUE.equals(isProcessed)) {
log.info("消息已处理,跳过: {}", msgId);
acknowledgment.acknowledge();
return;
}
try {
// 业务处理
PaymentEvent event = parsePaymentEvent(record.value());
orderService.processPayment(event);
// 标记已处理(TTL=24小时,避免Redis无限增长)
redisTemplate.opsForValue().set("processed:" + msgId, "1",
Duration.ofHours(24));
// 处理成功后提交
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("支付事件处理失败: {}", msgId, e);
// 不调用acknowledge(),消息将被重新投递
// 但要注意:如果永久性失败,会导致消费卡住
// 应该有最大重试次数控制,超过后发死信队列
throw e;
}
}
/**
* 策略3:异步批量提交(性能最好,有小概率重复)
* 适用:对性能要求极高,能容忍少量重复的场景
*/
@KafkaListener(
topics = "log-events",
groupId = "log-group",
containerFactory = "manualAckContainerFactory"
)
public void consumeWithAsyncCommit(
List<ConsumerRecord<String, String>> records,
Acknowledgment acknowledgment,
Consumer<?, ?> consumer) {
// 处理消息
records.forEach(r -> processLogEvent(r.value()));
// 异步提交(不等待确认,性能更好)
// 注意:异步提交失败时不自动重试,需要在Rebalance监听器中同步提交保底
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
records.stream()
.collect(Collectors.groupingBy(
r -> new TopicPartition(r.topic(), r.partition()),
Collectors.maxBy(Comparator.comparingLong(ConsumerRecord::offset))
))
.forEach((tp, maxRecord) ->
maxRecord.ifPresent(r ->
offsets.put(tp, new OffsetAndMetadata(r.offset() + 1))
)
);
consumer.commitAsync(offsets, (committedOffsets, exception) -> {
if (exception != null) {
log.warn("异步提交失败,下次Rebalance时会同步提交: {}",
exception.getMessage());
}
});
}
private OrderEvent parseEvent(String value) {
// JSON解析逻辑
return new ObjectMapper().readValue(value, OrderEvent.class);
}
private PaymentEvent parsePaymentEvent(String value) {
return new ObjectMapper().readValue(value, PaymentEvent.class);
}
private void processLogEvent(String value) {
// 日志处理逻辑
}
}4.3 三端配置最佳实践
/**
* 端到端消息保障配置矩阵
* 根据业务场景选择对应的配置组合
*/
@Configuration
public class E2EMessageGuaranteeConfig {
/**
* 场景1:支付、核心交易 - At Least Once + 幂等
* 配置要点:
* - Producer: acks=all, enable.idempotence=true
* - Broker: min.insync.replicas=2, replication.factor=3
* - Consumer: 手动提交 + 业务幂等
*
* 性能基准(3节点集群,3副本):
* - 写入吞吐:约5万TPS
* - P99延迟:约25ms
* - 消息保证:不丢失,可能重复,业务幂等兜底
*/
@Bean("paymentKafkaTemplate")
public KafkaTemplate<String, String> paymentKafkaTemplate() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
configs.put(ProducerConfig.ACKS_CONFIG, "all");
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configs.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); // 严格顺序
configs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configs));
}
/**
* 场景2:用户行为日志 - At Most Once(可接受少量丢失)
* 配置要点:
* - Producer: acks=0, 高批量
* - Broker: min.insync.replicas=1
* - Consumer: 自动提交
*
* 性能基准:
* - 写入吞吐:约50万TPS
* - P99延迟:约1-2ms
* - 消息保证:可能丢失,不重复
*/
@Bean("logKafkaTemplate")
public KafkaTemplate<String, String> logKafkaTemplate() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
configs.put(ProducerConfig.ACKS_CONFIG, "0");
configs.put(ProducerConfig.BATCH_SIZE_CONFIG, 262144); // 256KB
configs.put(ProducerConfig.LINGER_MS_CONFIG, 50);
configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(configs));
}
}四、踩坑实录
坑1:消息发送成功但Broker内存没持久化导致丢失
某次机房停电,Kafka所有节点同时掉电(没有UPS)。重启后发现最近15秒写入的消息全部丢失——因为数据还在PageCache没来得及刷盘。
这是副本机制也保证不了的场景:所有副本同时掉电,PageCache全部丢失。
解决方案:机房层面保证UPS,Kafka集群跨机架或跨机房部署(replica.rack.aware=true),至少两个副本在不同电路上。
坑2:acks=all但消息仍然丢失的神奇案例
有同学来找我说配了acks=all还是丢消息,排查了半天发现是:他同时配置了unclean.leader.election.enable=true,且某个Partition的ISR只剩一个Leader,acks=all退化为acks=1,Leader宕机后OSR成为新Leader,消息丢失。
解决方案:acks=all必须搭配min.insync.replicas>=2和unclean.leader.election.enable=false才能真正防止丢失。
坑3:消费者手动提交死锁导致消费卡住
有一段代码是这样写的:在处理消息的事务方法内部调用acknowledgment.acknowledge(),而Spring事务使用了同步代码块,导致acknowledge阻塞等待事务提交,事务提交又等acknowledge完成,形成死锁。
解决方案:事务提交后再调用acknowledge,不要在事务方法内部提交offset。
坑4:重复提交offset导致的性能问题
自动提交模式下,有人把auto.commit.interval.ms从5000设置成了100(以为提交越频繁越可靠),结果Coordinator端每秒处理大量offset提交请求,CPU飙升20%。
正确认知:offset提交频率不影响消息可靠性,只影响重启后重复消费的数量。手动提交按批次提交即可,不需要每条消息都提交。
坑5:Consumer端幂等检查的Redis TTL设置不合理
用Redis做幂等检查时,TTL设置为2小时,但消费积压时消息从生产到消费可能超过2小时,导致"已处理"的标记过期,消息被重复处理。
解决方案:Redis幂等TTL要大于消息可能积压的最长时间,或者将幂等状态持久化到数据库,用消息的业务唯一键做唯一索引。
五、总结与延伸
三端消息保障的核心要点:
| 端 | 关键配置 | 保障内容 | 代价 |
|---|---|---|---|
| Producer | acks=all, enable.idempotence=true | 不丢失,不重复发送 | 延迟增加5-15ms |
| Broker | min.insync.replicas=2, replication.factor=3 | 存储不丢失 | 存储成本3倍 |
| Consumer | 手动提交 + 业务幂等 | 不重复消费(业务层保证) | 代码复杂度增加 |
记住一个原则:Kafka只能保证at-least-once(至少一次)的消息投递,exactly-once(恰好一次)需要应用层配合实现幂等。这在第441期Exactly-Once语义会专门讲。
下一篇(第434期)讲RocketMQ事务消息,这是另一套思路——把消息发送和本地事务绑定,解决"发消息和写数据库的原子性"问题。
