Spring Boot 异步任务深度实战——@Async 到线程池到任务编排全套方案
Spring Boot 异步任务深度实战——@Async 到线程池到任务编排全套方案
适读人群:需要处理异步任务、提升接口响应速度的 Java 工程师 | 阅读时长:约18分钟 | 核心价值:掌握 @Async 原理、线程池配置调优、CompletableFuture 任务编排的完整实战方案
一、某同学的接口超时事故
某同学在做一个用户注册接口,需求很简单:注册成功后发欢迎短信、发邮件、记录注册日志。他的实现是同步串行的:DB 写用户 → 发短信(800ms)→ 发邮件(600ms)→ 写日志(200ms),整个接口耗时 1600ms 以上,产品经理催了三次说"注册接口太慢了"。
他找我,问能不能优化。我说:"你这个发短信、发邮件、写日志,都不影响注册结果,为什么要让用户等着这些操作完成?"
他说:"那我用多线程?"
我说:"用 @Async,Spring 帮你管线程池,别自己 new Thread。"
结果他加了 @Async 之后踩了一个坑:接口确实快了,但短信发送失败了也没有任何日志,因为异常被线程池吞掉了。再后来,他配置线程池的时候又踩了坑:用了 Spring 默认的 SimpleAsyncTaskExecutor,每次都 new 一个新线程,高并发时线程数暴涨,最终 OOM。
这两个坑,是 @Async 的经典入门级陷阱。这篇文章把从 @Async 到线程池配置到任务编排的完整方案给你。
二、@Async 的正确打开方式
2.1 开启异步支持
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication
@EnableAsync // 开启异步支持,必须加这个注解
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}2.2 配置自定义线程池(重点,不能用默认的)
Spring Boot 默认的 SimpleAsyncTaskExecutor 不是线程池,它每次提交任务都创建新线程,不做复用。高并发时会把系统线程资源耗尽,生产环境绝对不能用这个默认配置。
package com.example.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 异步线程池配置。
* 实现 AsyncConfigurer 并重写 getAsyncExecutor(),
* 让所有 @Async 方法默认使用此线程池。
*/
@Configuration
public class AsyncConfig implements AsyncConfigurer {
/**
* 默认异步线程池,适用于轻量 IO 类任务(发短信、发邮件、写日志等)。
* 核心线程数 10,最大线程数 50,队列容量 200。
* 当队列满且线程达到最大值时,使用 CallerRunsPolicy 由调用方线程执行,
* 起到背压作用,不会丢任务。
*/
@Override
@Bean("defaultAsyncExecutor")
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10); // 核心线程数
executor.setMaxPoolSize(50); // 最大线程数
executor.setQueueCapacity(200); // 任务队列容量
executor.setKeepAliveSeconds(60); // 空闲线程存活时间(秒)
executor.setThreadNamePrefix("async-default-"); // 线程名前缀,方便排查
// 拒绝策略:CallerRunsPolicy 让提交任务的线程自己执行,起背压效果
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true); // 优雅关闭
executor.setAwaitTerminationSeconds(30); // 最多等 30 秒
executor.initialize();
return executor;
}
/**
* 重量级任务线程池(比如发送批量报表、大文件处理),
* 与默认池隔离,避免抢占轻量任务的资源。
*/
@Bean("heavyTaskExecutor")
public Executor heavyTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(50);
executor.setThreadNamePrefix("async-heavy-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
}2.3 使用 @Async
package com.example.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@Service
public class NotificationService {
private static final Logger log = LoggerFactory.getLogger(NotificationService.class);
/**
* 发送欢迎短信,使用默认异步线程池。
* 注意:@Async 方法必须是 public,且不能在同类中直接调用
*(Spring AOP 代理机制限制,同类内调用会绕过代理)。
*/
@Async("defaultAsyncExecutor")
public void sendWelcomeSms(String phone) {
try {
log.info("[Async] 发送短信开始。phone={}, thread={}", phone,
Thread.currentThread().getName());
// 模拟短信发送耗时 800ms
Thread.sleep(800);
log.info("[Async] 发送短信完成。phone={}", phone);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("[Async] 发送短信被中断。phone={}", phone, e);
} catch (Exception e) {
// 异步方法里一定要捕获异常并记录,否则异常会被吞掉!
log.error("[Async] 发送短信失败。phone={}", phone, e);
}
}
/**
* 发送欢迎邮件,返回 Future 可以追踪执行状态。
*/
@Async("defaultAsyncExecutor")
public java.util.concurrent.Future<Boolean> sendWelcomeEmail(String email) {
try {
Thread.sleep(600);
log.info("[Async] 发送邮件完成。email={}", email);
return new org.springframework.scheduling.annotation.AsyncResult<>(true);
} catch (Exception e) {
log.error("[Async] 发送邮件失败。email={}", email, e);
return new org.springframework.scheduling.annotation.AsyncResult<>(false);
}
}
}三、CompletableFuture 任务编排
当异步任务之间有依赖关系(串行、并行、聚合),用 @Async 已经不够,需要 CompletableFuture。
3.1 场景:用户画像聚合——并行查三个数据源
package com.example.service;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
/**
* 用户画像聚合服务。
* 查询用户画像需要聚合:基本信息(DB)+ 行为标签(Redis)+ 推荐数据(算法服务)。
* 三个来源完全独立,并行查询可以把总耗时从串行的 ~500ms 降到 ~200ms(最慢的那个)。
*/
@Service
public class UserProfileService {
private static final Logger log = LoggerFactory.getLogger(UserProfileService.class);
private final Executor defaultAsyncExecutor;
public UserProfileService(Executor defaultAsyncExecutor) {
this.defaultAsyncExecutor = defaultAsyncExecutor;
}
public UserProfileVO getUserProfile(Long userId) {
long start = System.currentTimeMillis();
// 并行启动三个异步查询
CompletableFuture<UserBaseInfo> baseFuture = CompletableFuture.supplyAsync(
() -> queryBaseInfo(userId), defaultAsyncExecutor);
CompletableFuture<UserTagInfo> tagFuture = CompletableFuture.supplyAsync(
() -> queryTagInfo(userId), defaultAsyncExecutor);
CompletableFuture<UserRecommendInfo> recommendFuture = CompletableFuture.supplyAsync(
() -> queryRecommendInfo(userId), defaultAsyncExecutor);
// 等待全部完成,聚合结果
CompletableFuture<UserProfileVO> profileFuture = CompletableFuture.allOf(
baseFuture, tagFuture, recommendFuture
).thenApply(v -> {
UserBaseInfo base = baseFuture.join();
UserTagInfo tags = tagFuture.join();
UserRecommendInfo recommend = recommendFuture.join();
UserProfileVO vo = new UserProfileVO();
vo.setUserId(userId);
vo.setBaseInfo(base);
vo.setTagInfo(tags);
vo.setRecommendInfo(recommend);
return vo;
});
try {
// 超时控制:最多等 2 秒,超时返回降级结果
UserProfileVO result = profileFuture.get(2, java.util.concurrent.TimeUnit.SECONDS);
log.info("[Profile] 用户画像查询完成。userId={}, cost={}ms",
userId, System.currentTimeMillis() - start);
return result;
} catch (java.util.concurrent.TimeoutException e) {
log.warn("[Profile] 用户画像查询超时,返回降级结果。userId={}", userId);
return buildFallbackProfile(userId);
} catch (Exception e) {
log.error("[Profile] 用户画像查询异常。userId={}", userId, e);
return buildFallbackProfile(userId);
}
}
private UserBaseInfo queryBaseInfo(Long userId) {
// 模拟查 DB,耗时 ~150ms
return new UserBaseInfo();
}
private UserTagInfo queryTagInfo(Long userId) {
// 模拟查 Redis,耗时 ~80ms
return new UserTagInfo();
}
private UserRecommendInfo queryRecommendInfo(Long userId) {
// 模拟调推荐服务,耗时 ~200ms
return new UserRecommendInfo();
}
private UserProfileVO buildFallbackProfile(Long userId) {
UserProfileVO vo = new UserProfileVO();
vo.setUserId(userId);
return vo;
}
// 内部 VO 类(简化定义)
public static class UserProfileVO {
private Long userId;
private UserBaseInfo baseInfo;
private UserTagInfo tagInfo;
private UserRecommendInfo recommendInfo;
public void setUserId(Long userId) { this.userId = userId; }
public void setBaseInfo(UserBaseInfo baseInfo) { this.baseInfo = baseInfo; }
public void setTagInfo(UserTagInfo tagInfo) { this.tagInfo = tagInfo; }
public void setRecommendInfo(UserRecommendInfo recommendInfo) { this.recommendInfo = recommendInfo; }
public Long getUserId() { return userId; }
}
public static class UserBaseInfo {}
public static class UserTagInfo {}
public static class UserRecommendInfo {}
}3.2 场景:有依赖关系的串行任务编排
/**
* 下单流程:先创建订单 → 扣减库存 → 发送通知(可并行)
* 订单 ID 是后续步骤的依赖,所以创建订单必须先完成。
*/
public void placeOrder(OrderRequest request) {
CompletableFuture.supplyAsync(() -> createOrder(request), defaultAsyncExecutor)
.thenApplyAsync(orderId -> {
// 依赖 orderId,扣减库存
deductInventory(orderId, request.getSkuId(), request.getQuantity());
return orderId;
}, defaultAsyncExecutor)
.thenAcceptAsync(orderId -> {
// 发送通知(短信+邮件并行)
CompletableFuture.allOf(
CompletableFuture.runAsync(() -> sendOrderSms(orderId), defaultAsyncExecutor),
CompletableFuture.runAsync(() -> sendOrderEmail(orderId), defaultAsyncExecutor)
).join();
}, defaultAsyncExecutor)
.exceptionally(ex -> {
log.error("[Order] 下单流程异常", ex);
// 触发补偿逻辑
return null;
});
}四、踩坑实录
坑1:@Async 同类内部调用不生效
现象:Service A 的方法 methodA 调用了本类的 methodB,methodB 加了 @Async,但 methodB 明明在主线程里执行。
原因:Spring @Async 基于 AOP 代理实现,同类内部调用绕过了代理,直接调用了 this.methodB(),不走代理,@Async 失效。这个坑我也踩过,调试了半天。
解法:把 methodB 抽到另一个 Spring Bean 里,或者在本类里注入自身代理(不推荐),或者用 AopContext.currentProxy() 获取代理对象再调用。推荐方案是独立 Bean。
坑2:异步异常被吞,无任何日志
现象:异步任务里的业务异常从来没有报警,但数据却是错的。
原因:异步任务抛出的异常不会传播到调用方(调用方已经返回了),默认情况下 Spring 只打一行 WARN 日志,如果日志级别配置不当,直接丢失。
解法:为 AsyncConfigurer 实现 getAsyncUncaughtExceptionHandler():
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (throwable, method, params) -> {
log.error("[Async] 异步任务执行异常。method={}, params={}",
method.getName(), Arrays.toString(params), throwable);
// 可以在这里发告警通知
};
}坑3:线程池队列打满,任务开始被拒绝
现象:高峰期出现 RejectedExecutionException,部分异步通知丢失。
原因:任务提交速度超过了线程池处理速度,队列满了,触发拒绝策略 AbortPolicy 直接抛异常丢弃任务。
解法:改用 CallerRunsPolicy,让调用方线程降速执行(背压效果),不会丢任务。同时增加核心线程数和队列容量,并对任务量做监控告警,在队列水位达到 70% 时预警。
五、线程池参数调优建议
我自己总结的参数调优规则:
| 任务类型 | 核心线程数 | 最大线程数 | 队列容量 |
|---|---|---|---|
| IO 密集型(网络调用、DB) | CPU核数 × 2 | CPU核数 × 4 | 200~500 |
| CPU 密集型(计算、加解密) | CPU核数 | CPU核数 + 1 | 50~100 |
| 轻量通知类(短信、日志) | 10~20 | 50 | 200~500 |
一个 4 核服务器,IO 密集型任务建议:核心线程 8,最大线程 16,队列 200。
监控线程池:使用 ThreadPoolTaskExecutor 的 getThreadPoolExecutor() 可以获取到底层的 ThreadPoolExecutor,从而获取活跃线程数、队列大小等指标,集成到 Micrometer 或 Prometheus 做实时监控。
