Java 并发代码测试实战——多线程测试工具、竞争条件检测、Awaitility
Java 并发代码测试实战——多线程测试工具、竞争条件检测、Awaitility
适读人群:写过多线程代码但不知道怎么测试、被随机出现的并发 Bug 折磨过的 Java 工程师 | 阅读时长:约14分钟 | 核心价值:掌握并发代码测试的核心工具和方法,让多线程 Bug 在测试阶段暴露
那个只在高并发下才出现的 Bug
2021年,我们的秒杀系统在某个活动开始后的第一秒出现了超卖:数据库里显示库存卖出了 115 件,但实际库存只有 100 件。
问题代码很"经典":
@Service
public class FlashSaleService {
private int stock = 100; // 实例变量,没有加锁
public boolean purchase(Long userId, Long productId) {
if (stock > 0) { // check
stock--; // act(check 和 act 之间没有原子保证)
createOrder(userId, productId);
return true;
}
return false;
}
}Check-Then-Act 问题,没有原子性保证。100 个线程同时读到 stock > 0,都执行了 stock--。
更让人沮丧的是:这个代码在单线程测试下完全正常。你测 10 次,买 10 次,库存从 100 变成 90,一切正常。只有真正并发时才暴露。
并发 Bug 就是这么隐蔽。这篇文章讲怎么测出来它。
并发测试的核心挑战
并发测试有三个核心难题:
- 不确定性:并发 Bug 依赖线程调度顺序,很难稳定复现
- 原子性验证困难:传统断言是单线程的,无法直接验证并发场景
- 竞争窗口太小:有些竞争条件的窗口只有几纳秒,测试很难命中
基础方案:CountDownLatch + 多线程
最基础的方法:手动创建多个线程并发执行,用 CountDownLatch 同步。
@Test
void testFlashSale_shouldNotOversell() throws InterruptedException {
FlashSaleService service = new FlashSaleService(100); // 库存 100
int threadCount = 200; // 200 个线程同时抢购
CountDownLatch startLatch = new CountDownLatch(1); // 起跑枪
CountDownLatch doneLatch = new CountDownLatch(threadCount); // 等所有线程完成
AtomicInteger successCount = new AtomicInteger(0);
AtomicInteger failCount = new AtomicInteger(0);
for (int i = 0; i < threadCount; i++) {
final long userId = i + 1;
new Thread(() -> {
try {
startLatch.await(); // 所有线程等待,同时开始,增加竞争
boolean result = service.purchase(userId, 100L);
if (result) successCount.incrementAndGet();
else failCount.incrementAndGet();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
doneLatch.countDown();
}
}).start();
}
startLatch.countDown(); // 发令枪,所有线程同时开始
doneLatch.await(5, TimeUnit.SECONDS); // 等待所有线程完成
// 关键断言:成功购买数 <= 初始库存
assertTrue(successCount.get() <= 100,
"超卖了!成功购买数:" + successCount.get() + ",库存:100");
assertEquals(100, successCount.get() + failCount.get() > 0 ?
successCount.get() + failCount.get() : threadCount);
assertEquals(0, service.getStock(), "库存应该卖完");
}这个测试会稳定暴露前面那段有 Bug 的代码:successCount 会大于 100。
ExecutorService:更优雅的并发测试
@Test
void testConcurrentIncrement_atomicCounter_shouldBeThreadSafe() throws Exception {
AtomicInteger counter = new AtomicInteger(0);
int threadCount = 100;
int incrementPerThread = 1000;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
List<Future<Void>> futures = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
futures.add(executor.submit(() -> {
for (int j = 0; j < incrementPerThread; j++) {
counter.incrementAndGet();
}
return null;
}));
}
// 等待所有任务完成,捕获可能的异常
for (Future<Void> future : futures) {
future.get(10, TimeUnit.SECONDS);
}
executor.shutdown();
assertTrue(executor.awaitTermination(5, TimeUnit.SECONDS));
assertEquals(threadCount * incrementPerThread, counter.get(),
"原子计数器应该是线程安全的");
}
@Test
void testNonAtomicCounter_shouldShowRaceCondition() throws Exception {
// 这个测试预期会失败(用来验证非线程安全的代码确实有问题)
final int[] unsafeCounter = {0}; // 非原子操作
int threadCount = 100;
int incrementPerThread = 1000;
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
List<Future<Void>> futures = new ArrayList<>();
for (int i = 0; i < threadCount; i++) {
futures.add(executor.submit(() -> {
for (int j = 0; j < incrementPerThread; j++) {
unsafeCounter[0]++; // 非线程安全!
}
return null;
}));
}
for (Future<Void> future : futures) {
future.get(10, TimeUnit.SECONDS);
}
executor.shutdown();
// 非原子操作的计数器,在并发下结果应该小于预期值
assertTrue(unsafeCounter[0] < threadCount * incrementPerThread,
"非线程安全的计数器在并发下应该丢失更新");
}Awaitility:优雅处理异步断言
Awaitility 是专门处理"异步代码测试"的工具。当你的代码在另一个线程里异步完成某个操作,需要等待才能断言时,它比 Thread.sleep() 优雅得多。
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.2.0</version>
<scope>test</scope>
</dependency>基础用法:替换 Thread.sleep
// 不好的写法:用 sleep 等待异步操作完成
@Test
void testAsyncEmailSending_bad() throws Exception {
emailService.sendAsync("user@test.com", "Welcome");
Thread.sleep(3000); // 等 3 秒,希望邮件发完了……
verify(emailSender).send(eq("user@test.com"), any());
}
// 好的写法:用 Awaitility 轮询等待
@Test
void testAsyncEmailSending_good() {
emailService.sendAsync("user@test.com", "Welcome");
await()
.atMost(5, TimeUnit.SECONDS) // 最多等 5 秒
.pollInterval(100, TimeUnit.MILLISECONDS) // 每 100ms 检查一次
.untilAsserted(() -> {
verify(emailSender).send(eq("user@test.com"), any());
});
}等待状态变化
@Test
void testOrderProcessing_shouldEventuallyBePaid() {
Long orderId = orderService.createOrder(request).getOrderId();
// 异步支付处理,轮询等待订单状态变为 PAID
await()
.atMost(10, TimeUnit.SECONDS)
.pollDelay(500, TimeUnit.MILLISECONDS) // 初始延迟 500ms 后开始轮询
.pollInterval(200, TimeUnit.MILLISECONDS)
.until(() -> {
Order order = orderRepository.findById(orderId).orElseThrow();
return OrderStatus.PAID == order.getStatus();
});
Order finalOrder = orderRepository.findById(orderId).orElseThrow();
assertEquals(OrderStatus.PAID, finalOrder.getStatus());
assertNotNull(finalOrder.getPaymentTime());
}等待消息队列消费
@Test
void testOrderCreated_eventShouldBeConsumed() {
AtomicBoolean eventConsumed = new AtomicBoolean(false);
// 注册一个测试监听器
testEventListener.onOrderCreated(event -> eventConsumed.set(true));
orderService.createOrder(request);
// 等待消息消费完成
await()
.atMost(Duration.ofSeconds(5))
.untilTrue(eventConsumed);
}使用 CyclicBarrier 精准控制竞争
CyclicBarrier 可以让多个线程精确地在同一时刻执行某段代码,最大化竞争概率:
@Test
void testDoubleSpend_shouldBePreventedByConcurrencyControl() throws Exception {
Long accountId = createAccountWithBalance(new BigDecimal("100"));
int concurrentWithdrawals = 5;
BigDecimal withdrawAmount = new BigDecimal("80"); // 每次取 80,5 个同时取,只有 1 个应该成功
CyclicBarrier barrier = new CyclicBarrier(concurrentWithdrawals);
CountDownLatch done = new CountDownLatch(concurrentWithdrawals);
AtomicInteger successCount = new AtomicInteger(0);
for (int i = 0; i < concurrentWithdrawals; i++) {
new Thread(() -> {
try {
barrier.await(); // 所有线程在这里等齐,然后同时冲
boolean success = accountService.withdraw(accountId, withdrawAmount);
if (success) successCount.incrementAndGet();
} catch (Exception e) {
// 并发异常,记录但继续
} finally {
done.countDown();
}
}).start();
}
done.await(10, TimeUnit.SECONDS);
// 只应该有一笔取款成功(余额 100,取 80,只够取一次)
assertEquals(1, successCount.get(),
"并发取款:只应该有1次成功,实际成功了:" + successCount.get());
// 余额应该是 20(被取了一次 80)
BigDecimal finalBalance = accountService.getBalance(accountId);
assertEquals(0, new BigDecimal("20").compareTo(finalBalance));
}踩坑实录三则
踩坑一:并发测试里的断言失败信息不准确
现象:多线程测试失败,但 JUnit 的错误信息只显示最后一个断言失败,不显示是哪个线程的操作导致了问题。
原因:子线程里的 AssertionError 不会自动传播到主线程,会被 catch 掉或者变成线程未捕获的异常。
解法:把子线程里的异常收集起来,在主线程统一检查:
List<Throwable> errors = Collections.synchronizedList(new ArrayList<>());
for (int i = 0; i < threadCount; i++) {
executor.submit(() -> {
try {
// 执行测试逻辑
} catch (Throwable t) {
errors.add(t); // 收集所有异常
}
});
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
// 主线程检查
assertTrue(errors.isEmpty(),
"并发测试失败,错误:" + errors.stream()
.map(Throwable::getMessage)
.collect(Collectors.joining(", ")));踩坑二:Awaitility 超时时,错误信息不清楚是哪个条件没满足
现象:await().atMost(5, SECONDS).until(...) 超时,但报错信息只说"Condition was not fulfilled in 5 seconds",不知道条件值是什么。
解法:用 untilAsserted() 代替 until(),可以提供更清楚的失败信息:
await()
.atMost(5, TimeUnit.SECONDS)
.untilAsserted(() -> {
Order order = orderRepository.findById(orderId).orElseThrow();
// AssertJ 的断言失败信息更详细
assertThat(order.getStatus())
.as("订单[%d]状态应该是PAID,实际是%s", orderId, order.getStatus())
.isEqualTo(OrderStatus.PAID);
});踩坑三:测试多次运行才失败(间歇性失败)
现象:并发测试有时通过有时失败,不是每次都能复现。
原因:这实际上说明你的测试是有效的——它真的发现了并发问题,只是竞争窗口不是每次都被命中。
解法:不要因为"有时候通过"就认为代码没问题。间歇性失败意味着存在真实的竞争条件。要么增加并发度(更多线程),要么使用线程模型工具(如 jcstress)来更系统地测试。
也可以用 @RepeatedTest(100) 多次运行,增加暴露概率:
@RepeatedTest(100) // 重复 100 次,捕获间歇性并发 Bug
void testConcurrentOperation() {
// 并发测试逻辑
}并发代码测试是 Java 测试里最难的部分。没有银弹——多线程 Bug 的本质是调度不确定性。但通过 CountDownLatch 控制并发时机、Awaitility 处理异步等待、CyclicBarrier 最大化竞争窗口,可以把大多数并发 Bug 从生产环境赶到测试阶段。
这比在凌晨接到告警、排查超卖 Bug 要好多了。
