第1927篇:多数据中心AI服务的数据同步——向量数据的跨区域复制策略
第1927篇:多数据中心AI服务的数据同步——向量数据的跨区域复制策略
多区域部署这个话题,在我开始做AI项目之前,我以为很简单——不就是把数据复制一份到另一个地方嘛。
等真正做了一个跨国法律知识库项目,我才意识到自己太天真了。
那个项目要在中国大陆、香港、新加坡各部署一套AI服务,服务当地的用户。三个区域有共同的知识库(企业通用法规),也有各自独立的知识库(当地法律文档)。用户在大陆搜索,请求打到大陆的AI服务;在新加坡搜索,打到新加坡的AI服务。
问题来了:
- 知识库里的文档是向量形式存储的(用于语义检索),不是普通文本
- 大陆的运营人员更新了一条法规,这个更新需要同步到香港和新加坡
- 但三个区域的网络质量不一样,有时候跨区同步会延迟
- 万一大陆同步失败,香港的用户还在用旧数据,会产生法律风险
这是一个典型的多数据中心AI数据同步问题。今天把我们的解决方案整理出来,希望对做类似架构的同学有帮助。
向量数据同步的特殊挑战
普通关系型数据的跨区同步,方案已经很成熟了:MySQL的binlog复制、Redis的主从同步。但向量数据不一样,有几个特殊挑战:
挑战一:数据量大。一个包含100万条文档的向量知识库,每条向量1536维(float32),光向量数据就有:100万 × 1536 × 4字节 = 6GB。每次全量同步代价很高。
挑战二:向量不能"部分更新"。普通文本更新可以只同步变更的字段,但向量是一个整体——如果文档内容变了,需要重新用Embedding模型计算,然后把整个向量替换掉。
挑战三:不同区域的Embedding模型可能不同。大陆用A模型,香港用B模型(比如不同的多语言版本),那么同一条文档在两个区域的向量表示是不同的!直接同步向量数据无法跨模型使用。
挑战四:索引需要重建。向量数据库(Milvus、Qdrant等)有内部索引(HNSW、IVF等),直接同步数据后可能需要重建索引,耗时很长。
架构设计:同步原文,不同步向量
针对上面的挑战,我们采用的核心策略是:同步原始文档,各区域独立向量化。
这个方案的优点:
- 同步的是原始文档,数据量远小于向量(文本 vs 浮点数组)
- 各区域用自己的Embedding模型,不存在模型兼容性问题
- 即使同步延迟,原始文档数据是完整的,随时可以重新向量化
变更捕获:用CDC追踪文档变更
源区域的文档更新,需要通过变更数据捕获(CDC)转换为同步消息:
@Component
public class DocumentChangeCapture {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
private static final String SYNC_TOPIC = "ai.docs.sync";
/**
* 捕获文档插入事件
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onDocumentCreated(DocumentCreatedEvent event) {
publishSyncMessage(SyncOperationType.CREATE, event.getDocument());
}
/**
* 捕获文档更新事件
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onDocumentUpdated(DocumentUpdatedEvent event) {
publishSyncMessage(SyncOperationType.UPDATE, event.getDocument());
}
/**
* 捕获文档删除事件
*/
@TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
public void onDocumentDeleted(DocumentDeletedEvent event) {
publishSyncMessage(SyncOperationType.DELETE, event.getDocumentId());
}
private void publishSyncMessage(SyncOperationType opType, Object data) {
DocumentSyncMessage message = DocumentSyncMessage.builder()
.messageId(UUID.randomUUID().toString())
.operationType(opType)
.sourceRegion("cn-beijing")
.publishedAt(System.currentTimeMillis())
.schemaVersion("1.0")
.data(serializeData(opType, data))
.build();
// 用文档ID作为消息Key,保证同一文档的消息有序
String partitionKey = extractDocumentId(opType, data);
kafkaTemplate.send(SYNC_TOPIC, partitionKey, serialize(message))
.whenComplete((result, ex) -> {
if (ex != null) {
log.error("同步消息发布失败: opType={}, docId={}",
opType, partitionKey, ex);
// 这里可以把失败的消息写入本地补偿表
saveToCompensationTable(message);
}
});
}
}这里有个重要细节:用@TransactionalEventListener(phase = AFTER_COMMIT)而不是@EventListener。这确保只有事务提交成功后才发布同步消息,避免事务回滚后同步消息已经发出的问题。
同步消息格式设计
@Data
@Builder
public class DocumentSyncMessage {
private String messageId;
private SyncOperationType operationType; // CREATE, UPDATE, DELETE, FULL_SYNC
private String sourceRegion; // 来源区域
private long publishedAt;
private String schemaVersion; // 消息格式版本,用于兼容性控制
// 操作数据
private SyncData data;
@Data
public static class SyncData {
private String documentId;
private String collectionName; // 属于哪个知识库集合
// CREATE/UPDATE才有的字段
private String title;
private String content; // 原始文本内容
private String contentType; // "markdown", "plain", "html"
private Map<String, Object> metadata; // 文档元数据(分类、标签等)
// 用于向量化的提示(有时候我们需要对内容做预处理再向量化)
private String embeddingHint;
// 版本控制
private Long version; // 文档版本号(用于解决乱序问题)
private String checksum; // 内容的MD5(用于验证同步完整性)
}
}目标区域的同步消费者
@Component
public class DocumentSyncConsumer {
@Autowired
private DocumentRepository documentRepo;
@Autowired
private EmbeddingService embeddingService;
@Autowired
private VectorStoreClient vectorStore;
@Autowired
private SyncStateTracker syncTracker;
@KafkaListener(
topics = "ai.docs.sync",
groupId = "doc-sync-sg", // 新加坡区域的消费者组
containerFactory = "syncContainerFactory"
)
@Transactional
public void handleSyncMessage(ConsumerRecord<String, String> record,
Acknowledgment ack) {
DocumentSyncMessage message = deserialize(record.value());
String docId = message.getData().getDocumentId();
try {
// 版本检查:防止乱序导致旧版本覆盖新版本
if (!shouldProcess(docId, message.getData().getVersion())) {
log.info("跳过过时的同步消息: docId={}, version={}",
docId, message.getData().getVersion());
ack.acknowledge();
return;
}
switch (message.getOperationType()) {
case CREATE, UPDATE -> handleUpsert(message);
case DELETE -> handleDelete(message);
case FULL_SYNC -> handleFullSync(message);
}
// 记录同步状态
syncTracker.recordSyncSuccess(docId, message.getPublishedAt());
ack.acknowledge();
} catch (EmbeddingException e) {
// 向量化失败:加入重试队列
log.error("向量化失败,加入重试: docId={}", docId, e);
syncTracker.recordSyncFailure(docId, "EMBEDDING_FAILED", e.getMessage());
// 先提交offset,用补偿机制重试,而不是阻塞Kafka消费
ack.acknowledge();
scheduleRetry(message);
} catch (Exception e) {
log.error("同步失败: docId={}", docId, e);
syncTracker.recordSyncFailure(docId, "UNKNOWN", e.getMessage());
// 不提交offset,让消息重新被消费
throw new RuntimeException("同步处理失败", e);
}
}
private void handleUpsert(DocumentSyncMessage message) {
SyncData data = message.getData();
// 1. 更新文档数据库
Document doc = Document.builder()
.id(data.getDocumentId())
.collectionName(data.getCollectionName())
.title(data.getTitle())
.content(data.getContent())
.metadata(data.getMetadata())
.version(data.getVersion())
.syncedAt(Instant.now())
.build();
documentRepo.save(doc);
// 2. 计算向量(用本地的Embedding模型)
String textForEmbedding = buildEmbeddingText(data);
float[] vector = embeddingService.embed(textForEmbedding);
// 3. 更新向量数据库
VectorDocument vDoc = VectorDocument.builder()
.id(data.getDocumentId())
.vector(vector)
.payload(Map.of(
"title", data.getTitle(),
"collection", data.getCollectionName(),
"version", data.getVersion(),
"content_preview", data.getContent().substring(0,
Math.min(200, data.getContent().length()))
))
.build();
vectorStore.upsert(data.getCollectionName(), vDoc);
log.info("文档同步完成: docId={}, collection={}",
data.getDocumentId(), data.getCollectionName());
}
private void handleDelete(DocumentSyncMessage message) {
String docId = message.getData().getDocumentId();
String collection = message.getData().getCollectionName();
documentRepo.deleteById(docId);
vectorStore.delete(collection, docId);
log.info("文档删除同步完成: docId={}", docId);
}
private boolean shouldProcess(String docId, Long version) {
Long currentVersion = documentRepo.getVersion(docId);
if (currentVersion == null) return true; // 不存在,直接处理
return version > currentVersion; // 只处理版本号更大的更新
}
private String buildEmbeddingText(SyncData data) {
// 向量化时用标题+内容,比只用内容检索质量更好
if (data.getEmbeddingHint() != null) {
return data.getEmbeddingHint();
}
return data.getTitle() + "\n\n" + data.getContent();
}
}同步延迟监控
跨区同步一定有延迟,关键是要知道延迟有多大、是否超过了业务可接受的范围:
@Component
public class SyncLagMonitor {
@Autowired
private SyncStateRepository syncStateRepo;
@Autowired
private AlertService alertService;
// 各区域的同步延迟告警阈值(分钟)
private static final Map<String, Integer> LAG_THRESHOLDS = Map.of(
"sg", 5, // 新加坡,允许5分钟延迟
"hk", 3, // 香港,允许3分钟延迟
"us-west", 15 // 美国西部,跨太平洋,允许15分钟延迟
);
@Scheduled(fixedDelay = 60_000)
public void checkSyncLag() {
List<SyncState> lagStats = syncStateRepo.getRecentSyncStats();
for (SyncState stat : lagStats) {
long lagMinutes = Duration.between(
stat.getSourcePublishedAt(), Instant.now()
).toMinutes();
int threshold = LAG_THRESHOLDS.getOrDefault(stat.getTargetRegion(), 10);
if (lagMinutes > threshold) {
log.warn("[同步延迟告警] 区域:{}, 延迟:{}分钟, 阈值:{}分钟, 最慢文档:{}",
stat.getTargetRegion(), lagMinutes, threshold,
stat.getSlowestDocumentId());
alertService.sendAlert(AlertLevel.WARNING,
String.format("区域%s同步延迟%d分钟", stat.getTargetRegion(), lagMinutes));
}
}
}
// 暴露Prometheus指标
@Bean
public Gauge syncLagGauge(MeterRegistry registry,
SyncStateRepository syncStateRepo) {
return Gauge.builder("ai.sync.lag.minutes", syncStateRepo,
repo -> repo.getMaxLagMinutes())
.description("当前最大同步延迟(分钟)")
.register(registry);
}
}全量同步:基线建立与数据一致性校验
新区域上线时,或者怀疑同步出了问题时,需要做全量同步:
@Service
public class FullSyncService {
@Autowired
private DocumentRepository sourceRepo;
@Autowired
private DocumentSyncConsumer syncConsumer;
@Autowired
private ChecksumVerifier checksumVerifier;
private static final int BATCH_SIZE = 100;
/**
* 对指定集合做全量同步
* 按批次处理,避免内存溢出
*/
public FullSyncReport fullSync(String collectionName, String targetRegion) {
log.info("开始全量同步: collection={}, targetRegion={}", collectionName, targetRegion);
long totalDocs = sourceRepo.countByCollection(collectionName);
long processedDocs = 0;
long failedDocs = 0;
List<String> failedDocIds = new ArrayList<>();
// 分批处理
long offset = 0;
while (offset < totalDocs) {
List<Document> batch = sourceRepo.findByCollection(
collectionName, offset, BATCH_SIZE
);
for (Document doc : batch) {
try {
DocumentSyncMessage syncMessage = buildFullSyncMessage(doc, targetRegion);
syncConsumer.handleSyncMessage(
buildRecord(syncMessage), new NoopAcknowledgment()
);
processedDocs++;
} catch (Exception e) {
failedDocs++;
failedDocIds.add(doc.getId());
log.error("全量同步文档失败: docId={}", doc.getId(), e);
}
}
offset += batch.size();
log.info("全量同步进度: {}/{}", offset, totalDocs);
}
// 同步完成后做一致性校验
ConsistencyCheckResult checkResult = checksumVerifier.verify(
collectionName, targetRegion
);
return new FullSyncReport(
totalDocs, processedDocs, failedDocs,
failedDocIds, checkResult
);
}
}一致性校验:布隆过滤器校验
全量同步后,如何快速验证两个区域的数据是否一致?用布隆过滤器做近似校验:
@Component
public class ChecksumVerifier {
/**
* 用布隆过滤器近似校验两个区域的数据一致性
* 准确率约99%,适合快速验证
*/
public ConsistencyCheckResult verify(String collectionName,
String targetRegion) {
// 获取源区域所有文档的ID+版本号
List<String> sourceFingerprints = sourceRepo
.findByCollection(collectionName)
.stream()
.map(doc -> doc.getId() + ":" + doc.getVersion())
.collect(Collectors.toList());
// 获取目标区域所有文档的ID+版本号(通过跨区API或数据库)
List<String> targetFingerprints = targetRegionClient
.getDocumentFingerprints(collectionName, targetRegion);
// 比对
Set<String> sourceSet = new HashSet<>(sourceFingerprints);
Set<String> targetSet = new HashSet<>(targetFingerprints);
// 找出不一致的文档
Set<String> onlyInSource = new HashSet<>(sourceSet);
onlyInSource.removeAll(targetSet); // 源有,目标没有(需要同步)
Set<String> onlyInTarget = new HashSet<>(targetSet);
onlyInTarget.removeAll(sourceSet); // 目标有,源没有(可能是多余的)
return ConsistencyCheckResult.builder()
.totalSource(sourceSet.size())
.totalTarget(targetSet.size())
.missingInTarget(onlyInSource.size())
.extraInTarget(onlyInTarget.size())
.consistencyRate(calculateConsistencyRate(sourceSet, targetSet))
.inconsistentDocIds(onlyInSource) // 需要补同步的文档ID
.build();
}
private double calculateConsistencyRate(Set<String> source, Set<String> target) {
long commonCount = source.stream().filter(target::contains).count();
return (double) commonCount / source.size() * 100;
}
}冲突解决:多主写入的数据冲突
如果两个区域都支持写入(双活架构),就会有数据冲突的问题。比如大陆和香港同时更新了同一个文档的不同字段:
@Component
public class ConflictResolver {
/**
* 解决同一文档在不同区域的并发更新冲突
* 策略:Last-Write-Wins(最后写入的版本获胜)
*/
public Document resolveConflict(Document localDoc, DocumentSyncMessage remoteMessage) {
SyncData remote = remoteMessage.getData();
// 比较版本号
if (remote.getVersion() > localDoc.getVersion()) {
// 远端版本更新,用远端数据
return Document.fromSyncData(remote);
} else if (remote.getVersion() < localDoc.getVersion()) {
// 本地版本更新,保留本地数据
return localDoc;
} else {
// 版本号相同但内容不同,这是真正的冲突
// 策略:保留本地版本,将远端版本存为草稿供人工审核
saveConflictForReview(localDoc, remote);
log.warn("发现版本冲突,已保存待审: docId={}, localRegion={}, remoteRegion={}",
localDoc.getId(), "sg", remoteMessage.getSourceRegion());
return localDoc;
}
}
private void saveConflictForReview(Document local, SyncData remote) {
ConflictRecord conflict = ConflictRecord.builder()
.documentId(local.getId())
.localVersion(local.getVersion())
.localContent(local.getContent())
.remoteVersion(remote.getVersion())
.remoteContent(remote.getContent())
.remoteRegion(remote.getDocumentId()) // 这里取源区域
.detectedAt(Instant.now())
.status(ConflictStatus.PENDING_REVIEW)
.build();
conflictRepo.save(conflict);
}
}网络分区下的降级策略
跨区网络断了,目标区域的AI服务需要继续工作,但可能用的是稍旧的数据:
@Component
public class SyncAwareSearchService {
@Autowired
private VectorStoreClient vectorStore;
@Autowired
private SyncLagMonitor lagMonitor;
public SearchResult search(String query, String collectionName) {
SearchResult result = vectorStore.search(query, collectionName);
// 在结果中附加数据新鲜度信息
long lagMinutes = lagMonitor.getCurrentLag();
if (lagMinutes > 5) {
result.setDataFreshness(DataFreshness.STALE);
result.setDataNote(String.format(
"注:当前数据与主库存在%d分钟延迟,如需最新数据请重试",
lagMinutes
));
} else {
result.setDataFreshness(DataFreshness.FRESH);
}
return result;
}
}让调用方知道当前数据的新鲜度,是诚实且负责任的做法。特别对于法律、医疗这类对数据时效性敏感的场景,必须明确告知。
总结:向量数据跨区同步的核心原则
整理一下几个核心原则:
原则一:同步原文,不同步向量。不同区域可能用不同的Embedding模型,向量无法跨模型复用。让各区域自己向量化,保证灵活性。
原则二:增量同步而非全量同步。用CDC捕获变更,不要每次全量同步,除非做一致性校验。
原则三:异步化同步,本地先服务。同步消息走Kafka,目标区域用自己的数据提供服务,不要等同步完成再服务。
原则四:版本号解决乱序。消息乱序是必然的,用版本号让消费者自己判断是否需要处理。
原则五:数据新鲜度要透明。让调用方知道当前用的是不是最新数据,特别是对数据时效性敏感的业务。
