Testcontainers + Kafka 实战——消息队列集成测试的正确姿势
Testcontainers + Kafka 实战——消息队列集成测试的正确姿势
适读人群:Java 后端开发者、消息驱动架构实践者 | 阅读时长:约 17 分钟 | 核心价值:用真实 Kafka 测试消息生产消费全链路,告别 Mock 掩盖的顺序和幂等问题
那是一个让我至今记忆犹新的 P0 故障。
我们的支付完成通知链路:支付服务发 Kafka 消息,订单服务消费消息更新订单状态,同时触发发货。整个链路用 Mock 测了个遍,Mock KafkaTemplate.send()、Mock @KafkaListener,一切正常,愉快上线。
上线第三天,监控告警:一批订单支付成功但发货服务没有收到消息。查日志,支付服务的 Kafka 消息发出去了,订单服务也消费了。但再往下游看,消费者在处理消息时抛了一个序列化异常,被全局异常处理器吞掉了,消息被标记为已消费,但下游发货服务完全没有被通知到。
问题出在消息体的序列化上:支付服务发的是 PaymentEvent,订单服务反序列化时用的是 OrderPaymentEvent,字段名有一个不匹配(paymentId vs payment_id,驼峰和下划线的问题)。Mock 测试里,消费者直接接收 Java 对象,序列化反序列化这一步根本没有走。
那之后,Kafka 集成测试成了我们团队的标配。今天这篇,把完整方案写出来。
一、Kafka 测试的特殊挑战
Kafka 集成测试比数据库集成测试复杂一些,原因在于它是异步的:
- 消息发送是异步的——
send()返回不代表消息被消费了 - 消费者处理有延迟——需要等待机制
- offset 管理——测试之间的消息可能互相干扰
- 序列化/反序列化——生产者和消费者的序列化配置必须一致
这些特性决定了 Kafka 集成测试必须用真实 Broker,而不能靠 Mock。
二、依赖配置
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>
<!-- Kafka 专用模块 -->
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>
<!-- Spring Boot Testcontainers -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-testcontainers</artifactId>
<scope>test</scope>
</dependency>
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<!-- Awaitility 用于异步断言 -->
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>三、Kafka 容器配置
@SpringBootTest
@Testcontainers
class KafkaBaseTest {
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.5.0"))
.withStartupTimeout(Duration.ofMinutes(3));
@DynamicPropertySource
static void configureKafka(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
// 消费者配置
registry.add("spring.kafka.consumer.auto-offset-reset", () -> "earliest");
registry.add("spring.kafka.consumer.group-id", () -> "test-group");
// 序列化配置
registry.add("spring.kafka.producer.key-serializer",
() -> "org.apache.kafka.common.serialization.StringSerializer");
registry.add("spring.kafka.producer.value-serializer",
() -> "org.springframework.kafka.support.serializer.JsonSerializer");
registry.add("spring.kafka.consumer.key-deserializer",
() -> "org.apache.kafka.common.serialization.StringDeserializer");
registry.add("spring.kafka.consumer.value-deserializer",
() -> "org.springframework.kafka.support.serializer.JsonDeserializer");
registry.add("spring.kafka.consumer.properties.spring.json.trusted.packages",
() -> "*");
}
}四、完整的消息生产消费集成测试
被测的支付事件流程:
// 事件类
@Data
@AllArgsConstructor
@NoArgsConstructor
public class PaymentCompletedEvent {
private String paymentId;
private Long orderId;
private BigDecimal amount;
private String currency;
private LocalDateTime completedAt;
}
// 生产者
@Service
@RequiredArgsConstructor
public class PaymentEventProducer {
private final KafkaTemplate<String, PaymentCompletedEvent> kafkaTemplate;
private static final String TOPIC = "payment.completed";
public void publishPaymentCompleted(PaymentCompletedEvent event) {
kafkaTemplate.send(TOPIC, event.getPaymentId(), event);
}
}
// 消费者
@Component
@RequiredArgsConstructor
@Slf4j
public class OrderPaymentConsumer {
private final OrderService orderService;
@KafkaListener(topics = "payment.completed", groupId = "order-service")
public void handlePaymentCompleted(PaymentCompletedEvent event) {
log.info("收到支付完成消息: paymentId={}, orderId={}", event.getPaymentId(), event.getOrderId());
orderService.markOrderAsPaid(event.getOrderId(), event.getPaymentId());
}
}集成测试:
@SpringBootTest
@Testcontainers
class PaymentEventIntegrationTest {
@Container
@ServiceConnection
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.5.0"));
@Autowired
private PaymentEventProducer producer;
@Autowired
private OrderRepository orderRepository;
@Test
void 支付完成事件_发布后_订单状态更新() {
// given
Order order = orderRepository.save(Order.builder()
.status(OrderStatus.PAYMENT_PENDING)
.amount(new BigDecimal("299.00"))
.build());
PaymentCompletedEvent event = new PaymentCompletedEvent(
"PAY-" + UUID.randomUUID(),
order.getId(),
new BigDecimal("299.00"),
"CNY",
LocalDateTime.now()
);
// when - 发布事件
producer.publishPaymentCompleted(event);
// then - 等待消费者处理(异步操作需要等待)
await()
.atMost(Duration.ofSeconds(10))
.pollInterval(Duration.ofMillis(500))
.untilAsserted(() -> {
Order updated = orderRepository.findById(order.getId()).orElseThrow();
assertThat(updated.getStatus()).isEqualTo(OrderStatus.PAID);
assertThat(updated.getPaymentId()).startsWith("PAY-");
});
}
@Test
void 支付事件_序列化反序列化_字段完整保留() {
// given - 这个测试专门验证序列化
PaymentCompletedEvent original = new PaymentCompletedEvent(
"PAY-12345",
1001L,
new BigDecimal("1299.00"),
"CNY",
LocalDateTime.of(2024, 1, 15, 10, 30, 0)
);
List<PaymentCompletedEvent> received = new ArrayList<>();
// 直接用 KafkaConsumer 消费,验证序列化
KafkaConsumer<String, PaymentCompletedEvent> consumer = createTestConsumer();
consumer.subscribe(Collections.singletonList("payment.completed"));
// when
producer.publishPaymentCompleted(original);
// then
await().atMost(Duration.ofSeconds(15)).until(() -> {
ConsumerRecords<String, PaymentCompletedEvent> records =
consumer.poll(Duration.ofMillis(500));
records.forEach(r -> received.add(r.value()));
return !received.isEmpty();
});
assertThat(received).hasSize(1);
PaymentCompletedEvent deserialized = received.get(0);
assertThat(deserialized.getPaymentId()).isEqualTo("PAY-12345");
assertThat(deserialized.getOrderId()).isEqualTo(1001L);
assertThat(deserialized.getAmount()).isEqualByComparingTo("1299.00");
assertThat(deserialized.getCurrency()).isEqualTo("CNY");
}
private KafkaConsumer<String, PaymentCompletedEvent> createTestConsumer() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-verification-" + UUID.randomUUID());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, PaymentCompletedEvent.class.getName());
return new KafkaConsumer<>(props);
}
}五、三个深度踩坑实录
坑 1:消费者 group id 复用导致消息被跳过
现象: 第一次跑测试正常,第二次跑同样的测试,消费者没有收到消息,await 超时。
原因: Kafka 消费者的 offset 是按 group.id 记录的。第一次测试消费了消息,offset 推进了。第二次测试同一个 group.id 启动,发现 offset 已经到最新位置,不再消费之前的消息。
解法: 每个测试使用随机 group.id,或者在 @BeforeEach 里重置 offset:
// 方式一:每次随机 group.id
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-" + UUID.randomUUID());
// 方式二:配置 auto.offset.reset=earliest(仅对新 group 生效)
// 配合随机 group.id 使用
// 方式三:在测试配置里,消费者 group.id 加随机后缀
@DynamicPropertySource
static void config(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.consumer.group-id",
() -> "test-group-" + System.currentTimeMillis());
}坑 2:事务消息在测试环境下消费者收不到
现象: 生产者开启了事务(spring.kafka.producer.transaction-id-prefix),但测试里消费者始终收不到消息,await 超时。
原因: Kafka 事务消息默认的消费隔离级别是 read_committed,消费者只能看到已提交的事务。测试中如果 @Transactional 管理的事务还没提交(测试方法结束后才提交),消费者在 await 等待期间永远看不到消息。
解法:
// 方式一:把消息发送放在 @Transactional 方法之外
// 方式二:在测试里显式提交
@Test
void 事务消息测试() {
// 手动管理事务,确保消息在 await 开始前已发送并提交
transactionTemplate.execute(status -> {
producer.publishPaymentCompleted(event);
return null;
}); // 事务在这里提交
// 此时再 await
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
// 断言
});
}
// 方式三:消费者隔离级别改为 read_uncommitted(仅测试环境)
registry.add("spring.kafka.consumer.properties.isolation.level",
() -> "read_uncommitted");坑 3:Kafka 容器和 Zookeeper 版本不兼容
现象: 使用 confluentinc/cp-kafka:6.x 启动报错,容器日志显示 ZooKeeper 连接失败。
原因: 较新版的 Testcontainers Kafka 模块会自动处理 KRaft 模式(无 ZooKeeper),但如果手动指定老版本镜像,可能有兼容性问题。
解法: 统一使用 confluentinc/cp-kafka:7.5.0 以上版本,Testcontainers 1.19+ 支持 KRaft 模式:
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.5.0"))
// KRaft 模式,不需要 ZooKeeper
.withKraft();六、Dead Letter Queue 测试
消费者处理失败时消息的去向,也需要集成测试验证:
@Test
void 消费者处理异常_消息进入DLQ() {
// given - 发送一个会触发处理失败的消息
PaymentCompletedEvent badEvent = new PaymentCompletedEvent(
"PAY-INVALID",
-1L, // 不存在的订单 ID,消费者会抛异常
BigDecimal.ZERO,
null, // null currency 会触发 NPE
LocalDateTime.now()
);
// when
producer.publishPaymentCompleted(badEvent);
// then - 等待消息进入 DLQ(Dead Letter Topic)
KafkaConsumer<String, String> dlqConsumer = createDLQConsumer();
dlqConsumer.subscribe(Collections.singletonList("payment.completed.DLT"));
await().atMost(Duration.ofSeconds(30)).until(() -> {
ConsumerRecords<String, String> records = dlqConsumer.poll(Duration.ofMillis(500));
return !records.isEmpty();
});
}七、性能测试:大批量消息
@Test
void 批量消息发送_1000条_全部被消费() {
int messageCount = 1000;
CountDownLatch latch = new CountDownLatch(messageCount);
// 注册计数监听器
kafkaListenerEndpointRegistry.getListenerContainer("paymentListener")
.setBatchErrorHandler((e, data) -> latch.countDown());
// 发送 1000 条消息
for (int i = 0; i < messageCount; i++) {
producer.publishPaymentCompleted(new PaymentCompletedEvent(
"PAY-" + i, (long) i, BigDecimal.ONE, "CNY", LocalDateTime.now()));
}
// 等待全部消费完毕
assertThat(latch.await(60, TimeUnit.SECONDS)).isTrue();
}Kafka 集成测试的投入比数据库测试高,但回报也更高。那次序列化导致的 P0 故障之后,我们从没再因为消息体格式问题上线出过问题。
