Java 消息队列测试实战——Kafka、RabbitMQ 消费者/生产者集成测试
Java 消息队列测试实战——Kafka、RabbitMQ 消费者/生产者集成测试
适读人群:Java 后端开发者、消息驱动架构实践者 | 阅读时长:约 17 分钟 | 核心价值:系统掌握 Kafka 和 RabbitMQ 的生产者/消费者集成测试方法,覆盖重试、死信、幂等等复杂场景
我们团队同时用了两个消息队列:Kafka 用于高吞吐的日志和事件流,RabbitMQ 用于需要精确路由和优先级的业务消息。
两个队列,测试难度不一样。Kafka 的主要挑战是异步和 offset;RabbitMQ 的主要挑战是 Exchange/Queue 的路由规则验证,以及死信队列(DLQ)的测试。
有一次,我们的 RabbitMQ 路由配置出了问题:一个 Exchange 的 binding key 写错了,导致某类消息无法路由到正确的 Queue,静默丢失。因为测试时用的是 Mock,Mock 直接调用了消费者方法,绕过了 Exchange 路由这一层,问题在测试阶段完全发现不了,直接到了生产。
那次之后,消息队列的集成测试成了我们团队的核心标准之一:Exchange、Queue、Binding 的配置,必须在真实的消息队列容器里验证。
今天这篇,把 Kafka 和 RabbitMQ 的集成测试完整方案都写出来。
一、Kafka 集成测试:完整方案
1.1 依赖配置
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>1.2 生产者测试
@SpringBootTest
@Testcontainers
class OrderEventProducerIntegrationTest {
@Container
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.5.0"));
@DynamicPropertySource
static void configureKafka(DynamicPropertyRegistry registry) {
registry.add("spring.kafka.bootstrap-servers", kafka::getBootstrapServers);
registry.add("spring.kafka.producer.value-serializer",
() -> "org.springframework.kafka.support.serializer.JsonSerializer");
}
@Autowired
private OrderEventProducer producer;
@Test
void 发布订单创建事件_消息内容正确() {
// given
OrderCreatedEvent event = OrderCreatedEvent.builder()
.orderId("ORDER-001")
.userId(1001L)
.totalAmount(new BigDecimal("299.00"))
.items(List.of(OrderItemEvent.of("PROD-100", 2, new BigDecimal("149.50"))))
.createdAt(LocalDateTime.now())
.build();
// when
producer.publishOrderCreated(event);
// then - 用原生 KafkaConsumer 验证消息内容
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-verifier-" + UUID.randomUUID());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
List<String> messages = new ArrayList<>();
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(Collections.singletonList("order.created"));
await().atMost(Duration.ofSeconds(10)).until(() -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
records.forEach(r -> messages.add(r.value()));
return !messages.isEmpty();
});
}
assertThat(messages).hasSize(1);
// 验证消息内容(JSON)
assertThatJson(messages.get(0))
.isObject()
.containsKey("orderId")
.containsKey("userId")
.containsKey("totalAmount");
}
}1.3 消费者测试(含幂等性验证)
@SpringBootTest
@Testcontainers
class InventoryUpdateConsumerIntegrationTest {
@Container
@ServiceConnection
static KafkaContainer kafka = new KafkaContainer(
DockerImageName.parse("confluentinc/cp-kafka:7.5.0"));
@Container
@ServiceConnection
static MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0.36");
@Autowired
private KafkaTemplate<String, OrderCreatedEvent> kafkaTemplate;
@Autowired
private InventoryRepository inventoryRepository;
@BeforeEach
void setUp() {
inventoryRepository.save(Inventory.builder()
.productId("PROD-100")
.stock(50)
.build());
}
@Test
void 订单创建消息_库存扣减正确() {
// given
OrderCreatedEvent event = buildOrderEvent("ORDER-001", "PROD-100", 5);
// when
kafkaTemplate.send("order.created", event.getOrderId(), event);
// then
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
Inventory inventory = inventoryRepository.findByProductId("PROD-100").orElseThrow();
assertThat(inventory.getStock()).isEqualTo(45); // 50 - 5 = 45
});
}
@Test
void 幂等性测试_同一消息重复消费_库存只扣减一次() {
// given
OrderCreatedEvent event = buildOrderEvent("ORDER-002", "PROD-100", 3);
// when - 发送同一条消息两次(模拟重试)
kafkaTemplate.send("order.created", event.getOrderId(), event);
kafkaTemplate.send("order.created", event.getOrderId(), event);
// then - 等待处理
await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> {
// 库存只应该扣减一次
Inventory inventory = inventoryRepository.findByProductId("PROD-100").orElseThrow();
assertThat(inventory.getStock()).isEqualTo(47); // 50 - 3 = 47,不是 50 - 3 - 3 = 44
});
}
private OrderCreatedEvent buildOrderEvent(String orderId, String productId, int quantity) {
return OrderCreatedEvent.builder()
.orderId(orderId)
.items(List.of(OrderItemEvent.of(productId, quantity, BigDecimal.TEN)))
.build();
}
}二、RabbitMQ 集成测试
2.1 依赖配置
<dependencies>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>rabbitmq</artifactId>
<version>1.19.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>2.2 Exchange 路由规则验证
@SpringBootTest
@Testcontainers
class NotificationRoutingIntegrationTest {
@Container
static RabbitMQContainer rabbitmq = new RabbitMQContainer("rabbitmq:3.12-management")
.withStartupTimeout(Duration.ofMinutes(2));
@DynamicPropertySource
static void configureRabbitMQ(DynamicPropertyRegistry registry) {
registry.add("spring.rabbitmq.host", rabbitmq::getHost);
registry.add("spring.rabbitmq.port", () -> rabbitmq.getMappedPort(5672));
registry.add("spring.rabbitmq.username", rabbitmq::getAdminUsername);
registry.add("spring.rabbitmq.password", rabbitmq::getAdminPassword);
}
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private AmqpAdmin amqpAdmin;
private BlockingQueue<NotificationMessage> receivedMessages;
@BeforeEach
void setUp() {
receivedMessages = new LinkedBlockingQueue<>();
// 注册测试监听器
amqpAdmin.declareQueue(new Queue("test-verify-queue", false, false, true));
}
@Test
void Topic_Exchange_路由规则_按类型路由到正确队列() {
// given - 发送不同类型的通知
NotificationMessage emailNotification = new NotificationMessage(
"user@test.com", "ORDER_COMPLETED", "您的订单已完成");
NotificationMessage smsNotification = new NotificationMessage(
"13800138000", "PAYMENT_SUCCESS", "支付成功");
NotificationMessage pushNotification = new NotificationMessage(
"device-token-001", "PROMOTION", "限时折扣开始了");
// when - 发送到 Topic Exchange,routing key 包含通知类型
rabbitTemplate.convertAndSend("notification.exchange", "email.ORDER_COMPLETED",
emailNotification);
rabbitTemplate.convertAndSend("notification.exchange", "sms.PAYMENT_SUCCESS",
smsNotification);
rabbitTemplate.convertAndSend("notification.exchange", "push.PROMOTION",
pushNotification);
// then - 验证消息路由到了正确的 Queue
// email.* -> email-notification-queue
// sms.* -> sms-notification-queue
// push.* -> push-notification-queue
await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
Integer emailQueueCount = amqpAdmin.getQueueProperties("email-notification-queue")
.entrySet().stream()
.filter(e -> e.getKey().equals("QUEUE_MESSAGE_COUNT"))
.mapToInt(e -> (Integer) e.getValue())
.findFirst()
.orElse(0);
assertThat(emailQueueCount).isEqualTo(1);
});
}
@Test
void Dead_Letter_Queue_消费失败后_消息进入DLQ() {
// given - 发送一条会触发处理失败的消息
NotificationMessage invalidMessage = new NotificationMessage(
null, // null 接收者会触发异常
"ORDER_COMPLETED",
"测试消息"
);
// when
rabbitTemplate.convertAndSend("notification.exchange", "email.ORDER_COMPLETED",
invalidMessage);
// then - 等待消息进入 DLQ
await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {
// 检查 DLQ 里有消息
Object dlqMessage = rabbitTemplate.receiveAndConvert(
"notification.dlq", 1000);
assertThat(dlqMessage).isNotNull();
});
}
}三、三个踩坑实录
坑 1:Kafka 消息顺序测试误判
现象: 测试断言消息按顺序被消费(先 A 后 B),但偶发失败,消费顺序随机。
原因: Kafka 只保证同一 Partition 内的消息有序。如果生产者发送两条消息时没有指定相同的 key,消息可能分配到不同 Partition,消费者消费顺序不确定。
解法:
// 需要保证顺序的消息,必须用相同的 key
kafkaTemplate.send("order.events", orderId, firstEvent); // 同一个 orderId
kafkaTemplate.send("order.events", orderId, secondEvent); // 同一个 orderId -> 同一个 Partition
// 或者在测试里明确指定只使用一个 Partition
@Container
static KafkaContainer kafka = new KafkaContainer(...)
.withEnv("KAFKA_NUM_PARTITIONS", "1"); // 只用一个 Partition坑 2:RabbitMQ 的 Queue/Exchange 在测试间残留
现象: 第一个测试创建了 Queue,第二个测试因为 Queue 已存在且配置冲突而失败。
原因: RabbitMQ 容器复用时,第一个测试创建的 Queue 和 Exchange 在第二个测试开始时仍然存在。
解法:
@AfterEach
void cleanupRabbitResources(@Autowired AmqpAdmin amqpAdmin) {
// 清理测试创建的 Queue 和 Exchange
amqpAdmin.deleteQueue("test-verify-queue");
amqpAdmin.deleteExchange("test-exchange");
}
// 或者使用 auto-delete=true 的 Queue(消费者断开后自动删除)
@Bean
Queue testQueue() {
return new Queue("test-queue", false, false, true); // autoDelete=true
}坑 3:消息消费的并发控制问题
现象: 生产者发送 10 条消息,消费者配置了 3 个并发消费线程,测试里断言"消息被消费了 10 次",但 await 超时。
原因: 3 个并发线程各自有自己的事务和状态,计数器(AtomicInteger)在测试类里是一个实例变量,但如果消费者 Bean 是单例,计数器共享没问题;如果有多个 Spring 上下文,就会有问题。
解法:
// 使用线程安全的计数器,并确保消费者 Bean 能访问到它
private static final CountDownLatch messageLatch = new CountDownLatch(10);
@TestConfiguration
public static class TestConfig {
@Bean
@Primary
public MessageListener testMessageListener() {
return message -> {
// 处理消息
messageLatch.countDown();
};
}
}
@Test
void 10条消息全部被消费() throws InterruptedException {
for (int i = 0; i < 10; i++) {
producer.send("message-" + i);
}
assertThat(messageLatch.await(30, TimeUnit.SECONDS)).isTrue();
}四、消息队列测试的通用模式
| 测试场景 | 测试重点 | 推荐方法 |
|---|---|---|
| 消息发送 | 消息内容、key、header 正确 | 用原生 Consumer 消费并断言 |
| 消息消费 | 业务逻辑正确执行 | 发送消息后用 await 等待并断言数据库 |
| 幂等性 | 重复消息只处理一次 | 发送相同消息 N 次,断言业务效果只有 1 次 |
| 失败重试 | 消费失败后自动重试 | 模拟异常,断言最终处理成功或进入 DLQ |
| 路由规则 | 消息路由到正确目标 | 发送消息,验证 Queue 里有消息 |
| 顺序性 | 同一实体的事件有序 | 发送多条同 key 消息,验证消费顺序 |
消息队列是微服务架构里最容易出"幽灵 Bug"的地方——出问题时表现很奇怪,日志不完整,复现困难。把常见场景的集成测试都覆盖到,是防止这类问题最有效的手段。
