Kafka 深度实战——Producer 调优、Consumer Group 再均衡、消息积压处理
Kafka 深度实战——Producer 调优、Consumer Group 再均衡、消息积压处理
适读人群:使用 Kafka 做消息中间件的 Java 后端开发者 | 阅读时长:约20分钟 | 核心价值:掌握 Kafka 生产环境调优三板斧,彻底解决积压问题
凌晨三点的告警
两年前,我在一家做金融数据平台的公司。那是一个周五深夜,我正准备睡觉,手机突然响了——是运维同事发来的告警:Kafka 消息积压量超过 5000 万,还在以每秒 20 万的速度增长。
我立刻开远程,连上生产环境。第一眼看到监控大屏,心里就凉了半截——Consumer Group 的 lag(消费延迟)已经高达 5400 万条,而消费速率只有可怜的每秒 3000 条,Producer 写入速率是每秒 23000 条,差距悬殊。
按照这个趋势,不到两小时,消息就会开始因为 retention 时间到期而丢失。
我们的数据团队那边正在跑一个超大批量的数据清洗任务,把几千万条历史记录通过 Kafka 管道推送到下游分析系统。Producer 吞吐量远超 Consumer 处理能力,消息开始堆积。
那一夜我们用了四个小时才把积压消化掉,中途还遇到了一次 Rebalance 风暴,消费者集体停工了将近 10 分钟。
复盘那次事故,我意识到自己对 Kafka 的理解还停留在"能用"的层面,距离"用好"还差很远。这篇文章就是那次事故的完整总结,加上后来两年里的持续深挖。
Part 1:Producer 调优——把吞吐量压榨到极限
很多人用 Kafka Producer 就是 new 一个 KafkaProducer,然后 send(),对参数完全不管。这样做在低负载下没问题,但一旦上量,就会开始出问题。
Producer 核心参数详解
Properties props = new Properties();
// 基础配置
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 可靠性配置
// acks=all:等所有副本确认,最强可靠性但延迟高
// acks=1:只等 leader 确认(默认值,推荐大多数场景)
// acks=0:不等确认,最高吞吐但有丢失风险
props.put("acks", "1");
// 重试配置(配合幂等性使用)
props.put("retries", 3);
props.put("retry.backoff.ms", 100);
// 批量发送配置——这是吞吐量调优的关键!
// batch.size:每个 Partition 的批次大小(字节),默认 16KB,可调到 64KB 或 128KB
props.put("batch.size", 65536); // 64KB
// linger.ms:等待时间,让消息积累成批次再发送
// 0=立刻发送(低延迟),100=等100ms(高吞吐)
props.put("linger.ms", 10);
// buffer.memory:Producer 端缓冲区总大小,默认 32MB
// 当缓冲区满时,send() 会阻塞,超过 max.block.ms 后抛异常
props.put("buffer.memory", 67108864); // 64MB
// compression.type:压缩算法
// none/gzip/snappy/lz4/zstd,推荐 lz4(速度快,压缩率不错)
props.put("compression.type", "lz4");
// max.in.flight.requests.per.connection:未确认请求数量
// >1 时可能导致消息重排序;开启幂等后强制为 1 或者 5
props.put("max.in.flight.requests.per.connection", 5);
// 开启幂等性——避免重试导致的重复消息
props.put("enable.idempotence", true);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);高吞吐场景的 Producer 封装
@Component
public class HighThroughputKafkaProducer {
private final KafkaProducer<String, String> producer;
private final ScheduledExecutorService flushExecutor;
public HighThroughputKafkaProducer() {
Properties props = buildHighThroughputProps();
this.producer = new KafkaProducer<>(props);
// 定时 flush,避免 linger.ms 过长导致消息延迟
this.flushExecutor = Executors.newSingleThreadScheduledExecutor();
this.flushExecutor.scheduleAtFixedRate(producer::flush, 100, 100, TimeUnit.MILLISECONDS);
}
public void sendAsync(String topic, String key, String value,
Callback callback) {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, key, value);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
// 记录失败,可以写到本地文件或告警
log.error("消息发送失败,topic={}, key={}, error={}",
topic, key, exception.getMessage());
if (callback != null) {
callback.onCompletion(metadata, exception);
}
} else {
log.debug("消息发送成功,topic={}, partition={}, offset={}",
topic, metadata.partition(), metadata.offset());
}
});
}
private Properties buildHighThroughputProps() {
Properties props = new Properties();
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
props.put("key.serializer", StringSerializer.class.getName());
props.put("value.serializer", StringSerializer.class.getName());
props.put("acks", "1");
props.put("batch.size", 131072); // 128KB
props.put("linger.ms", 20);
props.put("buffer.memory", 134217728); // 128MB
props.put("compression.type", "lz4");
props.put("max.in.flight.requests.per.connection", 5);
props.put("enable.idempotence", true);
return props;
}
}实测效果: 调优前单 Producer 线程 TPS 约 8000/s,调优后(batch.size=128KB, linger.ms=20, lz4压缩)TPS 达到 85000/s,提升约 10 倍,同时网络带宽占用降低 60%(压缩的功劳)。
Part 2:Consumer Group 再均衡——最容易踩的坑
Rebalance(再均衡)是 Consumer Group 的协调机制,当有消费者加入、离开,或者分区数变化时会触发。Rebalance 期间,所有消费者停止消费,等待新的分区分配结果。
这就是为什么我们那次事故中,消费者集体停工了 10 分钟。
Rebalance 触发的常见原因
- 消费者
session.timeout.ms超时:心跳没在规定时间内发到 Coordinator,被认为宕机了 - 消费者
max.poll.interval.ms超时:两次poll()调用间隔超过限制,认为消费者卡死了 - 消费者频繁重启:发布新版本、OOM 重启等
优化 Rebalance 频率
Properties consumerProps = new Properties();
consumerProps.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
consumerProps.put("group.id", "data-processing-group");
consumerProps.put("key.deserializer", StringDeserializer.class.getName());
consumerProps.put("value.deserializer", StringDeserializer.class.getName());
// session.timeout.ms:Coordinator 认为消费者宕机的超时时间
// 增大这个值可以减少因为 GC Pause、短暂网络抖动导致的误判 Rebalance
// 建议:45000(45秒),默认是 10000
consumerProps.put("session.timeout.ms", 45000);
// heartbeat.interval.ms:心跳发送间隔
// 必须小于 session.timeout.ms 的 1/3
consumerProps.put("heartbeat.interval.ms", 3000);
// max.poll.interval.ms:两次 poll() 的最大间隔
// 如果你的消息处理逻辑很慢(比如要调用外部API),必须加大这个值
// 否则消费者会被踢出 Group,触发 Rebalance
consumerProps.put("max.poll.interval.ms", 600000); // 10分钟
// max.poll.records:每次 poll() 获取的最大消息数
// 减小这个值,让每批次处理时间更短,避免超过 max.poll.interval.ms
consumerProps.put("max.poll.records", 500); // 默认 500,可酌情减小
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps);使用静态成员身份减少 Rebalance
Kafka 2.3+ 引入了 group.instance.id,为每个消费者指定一个稳定的 ID。重启后使用相同 ID 重新加入,不触发 Rebalance:
consumerProps.put("group.instance.id", "consumer-instance-1");
// 配合 session.timeout.ms 使用:重启在 timeout 内完成就不会触发 RebalancePart 3:消息积压处理——生产环境救命指南
消息积压是最让人头疼的 Kafka 故障。以下是我总结的分级处理方案:
积压诊断
# 查看消费组 lag
kafka-consumer-groups.sh --bootstrap-server kafka1:9092 \
--group data-processing-group --describe
# 输出示例:
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# data-topic 0 1000000 6000000 5000000
# data-topic 1 800000 5500000 4700000如果 LAG 持续增长,说明消费速率 < 生产速率,需要扩容消费者。
方案一:临时扩大消费者数量
// 临时启动更多消费者实例,分担积压
// 注意:消费者数量不能超过 Partition 数量,多余的消费者空跑
// 如果 Partition 数=4,最多同时 4 个消费者有效消费
// 先检查 Partition 数
AdminClient adminClient = AdminClient.create(adminProps);
DescribeTopicsResult result = adminClient.describeTopics(
Collections.singletonList("data-topic"));
TopicDescription desc = result.all().get().get("data-topic");
int partitionCount = desc.partitions().size();
System.out.println("Partition 数量: " + partitionCount);方案二:新建 Topic + 迁移消费
当 Partition 数不足时,无法通过简单扩容消费者解决。此时可以:
- 创建一个新 Topic,Partition 数设置为更大的值(比如 32)
- 部署一个"搬运"程序,从旧 Topic 消费,写入新 Topic
- 增加消费新 Topic 的消费者数量
// 搬运程序:从旧 Topic 消费,写入新 Topic,提升并发度
@Component
public class TopicMigrationWorker {
@Autowired
private KafkaProducer<String, String> producer;
public void migrate(String oldTopic, String newTopic, int batchSize) {
KafkaConsumer<String, String> consumer = createMigrationConsumer();
consumer.subscribe(Collections.singletonList(oldTopic));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
List<Future<RecordMetadata>> futures = new ArrayList<>();
for (ConsumerRecord<String, String> record : records) {
ProducerRecord<String, String> newRecord =
new ProducerRecord<>(newTopic, record.key(), record.value());
futures.add(producer.send(newRecord));
}
// 等待所有发送完成
for (Future<RecordMetadata> future : futures) {
future.get();
}
// 手动提交 offset
consumer.commitSync();
log.info("已迁移 {} 条消息", records.count());
}
}
}方案三:消费者端异步并行处理
很多积压是因为消费者串行处理,改成并行后吞吐量可以提升 5-10 倍:
@Component
public class ParallelConsumer {
private final ExecutorService workerPool =
new ThreadPoolExecutor(
16, // 核心线程数
32, // 最大线程数
60L, // keepAliveTime
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000), // 队列容量
new ThreadPoolExecutor.CallerRunsPolicy() // 满了就在当前线程执行
);
@KafkaListener(topics = "data-topic", groupId = "parallel-group",
concurrency = "4") // 4个消费者线程并行
public void consume(List<ConsumerRecord<String, String>> records,
Acknowledgment ack) {
// 将消息分批提交给线程池并行处理
List<CompletableFuture<Void>> futures = records.stream()
.map(record -> CompletableFuture.runAsync(
() -> processRecord(record), workerPool))
.collect(Collectors.toList());
// 等待所有处理完成
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// 手动提交 offset
ack.acknowledge();
}
private void processRecord(ConsumerRecord<String, String> record) {
// 实际业务处理逻辑
try {
businessService.process(record.value());
} catch (Exception e) {
// 记录失败,写入失败队列或死信 Topic
log.error("处理失败,offset={}, error={}", record.offset(), e.getMessage());
}
}
}三大踩坑实录
坑一:linger.ms 设太大导致消息延迟告警
现象: 将 linger.ms 从 0 调到 500 后,吞吐量确实提升了,但监控告警说消息端到端延迟从 50ms 增加到了 600ms,产品方投诉消息推送不及时。
原因: linger.ms=500 意味着 Producer 最多等 500ms 再发批次,这 500ms 都加到了端到端延迟里。
解法: 根据业务对延迟的容忍度来设置 linger.ms。实时性要求高的 Topic,linger.ms 保持 5-10ms;允许延迟的离线类 Topic,可以设 100-200ms。不同业务场景配置不同的 Producer 实例。
坑二:Consumer 消费慢导致无限 Rebalance
现象: 生产环境消费者不断输出 Rebalance 日志,实际消费量极低。查看日志发现:消费者刚分配到分区,还没消费完就被踢出去了,然后重新加入,再被踢出,陷入死循环。
原因: 消费者的业务逻辑里有一个外部 HTTP 调用,超时时间设置的是 30 秒,而 max.poll.interval.ms 默认是 300000(5分钟)。当外部服务变慢,单批次 500 条消息的处理时间超过了 5 分钟,触发 Rebalance。
解法:
- 减小
max.poll.records到 50,让每批次处理时间可控 - 增大
max.poll.interval.ms到 1800000(30分钟) - 外部 HTTP 调用改成异步,并添加熔断器
坑三:Partition 数扩容后导致消息乱序
现象: 扩容 Partition 数后,相同 key 的消息到了不同 Partition,下游消费时出现乱序处理,导致部分业务状态机跳转异常。
原因: Kafka 通过对 key 取 hash 再对 Partition 数取模来决定消息路由。Partition 数变了,同一个 key 的路由结果就变了。旧 Partition 里的历史消息 key=A 在 Partition 0,新增后 key=A 可能路由到 Partition 5。新旧消息在不同 Partition 里,消费顺序不保证。
解法: Partition 扩容前,确认所有旧消息都已消费完(lag=0)再扩容。或者在消费端对同一 key 的消息加业务层排序(通过消息中的序列号)。
生产环境配置模板
总结一份我们内部使用的生产配置:
# application.yml
spring:
kafka:
bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
producer:
acks: 1
batch-size: 131072 # 128KB
linger-ms: 10
buffer-memory: 134217728 # 128MB
compression-type: lz4
properties:
enable.idempotence: true
max.in.flight.requests.per.connection: 5
consumer:
group-id: ${spring.application.name}-group
auto-offset-reset: latest
enable-auto-commit: false # 关闭自动提交,手动控制
max-poll-records: 200
properties:
session.timeout.ms: 45000
heartbeat.interval.ms: 3000
max.poll.interval.ms: 600000
listener:
type: batch # 批量消费
ack-mode: manual # 手动 ack
concurrency: 4 # 4个并发消费线程写在最后
Kafka 调优没有通用公式,但有方法论:先理解原理,再看监控指标,然后根据瓶颈点调整参数,最后验证效果。
那次凌晨三点的事故,让我把 Kafka 文档从头到尾啃了一遍,也逼着我把生产环境的每一个参数都搞清楚了来龙去脉。代价有点大,但值得。
后来我们把这套调优方案固化成了公司内部的 Kafka 使用规范,新接入的业务团队直接套用,少踩了很多坑。
