Spring Kafka最佳实践:@KafkaListener的并发、错误处理与手动提交
Spring Kafka最佳实践:@KafkaListener的并发、错误处理与手动提交
适读人群:每天用Spring Kafka写消费者、但总感觉配置不够优雅的Java工程师 | 阅读时长:约16分钟
开篇故事
每次Code Review Spring Kafka消费者代码,我都会看到类似的问题:
@KafkaListener(topics = "order-events")
public void consume(String message) {
// 直接处理,没有错误处理,没有手动提交
processOrder(message);
}这段代码用自动提交,处理失败时消息默默丢失。没有错误处理,下游接口超时时会让消费者线程卡住。没有并发配置,默认单线程,完全浪费了多Partition的并行能力。
我在团队里推行了一套Spring Kafka最佳实践模板,帮团队避开了大量坑。今天把这套实践完整分享出来。
一、@KafkaListener的核心机制
@KafkaListener背后是ConcurrentMessageListenerContainer,理解这个类的工作机制是配置好Spring Kafka的基础。
concurrency参数:创建多少个Consumer线程,每个线程负责一个或多个Partition。最佳值 = Partition数量(或Partition数量的因数)。
二、完整最佳实践配置
2.1 容器工厂配置
/**
* Spring Kafka容器工厂最佳实践配置
*/
@Configuration
@Slf4j
public class KafkaListenerConfig {
/**
* 通用消费者工厂(手动提交)
*/
@Bean
public ConsumerFactory<String, String> consumerFactory(
@Value("${kafka.bootstrap-servers}") String bootstrapServers,
@Value("${kafka.consumer.group-id}") String groupId) {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); // 手动提交
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 300000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 15000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5000);
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
CooperativeStickyAssignor.class.getName());
return new DefaultKafkaConsumerFactory<>(props);
}
/**
* 默认容器工厂
* 特性:手动提交、重试3次、超过重试发到死信Topic
*/
@Bean("defaultContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String>
defaultContainerFactory(ConsumerFactory<String, String> cf) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf);
// 并发度:根据Topic的Partition数设置
// 可以通过@KafkaListener的concurrency属性覆盖
factory.setConcurrency(4);
ContainerProperties props = factory.getContainerProperties();
props.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// 错误处理器:重试3次,超过后发到死信Topic
factory.setCommonErrorHandler(buildErrorHandler());
return factory;
}
/**
* 批量处理容器工厂
* 特性:批量消费、批量提交
*/
@Bean("batchContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String>
batchContainerFactory(ConsumerFactory<String, String> cf) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf);
factory.setBatchListener(true); // 批量消费
factory.setConcurrency(8);
ContainerProperties props = factory.getContainerProperties();
props.setAckMode(ContainerProperties.AckMode.BATCH); // 批量提交
factory.setCommonErrorHandler(buildBatchErrorHandler());
return factory;
}
/**
* 构建错误处理器
* DefaultErrorHandler:自动重试,超过次数后发到死信Topic
*/
private DefaultErrorHandler buildErrorHandler() {
// 死信发布器:处理失败的消息发到 <topic>.DLT
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
kafkaTemplate(),
(record, exception) -> {
// 自定义死信Topic路由:原Topic + ".DLT"
String dlTopic = record.topic() + ".DLT";
log.error("消息发往死信队列: topic={}, partition={}, offset={}, error={}",
record.topic(), record.partition(), record.offset(),
exception.getMessage());
return new TopicPartition(dlTopic, -1); // -1 = 自动分配分区
}
);
// 重试配置:最多重试3次,每次间隔1秒
FixedBackOff backOff = new FixedBackOff(1000L, 3L);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, backOff);
// 不可重试的异常(直接发死信,不重试)
errorHandler.addNotRetryableExceptions(
JsonParseException.class, // JSON格式错误,重试没意义
IllegalArgumentException.class // 参数非法,重试没意义
);
// 可重试的异常(默认所有异常都重试)
// errorHandler.addRetryableExceptions(SQLException.class);
return errorHandler;
}
private CommonErrorHandler buildBatchErrorHandler() {
// 批量处理的错误处理器(更复杂,需要处理批次中部分失败的情况)
return new DefaultErrorHandler(new FixedBackOff(500L, 2L));
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
// 复用上面配置的Producer
return new KafkaTemplate<>(producerFactory());
}
private ProducerFactory<String, String> producerFactory() {
Map<String, Object> configs = new HashMap<>();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka1:9092");
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(configs);
}
}2.2 消费者最佳实践模板
/**
* 标准消费者模板(单条处理)
* 包含:手动提交、错误处理、幂等检查、监控埋点
*/
@Component
@Slf4j
public class StandardOrderConsumer {
private final OrderService orderService;
private final MeterRegistry meterRegistry;
@KafkaListener(
id = "orderConsumer", // 容器ID(用于动态管理)
topics = "${kafka.topic.order-events}", // 从配置读Topic名
groupId = "${kafka.consumer.group-id}",
containerFactory = "defaultContainerFactory",
concurrency = "#{T(Math).min(@kafkaPartitionCount, 12)}" // SpEL:min(Partition数, 12)
)
public void consume(
ConsumerRecord<String, String> record,
Acknowledgment acknowledgment) {
long startTime = System.currentTimeMillis();
String topic = record.topic();
int partition = record.partition();
long offset = record.offset();
log.debug("消费消息: topic={}, partition={}, offset={}", topic, partition, offset);
try {
// 1. 反序列化(快速失败,格式错误直接抛出不可重试异常)
OrderEvent event = deserialize(record.value());
// 2. 业务处理
orderService.processEvent(event);
// 3. 提交offset
acknowledgment.acknowledge();
// 4. 监控埋点
long latency = System.currentTimeMillis() - startTime;
meterRegistry.counter("kafka.consume.success",
"topic", topic).increment();
meterRegistry.timer("kafka.consume.latency", "topic", topic)
.record(latency, TimeUnit.MILLISECONDS);
} catch (JsonParseException e) {
// 消息格式错误:不可重试,直接ACK(会被ErrorHandler发到DLT)
log.error("消息格式错误,发往死信: partition={}, offset={}",
partition, offset, e);
acknowledgment.acknowledge(); // 提交,避免阻塞
meterRegistry.counter("kafka.consume.format.error", "topic", topic).increment();
throw e; // 抛出让ErrorHandler处理(发到DLT)
} catch (Exception e) {
// 其他异常:可重试(由ErrorHandler决定)
log.error("消费失败,等待重试: partition={}, offset={}",
partition, offset, e);
meterRegistry.counter("kafka.consume.error", "topic", topic).increment();
throw e; // 不提交,ErrorHandler会重试
}
}
/**
* 死信Topic消费者(处理最终失败的消息)
*/
@KafkaListener(
topics = "${kafka.topic.order-events}.DLT",
groupId = "${kafka.consumer.group-id}-dlt"
)
public void consumeDeadLetter(ConsumerRecord<String, String> record,
Acknowledgment acknowledgment) {
log.error("死信消息: partition={}, offset={}, key={}",
record.partition(), record.offset(), record.key());
// 1. 记录到数据库
saveDltRecord(record);
// 2. 发送告警
sendAlarm(record);
// 3. 提交offset(死信消息已处理)
acknowledgment.acknowledge();
}
private OrderEvent deserialize(String value) {
try {
return new ObjectMapper().readValue(value, OrderEvent.class);
} catch (Exception e) {
throw new JsonParseException(null, "消息反序列化失败: " + value, e);
}
}
private void saveDltRecord(ConsumerRecord<String, String> record) {}
private void sendAlarm(ConsumerRecord<String, String> record) {}
}2.3 批量消费者模板
/**
* 批量消费者模板(适合高吞吐场景)
*/
@Component
@Slf4j
public class BatchLogConsumer {
@KafkaListener(
topics = "user-behavior-log",
groupId = "log-consumer-group",
containerFactory = "batchContainerFactory",
concurrency = "8"
)
public void consume(
List<ConsumerRecord<String, String>> records,
Acknowledgment acknowledgment) {
if (records.isEmpty()) {
acknowledgment.acknowledge();
return;
}
long startTime = System.currentTimeMillis();
int success = 0;
int fail = 0;
// 批量处理
List<UserBehaviorLog> logs = new ArrayList<>(records.size());
for (ConsumerRecord<String, String> record : records) {
try {
UserBehaviorLog logEntry = parseLog(record.value());
if (logEntry != null) {
logs.add(logEntry);
}
} catch (Exception e) {
fail++;
log.warn("日志解析失败,跳过: offset={}", record.offset());
}
}
// 批量写入DB
if (!logs.isEmpty()) {
try {
behaviorLogRepository.batchSave(logs);
success = logs.size();
} catch (Exception e) {
log.error("批量写入失败,将重试: size={}", logs.size(), e);
throw e; // 抛出,整批重试
}
}
// 批量提交offset
acknowledgment.acknowledge();
long cost = System.currentTimeMillis() - startTime;
log.info("批次完成: total={}, success={}, fail={}, cost={}ms",
records.size(), success, fail, cost);
}
@Autowired
private UserBehaviorLogRepository behaviorLogRepository;
private UserBehaviorLog parseLog(String value) {
try {
return new ObjectMapper().readValue(value, UserBehaviorLog.class);
} catch (Exception e) {
return null;
}
}
}四、踩坑实录
坑1:concurrency配置超过Partition数导致空线程
配置了concurrency=20,但Topic只有12个Partition,启动后有8个Consumer线程完全空闲(没有分配到Partition),浪费资源并且增加了Rebalance时间。
解决:concurrency = min(Partition数, 期望并发数)。通过SpEL动态计算是最优雅的方式。
坑2:ErrorHandler配置了但重试没生效
配置了DefaultErrorHandler,以为消费失败会自动重试。但消费方法里使用了try-catch把异常吃掉了,没有向上抛出,ErrorHandler根本感知不到异常,直接认为消费成功并提交offset。
正确做法:不可重试的异常在catch里处理(ACK + 发告警),可重试的异常必须向上抛出,让ErrorHandler决定是否重试。
坑3:手动提交ACK的时机错误
在事务方法内提交ACK,事务回滚后ACK已经提交了(offset已经推进),消息被认为消费成功但业务实际上失败了。
正确做法:ACK应该在事务提交之后调用,不能在事务内部。
坑4:批量消费时部分失败的处理
批量消费时,500条消息里有一条解析失败,整批抛出异常重试。重试时同样是那500条,那一条仍然会失败,形成无限循环。
解决方案:
- 解析失败的消息单独处理(跳过+告警),不影响整批的成功提交
- 或者配置
BatchErrorHandler,支持对批次中失败的消息单独处理
坑5:@KafkaListener的id不唯一
两个Bean方法使用了相同的id参数,Spring Kafka启动时报错:A @KafkaListener with the id 'xxx' already exists。
规则:每个@KafkaListener的id必须全局唯一,用业务含义命名,方便通过KafkaListenerEndpointRegistry动态管理。
五、总结:Spring Kafka最佳实践清单
- 永远使用手动提交(
MANUAL_IMMEDIATE),不用自动提交 - concurrency等于Partition数(或Partition数的因数),充分利用并行
- 配置DefaultErrorHandler,设置重试次数和死信Topic
- 区分不可重试异常(格式错误)和可重试异常(网络超时),不可重试直接发DLT
- 监听死信Topic,有消费者处理最终失败的消息
- 所有消费者必须有幂等保障(见第435期)
- 消费方法内的异常必须向上抛出,不要在catch里吃掉
下一篇(第447期)讲消息队列监控体系,Lag积压、消费速率、Broker内存的完整指标,告诉你在生产环境应该监控哪些数据。
