消息积压处理:百万条消息堆积时的紧急扩容方案
消息积压处理:百万条消息堆积时的紧急扩容方案
适读人群:在生产环境处理过或担心遇到消息积压的Java工程师 | 阅读时长:约16分钟
开篇故事
那是一个普通的周五下午,距离下班还有一小时。
监控告警突然疯狂响起:订单消费者组Lag从0暴增到230万,消费速率从每秒1.2万条骤降到每秒300条。顿时明白:出大事了。
赶紧排查:消费者服务正常运行(进程没挂),CPU使用率才20%,内存正常。那为什么消费速率骤降?
最后发现原因是:消费者调用的一个下游库存服务突然响应变慢(DBA在生产库做了一个慢查询分析),每次调用从平均50ms变成了平均4秒,消费速率直接从1.2万TPS降到300TPS,但生产者速率还是1.2万TPS,积压以每秒约1.2万条的速度在增长。
此时距离下班只有45分钟,我必须在1小时内把积压降下来,否则会影响到晚高峰订单处理。
今天把这次处理的完整过程和经验全部分享出来,算是一份生产事故现场操作手册。
一、消息积压的成因分类
解决积压之前,必须先搞清楚根因,否则"扩容消费者"可能根本解决不了问题。
关键判断:先看消费者是否存活(进程、CPU、内存),再看下游依赖是否正常(DB、HTTP接口延迟),再看是否有异常消息在无限重试。不同根因对应不同处理方案。
二、紧急处理的分级策略
三、完整处理方案
3.1 第一步:快速诊断脚本
#!/bin/bash
# Kafka消息积压快速诊断脚本
# 用法:./check-lag.sh <bootstrap-server> <group-id> <topic>
BOOTSTRAP=$1
GROUP=$2
TOPIC=$3
echo "===== Kafka积压诊断 ====="
echo "时间: $(date)"
echo ""
# 1. 查看消费组Lag
echo ">>> 消费组Lag汇总:"
kafka-consumer-groups.sh \
--bootstrap-server $BOOTSTRAP \
--describe \
--group $GROUP \
--topic $TOPIC
echo ""
# 2. 查看Topic分区信息
echo ">>> Topic分区信息:"
kafka-topics.sh \
--bootstrap-server $BOOTSTRAP \
--describe \
--topic $TOPIC
echo ""
# 3. 实时监控Lag变化(每5秒刷新一次)
echo ">>> 实时Lag变化(Ctrl+C退出):"
while true; do
TOTAL_LAG=$(kafka-consumer-groups.sh \
--bootstrap-server $BOOTSTRAP \
--describe \
--group $GROUP \
2>/dev/null | grep $TOPIC | awk '{sum += $NF} END {print sum}')
echo "$(date '+%H:%M:%S') Total Lag: $TOTAL_LAG"
sleep 5
done3.2 方案A:直接扩容消费者(最常用)
适用场景:消费者处理逻辑正常,只是数量不够。
/**
* K8s环境动态扩容消费者
* 核心思路:增加Pod数量,同时确保消费者数量不超过Partition数量
*/
@Service
@Slf4j
public class ConsumerScalingService {
private final KubernetesClient k8sClient;
/**
* 紧急扩容消费者
* @param deploymentName Deployment名称
* @param targetReplicas 目标副本数(不能超过Partition数)
*/
public void scaleConsumerDeployment(String deploymentName, int targetReplicas) {
// 获取当前Partition数(消费者数不能超过这个值)
int partitionCount = getPartitionCount("order-events");
int safeReplicas = Math.min(targetReplicas, partitionCount);
if (safeReplicas < targetReplicas) {
log.warn("目标副本数{}超过Partition数{},限制为{}",
targetReplicas, partitionCount, safeReplicas);
}
// 执行扩容
k8sClient.apps().deployments()
.inNamespace("production")
.withName(deploymentName)
.scale(safeReplicas);
log.info("消费者扩容完成: deployment={}, replicas={}",
deploymentName, safeReplicas);
}
private int getPartitionCount(String topic) {
// 通过Kafka AdminClient获取分区数
return 12; // 示例
}
}3.3 方案B:临时消费者加速消化(积压量巨大时)
当积压超过100万条,光扩容现有消费者可能速度太慢。可以创建一个"临时加速消费者",专门以最高速度消化积压,消化完后销毁。
/**
* 临时加速消费者
* 特点:极简逻辑,不做非必要操作,只处理核心业务
* 消化完积压后删除此消费者组
*/
@Component
@Slf4j
public class EmergencyConsumer {
private final OrderService orderService;
private final AtomicLong processedCount = new AtomicLong(0);
private final AtomicLong startTime = new AtomicLong(System.currentTimeMillis());
/**
* 临时消费者:从Topic头部开始消费,快速处理积压
* 注意:这里用了新的消费者组名,不会影响原消费者组的offset
*/
@KafkaListener(
topics = "order-events",
groupId = "order-emergency-consumer-group-${random.uuid}", // 每次启动新group
containerFactory = "emergencyContainerFactory",
properties = {
"max.poll.records=2000", // 每次拉取2000条
"fetch.min.bytes=524288", // 512KB再返回
"fetch.max.wait.ms=100", // 最多等100ms
"enable.auto.commit=true", // 自动提交减少开销
"auto.commit.interval.ms=1000"
}
)
public void consumeEmergency(List<ConsumerRecord<String, String>> records) {
// 批量处理,减少DB/HTTP调用次数
List<OrderEvent> events = records.stream()
.map(r -> parseEvent(r.value()))
.filter(Objects::nonNull)
.collect(Collectors.toList());
// 批量写入DB(减少网络往返)
orderService.batchProcessEvents(events);
long count = processedCount.addAndGet(records.size());
long elapsed = (System.currentTimeMillis() - startTime.get()) / 1000;
if (count % 10000 == 0) {
log.info("紧急消费进度: 已处理={}条, 速率={}条/秒",
count, elapsed > 0 ? count / elapsed : 0);
}
}
private OrderEvent parseEvent(String value) {
try {
return new ObjectMapper().readValue(value, OrderEvent.class);
} catch (Exception e) {
log.warn("消息解析失败,跳过: {}", value);
return null;
}
}
}
/**
* 紧急消费者的高性能容器工厂
*/
@Bean("emergencyContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> emergencyContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
// 最大并发数等于Partition数
factory.setConcurrency(12);
factory.setBatchListener(true);
ContainerProperties props = factory.getContainerProperties();
props.setAckMode(ContainerProperties.AckMode.BATCH);
return factory;
}3.4 方案C:Topic分流(万不得已的大招)
当积压太严重,且消费逻辑改动较大,可以用"新Topic接新消息,老消费者专心消化积压"的方案:
/**
* 生产者:积压处理期间路由到新Topic
*/
@Service
public class AdaptiveProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private volatile boolean emergencyMode = false; // 由运维手动开关
public void sendOrderEvent(OrderEvent event) {
// 积压处理期间,新消息发到备用Topic
String topic = emergencyMode ? "order-events-new" : "order-events";
kafkaTemplate.send(topic, event.getOrderNo(),
JsonUtil.toJson(event));
}
// 暴露开关接口,运维通过接口切换
@PostMapping("/admin/emergency-mode")
public void setEmergencyMode(@RequestParam boolean enable) {
this.emergencyMode = enable;
log.warn("紧急模式切换: {}", enable ? "开启(新消息发到备用Topic)" : "关闭");
}
}3.5 方案D:动态调整消费者线程数
/**
* 动态调整消费者线程数(不重启服务)
*/
@RestController
@Slf4j
public class ConsumerAdminController {
private final KafkaListenerEndpointRegistry registry;
public ConsumerAdminController(KafkaListenerEndpointRegistry registry) {
this.registry = registry;
}
/**
* 动态调整并发度
* PUT /admin/consumer/concurrency?listenerId=orderConsumer&concurrency=12
*/
@PutMapping("/admin/consumer/concurrency")
public ResponseEntity<String> adjustConcurrency(
@RequestParam String listenerId,
@RequestParam int concurrency) {
MessageListenerContainer container = registry.getListenerContainer(listenerId);
if (container == null) {
return ResponseEntity.notFound().build();
}
if (container instanceof ConcurrentMessageListenerContainer) {
ConcurrentMessageListenerContainer<?, ?> concurrent =
(ConcurrentMessageListenerContainer<?, ?>) container;
int current = concurrent.getConcurrency();
concurrent.setConcurrency(concurrency);
// 需要重启容器使配置生效
container.stop();
container.start();
log.info("消费者并发度调整: listenerId={}, {} -> {}",
listenerId, current, concurrency);
return ResponseEntity.ok(
String.format("并发度已调整: %d -> %d", current, concurrency));
}
return ResponseEntity.badRequest().body("不支持动态调整并发度的容器类型");
}
/**
* 查看所有消费者状态
*/
@GetMapping("/admin/consumer/status")
public Map<String, Object> getConsumerStatus() {
Map<String, Object> status = new HashMap<>();
registry.getListenerContainerIds().forEach(id -> {
MessageListenerContainer container = registry.getListenerContainer(id);
Map<String, Object> info = new HashMap<>();
info.put("running", container.isRunning());
if (container instanceof ConcurrentMessageListenerContainer) {
info.put("concurrency",
((ConcurrentMessageListenerContainer<?,?>) container).getConcurrency());
}
status.put(id, info);
});
return status;
}
}四、踩坑实录
坑1:扩容消费者数量超过Partition数
紧急扩容,从5个Pod扩到30个。结果:只有12个Pod(等于Partition数)有分配,另外18个Pod空跑,浪费资源,但没有任何错误提示,以为扩容成功了。
规则:有效消费者数 = min(消费者实例数, Partition数)。扩容时先检查Partition数。
坑2:积压处理期间触发大量Rebalance
扩容时,每加入一个新Pod就触发一次Rebalance,连续加入10个Pod就触发了10次Rebalance,每次Rebalance期间全停,本来想加速消费,反而在最初10分钟因为Rebalance什么都没消费。
解决方案:配置group.instance.id(静态成员),减少Rebalance。或者一次性扩容到目标数量,而不是一个个加。
坑3:下游DB不堪重负,越消费越慢
把消费者从10个扩到30个,消费速率反而从1万/秒降到了5000/秒。原因:DB连接数从200增加到600,DB CPU 100%,所有DB操作变慢,消费者都在等DB。
解决方案:消费端做批量写入(INSERT ... ON DUPLICATE KEY UPDATE),把多个消息合并为一次DB操作。同时监控DB连接数和CPU,不要盲目扩消费者。
坑4:紧急消费者组的offset管理问题
用了临时消费者组(新groupId)消化积压,消化完后想切回原消费者组。但原消费者组的offset一直停在积压时的位置,切回后需要重新消费从积压开始到现在的所有消息,又产生了重复消费。
解决方案:紧急消费完成后,手动将原消费者组的offset重置到最新位置:
# 停止原消费者组
# 重置offset到最新
kafka-consumer-groups.sh \
--bootstrap-server kafka1:9092 \
--group order-consumer-group \
--topic order-events \
--reset-offsets \
--to-latest \
--execute坑5:消息积压导致Broker磁盘告警
Kafka默认保留消息7天,平时每天20GB,积压了2天相当于额外40GB,加上原来的140GB,接近磁盘容量200GB,触发了告警。
应对:临时调整Topic的保留时间,把不重要的Topic从7天改成1天:
kafka-configs.sh \
--bootstrap-server kafka1:9092 \
--entity-type topics \
--entity-name log-events \
--alter \
--add-config retention.ms=86400000 # 1天五、消息积压预防体系
处理完积压只是救火,预防才是正道。
监控告警层:
- 消费Lag超过阈值(如10万)触发P2告警
- 消费速率下降超过50%触发P1告警
- Lag增长速率为正(在增长)触发即时通知
容量规划层:
- 消费者数量 = Partition数(充分利用并行)
- Partition数 = 峰值TPS / 单消费者TPS * 1.5(留50%余量)
- 消费者处理能力 = 消费速率 >= 1.5 * 生产速率
代码设计层:
- 消费逻辑不能有无限重试(设置最大重试次数+死信)
- 下游依赖必须有超时设置(HTTP、DB查询)
- 消费者批量处理比单条处理效率高5-10倍
消息积压是MQ最常见的线上问题,有了这套处理手册,下次再遇到就不慌了。
下一篇(第441期)讲Kafka Exactly-Once语义,幂等Producer和事务API如何组合实现精确一次语义,和应用层幂等的区别在哪里。
