第2211篇:多模态数据的存储与管理——图文混合数据的工程架构
2026/4/30大约 6 分钟
第2211篇:多模态数据的存储与管理——图文混合数据的工程架构
适读人群:需要设计多模态数据存储方案的架构师或高级工程师 | 阅读时长:约15分钟 | 核心价值:图文混合数据的存储架构设计,含对象存储、向量数据库、元数据库的协同方案
在多模态系统里,数据存储是容易被低估的问题。
初期往往随便存:图片存本地磁盘,向量存内存里,元数据存关系型数据库,没有统一设计。结果系统稍微大一点,问题就来了:图片找不到了(磁盘路径变了)、向量重新计算很慢(没有持久化)、元数据和图片数据不同步(两个系统分别维护)。
多模态数据存储的挑战在于:它同时涉及三种完全不同的数据类型:
- 非结构化大文件(图片/视频/音频):适合对象存储
- 高维向量(Embedding):适合向量数据库
- 结构化元数据(创建时间、标签、关系):适合关系型数据库
三者需要协同工作,同时保证一致性。
一、多模态数据的存储架构
/**
* 多模态内容的统一数据模型
*/
@Entity
@Table(name = "multimodal_content")
@Data
@Builder
public class MultimodalContent {
@Id
private String contentId; // UUID,全局唯一ID
private String contentType; // image/video/audio/document
private String mimeType; // image/jpeg, video/mp4等
// 对象存储引用
private String storageKey; // OSS中的key
private String storageUrl; // CDN访问URL
private Long fileSize; // 字节数
// 向量索引引用
private Long vectorId; // Milvus中的向量ID
private String vectorCollection; // 使用的向量集合名称
// 内容元数据
private String title;
private String description; // AI生成的描述
private String tags; // JSON格式的标签列表
// 图片专属元数据
private Integer imageWidth;
private Integer imageHeight;
// 处理状态
private String processingStatus; // PENDING/PROCESSING/COMPLETED/FAILED
private LocalDateTime createdAt;
private LocalDateTime updatedAt;
// 关联关系
private String parentId; // 父内容ID(如:视频的关键帧)
private String businessId; // 业务关联ID(如:商品ID、文章ID)
private String businessType; // 关联业务类型
}二、对象存储的工程封装
@Service
public class ObjectStorageService {
private final OSSClient ossClient;
@Value("${storage.oss.bucket-name}")
private String bucketName;
@Value("${storage.oss.cdn-domain}")
private String cdnDomain;
/**
* 上传内容到对象存储
* 自动生成按日期分组的key
*/
public StorageResult upload(byte[] contentBytes, String mimeType, String contentId) {
// 按日期分组存储(避免单目录文件数过多)
String datePrefix = LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy/MM/dd"));
String extension = getExtension(mimeType);
String storageKey = String.format("multimodal/%s/%s.%s", datePrefix, contentId, extension);
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentType(mimeType);
metadata.setContentLength(contentBytes.length);
try {
ossClient.putObject(new PutObjectRequest(bucketName, storageKey,
new ByteArrayInputStream(contentBytes), metadata));
String accessUrl = cdnDomain + "/" + storageKey;
return new StorageResult(storageKey, accessUrl, contentBytes.length);
} catch (OSSException e) {
throw new StorageException("OSS上传失败: " + e.getMessage(), e);
}
}
/**
* 下载内容
*/
public byte[] download(String storageKey) {
try (OSSObject obj = ossClient.getObject(bucketName, storageKey)) {
return obj.getObjectContent().readAllBytes();
} catch (Exception e) {
throw new StorageException("OSS下载失败: " + storageKey, e);
}
}
/**
* 生成临时访问URL(适合私有Bucket)
*/
public String generatePresignedUrl(String storageKey, Duration expires) {
Date expiration = Date.from(Instant.now().plus(expires));
URL url = ossClient.generatePresignedUrl(bucketName, storageKey, expiration);
return url.toString();
}
/**
* 删除内容
*/
public void delete(String storageKey) {
ossClient.deleteObject(bucketName, storageKey);
}
private String getExtension(String mimeType) {
return switch (mimeType) {
case "image/jpeg" -> "jpg";
case "image/png" -> "png";
case "image/webp" -> "webp";
case "video/mp4" -> "mp4";
case "audio/wav" -> "wav";
case "application/pdf" -> "pdf";
default -> "bin";
};
}
public record StorageResult(String storageKey, String accessUrl, long fileSize) {}
}三、向量数据库的统一管理
@Service
public class VectorStoreManager {
private final MilvusServiceClient milvusClient;
// 多模态向量集合的Schema定义
private static final Map<String, CollectionSchema> COLLECTION_SCHEMAS = Map.of(
"image_vectors", new CollectionSchema(768, "图片CLIP向量"),
"text_vectors", new CollectionSchema(1024, "文字BGE向量"),
"multimodal_vectors", new CollectionSchema(768, "图文统一向量(BGE-Visualized)")
);
/**
* 初始化所有需要的向量集合
*/
@PostConstruct
public void initializeCollections() {
for (Map.Entry<String, CollectionSchema> entry : COLLECTION_SCHEMAS.entrySet()) {
String collectionName = entry.getKey();
CollectionSchema schema = entry.getValue();
if (!collectionExists(collectionName)) {
createCollection(collectionName, schema.dimension());
log.info("创建向量集合: {}", collectionName);
}
}
}
/**
* 插入向量并返回向量ID
*/
public long insertVector(String collectionName, float[] vector, String contentId) {
InsertParam insertParam = InsertParam.newBuilder()
.withCollectionName(collectionName)
.withFields(List.of(
InsertParam.Field.newBuilder()
.withName("content_id")
.withDataType(DataType.VarChar)
.withValues(List.of(contentId))
.build(),
InsertParam.Field.newBuilder()
.withName("embedding")
.withDataType(DataType.FloatVector)
.withValues(List.of(vector))
.build()
))
.build();
R<MutationResult> result = milvusClient.insert(insertParam);
return result.getData().getIDs().getIntId().getData(0);
}
/**
* 向量相似搜索
*/
public List<VectorSearchResult> search(String collectionName, float[] queryVector,
int topK, String filter) {
SearchParam searchParam = SearchParam.newBuilder()
.withCollectionName(collectionName)
.withMetricType(MetricType.COSINE)
.withFloatVectors(List.of(queryVector))
.withTopK(topK)
.withExpr(filter)
.withOutFields(List.of("content_id"))
.build();
R<SearchResults> result = milvusClient.search(searchParam);
List<VectorSearchResult> searchResults = new ArrayList<>();
// 解析结果(省略具体实现)
return searchResults;
}
/**
* 批量插入(高效)
*/
public void batchInsert(String collectionName, List<float[]> vectors,
List<String> contentIds) {
if (vectors.size() != contentIds.size()) {
throw new IllegalArgumentException("向量数量和ID数量不匹配");
}
InsertParam insertParam = InsertParam.newBuilder()
.withCollectionName(collectionName)
.withFields(List.of(
InsertParam.Field.newBuilder()
.withName("content_id").withDataType(DataType.VarChar)
.withValues(contentIds).build(),
InsertParam.Field.newBuilder()
.withName("embedding").withDataType(DataType.FloatVector)
.withValues(vectors).build()
))
.build();
milvusClient.insert(insertParam);
}
private boolean collectionExists(String collectionName) {
R<Boolean> result = milvusClient.hasCollection(
HasCollectionParam.newBuilder().withCollectionName(collectionName).build());
return Boolean.TRUE.equals(result.getData());
}
private void createCollection(String collectionName, int dimension) {
// 省略具体建表逻辑
}
public record CollectionSchema(int dimension, String description) {}
public record VectorSearchResult(String contentId, float score) {}
}四、内容处理的事务一致性
三个存储系统的写入需要保证一致性,否则会出现"向量存了但元数据没存"的不一致情况:
@Service
public class MultimodalContentService {
private final ObjectStorageService objectStorage;
private final VectorStoreManager vectorStore;
private final MultimodalContentRepository contentRepo;
private final EmbeddingService embeddingService;
/**
* 保存多模态内容(三步原子化)
*/
@Transactional
public String saveContent(byte[] contentBytes, String mimeType,
String businessId, String businessType) {
String contentId = UUID.randomUUID().toString();
// 步骤1:上传到对象存储(非事务操作,需要特别处理)
ObjectStorageService.StorageResult storageResult;
try {
storageResult = objectStorage.upload(contentBytes, mimeType, contentId);
} catch (Exception e) {
throw new ContentSaveException("对象存储上传失败", e);
}
// 步骤2:生成向量并存入向量数据库
long vectorId;
try {
float[] embedding = embeddingService.embed(contentBytes, mimeType);
vectorId = vectorStore.insertVector("multimodal_vectors", embedding, contentId);
} catch (Exception e) {
// 向量存储失败,回滚:删除已上传的对象
try { objectStorage.delete(storageResult.storageKey()); } catch (Exception ignored) {}
throw new ContentSaveException("向量存储失败", e);
}
// 步骤3:保存元数据到关系型数据库(在@Transactional中,失败会回滚)
MultimodalContent content = MultimodalContent.builder()
.contentId(contentId)
.contentType(getContentType(mimeType))
.mimeType(mimeType)
.storageKey(storageResult.storageKey())
.storageUrl(storageResult.accessUrl())
.fileSize(storageResult.fileSize())
.vectorId(vectorId)
.vectorCollection("multimodal_vectors")
.processingStatus("COMPLETED")
.businessId(businessId)
.businessType(businessType)
.createdAt(LocalDateTime.now())
.updatedAt(LocalDateTime.now())
.build();
contentRepo.save(content);
return contentId;
}
/**
* 删除多模态内容(三个存储都需要清理)
*/
public void deleteContent(String contentId) {
MultimodalContent content = contentRepo.findById(contentId)
.orElseThrow(() -> new ContentNotFoundException(contentId));
// 并行删除三个存储(都是最终一致性,失败可以重试)
List<CompletableFuture<Void>> deleteTasks = List.of(
CompletableFuture.runAsync(() -> objectStorage.delete(content.getStorageKey())),
CompletableFuture.runAsync(() -> vectorStore.deleteVector(
content.getVectorCollection(), content.getVectorId())),
CompletableFuture.runAsync(() -> contentRepo.deleteById(contentId))
);
CompletableFuture.allOf(deleteTasks.toArray(new CompletableFuture[0]))
.exceptionally(e -> {
log.error("内容删除部分失败,contentId={},需要人工清理: {}",
contentId, e.getMessage());
return null;
})
.join();
}
private String getContentType(String mimeType) {
if (mimeType.startsWith("image/")) return "image";
if (mimeType.startsWith("video/")) return "video";
if (mimeType.startsWith("audio/")) return "audio";
return "document";
}
}五、数据治理与清理策略
随着业务增长,多模态数据的存储成本会显著上升。需要主动的数据治理:
@Service
public class MultimodalDataGovernance {
/**
* 识别"孤儿向量":向量存在但元数据不存在
*/
@Scheduled(cron = "0 0 2 * * *") // 每天凌晨2点
public void cleanOrphanVectors() {
// 从Milvus获取所有content_id
// 检查哪些content_id在关系型数据库中不存在
// 删除这些孤儿向量
log.info("开始清理孤儿向量...");
// 具体实现省略
}
/**
* 冷热数据分层:超过180天未访问的内容转移到归档存储
*/
@Scheduled(cron = "0 0 3 * * SUN") // 每周日凌晨3点
public void archiveColdContent() {
LocalDateTime cutoffDate = LocalDateTime.now().minusDays(180);
List<MultimodalContent> coldContent = contentRepo.findByLastAccessedBefore(cutoffDate);
for (MultimodalContent content : coldContent) {
// 转移到归档存储(成本更低,但访问延迟更高)
objectStorage.moveToArchive(content.getStorageKey());
content.setStorageUrl("ARCHIVED");
contentRepo.save(content);
}
log.info("归档冷数据: {}条", coldContent.size());
}
}多模态数据的存储架构不是一个可以事后补的事情,需要在系统设计初期就规划好。特别是三个存储系统的一致性问题,如果初期不处理,后期数据不一致了很难修。
