Awaitility 实战——异步代码测试的最优雅解决方案
Awaitility 实战——异步代码测试的最优雅解决方案
适读人群:Java 后端开发者、测试工程师 | 阅读时长:约 15 分钟 | 核心价值:掌握 Awaitility 的完整用法,优雅解决所有异步代码的测试等待问题
异步代码的测试,是让很多开发者放弃写集成测试的重要原因之一。
我见过太多这样的代码:
// 糟糕的等待方式 - 硬睡 5 秒
producer.sendMessage(event);
Thread.sleep(5000); // 希望 5 秒够消费者处理完
assertThat(repository.count()).isEqualTo(1);这段代码有三个问题:
- 不稳定:网络抖动或机器慢时,5 秒不够,测试随机失败。
- 浪费时间:99% 情况下消费者 1 秒内就处理完了,但还是要等 5 秒。
- 难以诊断:测试失败时,不知道是"5 秒后还没处理完"还是"处理完了但结果不对"。
有一次我们的 CI 因为机器负载高,所有用 Thread.sleep 等待的测试全部失败了,然后开发者不断把睡眠时间从 5 秒改到 10 秒再改到 20 秒,CI 时间越来越长。这是一个恶性循环。
Awaitility 是解决这个问题的正确工具。今天这篇,把它的完整用法和各种场景的最佳实践写出来。
一、Awaitility 的核心思路
Awaitility 是"轮询等待"的优雅封装:
- 设定一个最大等待时间(超过就失败)
- 设定轮询间隔(每隔多久检查一次条件)
- 设定等待条件(什么情况下认为"完成了")
内部实现:每隔 pollInterval 检查一次条件,条件满足则立刻通过,超过 atMost 时间还不满足则断言失败。
二、依赖引入
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.1</version>
<scope>test</scope>
</dependency>// 静态导入,代码更简洁
import static org.awaitility.Awaitility.*;
import static org.awaitility.Durations.*;
import static org.hamcrest.Matchers.*;三、基础用法:从最简到最复杂
3.1 最简单的用法
// 等待直到条件满足,最多等 10 秒
await().atMost(10, TimeUnit.SECONDS).until(() -> repository.count() == 1);
// 或者更简洁
await().atMost(TEN_SECONDS).until(() -> repository.count() == 1);3.2 推荐的 untilAsserted 方式(最推荐)
await()
.atMost(Duration.ofSeconds(10))
.pollInterval(Duration.ofMillis(500))
.untilAsserted(() -> {
Order order = repository.findById(orderId).orElseThrow();
assertThat(order.getStatus()).isEqualTo(OrderStatus.COMPLETED);
assertThat(order.getPaymentId()).isNotNull();
});untilAsserted 的优势:可以用 AssertJ 或 Hamcrest 的断言,失败时有清晰的错误消息。
3.3 带 Callable 的用法
// 等待方法返回特定值
await()
.atMost(Duration.ofSeconds(10))
.until(
() -> orderService.getOrderStatus(orderId),
equalTo(OrderStatus.COMPLETED)
);四、真实业务场景测试
场景一:Kafka 消费者异步处理
@Test
void 支付完成消息_消费者处理后_订单状态更新() {
// given
Order order = createPendingOrder();
// when
kafkaTemplate.send("payment.completed", PaymentCompletedEvent.builder()
.orderId(order.getId())
.paymentId("PAY-001")
.amount(order.getTotalAmount())
.build());
// then - 优雅等待,不是 Thread.sleep
await()
.atMost(Duration.ofSeconds(15))
.pollInterval(Duration.ofMillis(200))
.untilAsserted(() -> {
Order updated = orderRepository.findById(order.getId()).orElseThrow();
// 验证多个条件
assertThat(updated.getStatus()).isEqualTo(OrderStatus.PAID);
assertThat(updated.getPaymentId()).isEqualTo("PAY-001");
assertThat(updated.getPaidAt()).isNotNull();
});
}场景二:异步发送邮件
@Test
void 注册成功后_发送欢迎邮件() {
// given - Mock 邮件服务,记录调用
List<String> sentEmails = new CopyOnWriteArrayList<>();
doAnswer(invocation -> {
sentEmails.add(invocation.getArgument(0));
return null;
}).when(emailService).sendWelcomeEmail(anyString());
// when
userService.register("newuser@example.com", "username", "password");
// then - 等待异步邮件发送完成
await()
.atMost(Duration.ofSeconds(5))
.untilAsserted(() ->
assertThat(sentEmails).containsExactly("newuser@example.com"));
}场景三:定时任务执行验证
@Test
void 定时任务_每分钟执行一次_清理过期数据() {
// given - 插入一条已过期的记录
Session expiredSession = sessionRepository.save(Session.builder()
.userId(1001L)
.token("expired-token")
.expiresAt(LocalDateTime.now().minusHours(1))
.build());
// when - 触发定时任务(或等待它自动运行)
sessionCleanupTask.cleanup(); // 手动触发,方便测试
// then
await()
.atMost(Duration.ofSeconds(5))
.untilAsserted(() ->
assertThat(sessionRepository.findById(expiredSession.getId())).isEmpty());
}场景四:WebSocket/SSE 事件接收
@Test
void WebSocket_服务端推送消息_客户端接收到() throws Exception {
List<String> receivedMessages = new CopyOnWriteArrayList<>();
// 建立 WebSocket 连接
StompSession session = connectWebSocket();
session.subscribe("/user/queue/notifications", new StompFrameHandler() {
@Override
public Type getPayloadType(StompHeaders headers) {
return String.class;
}
@Override
public void handleFrame(StompHeaders headers, Object payload) {
receivedMessages.add((String) payload);
}
});
// when - 触发服务端推送
notificationService.sendToUser(userId, "您有一条新消息");
// then
await()
.atMost(Duration.ofSeconds(5))
.untilAsserted(() ->
assertThat(receivedMessages)
.hasSize(1)
.contains("您有一条新消息"));
}五、高级用法:条件、超时、忽略异常
5.1 忽略特定异常(等待期间的异常不算失败)
await()
.atMost(Duration.ofSeconds(10))
.ignoreException(EmptyResultDataAccessException.class) // 数据还没写入时忽略这个异常
.untilAsserted(() -> {
Order order = orderRepository.findById(orderId).orElseThrow();
assertThat(order.getStatus()).isEqualTo(OrderStatus.PAID);
});5.2 poll delay(第一次检查延迟)
await()
.pollDelay(Duration.ofSeconds(2)) // 等 2 秒后才开始检查(已知处理需要 2 秒)
.atMost(Duration.ofSeconds(15))
.pollInterval(Duration.ofMillis(500))
.untilAsserted(() -> {
// ...
});5.3 自定义 Poll 间隔策略
// 斐波那契策略:先快速轮询,然后逐渐减慢(类似指数退避)
await()
.atMost(Duration.ofSeconds(30))
.pollInterval(new FibonacciPollInterval(Duration.ofMillis(100)))
.untilAsserted(() -> {
// ...
});
// 固定间隔
await()
.atMost(Duration.ofSeconds(15))
.pollInterval(Duration.ofSeconds(1))
.untilAsserted(() -> {
// ...
});5.4 全局 Awaitility 配置
// 在 @BeforeAll 或测试基类初始化里配置
@BeforeAll
static void configureAwaitility() {
Awaitility.setDefaultTimeout(Duration.ofSeconds(30));
Awaitility.setDefaultPollInterval(Duration.ofMillis(200));
Awaitility.setDefaultPollDelay(Duration.ZERO);
}六、三个踩坑实录
坑 1:until() 吞异常导致假通过
现象: 测试通过了,但实际上条件从来没有真正满足,只是 until() 在等待超时时吞掉了异常,返回了 false。
原因: until(Callable) 在 callable 抛出异常时,默认把它当作条件不满足(返回 false),继续等待,而不是立即失败。超时后才失败,但错误信息是"等待超时",而不是真正的异常信息。
解法: 优先使用 untilAsserted() 而不是 until(Callable):
// 危险:异常被吞掉
await().until(() -> {
Order order = orderRepository.findById(id).orElseThrow(); // 可能抛异常
return order.getStatus() == OrderStatus.PAID;
});
// 安全:异常会立即传播,测试立即失败并有正确的错误信息
await().untilAsserted(() -> {
Order order = orderRepository.findById(id).orElseThrow();
assertThat(order.getStatus()).isEqualTo(OrderStatus.PAID);
});坑 2:等待时间设置不合理导致 CI 随机失败
现象: 本地 CI 稳定,但生产 CI runner 负载高时,设置的等待时间不够,测试随机失败。
原因: 等待时间写死了(比如 5 秒),但 CI 机器负载高时,Kafka 消费者的处理可能延迟更久。
解法: 等待时间留足余量,同时对"应该快速完成"的场景和"可能慢"的场景区别对待:
// 快速操作(数据库读写):5 秒足够
await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {...});
// 消息队列(涉及网络):15 秒
await().atMost(Duration.ofSeconds(15)).untilAsserted(() -> {...});
// 复杂异步任务(涉及多步骤):30 秒
await().atMost(Duration.ofSeconds(30)).untilAsserted(() -> {...});
// 定义常量,统一管理
// 不要每个测试里都随意写时间坑 3:并行测试中 Awaitility 条件竞争
现象: 并行跑两个测试,每个测试都在等待 "repository.count() == 1",但实际上两个测试各自插入了数据,count 变成了 2,两个测试都永远等不到 == 1 的条件,都超时失败。
原因: 并行测试共享数据库,互相干扰了等待条件。
解法:
// 不要用 count() 这种全局条件,用基于 ID 的条件
await().untilAsserted(() -> {
// 基于具体 ID 断言,不受其他测试影响
Order order = orderRepository.findById(specificOrderId).orElseThrow();
assertThat(order.getStatus()).isEqualTo(OrderStatus.PAID);
});七、Awaitility 使用原则
1. 优先 untilAsserted 而不是 until:前者失败时给出清晰的断言错误信息。
2. 等待时间要有余量:本地测试快不代表 CI 也快。基于类型设置合理的超时。
3. 基于状态断言,不基于时间:有了 Awaitility,代码里不应该再出现 Thread.sleep。
4. 等待条件要与具体业务绑定:用 ID、唯一标识来断言,而不是全局的 count()。
Awaitility 让异步测试从"玄学"变成了"工程"。有了它,异步代码不再是集成测试的障碍。
