Kafka架构深度解析:Partition副本、ISR机制与Leader选举
Kafka架构深度解析:Partition副本、ISR机制与Leader选举
适读人群:有Kafka使用经验、想深入理解内部机制的Java工程师 | 阅读时长:约18分钟
开篇故事
2021年双十一前夕,我们团队负责的订单消息系统出了一次让我至今记忆深刻的事故。
那天凌晨两点,运维同学发来告警:Kafka集群中有一个Broker节点磁盘IO飙升到100%,随后这台机器直接失联。我从被窝里爬起来,打开监控看到的景象是——消费者Lag从零突然蹿到了120万条,订单状态更新全部卡住,客服系统开始大量报错。
我当时的第一反应是:Leader挂了,剩下的副本会不会选出新Leader?等了大概40秒,新Leader选出来了,消费端开始恢复。但这40秒里,有大量消息写入失败,Producer端开始抛出NotLeaderForPartitionException。
事故复盘时,老板问我:"为什么选举要40秒这么久?能不能更快?" 我一时语塞,因为我当时对Kafka内部的Leader选举机制并不够了解,只知道个大概。
从那次事故之后,我开始系统地研究Kafka的副本机制和选举流程。这篇文章就是我这几年研究和踩坑的总结,把Partition副本、ISR机制、Leader选举这三件事讲透。
一、Partition与副本的本质
1.1 为什么要有Partition
很多人觉得Kafka的Partition就是"分片",这没错,但没说到本质。Partition的核心价值有两点:
水平扩展吞吐量:一个Topic的吞吐量受限于单个机器的处理能力。把数据切成多个Partition,每个Partition可以放在不同Broker上,多个Producer可以并发写入不同Partition,突破单机瓶颈。我们生产环境一个高峰期Topic,单Partition写入峰值大约120MB/s,配置了12个Partition分布在3台Broker上,整体吞吐达到了1.2GB/s以上。
并发消费:消费者组里的每个消费者实例只能消费一个或多个Partition,Partition数量是消费并发度的上限。这个上限很关键,后面讲Rebalance时还会提到。
1.2 副本的角色划分
每个Partition有且只有一个Leader副本和若干Follower副本。
Leader负责所有读写请求——注意,Kafka默认所有读写都走Leader,Follower只负责从Leader同步数据。这和某些数据库的主从分离不同,Kafka的Follower不承担读流量(Kafka 2.4之后引入了replica.selector.class可以配置从Follower读,但生产中很少用)。
Follower从Leader拉取数据,而不是Leader推送。Follower会定期向Leader发送Fetch请求,Leader返回新增的消息数据。这个拉取间隔由replica.fetch.min.bytes和replica.fetch.wait.max.ms共同决定,默认拉取等待最大500ms。
1.3 副本数量的选择
生产中副本数一般设置为3,这是一个经过验证的平衡点:
- 副本数=1:没有容错能力,机器宕机消息立即不可用
- 副本数=2:只能容忍1台机器故障,且选举时可能出现脑裂
- 副本数=3:可以容忍1台故障,多数派(2/3)确保数据安全
- 副本数=5:容灾能力更强,但写放大严重,同步开销大
我们大部分Topic都用3副本,只有少数对数据可靠性要求极高的Topic(比如支付流水)会用5副本。
二、ISR机制深度解析
2.1 ISR是什么
ISR全称In-Sync Replicas,即同步副本集合。这是Kafka保证数据安全性的核心机制。
ISR中的副本被认为与Leader保持"足够同步"的状态。只有ISR中的副本才有资格在Leader宕机时被选为新的Leader。如果Leader宕机时ISR只剩Leader自己,就会出现严重问题。
2.2 副本如何加入和踢出ISR
副本加入ISR的条件:Follower的LEO(Log End Offset,日志末端偏移量)与Leader的LEO差距在允许范围内。
副本被踢出ISR的条件(满足任一即触发):
- Follower超过
replica.lag.time.max.ms(默认30秒)没有向Leader发送Fetch请求 - Follower的同步进度落后Leader超过
replica.lag.max.messages条消息(Kafka 0.9之后这个参数已移除,统一用时间来判断)
踢出ISR的动作由Leader负责,Leader会周期性检查每个Follower的最后一次Fetch时间,如果超时就把它踢出ISR,并把变更通知给Controller,Controller再同步给ZooKeeper(或KRaft模式下的元数据日志)。
2.3 HW与LEO的关系
这是面试最爱考的点,也是理解消息可见性的关键。
- LEO(Log End Offset):每个副本自己写到哪里了,是"自己的进度"
- HW(High Watermark,高水位):所有ISR副本中最小的LEO,是"大家都确认写完的进度"
消费者只能消费到HW以下的消息,HW之后的消息虽然已经写入Leader,但还没被所有ISR副本确认,对消费者不可见。
2.4 ISR收缩带来的问题
生产中遇到过这样的情况:某台Broker机器GC时间过长,导致Follower无法及时Fetch,被踢出ISR。这时ISR只剩Leader和另一个Follower,min.insync.replicas如果设置为2就不影响写入,但如果设置为3就会导致Producer写入失败。
我们线上有一段时间频繁出现ISR收缩告警,排查发现是某台机器的JVM老年代GC耗时平均在8秒,而replica.lag.time.max.ms默认是30秒,看起来没问题。但问题是GC期间Follower Fetch请求全部暂停,最坏情况下连续两次GC可能超过30秒,这就触发了ISR收缩。
解决方案:调整JVM参数减少Full GC频率,同时把replica.lag.time.max.ms调整到60秒,并对ISR收缩配置告警但不设为P0级别。
三、Leader选举完整实现
3.1 Controller的角色
Kafka集群中有且只有一个Controller,它负责:
- 监听Broker上下线,触发Leader选举
- 管理Partition状态机和副本状态机
- 维护集群元数据,推送给其他Broker
Controller本身也是一个普通Broker,通过在ZooKeeper的/controller节点抢占注册来成为Controller。第一个成功写入的Broker成为Controller,其他Broker监听这个节点,一旦节点消失就立即竞争。
KRaft模式(Kafka 2.8引入,3.x成熟)下,ZooKeeper被完全替代,Controller通过Raft协议选举,元数据通过内置的@metadata日志维护,这也是老架构中"Leader选举需要40秒"变成"通常在几秒内完成"的关键原因之一。
3.2 Leader选举触发时机
3.3 选举算法:PreferredLeader选举
Kafka还有一种主动选举机制叫Preferred Leader选举,目的是让Leader重新回到"优先副本"(AR列表第一个副本)所在的Broker,避免集群负载不均。
auto.leader.rebalance.enable=true时,Controller会定期(默认300秒)检查每个Broker的Leader分布,如果某个Broker的Leader比例低于leader.imbalance.per.broker.percentage(默认10%),就触发Preferred Leader选举。
3.4 Java代码:监控Leader分布
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.TopicPartitionInfo;
import java.util.*;
import java.util.concurrent.ExecutionException;
/**
* Kafka Leader分布监控工具
* 检查各Broker上的Leader数量分布,用于判断是否需要手动触发Rebalance
*/
@Component
public class KafkaLeaderMonitor {
private final AdminClient adminClient;
public KafkaLeaderMonitor(@Value("${kafka.bootstrap-servers}") String bootstrapServers) {
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000);
props.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 30000);
this.adminClient = AdminClient.create(props);
}
/**
* 获取各Broker的Leader分布情况
* @param topicName Topic名称
* @return Map<brokerId, leaderCount>
*/
public Map<Integer, Long> getLeaderDistribution(String topicName)
throws ExecutionException, InterruptedException {
DescribeTopicsResult result = adminClient.describeTopics(
Collections.singletonList(topicName)
);
Map<String, TopicDescription> topicDescMap = result.all().get();
TopicDescription topicDesc = topicDescMap.get(topicName);
Map<Integer, Long> leaderDistribution = new HashMap<>();
for (TopicPartitionInfo partitionInfo : topicDesc.partitions()) {
int leaderId = partitionInfo.leader().id();
leaderDistribution.merge(leaderId, 1L, Long::sum);
// 打印ISR信息
List<Node> isr = partitionInfo.isr();
List<Node> replicas = partitionInfo.replicas();
if (isr.size() < replicas.size()) {
log.warn("Partition {} ISR收缩!ISR={}/{}, 不在ISR的副本={}",
partitionInfo.partition(),
isr.size(), replicas.size(),
getRemovedFromISR(replicas, isr)
);
}
}
return leaderDistribution;
}
/**
* 触发Preferred Leader选举(让Leader回到优先副本)
*/
public void triggerPreferredLeaderElection(String topicName)
throws ExecutionException, InterruptedException {
DescribeTopicsResult descResult = adminClient.describeTopics(
Collections.singletonList(topicName)
);
Map<String, TopicDescription> topicDesc = descResult.all().get();
Set<TopicPartition> partitions = new HashSet<>();
for (TopicPartitionInfo info : topicDesc.get(topicName).partitions()) {
partitions.add(new TopicPartition(topicName, info.partition()));
}
// 触发选举
ElectLeadersResult electResult = adminClient.electLeaders(
ElectionType.PREFERRED, partitions
);
Map<TopicPartition, Optional<Throwable>> results = electResult.partitions().get();
results.forEach((tp, error) -> {
if (error.isPresent()) {
log.error("Partition {} 选举失败: {}", tp, error.get().getMessage());
} else {
log.info("Partition {} Preferred Leader选举成功", tp);
}
});
}
private List<Integer> getRemovedFromISR(List<Node> replicas, List<Node> isr) {
Set<Integer> isrIds = new HashSet<>();
isr.forEach(n -> isrIds.add(n.id()));
List<Integer> removed = new ArrayList<>();
replicas.forEach(n -> {
if (!isrIds.contains(n.id())) {
removed.add(n.id());
}
});
return removed;
}
}3.5 Java代码:生产者配置与ISR感知
import org.apache.kafka.clients.producer.*;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
/**
* 与ISR机制配合的生产者配置
* 核心参数说明:
* - acks=all:等待ISR所有副本确认,最强可靠性
* - min.insync.replicas:最少多少个ISR副本确认才算成功(Broker端配置)
* - retries:失败重试次数
* - enable.idempotence:开启幂等,配合acks=all使用
*/
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> configs = new HashMap<>();
// 基础连接配置
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092,kafka2:9092,kafka3:9092");
// 可靠性配置 - 与ISR直接相关
// acks=all 表示等待ISR中所有副本都写入才返回ACK
// 配合Broker的min.insync.replicas=2,确保至少2个副本写入
configs.put(ProducerConfig.ACKS_CONFIG, "all");
// 重试配置
configs.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
configs.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 100);
// 幂等性(要求acks=all,retries>0,max.in.flight.requests.per.connection<=5)
configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 性能配置
configs.put(ProducerConfig.BATCH_SIZE_CONFIG, 65536); // 64KB批次
configs.put(ProducerConfig.LINGER_MS_CONFIG, 5); // 最多等5ms凑批
configs.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432L); // 32MB缓冲区
configs.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4"); // LZ4压缩
// 请求超时
configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 30000);
configs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 120000);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> template = new KafkaTemplate<>(producerFactory());
// 设置Producer监听器,监控发送结果
template.setObservationEnabled(true);
return template;
}
}
/**
* 带重试和降级的消息发送服务
*/
@Service
@Slf4j
public class ReliableMessageSender {
private final KafkaTemplate<String, String> kafkaTemplate;
public ReliableMessageSender(KafkaTemplate<String, String> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
/**
* 发送消息,自动处理NotLeaderForPartitionException
* 当Leader切换时,Producer会感知到元数据变化并重新路由
*/
public CompletableFuture<SendResult<String, String>> sendWithFallback(
String topic, String key, String value) {
return kafkaTemplate.send(topic, key, value)
.thenApply(result -> {
RecordMetadata metadata = result.getRecordMetadata();
log.info("消息发送成功: topic={}, partition={}, offset={}, latency={}ms",
metadata.topic(),
metadata.partition(),
metadata.offset(),
System.currentTimeMillis() - metadata.timestamp()
);
return result;
})
.exceptionally(ex -> {
// Leader切换期间可能抛出此异常,Producer会自动重试
if (ex.getCause() instanceof NotLeaderOrFollowerException) {
log.warn("Leader切换中,消息将自动重试: topic={}, key={}", topic, key);
}
// 其他异常记录报警
log.error("消息发送失败: topic={}, key={}, error={}",
topic, key, ex.getMessage());
throw new RuntimeException(ex);
});
}
}四、踩坑实录
坑1:unclean.leader.election.enable=true 导致消息丢失
这个坑我们在压测环境踩过,差点在生产上重现。
场景:Kafka集群3节点,某个Partition的ISR只剩Leader一个(两个Follower已经落后太多被踢出ISR)。此时Leader宕机,由于unclean.leader.election.enable=true(老版本Kafka的默认值!),Controller从OSR(Outside Sync Replicas)中随机选了一个落后的副本作为新Leader。
这个新Leader的LEO比原Leader少了约3万条消息,相当于这3万条已经被ACK了的消息"凭空消失"了。
解决方案:
# Broker配置,绝对不要在生产环境开启unclean election
unclean.leader.election.enable=false
# 同时确保min.insync.replicas >= 2
min.insync.replicas=2如果业务对数据丢失零容忍,这两个参数必须检查。
坑2:Controller频繁切换导致选举风暴
有一次,ZooKeeper集群出现网络抖动,导致/controller节点频繁消失和重新写入,引发Controller频繁切换。每次Controller切换都会触发全量Partition元数据同步,大量的网络IO导致Broker处理能力下降,进而引发更多超时,形成恶性循环。
我们当时的监控显示:Controller切换5分钟内发生了47次,集群中大量请求超时,消费者Lag从0暴增到80万。
解决方案:
- 独立部署ZooKeeper集群,不与Kafka混部
- 升级到Kafka 3.x使用KRaft模式,彻底去掉ZooKeeper依赖
- 短期应急:调大
zookeeper.session.timeout.ms到30000,减少误判
坑3:replica.fetch.max.bytes 配置不当导致同步异常慢
生产中遇到Follower同步一直跟不上Leader,ISR频繁收缩,但网络和磁盘看起来都很正常。排查了很久才发现是replica.fetch.max.bytes(默认1MB)配置太小,而我们的消息体平均在800KB,一次Fetch只能拿到1条消息,同步效率极低。
调整后:
# Broker端配置
replica.fetch.max.bytes=10485760 # 调整到10MB
replica.fetch.min.bytes=1 # 最小拉取1字节(立即返回)
replica.fetch.wait.max.ms=500 # 最长等待500ms
# 同时要调整message.max.bytes
message.max.bytes=10485760 # 单条消息最大10MB调整后Follower同步延迟从平均1200ms降到了80ms,ISR收缩问题完全消失。
坑4:HW更新延迟导致消费者重复消费
这是一个比较隐蔽的坑。Leader宕机并恢复后,新旧Leader之间的HW不一致可能导致消费者重复消费。
具体场景:
- Leader(A) HW=100,Follower(B) HW=95(B的HW更新有延迟)
- A宕机,B成为新Leader,B的HW=95
- 消费者已经消费到offset=98
- B成为Leader后,消费者重新从HW=95开始消费,产生重复消费
解决方案:消费者必须做幂等处理,不能依赖Kafka保证"绝对不重复"。这也是Exactly-Once语义需要额外机制保障的原因,我们在第441期专门讲。
坑5:日志段(LogSegment)切换影响消费延迟
不是ISR相关,但和Partition副本机制有关。我们观察到每隔1小时,某个消费者组的消费延迟会有一次短暂飙升,持续约30秒。排查发现是日志段切换时,Follower同步新Segment文件有一个初始化延迟,导致短暂的ISR抖动。
解决方案:调整日志段滚动时间:
# 默认1GB或7天滚动一次日志段,降低滚动频率
log.segment.bytes=536870912 # 512MB
log.roll.ms=86400000 # 24小时五、总结与延伸
Kafka的Partition副本机制是整个高可用体系的基础,ISR机制是可靠性的核心保障,Leader选举则是容灾恢复的关键路径。
几个重要结论:
ISR不是越大越好,ISR过大意味着写入必须等更多副本确认,延迟增加;ISR过小意味着可靠性降低,要根据业务场景权衡
min.insync.replicas。Leader选举速度取决于Controller感知速度,ZooKeeper模式下通常需要15-60秒,KRaft模式通常在5秒内。生产中应该容忍一定时间的不可用,同时Producer要配置合适的重试策略。
unclean.leader.election.enable=false是保底配置,任何对数据可靠性有要求的系统都必须这样设置。副本同步的三个关键参数需要联动调整:
replica.fetch.max.bytes、message.max.bytes、replica.lag.time.max.ms,任何一个配置不当都可能导致ISR收缩。
下一篇(第432期)我们讲消费者组Rebalance,这也是Kafka使用中另一个让人头疼的问题,触发条件、过程、以及如何减少其影响,实际项目中能用上的干货。
