AbstractRoutingDataSource多数据源:动态切换的原理与AOP实现
AbstractRoutingDataSource多数据源:动态切换的原理与AOP实现
适读人群:有读写分离或多数据源需求的Java后端工程师 | 阅读时长:约17分钟
开篇故事
做读写分离是很多中型系统的必经之路。我之前负责的一个项目,随着业务增长,MySQL主库压力越来越大,大量的查询请求和写请求混在一起,慢查询越来越多。
DBA给我们配了主从复制,从库专门用来读,但代码层面怎么让读操作自动走从库?最粗暴的方式是每个Service方法里手动指定用哪个DataSource,但这样改动量巨大,而且逻辑和基础设施耦合,非常难维护。
后来我们用了AbstractRoutingDataSource+AOP的方案,只加了一个注解,读操作自动路由到从库,写操作走主库,核心业务代码零改动。
今天我们把这套方案从源码到实现完整讲清楚。
一、AbstractRoutingDataSource的设计思想
AbstractRoutingDataSource是Spring JDBC提供的一个抽象类,它本身实现了DataSource接口,但实际的数据库连接获取会委托给"目标DataSource"。选择哪个目标DataSource,由子类实现的determineCurrentLookupKey()方法决定。
// spring-jdbc/src/main/java/org/springframework/jdbc/datasource/lookup/
// AbstractRoutingDataSource.java
public abstract class AbstractRoutingDataSource extends AbstractDataSource
implements InitializingBean {
// 所有目标DataSource的映射:key -> DataSource
@Nullable
private Map<Object, Object> targetDataSources;
// 默认DataSource(当key找不到匹配时用这个)
@Nullable
private Object defaultTargetDataSource;
// 解析后的DataSource映射(key -> DataSource,key已解析)
@Nullable
private Map<Object, DataSource> resolvedDataSources;
// 核心方法:子类实现,返回当前应该用哪个DataSource的key
@Nullable
protected abstract Object determineCurrentLookupKey();
// getConnection时调用
@Override
public Connection getConnection() throws SQLException {
return determineTargetDataSource().getConnection();
}
// 根据key选择DataSource
protected DataSource determineTargetDataSource() {
Assert.notNull(this.resolvedDataSources,
"DataSource router not initialized");
Object lookupKey = determineCurrentLookupKey();
DataSource dataSource = this.resolvedDataSources.get(lookupKey);
if (dataSource == null &&
(this.lenientFallback || lookupKey == null)) {
dataSource = this.resolvedDefaultDataSource;
}
if (dataSource == null) {
throw new IllegalStateException("Cannot determine target DataSource for lookup key ["
+ lookupKey + "]");
}
return dataSource;
}
}二、源码核心路径解析
2.1 完整的读写分离架构
2.2 ThreadLocal做Key存储
关键设计:用ThreadLocal存储当前线程的数据源选择,在AOP切面里设置,在determineCurrentLookupKey()里读取。
// 数据源上下文持有者
public class DynamicDataSourceContext {
private static final ThreadLocal<String> CONTEXT = new InheritableThreadLocal<>();
public static final String MASTER = "master";
public static final String SLAVE = "slave";
public static void set(String dataSourceKey) {
CONTEXT.set(dataSourceKey);
}
public static String get() {
return CONTEXT.get();
}
public static void clear() {
CONTEXT.remove(); // 防止内存泄漏,必须清理!
}
public static boolean isMaster() {
String current = CONTEXT.get();
return current == null || MASTER.equals(current);
}
}注意用InheritableThreadLocal而不是普通ThreadLocal,子线程可以继承父线程的上下文(在@Async场景下有用)。
2.3 动态数据源实现
public class DynamicDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
String key = DynamicDataSourceContext.get();
log.debug("Current datasource key: {}", key);
return key; // 返回null时使用defaultTargetDataSource
}
}2.4 AOP切面的执行顺序问题
这是最容易踩坑的地方:AOP切面必须在@Transactional开始之前执行,否则事务已经绑定了主库连接,切换到从库也没用。
三、完整代码示例
3.1 完整的多数据源配置和AOP实现
// 1. 数据源路由注解
@Target({ElementType.METHOD, ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataSource {
String value() default DynamicDataSourceContext.MASTER;
}
// 2. AOP切面(Order必须小于事务切面的Order)
@Aspect
@Component
@Order(1) // 必须比@Transactional的Order小(事务默认Order=Integer.MAX_VALUE)
public class DynamicDataSourceAspect {
// 拦截有@DataSource注解的方法
@Around("@annotation(dataSource) || @within(dataSource)")
public Object around(ProceedingJoinPoint pjp, DataSource dataSource) throws Throwable {
String key = dataSource.value();
String previous = DynamicDataSourceContext.get();
try {
DynamicDataSourceContext.set(key);
return pjp.proceed();
} finally {
// 恢复之前的数据源(支持嵌套切换)
if (previous != null) {
DynamicDataSourceContext.set(previous);
} else {
DynamicDataSourceContext.clear();
}
}
}
// 基于方法名自动判断读写(不需要手动加注解)
@Around("@within(org.springframework.stereotype.Service) && " +
"!@annotation(DataSource)") // 没有显式指定数据源的Service方法
public Object autoRoute(ProceedingJoinPoint pjp) throws Throwable {
String methodName = pjp.getSignature().getName();
// 根据方法名前缀判断读写
boolean isRead = methodName.startsWith("get") ||
methodName.startsWith("find") ||
methodName.startsWith("query") ||
methodName.startsWith("list") ||
methodName.startsWith("count") ||
methodName.startsWith("exists");
String key = isRead ? DynamicDataSourceContext.SLAVE :
DynamicDataSourceContext.MASTER;
String previous = DynamicDataSourceContext.get();
try {
DynamicDataSourceContext.set(key);
return pjp.proceed();
} finally {
if (previous != null) {
DynamicDataSourceContext.set(previous);
} else {
DynamicDataSourceContext.clear();
}
}
}
}
// 3. 数据源配置
@Configuration
public class DynamicDataSourceConfig {
@Bean
@ConfigurationProperties(prefix = "spring.datasource.master")
public DataSource masterDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
@ConfigurationProperties(prefix = "spring.datasource.slave")
public DataSource slaveDataSource() {
return DataSourceBuilder.create().build();
}
@Bean
@Primary // 主DataSource,Spring默认注入这个
public DataSource dynamicDataSource(
@Qualifier("masterDataSource") DataSource masterDataSource,
@Qualifier("slaveDataSource") DataSource slaveDataSource) {
Map<Object, Object> targetDataSources = new HashMap<>();
targetDataSources.put(DynamicDataSourceContext.MASTER, masterDataSource);
targetDataSources.put(DynamicDataSourceContext.SLAVE, slaveDataSource);
DynamicDataSource dynamicDataSource = new DynamicDataSource();
dynamicDataSource.setTargetDataSources(targetDataSources);
dynamicDataSource.setDefaultTargetDataSource(masterDataSource); // 默认主库
return dynamicDataSource;
}
// 确保JPA/MyBatis等使用动态DataSource
@Bean
public PlatformTransactionManager transactionManager(DataSource dynamicDataSource) {
return new DataSourceTransactionManager(dynamicDataSource);
}
}
// 4. application.yml
// spring:
// datasource:
// master:
// url: jdbc:mysql://master:3306/mydb
// username: root
// password: master_pass
// driver-class-name: com.mysql.cj.jdbc.Driver
// slave:
// url: jdbc:mysql://slave:3306/mydb
// username: root
// password: slave_pass
// driver-class-name: com.mysql.cj.jdbc.Driver3.2 使用示例
@Service
public class UserService {
@Autowired
private UserRepository userRepository;
// 显式指定走从库(读操作)
@DataSource(DynamicDataSourceContext.SLAVE)
@Transactional(readOnly = true)
public List<User> findAllUsers() {
return userRepository.findAll();
}
// 自动路由:方法名以find开头,自动走从库
public Optional<User> findById(Long id) {
return userRepository.findById(id);
}
// 写操作,自动走主库(方法名没有find/get前缀)
@Transactional
public User createUser(CreateUserRequest request) {
return userRepository.save(User.from(request));
}
// 显式指定主库(强制读主库,避免主从延迟问题)
@DataSource(DynamicDataSourceContext.MASTER)
public User findByIdFromMaster(Long id) {
return userRepository.findById(id)
.orElseThrow(() -> new EntityNotFoundException("User " + id));
}
}四、踩坑实录
坑1:事务中切换数据源无效
现象:在已经开启事务的方法里调用@DataSource(SLAVE),数据源没有切换。
根因:@Transactional一旦开启事务,就会通过DataSourceUtils.getConnection()获取并绑定连接到当前事务,后续在同一事务中所有操作都用这个连接,不管ThreadLocal怎么变。
解决:确保数据源切换发生在事务开启之前(AOP Order要更小)。已经在事务中了就无法切换。
坑2:从库的主从延迟导致数据不一致
现象:写入主库后立即从从库查询,读到的是旧数据。
根因:MySQL主从复制有延迟(通常几毫秒到几秒),刚写入主库的数据在从库上还没同步。
解决:写操作完成后的查询(尤其是"查询刚创建的记录"),强制走主库:
@Transactional
public User createAndReturn(CreateUserRequest request) {
User user = userRepository.save(User.from(request));
// 不要立即从从库查,直接用刚保存的对象返回
return user;
}或者加上强制主库标记:
// 在ThreadLocal里加一个"强制主库"的标记,写操作完成后的一段时间内走主库
// (更复杂的实现,适合延迟容忍度很低的场景)坑3:多个从库的负载均衡
当有多个从库时,需要负载均衡:
public class LoadBalancedDynamicDataSource extends AbstractRoutingDataSource {
private final List<String> slaveKeys = new ArrayList<>();
private final AtomicInteger counter = new AtomicInteger(0);
@Override
protected Object determineCurrentLookupKey() {
String key = DynamicDataSourceContext.get();
if (DynamicDataSourceContext.SLAVE.equals(key) && !slaveKeys.isEmpty()) {
// 轮询负载均衡
int index = counter.getAndIncrement() % slaveKeys.size();
return slaveKeys.get(index);
}
return key;
}
public void addSlaveKey(String key) {
slaveKeys.add(key);
}
}坑4:@Async中数据源上下文丢失
现象:在@Async方法里,DynamicDataSourceContext读不到父线程设置的值。
根因:异步线程是新线程,普通ThreadLocal的值不会自动传递。
解决:用InheritableThreadLocal(如果用Spring的ThreadPoolTaskExecutor,它会把父线程的InheritableThreadLocal复制给子线程)。或者显式传递:
// 提交异步任务时,显式设置数据源
String dsKey = DynamicDataSourceContext.get();
asyncExecutor.execute(() -> {
DynamicDataSourceContext.set(dsKey);
try {
doAsyncTask();
} finally {
DynamicDataSourceContext.clear();
}
});五、总结与延伸
AbstractRoutingDataSource的设计非常优雅:通过一个抽象方法determineCurrentLookupKey()解耦了"路由逻辑"和"连接管理",配合ThreadLocal+AOP,实现了对业务代码完全透明的数据源切换。
核心要点回顾:
ThreadLocal存储当前数据源key,各线程隔离- AOP切面设置ThreadLocal,Order必须比事务切面小
- 事务开始后无法切换数据源(连接已绑定)
- 写后读要注意主从延迟,关键场景强制主库
下一篇我们进入测试专题,聊@SpringBootTest、@WebMvcTest、@DataJpaTest该怎么选。
