分库分表ShardingSphere:分片键选择、跨库查询、数据迁移方案
分库分表ShardingSphere:分片键选择、跨库查询、数据迁移方案
适读人群:高级Java工程师、架构师 | 阅读时长:约20分钟 | 技术栈:Spring Boot 3.x、ShardingSphere 5.x、MySQL 8.0
开篇故事
2019年,我们的订单表数据量突破了 5 亿条,单表查询已经慢到用户无法接受——哪怕是按主键查询,走的是主键索引,也要 200ms 以上。MySQL 的 B+ 树在数据量这个量级下,树高已经达到了 5 层,每次查询需要 5 次磁盘 IO。
我们开始讨论分库分表方案。技术负责人点名要用 ShardingSphere(当时叫 Sharding-JDBC),理由是对业务代码侵入最小,配置完直接用,应用层无感知。
结果第一个踩的坑,就是分片键选择错误:我们按 user_id % 16 分了 16 个表,但有一个查询场景是"查询某个商家下的所有订单"(按 seller_id 查),这个查询没有 user_id,ShardingSphere 不知道去哪个分片查,就去了所有 16 个分片查,然后合并结果——这就是"全路由"问题,16 个分片的查询性能叠加在一起,比分库分表之前还慢。
这次经历让我深入研究了分片键的选择逻辑,以及跨库查询的各种处理策略。
一、核心问题分析
分库分表的本质
分库分表是把一张大表的数据,按某种规则拆分到多个数据库/表中,每个分片存储部分数据,通过中间件(ShardingSphere)对应用层透明地路由请求。
核心问题:分片键的选择决定了 90% 的查询是否能精确路由。 选错了分片键,要么高频查询走全路由(性能比不分还差),要么数据分布不均匀(热点分片)。
分库 vs 分表 vs 分库分表
只分表:数据在同一个数据库实例里,用多张表存储,解决了单表数据量大的问题,但没解决单库的连接数和 IO 瓶颈。
只分库:数据分散在多个数据库实例里,每个库只有一张表,解决了连接数和 IO 瓶颈,但每个库的单表数据量可能还是很大。
分库分表:同时做,彻底解决数据量和数据库压力问题,复杂度最高。
二、ShardingSphere 核心配置
分片键选择原则
好的分片键需满足:
- 高频查询条件:大多数查询都带这个字段,实现精确路由
- 数据分布均匀:这个字段的值哈希后分布均匀,不产生热点
- 不可变性:分片后数据不能在分片间移动,因此分片键的值不能更改
- 不泄露信息:避免使用自增 ID(暴露数据量),推荐用雪花 ID
订单表的分片键经典选择:user_id(按用户维度查询的场景)或 order_id(按订单查询的场景),取决于你的高频查询模式。
Spring Boot 配置
spring:
datasource:
driver-class-name: org.apache.shardingsphere.driver.ShardingSphereDriver
url: jdbc:shardingsphere:classpath:sharding-config.yaml# sharding-config.yaml
databaseName: order_sharding
dataSources:
ds0:
dataSourceClassName: com.zaxxer.hikari.HikariDataSource
driverClassName: com.mysql.cj.jdbc.Driver
jdbcUrl: jdbc:mysql://db0:3306/order_db?useSSL=false&characterEncoding=UTF-8
username: root
password: "password"
maximumPoolSize: 50
ds1:
dataSourceClassName: com.zaxxer.hikari.HikariDataSource
driverClassName: com.mysql.cj.jdbc.Driver
jdbcUrl: jdbc:mysql://db1:3306/order_db?useSSL=false&characterEncoding=UTF-8
username: root
password: "password"
maximumPoolSize: 50
rules:
- !SHARDING
tables:
t_order:
actualDataNodes: ds${0..1}.t_order_${0..7} # 2库 x 8表 = 16个分片
databaseStrategy:
standard:
shardingColumn: user_id
shardingAlgorithmName: db_mod_algo
tableStrategy:
standard:
shardingColumn: user_id
shardingAlgorithmName: table_mod_algo
keyGenerateStrategy:
column: order_id
keyGeneratorName: snowflake
t_order_item:
actualDataNodes: ds${0..1}.t_order_item_${0..7}
databaseStrategy:
standard:
shardingColumn: user_id
shardingAlgorithmName: db_mod_algo
tableStrategy:
standard:
shardingColumn: user_id
shardingAlgorithmName: table_mod_algo
# 绑定表(t_order 和 t_order_item 分片键相同,绑定后 JOIN 不需要跨库)
bindingTables:
- t_order, t_order_item
# 广播表(全局表,所有分片都有完整数据,如省份表、类目表)
broadcastTables:
- t_category
- t_province
shardingAlgorithms:
db_mod_algo:
type: MOD
props:
sharding-count: 2 # 2个数据库
table_mod_algo:
type: MOD
props:
sharding-count: 8 # 每库8个表
keyGenerators:
snowflake:
type: SNOWFLAKE
props:
worker-id: ${WORKER_ID:0}复杂分片算法(非均匀分片)
/**
* 自定义分片算法:按时间范围分片(适合历史订单归档)
*/
public class DateRangeShardingAlgorithm implements StandardShardingAlgorithm<LocalDate> {
@Override
public String doSharding(Collection<String> availableTargetNames,
PreciseShardingValue<LocalDate> shardingValue) {
LocalDate date = shardingValue.getValue();
int year = date.getYear();
int month = date.getMonthValue();
// 按年月路由到不同表:t_order_202301, t_order_202302...
String tableSuffix = String.format("%d%02d", year, month);
return "t_order_" + tableSuffix;
}
@Override
public Collection<String> doSharding(Collection<String> availableTargetNames,
RangeShardingValue<LocalDate> shardingValue) {
// 范围查询:返回日期范围内的所有分片
Range<LocalDate> range = shardingValue.getValueRange();
return availableTargetNames.stream()
.filter(name -> {
String suffix = name.substring(name.lastIndexOf("_") + 1);
LocalDate tableDate = LocalDate.parse(suffix + "01",
DateTimeFormatter.ofPattern("yyyyMMdd"));
return !range.hasUpperBound() || !tableDate.isAfter(range.upperEndpoint())
&& (!range.hasLowerBound() || !tableDate.isBefore(range.lowerEndpoint()));
})
.collect(Collectors.toList());
}
}三、跨库查询的处理策略
策略一:业务层补全分片键
最彻底的解决方案,在执行无分片键查询之前,先查询获取分片键:
@Service
@Slf4j
public class OrderQueryService {
@Autowired
private OrderMapper orderMapper;
/**
* 按商家查询订单(原始分片键是 user_id,seller_id 不能直接用于分片路由)
*/
public Page<Order> queryOrdersBySellerNoFullRoute(Long sellerId, int page, int size) {
// 先查 ES 获取对应的 user_id 列表(ES 按 seller_id 建了索引)
List<Long> userIds = orderEsRepository.findUserIdsBySellerId(sellerId, page, size);
if (userIds.isEmpty()) {
return Page.empty();
}
// 再按 user_id 查数据库(精确路由)
return orderMapper.selectByUserIds(userIds);
}
}策略二:ES 作为查询层
将订单数据同步到 ES,非分片键维度的查询走 ES,ES 返回订单 ID 后再按主键查库:
@Service
public class SellerOrderService {
@Autowired
private ElasticsearchClient esClient;
@Autowired
private OrderMapper orderMapper;
/**
* 查询商家订单:ES 获取 ID 列表 -> 精确路由查数据库
*/
public List<Order> getSellerOrders(Long sellerId, int page, int size) throws IOException {
// ES 查询订单 ID(ES 按 seller_id 索引,查询极快)
SearchResponse<OrderDoc> esResponse = esClient.search(s -> s
.index("orders")
.from((page - 1) * size)
.size(size)
.query(q -> q.term(t -> t.field("seller_id").value(sellerId)))
.sort(so -> so.field(f -> f.field("create_time").order(SortOrder.Desc))),
OrderDoc.class
);
List<Long> orderIds = esResponse.hits().hits().stream()
.map(hit -> Long.parseLong(hit.id()))
.collect(Collectors.toList());
if (orderIds.isEmpty()) {
return Collections.emptyList();
}
// 按 order_id 精确路由查库(order_id 包含 user_id 信息,可以计算出分片)
return orderMapper.selectByIds(orderIds);
}
}策略三:异构索引(冗余表)
对于超高频的非分片键查询,创建一张以该字段为分片键的冗余表,通过异步同步保持数据一致:
-- 按 seller_id 分片的订单索引表(只存ID,不存完整订单数据)
CREATE TABLE t_order_seller_index_0 (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
seller_id BIGINT NOT NULL,
order_id BIGINT NOT NULL,
user_id BIGINT NOT NULL,
create_time DATETIME NOT NULL,
KEY idx_seller_create (seller_id, create_time)
) ENGINE=InnoDB;四、数据迁移方案
生产中分库分表的数据迁移是最难的部分,需要保证在迁移过程中业务不中断。经典的"双写迁移"方案:
@Service
@Slf4j
public class OrderMigrationService {
@Autowired
private OldOrderMapper oldOrderMapper;
@Autowired
private NewOrderMapper newOrderMapper; // 走 ShardingSphere 路由
// 迁移开关(从 Nacos 配置中心动态控制)
@Value("${migration.enabled:false}")
private boolean migrationEnabled;
@Value("${migration.read-from-new:false}")
private boolean readFromNew;
/**
* 双写:同时写新旧表
*/
@Transactional
public void createOrder(Order order) {
// 写旧表
oldOrderMapper.insert(order);
// 双写到新分片表
if (migrationEnabled) {
try {
newOrderMapper.insert(order);
} catch (Exception e) {
// 新表写失败不影响旧表(只记录日志,后续补偿)
log.error("双写新表失败,order_id={}", order.getId(), e);
recordFailedWrite(order.getId());
}
}
}
/**
* 批量历史数据迁移(每次处理1000条)
*/
@Scheduled(fixedDelay = 5000)
public void migrateHistoryData() {
if (!migrationEnabled) {
return;
}
Long lastMigratedId = getMigrationProgress();
List<Order> orders = oldOrderMapper.selectForMigration(lastMigratedId, 1000);
if (orders.isEmpty()) {
log.info("历史数据迁移完成");
return;
}
int successCount = 0;
for (Order order : orders) {
try {
newOrderMapper.insertIgnore(order); // INSERT IGNORE,避免双写重复
successCount++;
} catch (Exception e) {
log.error("迁移失败,order_id={}", order.getId(), e);
}
}
Long maxId = orders.stream().mapToLong(Order::getId).max().orElse(lastMigratedId);
saveMigrationProgress(maxId);
log.info("本批迁移完成,成功={}/{}, maxId={}", successCount, orders.size(), maxId);
}
/**
* 数据一致性校验(迁移后的验证)
*/
public ConsistencyCheckResult checkConsistency(Long startId, Long endId) {
List<Order> oldOrders = oldOrderMapper.selectRange(startId, endId);
List<Order> newOrders = newOrderMapper.selectByIds(
oldOrders.stream().map(Order::getId).collect(Collectors.toList()));
Map<Long, Order> newOrderMap = newOrders.stream()
.collect(Collectors.toMap(Order::getId, o -> o));
List<Long> missingIds = new ArrayList<>();
List<Long> inconsistentIds = new ArrayList<>();
for (Order old : oldOrders) {
Order newOrder = newOrderMap.get(old.getId());
if (newOrder == null) {
missingIds.add(old.getId());
} else if (!old.equals(newOrder)) {
inconsistentIds.add(old.getId());
}
}
return ConsistencyCheckResult.builder()
.total(oldOrders.size())
.missingCount(missingIds.size())
.inconsistentCount(inconsistentIds.size())
.missingIds(missingIds)
.inconsistentIds(inconsistentIds)
.build();
}
private Long getMigrationProgress() {
// 从 Redis 或 DB 读取迁移进度
return 0L;
}
private void saveMigrationProgress(Long id) {
// 保存迁移进度
}
private void recordFailedWrite(Long orderId) {
// 记录双写失败,供后续补偿
}
}五、踩坑实录
坑一:分片键选错,全路由拖垮性能(开篇故事复盘)
按 user_id 分片后,按 seller_id 查询会触发全路由,查所有 16 个分片。改造方案:引入 ES 作为查询索引,seller_id 相关的查询走 ES 获取 order_id 列表,再按分片键精确路由查库。
坑二:分片数量后期难以扩容
最初我们分了 16 个表(每库 8 个),后来数据量增长超预期,想从 16 个表扩到 32 个表。这时发现扩容非常麻烦:所有数据的路由规则都变了(从 %8 变成 %16),需要全量数据迁移,而且迁移过程中还在不断写入新数据。
教训:分片数量一开始就要预留足够的空间,推荐直接分 64 或 128 张表(即使现在数据少,空表的开销可以忽略),以后扩容只需要增加物理节点,不需要改变分片规则。
坑三:跨分片的 JOIN 不支持
ShardingSphere 不支持跨分片的 JOIN 操作(两张表的分片键不同时的 JOIN)。我们有一个报表查询需要 t_order JOIN t_user,t_order 按 user_id 分片,t_user 是单表,这个 JOIN 需要特殊处理。
解决方案:t_user 设置为广播表(所有分片都有完整数据),这样每个分片内的 JOIN 都能正常执行,ShardingSphere 合并结果。
六、总结
分库分表的工程决策:
一、分片键是最关键的决策,要分析高频查询场景,选择覆盖率最高的字段。
二、分片数量要预留,一开始就分足够多的逻辑分片(如 64 张),以后加物理节点。
三、跨分片查询用 ES 补全,ES 作为查询索引,精确路由查库,是处理非分片键查询的最佳实践。
四、数据迁移用双写方案,迁移过程中业务不停,双写 + 数据追赶 + 一致性校验 + 读切换,这五步是标准流程。
五、不是所有系统都需要分库分表,MySQL 单表 1 亿以内都是可以优化的,达到瓶颈之前可以先考虑读写分离、归档历史数据等方案。
