抽象工厂模式:多产品族切换与Spring多数据源的工程实践
抽象工厂模式:多产品族切换与Spring多数据源的工程实践
适读人群:中高级Java开发者 | 阅读时长:约25分钟 | 模式类型:创建型
开篇故事
2021年,我参与了一个集团级别的中台改造项目。集团下面有十几个子公司,各自使用不同的技术栈:有的用 MySQL,有的用 Oracle,有的用 TiDB,甚至还有一个子公司的历史系统还跑在 DB2 上。中台需要统一接入所有这些数据库,并且根据请求来源路由到正确的数据库。
这还不是最复杂的部分。每种数据库还配套了不同的缓存策略(有用 Redis 的,有用 Memcached 的),以及不同的消息队列(Kafka、RocketMQ、RabbitMQ)。一套请求进来,需要同时适配"正确的数据库"+"正确的缓存"+"正确的消息队列",这三者必须来自同一套技术体系,不能混用。
举个例子:A子公司的数据请求,必须走 MySQL + Redis + Kafka 这套组合;B子公司的请求,必须走 Oracle + Redis + RocketMQ 这套组合。把这套"成套切换"的需求交给工厂方法模式来处理会非常麻烦——每次切换要改三个地方。
这就是抽象工厂模式的典型应用场景。它解决的核心问题是:创建一族相关或相互依赖的对象,确保同族对象一起被创建,不同族的对象不会被混用。
一、模式动机:工厂方法模式的局限性
工厂方法模式只能创建一种类型的产品,每个具体工厂只生产一种具体产品。当我们需要一次性创建多种类型的产品,且这些产品之间存在关联约束时,工厂方法模式就力不从心了。
抽象工厂模式的核心思想:
- 将相关的产品组成产品族(Product Family)
- 一个具体工厂负责创建整个产品族的所有产品
- 切换产品族只需要切换工厂,不需要修改客户代码
在 Spring 生态中,这个模式体现得非常明显:
DataSource+TransactionManager+JdbcTemplate是一个完整的数据访问产品族- 切换数据库(MySQL → Oracle)意味着这三者都需要切换,且切换后的三个对象必须来自同一实现
二、模式结构
三、Spring 源码中的抽象工厂分析
3.1 Spring 的数据访问层就是一个产品族设计
Spring 的数据访问抽象层本身就体现了抽象工厂模式的思想。以 Spring Data JPA 为例:
EntityManagerFactory(产品A)PlatformTransactionManager(产品B)JpaRepository(产品C)
这三者必须配套使用,JPA 的 EntityManagerFactory 不能和 JDBC 的 DataSourceTransactionManager 混用。Spring 通过 @EnableJpaRepositories 注解和 JpaRepositoryConfigExtension 等配置类,确保了同一产品族的对象被一起创建和配置。
3.2 AbstractRoutingDataSource:多数据源路由
AbstractRoutingDataSource 是 Spring 提供的多数据源路由基类,它是实现动态数据源切换的关键:
// Spring 源码中的 AbstractRoutingDataSource(简化版)
public abstract class AbstractRoutingDataSource extends AbstractDataSource
implements InitializingBean {
// 存储所有数据源的Map
private Map<Object, Object> targetDataSources;
// 默认数据源
private Object defaultTargetDataSource;
// 解析后的数据源Map(key已经被解析为实际对象)
private Map<Object, DataSource> resolvedDataSources;
private DataSource resolvedDefaultDataSource;
@Override
public void afterPropertiesSet() {
if (this.targetDataSources == null) {
throw new IllegalArgumentException("Property 'targetDataSources' is required");
}
this.resolvedDataSources = CollectionUtils.newHashMap(this.targetDataSources.size());
this.targetDataSources.forEach((key, value) -> {
Object lookupKey = resolveSpecifiedLookupKey(key);
DataSource dataSource = resolveSpecifiedDataSource(value);
this.resolvedDataSources.put(lookupKey, dataSource);
});
if (this.defaultTargetDataSource != null) {
this.resolvedDefaultDataSource = resolveSpecifiedDataSource(this.defaultTargetDataSource);
}
}
// 关键方法:获取当前应该使用的数据源
@Override
public Connection getConnection() throws SQLException {
return determineTargetDataSource().getConnection();
}
protected DataSource determineTargetDataSource() {
Object lookupKey = determineCurrentLookupKey(); // 子类实现,返回当前应使用的数据源key
DataSource dataSource = this.resolvedDataSources.get(lookupKey);
if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
dataSource = this.resolvedDefaultDataSource;
}
if (dataSource == null) {
throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
}
return dataSource;
}
// 抽象方法:由子类决定当前使用哪个数据源
protected abstract Object determineCurrentLookupKey();
}四、生产级代码实现
4.1 完整的多租户多数据源系统
/**
* 数据访问产品族接口(Abstract Factory)
* 定义了一套完整的数据访问层:数据源 + 事务管理 + 缓存
*/
public interface TenantInfrastructureFactory {
/**
* 创建租户专属数据源
*/
DataSource createDataSource(TenantDatabaseConfig config);
/**
* 创建事务管理器(必须与DataSource配套)
*/
PlatformTransactionManager createTransactionManager(DataSource dataSource);
/**
* 创建JDBC访问模板
*/
JdbcTemplate createJdbcTemplate(DataSource dataSource);
/**
* 创建缓存管理器(与数据层配套的缓存)
*/
CacheManager createCacheManager(TenantCacheConfig cacheConfig);
/**
* 工厂支持的数据库类型
*/
DatabaseType supportedType();
}
public enum DatabaseType {
MYSQL, ORACLE, TIDB, POSTGRESQL
}
/**
* 租户数据库配置
*/
@Data
@Builder
public class TenantDatabaseConfig {
private String tenantId;
private DatabaseType dbType;
private String host;
private int port;
private String databaseName;
private String username;
private String password;
private int maxPoolSize;
private int minIdle;
private long connectionTimeout;
private long idleTimeout;
}
/**
* MySQL 产品族工厂(Concrete Factory A)
*/
@Component
@Slf4j
public class MySQLInfrastructureFactory implements TenantInfrastructureFactory {
@Override
public DataSource createDataSource(TenantDatabaseConfig config) {
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setJdbcUrl(String.format(
"jdbc:mysql://%s:%d/%s?useUnicode=true&characterEncoding=utf8&serverTimezone=Asia/Shanghai&useSSL=false",
config.getHost(), config.getPort(), config.getDatabaseName()
));
hikariConfig.setUsername(config.getUsername());
hikariConfig.setPassword(config.getPassword());
hikariConfig.setDriverClassName("com.mysql.cj.jdbc.Driver");
hikariConfig.setMaximumPoolSize(config.getMaxPoolSize());
hikariConfig.setMinimumIdle(config.getMinIdle());
hikariConfig.setConnectionTimeout(config.getConnectionTimeout());
hikariConfig.setIdleTimeout(config.getIdleTimeout());
hikariConfig.setPoolName("HikariPool-" + config.getTenantId() + "-mysql");
// MySQL特定优化
hikariConfig.addDataSourceProperty("cachePrepStmts", "true");
hikariConfig.addDataSourceProperty("prepStmtCacheSize", "250");
hikariConfig.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
log.info("Creating MySQL DataSource for tenant: {}", config.getTenantId());
return new HikariDataSource(hikariConfig);
}
@Override
public PlatformTransactionManager createTransactionManager(DataSource dataSource) {
// MySQL使用标准的DataSourceTransactionManager
DataSourceTransactionManager txManager = new DataSourceTransactionManager(dataSource);
txManager.setDefaultTimeout(30); // 30秒事务超时
return txManager;
}
@Override
public JdbcTemplate createJdbcTemplate(DataSource dataSource) {
JdbcTemplate template = new JdbcTemplate(dataSource);
template.setQueryTimeout(15); // 15秒查询超时
template.setFetchSize(1000); // 批量读取大小
return template;
}
@Override
public CacheManager createCacheManager(TenantCacheConfig cacheConfig) {
// MySQL产品族配套Redis缓存
RedisConnectionFactory connectionFactory = createRedisConnectionFactory(cacheConfig);
RedisCacheConfiguration cacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofMinutes(cacheConfig.getDefaultTtlMinutes()))
.serializeKeysWith(RedisSerializationContext.SerializationPair
.fromSerializer(new StringRedisSerializer()))
.serializeValuesWith(RedisSerializationContext.SerializationPair
.fromSerializer(new GenericJackson2JsonRedisSerializer()));
return RedisCacheManager.builder(connectionFactory)
.cacheDefaults(cacheConfiguration)
.build();
}
@Override
public DatabaseType supportedType() {
return DatabaseType.MYSQL;
}
private RedisConnectionFactory createRedisConnectionFactory(TenantCacheConfig config) {
RedisStandaloneConfiguration redisConfig = new RedisStandaloneConfiguration();
redisConfig.setHostName(config.getRedisHost());
redisConfig.setPort(config.getRedisPort());
redisConfig.setPassword(RedisPassword.of(config.getRedisPassword()));
redisConfig.setDatabase(config.getRedisDatabase());
return new LettuceConnectionFactory(redisConfig);
}
}
/**
* Oracle 产品族工厂(Concrete Factory B)
*/
@Component
@Slf4j
public class OracleInfrastructureFactory implements TenantInfrastructureFactory {
@Override
public DataSource createDataSource(TenantDatabaseConfig config) {
HikariConfig hikariConfig = new HikariConfig();
hikariConfig.setJdbcUrl(String.format(
"jdbc:oracle:thin:@%s:%d:%s",
config.getHost(), config.getPort(), config.getDatabaseName()
));
hikariConfig.setUsername(config.getUsername());
hikariConfig.setPassword(config.getPassword());
hikariConfig.setDriverClassName("oracle.jdbc.OracleDriver");
hikariConfig.setMaximumPoolSize(config.getMaxPoolSize());
hikariConfig.setMinimumIdle(config.getMinIdle());
hikariConfig.setConnectionTimeout(config.getConnectionTimeout());
// Oracle特定配置
hikariConfig.setConnectionTestQuery("SELECT 1 FROM DUAL");
hikariConfig.addDataSourceProperty("oracle.net.CONNECT_TIMEOUT", "10000");
hikariConfig.setPoolName("HikariPool-" + config.getTenantId() + "-oracle");
log.info("Creating Oracle DataSource for tenant: {}", config.getTenantId());
return new HikariDataSource(hikariConfig);
}
@Override
public PlatformTransactionManager createTransactionManager(DataSource dataSource) {
// Oracle也使用DataSourceTransactionManager
DataSourceTransactionManager txManager = new DataSourceTransactionManager(dataSource);
txManager.setDefaultTimeout(60); // Oracle操作通常更慢,超时时间更长
return txManager;
}
@Override
public JdbcTemplate createJdbcTemplate(DataSource dataSource) {
JdbcTemplate template = new JdbcTemplate(dataSource);
template.setQueryTimeout(30);
template.setFetchSize(500);
return template;
}
@Override
public CacheManager createCacheManager(TenantCacheConfig cacheConfig) {
// Oracle产品族也使用Redis,但TTL策略不同
RedisConnectionFactory connectionFactory = createRedisConnectionFactory(cacheConfig);
RedisCacheConfiguration cacheConfiguration = RedisCacheConfiguration.defaultCacheConfig()
.entryTtl(Duration.ofHours(2)) // Oracle场景缓存时间更长
.disableCachingNullValues();
return RedisCacheManager.builder(connectionFactory)
.cacheDefaults(cacheConfiguration)
.build();
}
@Override
public DatabaseType supportedType() {
return DatabaseType.ORACLE;
}
private RedisConnectionFactory createRedisConnectionFactory(TenantCacheConfig config) {
// 同MySQL,略
return null;
}
}4.2 动态数据源路由与租户上下文
/**
* 租户上下文:ThreadLocal存储当前线程的租户信息
*/
public class TenantContext {
private static final ThreadLocal<String> CURRENT_TENANT = new InheritableThreadLocal<>();
public static void setTenantId(String tenantId) {
CURRENT_TENANT.set(tenantId);
}
public static String getTenantId() {
return CURRENT_TENANT.get();
}
public static void clear() {
CURRENT_TENANT.remove();
}
}
/**
* 动态多租户数据源
*/
public class TenantRoutingDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
String tenantId = TenantContext.getTenantId();
if (StringUtils.isEmpty(tenantId)) {
return "default"; // 没有租户上下文时使用默认数据源
}
return tenantId;
}
}
/**
* 租户基础设施注册中心
* 负责管理所有租户的数据访问产品族
*/
@Component
@Slf4j
public class TenantInfrastructureRegistry implements InitializingBean {
@Autowired
private List<TenantInfrastructureFactory> factories;
@Autowired
private TenantConfigRepository tenantConfigRepository;
// 工厂Map:按数据库类型索引
private Map<DatabaseType, TenantInfrastructureFactory> factoryMap;
// 租户基础设施缓存
private final ConcurrentHashMap<String, TenantInfrastructure> tenantInfrastructures
= new ConcurrentHashMap<>();
@Override
public void afterPropertiesSet() {
// 构建工厂注册表
factoryMap = factories.stream()
.collect(Collectors.toMap(
TenantInfrastructureFactory::supportedType,
Function.identity()
));
log.info("Registered {} infrastructure factories: {}",
factoryMap.size(), factoryMap.keySet());
// 预加载所有活跃租户的基础设施
List<TenantConfig> activeTenants = tenantConfigRepository.findAllActive();
activeTenants.forEach(this::initTenantInfrastructure);
log.info("Initialized infrastructure for {} active tenants", activeTenants.size());
}
public TenantInfrastructure getInfrastructure(String tenantId) {
return tenantInfrastructures.computeIfAbsent(tenantId, id -> {
TenantConfig config = tenantConfigRepository.findByTenantId(id)
.orElseThrow(() -> new IllegalArgumentException("Unknown tenant: " + id));
return initTenantInfrastructure(config);
});
}
private TenantInfrastructure initTenantInfrastructure(TenantConfig config) {
DatabaseType dbType = config.getDatabaseType();
TenantInfrastructureFactory factory = factoryMap.get(dbType);
if (factory == null) {
throw new UnsupportedOperationException(
"No factory for database type: " + dbType + " (tenant: " + config.getTenantId() + ")");
}
// 使用抽象工厂创建完整的产品族
TenantDatabaseConfig dbConfig = buildDatabaseConfig(config);
TenantCacheConfig cacheConfig = buildCacheConfig(config);
DataSource dataSource = factory.createDataSource(dbConfig);
PlatformTransactionManager txManager = factory.createTransactionManager(dataSource);
JdbcTemplate jdbcTemplate = factory.createJdbcTemplate(dataSource);
CacheManager cacheManager = factory.createCacheManager(cacheConfig);
TenantInfrastructure infrastructure = TenantInfrastructure.builder()
.tenantId(config.getTenantId())
.dataSource(dataSource)
.transactionManager(txManager)
.jdbcTemplate(jdbcTemplate)
.cacheManager(cacheManager)
.dbType(dbType)
.build();
tenantInfrastructures.put(config.getTenantId(), infrastructure);
log.info("Infrastructure initialized for tenant: {} ({})", config.getTenantId(), dbType);
return infrastructure;
}
private TenantDatabaseConfig buildDatabaseConfig(TenantConfig config) {
return TenantDatabaseConfig.builder()
.tenantId(config.getTenantId())
.dbType(config.getDatabaseType())
.host(config.getDbHost())
.port(config.getDbPort())
.databaseName(config.getDbName())
.username(config.getDbUsername())
.password(config.getDbPassword())
.maxPoolSize(config.getDbMaxPoolSize())
.minIdle(5)
.connectionTimeout(30000)
.idleTimeout(600000)
.build();
}
private TenantCacheConfig buildCacheConfig(TenantConfig config) {
return TenantCacheConfig.builder()
.redisHost(config.getRedisHost())
.redisPort(config.getRedisPort())
.redisPassword(config.getRedisPassword())
.redisDatabase(config.getRedisDatabase())
.defaultTtlMinutes(config.getCacheTtlMinutes())
.build();
}
}
/**
* 租户基础设施容器(产品族)
*/
@Data
@Builder
public class TenantInfrastructure {
private String tenantId;
private DatabaseType dbType;
private DataSource dataSource;
private PlatformTransactionManager transactionManager;
private JdbcTemplate jdbcTemplate;
private CacheManager cacheManager;
}4.3 业务层使用产品族
/**
* 租户感知的订单仓储实现
* 通过TenantContext自动路由到正确的数据访问产品族
*/
@Repository
@Slf4j
public class TenantAwareOrderRepository {
@Autowired
private TenantInfrastructureRegistry registry;
public Order findById(String tenantId, Long orderId) {
TenantInfrastructure infra = registry.getInfrastructure(tenantId);
// 根据数据库类型选择不同的SQL方言
String sql = buildFindByIdSql(infra.getDbType());
return infra.getJdbcTemplate().queryForObject(sql,
new Object[]{orderId},
(rs, rowNum) -> mapOrder(rs)
);
}
public void save(String tenantId, Order order) {
TenantInfrastructure infra = registry.getInfrastructure(tenantId);
// 使用该租户专属的事务管理器
TransactionTemplate txTemplate = new TransactionTemplate(infra.getTransactionManager());
txTemplate.execute(status -> {
String sql = "INSERT INTO orders (id, amount, status, created_at) VALUES (?, ?, ?, ?)";
infra.getJdbcTemplate().update(sql,
order.getId(), order.getAmount(), order.getStatus(),
Timestamp.valueOf(order.getCreatedAt())
);
return null;
});
}
private String buildFindByIdSql(DatabaseType dbType) {
return switch (dbType) {
case MYSQL, TIDB -> "SELECT * FROM orders WHERE id = ? LIMIT 1";
case ORACLE -> "SELECT * FROM orders WHERE id = ? AND ROWNUM = 1";
case POSTGRESQL -> "SELECT * FROM orders WHERE id = ? FETCH FIRST 1 ROWS ONLY";
};
}
private Order mapOrder(ResultSet rs) throws SQLException {
return Order.builder()
.id(rs.getLong("id"))
.amount(rs.getBigDecimal("amount"))
.status(rs.getString("status"))
.createdAt(rs.getTimestamp("created_at").toLocalDateTime())
.build();
}
}
/**
* 拦截器:从请求头中提取租户ID,设置到TenantContext
*/
@Component
public class TenantInterceptor implements HandlerInterceptor {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
Object handler) throws Exception {
String tenantId = request.getHeader("X-Tenant-Id");
if (StringUtils.isEmpty(tenantId)) {
// 也可以从JWT token、域名等方式提取租户ID
tenantId = extractTenantFromJwt(request);
}
if (StringUtils.isEmpty(tenantId)) {
response.sendError(HttpStatus.BAD_REQUEST.value(), "Missing tenant identifier");
return false;
}
TenantContext.setTenantId(tenantId);
return true;
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response,
Object handler, Exception ex) throws Exception {
// 请求结束后必须清理,防止ThreadLocal泄漏
TenantContext.clear();
}
private String extractTenantFromJwt(HttpServletRequest request) {
// 从Authorization header解析JWT获取租户ID
String authorization = request.getHeader("Authorization");
if (authorization != null && authorization.startsWith("Bearer ")) {
// JWT解析逻辑...
return "tenant-from-jwt";
}
return null;
}
}五、与相关模式的对比与选型
抽象工厂 vs 工厂方法
| 维度 | 工厂方法 | 抽象工厂 |
|---|---|---|
| 产品数量 | 一种产品 | 一族多种产品 |
| 切换成本 | 只换一个工厂 | 换一个工厂,自动切换整族产品 |
| 扩展方向 | 纵向:增加新产品类型 | 横向:增加新产品族 |
| 适用场景 | 创建逻辑多态 | 多套相关对象配套使用 |
抽象工厂 vs Builder 模式
- 抽象工厂:创建多个不同类型的对象,这些对象来自同一"族"。
- Builder:创建同一个复杂对象,通过不同的步骤和配置来定制。
如果你的场景是"我需要一台电脑(主机 + 显示器 + 键盘)",这是抽象工厂。 如果你的场景是"我需要一台可以自定义 CPU、内存、硬盘的电脑",这是 Builder。
六、踩坑实录
坑一:产品族不完整导致的运行时错误
在实现第一版时,TenantInfrastructure 里的 CacheManager 我忘记初始化了,直接设置为 null。Service 层的代码调用 infra.getCacheManager().getCache("order") 直接 NPE。
更糟糕的是这个错误不是在启动时报出来的,而是在真实请求进来时才暴露,而且只影响特定租户。
教训:抽象工厂模式的产品族必须是完整的,任何成员缺失都应该在工厂创建阶段立即失败,而不是等到使用时。在 TenantInfrastructureRegistry.initTenantInfrastructure() 中加入完整性校验:
private void validateInfrastructure(TenantInfrastructure infra) {
Objects.requireNonNull(infra.getDataSource(), "DataSource is required");
Objects.requireNonNull(infra.getTransactionManager(), "TransactionManager is required");
Objects.requireNonNull(infra.getJdbcTemplate(), "JdbcTemplate is required");
Objects.requireNonNull(infra.getCacheManager(), "CacheManager is required");
// 尝试连接,验证数据源可用性
try (Connection conn = infra.getDataSource().getConnection()) {
log.debug("DataSource connectivity verified for tenant: {}", infra.getTenantId());
} catch (SQLException e) {
throw new IllegalStateException("DataSource connectivity check failed for tenant: "
+ infra.getTenantId(), e);
}
}坑二:ThreadLocal 在异步场景下的租户上下文丢失
线程池里的任务线程不继承父线程的 ThreadLocal,导致异步处理时 TenantContext.getTenantId() 返回 null。当时花了半天才找到根因。
解决方案:使用 InheritableThreadLocal(已经用了),但它只在线程创建时继承,线程池里的线程是复用的,无法自动继承。正确做法是在提交任务时显式传递租户 ID:
String tenantId = TenantContext.getTenantId();
CompletableFuture.runAsync(() -> {
TenantContext.setTenantId(tenantId); // 显式设置
try {
asyncBusinessLogic();
} finally {
TenantContext.clear(); // 使用后清理
}
}, tenantAwareExecutor);或者使用 TransmittableThreadLocal(阿里巴巴开源的 TTL 库),它专门解决线程池场景下的 ThreadLocal 传递问题。
坑三:产品族切换时旧连接池的资源释放
动态添加新租户没问题,但当某个租户下线(数据迁移完毕,不再使用)时,我们没有及时关闭旧的 HikariDataSource,导致这个租户的连接池一直占用着数据库连接,白白浪费资源。
正确做法是在 TenantInfrastructureRegistry 中实现 DisposableBean 或 @PreDestroy,在 Spring 容器关闭时或租户下线时,正确关闭 HikariDataSource:
public void removeTenant(String tenantId) {
TenantInfrastructure infra = tenantInfrastructures.remove(tenantId);
if (infra != null && infra.getDataSource() instanceof HikariDataSource hikari) {
hikari.close(); // 关闭连接池,释放数据库连接
log.info("Infrastructure cleaned up for tenant: {}", tenantId);
}
}七、总结
抽象工厂模式是处理"成套创建"需求的最优解。在 Spring 多数据源、多租户场景中,它的核心价值是:
- 一致性保证:同一租户/环境的所有基础设施对象来自同一工厂,确保它们相互兼容。
- 切换成本最低:切换工厂实现即可切换整族产品,调用方代码无需修改。
- 扩展点明确:新增数据库类型,只需实现一个新的工厂类,不修改已有代码。
抽象工厂的缺点是"难以支持新的产品种类"——如果需要在产品族中增加一个新成员(比如给 TenantInfrastructureFactory 增加 createMessageQueueClient() 方法),就需要修改所有工厂实现,改动面很大。这是抽象工厂模式本身无法克服的代价,需要在设计初期就想清楚产品族的边界。
