Kafka性能调优:batch.size、linger.ms、压缩算法的权衡取舍
Kafka性能调优:batch.size、linger.ms、压缩算法的权衡取舍
适读人群:需要提升Kafka写入吞吐量、降低端到端延迟的Java工程师 | 阅读时长:约17分钟
开篇故事
去年大促前,我们的埋点数据采集系统遇到了一个严重的性能瓶颈:峰值期间,单个Kafka集群每秒需要处理约60万条埋点日志(平均消息大小约300字节),但实测写入TPS只有22万,大量消息堆积在Producer的buffer里等待发送,Producer端的buffer已经接近耗尽,开始抛出BufferExhaustedException。
这个系统跑了一年多,从没做过性能调优,全是默认配置。我花了三天时间系统调优,最终TPS从22万提升到了78万,提升了3.5倍,缓解了大促的压力。
今天把这次调优的思路和具体参数全部分享出来。
一、Kafka Producer发送原理
理解调优参数之前,必须先搞清楚消息是如何从业务代码到达Broker的。
核心组件:
- RecordAccumulator:每个TopicPartition有一个对应的双端队列,消息先进入这里缓冲
- Sender线程:独立线程,从RecordAccumulator取批次数据,异步发送到Broker
- batch.size:一个批次(ProducerBatch)的最大字节数
- linger.ms:Sender等待批次凑满的最长时间
二、关键参数的工作机制
2.1 batch.size + linger.ms 的协同效应
linger.ms=0(默认):消息进入批次后几乎立即发送,延迟最低(约1-2ms),但批次通常很小(可能只有1条消息),网络利用率低,实际吞吐量有限。
linger.ms=5-20ms:等待最多5-20ms凑批,批次更大,每次网络请求携带更多消息,吞吐量显著提升。代价是增加了最大5-20ms的额外延迟。
batch.size:批次大小上限,不是越大越好。太小:批次经常被linger.ms触发而不是batch满触发,效率低。太大:消息太少时整个批次内存浪费,GC压力增加。
2.2 压缩算法对比
| 算法 | 压缩比 | CPU消耗 | 适用场景 |
|---|---|---|---|
| none | 1:1(无压缩) | 极低 | 消息本身已压缩,或CPU极度敏感 |
| gzip | 高(3:1-10:1) | 高 | 磁盘/带宽敏感,CPU充裕 |
| snappy | 中(2:1-4:1) | 低 | 通用场景,最常用 |
| lz4 | 中(2:1-4:1) | 极低 | 追求低延迟+高吞吐,推荐 |
| zstd | 高(3:1-8:1) | 中 | Kafka 2.1+,综合最优 |
实测数据(300字节JSON消息,单机12核):
| 压缩算法 | TPS | P99延迟 | CPU使用率 | 磁盘空间节省 |
|---|---|---|---|---|
| none | 42万 | 8ms | 15% | 0% |
| lz4 | 78万 | 9ms | 22% | 55% |
| snappy | 65万 | 10ms | 28% | 52% |
| zstd | 71万 | 11ms | 35% | 68% |
| gzip | 38万 | 15ms | 65% | 72% |
结论:lz4是最推荐的选择(高压缩比,低CPU消耗)。zstd在磁盘空间敏感时更优。gzip只在CPU不是瓶颈、带宽极度稀缺时使用。
2.3 buffer.memory 和 max.block.ms
当RecordAccumulator的内存使用达到buffer.memory(默认32MB),业务线程调用producer.send()会阻塞,等待最多max.block.ms(默认60秒)。如果超时,抛出BufferExhaustedException。
开篇说的故障就是这个问题:buffer.memory耗尽,业务线程阻塞60秒后抛出异常。
三、完整调优配置示例
3.1 高吞吐场景配置(日志/埋点)
/**
* 高吞吐Producer配置
* 目标:最大化TPS,对P99延迟要求宽松(< 100ms)
*
* 调优后基准(测试环境,3节点集群,12核CPU,万兆网卡):
* - TPS: 75-85万(消息体300字节,lz4压缩)
* - P50延迟: 6ms
* - P99延迟: 35ms
* - CPU使用率: 25-30%
*/
@Bean("highThroughputProducer")
public KafkaTemplate<String, String> highThroughputProducer() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka1:9092,kafka2:9092,kafka3:9092");
// ===== 核心吞吐量配置 =====
// 批次大小:128KB。默认16KB太小,实测128KB时吞吐量比16KB提升约3倍
// 太大(>512KB)会导致内存浪费和GC压力增加
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 131072); // 128KB
// 等待时间:20ms。给批次充分的时间凑满
// 如果不关心延迟,可以设置更大(50-100ms)
props.put(ProducerConfig.LINGER_MS_CONFIG, 20);
// 压缩算法:lz4。高吞吐+低CPU消耗的最佳选择
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "lz4");
// 缓冲区大小:128MB。默认32MB在高流量时容易耗尽
// 计算方式:峰值TPS * 平均消息大小 * 缓冲时间(秒)
// 100万TPS * 300字节 * 0.4秒 = 120MB,取128MB
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 134217728L); // 128MB
// 阻塞等待时间:5秒。默认60秒太长,5秒内解决不了说明系统有问题
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5000);
// ===== 可靠性配置 =====
props.put(ProducerConfig.ACKS_CONFIG, "1"); // 日志场景可以接受少量丢失
props.put(ProducerConfig.RETRIES_CONFIG, 3);
// ===== 网络优化 =====
// 每个Broker最大并行请求数,增加可提升吞吐
props.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
// 每次请求最大字节数(要大于batch.size)
props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 10485760); // 10MB
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
}3.2 低延迟场景配置(实时通知)
/**
* 低延迟Producer配置
* 目标:最低P99延迟,对TPS要求适中(< 5万)
*
* 调优后基准:
* - P50延迟: 2ms
* - P99延迟: 8ms
* - TPS: ~4万
*/
@Bean("lowLatencyProducer")
public KafkaTemplate<String, String> lowLatencyProducer() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka1:9092,kafka2:9092,kafka3:9092");
// 批次大小小,凑满快,不需要等linger.ms
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 4096); // 4KB
// 不等待,消息进入批次后立即发送
props.put(ProducerConfig.LINGER_MS_CONFIG, 0);
// 不压缩,减少CPU耗时
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "none");
// 高可靠
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
// 请求超时:低延迟场景要快速失败
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 30000);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
}3.3 Broker端性能优化配置
# server.properties 核心性能参数
# ===== 网络线程 =====
# 处理网络请求的线程数,默认3,建议设为CPU核数
num.network.threads=8
# 处理IO请求的线程数(读写磁盘),建议设为CPU核数*2
num.io.threads=16
# ===== 内存 =====
# Producer/Consumer Socket缓冲区大小
socket.send.buffer.bytes=1048576 # 1MB
socket.receive.buffer.bytes=1048576 # 1MB
socket.request.max.bytes=104857600 # 100MB(单次请求最大)
# ===== 日志写入 =====
# 每个TopicPartition的写线程数(增加并行度)
num.replica.fetchers=4
# ===== PageCache优化 =====
# 不要设置日志刷盘间隔,依赖OS和副本机制
# log.flush.interval.messages=10000 # 不建议配置
# log.flush.interval.ms=1000 # 不建议配置
# ===== 日志清理 =====
log.retention.hours=168 # 7天
log.segment.bytes=536870912 # 512MB
log.retention.check.interval.ms=300000 # 5分钟检查一次3.4 消费者端性能优化
/**
* 高吞吐消费者配置
* 关键参数:fetch.min.bytes、fetch.max.wait.ms、max.partition.fetch.bytes
*/
@Bean
public ConsumerFactory<String, String> highThroughputConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"kafka1:9092,kafka2:9092,kafka3:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "high-throughput-consumer");
// 每次fetch至少返回多少字节的数据才响应
// 默认1字节(有数据立即返回),设大可以减少fetch请求数,提高吞吐
// 代价:增加延迟(等到accumulate足够数据才返回)
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 65536); // 64KB
// 最大等待时间:即使fetch.min.bytes没凑满,也要在500ms后返回
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 500);
// 单个Partition每次fetch最大字节数,默认1MB
// 如果消息很大,需要调大
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 5242880); // 5MB
// 每次poll最大消息数
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}四、踩坑实录
坑1:batch.size过大导致内存OOM
把batch.size调到了10MB,想着越大越好。结果:每个TopicPartition一个批次,100个Partition就是100个批次,每个批次最大10MB,理论上需要1GB内存(buffer.memory=32MB根本不够,直接OOM)。
正确认知:batch.size是单个批次的上限,不是每次都分配这么大,批次按需增长。但多个Partition同时写入时,内存消耗是叠加的。一般设置到512KB已经足够,不需要更大。
坑2:linger.ms设太大导致CPU GC问题
linger.ms设了500ms,批次里积累了大量消息(数万条),批次很大,GC时整个批次对象无法被回收(Sender线程还持有引用),触发Full GC。
建议:linger.ms不要超过100ms,20ms通常已经是一个很好的平衡点。
坑3:压缩导致Broker CPU飙升
Producer端发了lz4压缩的消息,但Broker配置了compression.type=gzip(Topic级别),Broker要把lz4数据解压再用gzip重新压缩,CPU飙升到80%。
解决方案:把Topic的compression.type设置为producer(保持Producer的压缩格式):
kafka-topics.sh --alter --topic your-topic \
--config compression.type=producer \
--bootstrap-server kafka1:9092坑4:消费端fetch.min.bytes设太大导致延迟
把fetch.min.bytes设到了1MB,结果在低流量时期(晚上12点后),消息积累不到1MB就一直等待fetch.max.wait.ms(500ms)才拉取,消费延迟平均增加了400ms。
原则:fetch.min.bytes要根据实际消息量设置,不能在高峰配置后不管低谷的情况。
坑5:没有监控Producer发送延迟分布
调优后以为没问题,但有用户反映偶尔消息延迟很高。后来加了监控才发现P999延迟高达800ms,原来是某台Broker磁盘响应偶尔很慢,导致acks等待时间很长。
务必监控:Producer端的record-send-rate、record-error-rate、request-latency-avg/max,以及Broker端的request-handler-avg-idle-percent。
五、调优参数速查表
| 参数 | 默认值 | 高吞吐推荐 | 低延迟推荐 | 说明 |
|---|---|---|---|---|
| batch.size | 16384(16KB) | 131072(128KB) | 4096(4KB) | 批次大小上限 |
| linger.ms | 0 | 20 | 0 | 批次等待时间 |
| compression.type | none | lz4 | none | 压缩算法 |
| buffer.memory | 33554432(32MB) | 134217728(128MB) | 33554432 | 缓冲区总大小 |
| max.in.flight.requests.per.connection | 5 | 5 | 1 | 并行请求数 |
| acks | 1 | 1 | all | 确认模式 |
调优不是一次性工作,需要根据实际业务流量持续监控和调整。建议用Prometheus+Grafana建立Kafka指标监控看板,跟踪关键指标变化。
下一篇(第439期)讲延迟消息的4种实现方案,RocketMQ定时消息、RabbitMQ TTL+死信、Redis ZSet、时间轮,每种方案的原理和适用边界全部讲清楚。
