第1707篇:MyBatis Plus在AI应用中的扩展——自动填充与智能字段处理
第1707篇:MyBatis Plus在AI应用中的扩展——自动填充与智能字段处理
说实话,在AI应用里用MyBatis Plus,我最初觉得没什么特别的——不就是普通的CRUD吗?
直到开始做AI对话记录的持久化,才发现需要处理很多"AI特有"的字段:向量(float数组)、JSON格式的工具调用参数、模型名与Token消耗的自动记录、对话轮次的自动累加……这些如果每次手动处理,代码会非常啰嗦而且容易遗漏。
MyBatis Plus的自动填充、类型处理器(TypeHandler)、自定义字段策略,这些特性组合起来,能优雅地解决上面这些问题。今天来一一讲透。
一、AI应用的数据库表设计
先看看我们需要持久化什么样的数据。以一个AI对话系统为例,核心表结构:
-- 对话会话表
CREATE TABLE conversation_session (
id VARCHAR(36) PRIMARY KEY,
user_id VARCHAR(36) NOT NULL,
title VARCHAR(255),
model_name VARCHAR(100), -- 使用的模型
total_turns INT DEFAULT 0, -- 对话轮次(自动累加)
total_tokens INT DEFAULT 0, -- 累计token消耗
session_config JSON, -- 会话配置(JSON)
system_prompt TEXT,
status VARCHAR(20) DEFAULT 'ACTIVE',
created_at DATETIME(3),
updated_at DATETIME(3),
last_active_at DATETIME(3),
created_by VARCHAR(36),
deleted TINYINT(1) DEFAULT 0
);
-- 对话消息表
CREATE TABLE conversation_message (
id VARCHAR(36) PRIMARY KEY,
session_id VARCHAR(36) NOT NULL,
role VARCHAR(20) NOT NULL, -- user/assistant/system/tool
content TEXT,
tool_calls JSON, -- 工具调用(JSON)
tool_call_id VARCHAR(100), -- 工具调用ID
model_name VARCHAR(100),
prompt_tokens INT DEFAULT 0,
completion_tokens INT DEFAULT 0,
total_tokens INT DEFAULT 0,
latency_ms INT DEFAULT 0,
finish_reason VARCHAR(50),
embedding BLOB, -- 消息的向量表示(可选)
metadata JSON,
created_at DATETIME(3),
created_by VARCHAR(36),
deleted TINYINT(1) DEFAULT 0,
INDEX idx_session_id (session_id),
INDEX idx_created_at (created_at)
);这个表设计有几个AI特有的字段:JSON类型(session_config、tool_calls)、向量类型(embedding)、需要自动填充的字段(created_at、total_turns)。
二、实体类设计与MyBatis Plus注解
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@TableName("conversation_session")
public class ConversationSession {
@TableId(type = IdType.ASSIGN_UUID)
private String id;
private String userId;
private String title;
private String modelName;
// 自动填充字段
@TableField(fill = FieldFill.INSERT)
private Integer totalTurns;
@TableField(fill = FieldFill.INSERT)
private Integer totalTokens;
// JSON字段,需要自定义TypeHandler
@TableField(typeHandler = SessionConfigTypeHandler.class)
private SessionConfig sessionConfig;
private String systemPrompt;
@TableField(fill = FieldFill.INSERT)
private String status;
@TableField(fill = FieldFill.INSERT)
private LocalDateTime createdAt;
@TableField(fill = FieldFill.INSERT_UPDATE)
private LocalDateTime updatedAt;
@TableField(fill = FieldFill.INSERT_UPDATE)
private LocalDateTime lastActiveAt;
@TableField(fill = FieldFill.INSERT)
private String createdBy;
// 逻辑删除
@TableLogic
private Integer deleted;
}
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
@TableName(value = "conversation_message", autoResultMap = true) // 注意autoResultMap=true
public class ConversationMessage {
@TableId(type = IdType.ASSIGN_UUID)
private String id;
private String sessionId;
private String role;
private String content;
// JSON字段
@TableField(typeHandler = ToolCallsTypeHandler.class)
private List<ToolCall> toolCalls;
private String toolCallId;
private String modelName;
private Integer promptTokens;
private Integer completionTokens;
private Integer totalTokens;
private Long latencyMs;
private String finishReason;
// 向量字段
@TableField(typeHandler = EmbeddingTypeHandler.class)
private float[] embedding;
// 元数据JSON
@TableField(typeHandler = MetadataTypeHandler.class)
private Map<String, Object> metadata;
@TableField(fill = FieldFill.INSERT)
private LocalDateTime createdAt;
@TableField(fill = FieldFill.INSERT)
private String createdBy;
@TableLogic
private Integer deleted;
}这里有个重要细节:当用了自定义TypeHandler的字段时,@TableName 注解需要加 autoResultMap = true,否则查询出来的数据这些字段会是null。这是MyBatis Plus的一个常见坑。
三、自定义TypeHandler:处理JSON字段
MyBatis Plus对JSON字段的处理需要自定义TypeHandler:
// 会话配置(存储为JSON)
public record SessionConfig(
String modelOverride,
Double temperatureOverride,
Integer maxTokensOverride,
Boolean streamEnabled,
Map<String, String> customHeaders
) {}
// SessionConfig的TypeHandler
@MappedTypes(SessionConfig.class)
@MappedJdbcTypes(JdbcType.VARCHAR)
public class SessionConfigTypeHandler extends BaseTypeHandler<SessionConfig> {
private final ObjectMapper objectMapper;
public SessionConfigTypeHandler() {
this.objectMapper = new ObjectMapper();
this.objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
}
@Override
public void setNonNullParameter(PreparedStatement ps, int i,
SessionConfig parameter, JdbcType jdbcType)
throws SQLException {
try {
ps.setString(i, objectMapper.writeValueAsString(parameter));
} catch (JsonProcessingException e) {
throw new SQLException("无法序列化SessionConfig", e);
}
}
@Override
public SessionConfig getNullableResult(ResultSet rs, String columnName)
throws SQLException {
return parseJson(rs.getString(columnName));
}
@Override
public SessionConfig getNullableResult(ResultSet rs, int columnIndex)
throws SQLException {
return parseJson(rs.getString(columnIndex));
}
@Override
public SessionConfig getNullableResult(CallableStatement cs, int columnIndex)
throws SQLException {
return parseJson(cs.getString(columnIndex));
}
private SessionConfig parseJson(String json) {
if (json == null || json.isBlank()) return null;
try {
return objectMapper.readValue(json, SessionConfig.class);
} catch (JsonProcessingException e) {
log.error("SessionConfig JSON解析失败: {}", json, e);
return null;
}
}
}
// 泛型JSON TypeHandler基类(减少重复代码)
public abstract class JsonTypeHandler<T> extends BaseTypeHandler<T> {
private static final ObjectMapper MAPPER = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
private final Class<T> type;
private final JavaType javaType;
protected JsonTypeHandler(Class<T> type) {
this.type = type;
this.javaType = MAPPER.getTypeFactory().constructType(type);
}
// 支持复杂泛型类型(如 List<ToolCall>)
protected JsonTypeHandler(JavaType javaType) {
this.type = null;
this.javaType = javaType;
}
@Override
public void setNonNullParameter(PreparedStatement ps, int i, T parameter, JdbcType jdbcType)
throws SQLException {
try {
ps.setString(i, MAPPER.writeValueAsString(parameter));
} catch (JsonProcessingException e) {
throw new SQLException("JSON序列化失败", e);
}
}
@Override
public T getNullableResult(ResultSet rs, String columnName) throws SQLException {
return parse(rs.getString(columnName));
}
@Override
public T getNullableResult(ResultSet rs, int columnIndex) throws SQLException {
return parse(rs.getString(columnIndex));
}
@Override
public T getNullableResult(CallableStatement cs, int columnIndex) throws SQLException {
return parse(cs.getString(columnIndex));
}
@SuppressWarnings("unchecked")
private T parse(String json) {
if (json == null || json.isBlank()) return null;
try {
return MAPPER.readValue(json, javaType);
} catch (JsonProcessingException e) {
log.error("JSON解析失败,类型: {}, JSON: {}", javaType, json, e);
return null;
}
}
}
// 工具调用列表的TypeHandler(List<ToolCall> 类型)
@MappedTypes(List.class)
public class ToolCallsTypeHandler extends JsonTypeHandler<List<ToolCall>> {
public ToolCallsTypeHandler() {
super(new ObjectMapper().getTypeFactory()
.constructCollectionType(List.class, ToolCall.class));
}
}
// Map<String, Object> 的TypeHandler
@MappedTypes(Map.class)
public class MetadataTypeHandler extends JsonTypeHandler<Map<String, Object>> {
public MetadataTypeHandler() {
super(new ObjectMapper().getTypeFactory()
.constructMapType(Map.class, String.class, Object.class));
}
}四、向量字段的TypeHandler
向量(float[])需要专门处理,因为数据库存的是BLOB:
@MappedTypes(float[].class)
@MappedJdbcTypes(JdbcType.BLOB)
public class EmbeddingTypeHandler extends BaseTypeHandler<float[]> {
@Override
public void setNonNullParameter(PreparedStatement ps, int i,
float[] parameter, JdbcType jdbcType)
throws SQLException {
ps.setBytes(i, floatArrayToBytes(parameter));
}
@Override
public float[] getNullableResult(ResultSet rs, String columnName) throws SQLException {
byte[] bytes = rs.getBytes(columnName);
return bytes != null ? bytesToFloatArray(bytes) : null;
}
@Override
public float[] getNullableResult(ResultSet rs, int columnIndex) throws SQLException {
byte[] bytes = rs.getBytes(columnIndex);
return bytes != null ? bytesToFloatArray(bytes) : null;
}
@Override
public float[] getNullableResult(CallableStatement cs, int columnIndex) throws SQLException {
byte[] bytes = cs.getBytes(columnIndex);
return bytes != null ? bytesToFloatArray(bytes) : null;
}
private byte[] floatArrayToBytes(float[] floats) {
ByteBuffer buffer = ByteBuffer.allocate(floats.length * 4);
buffer.order(ByteOrder.LITTLE_ENDIAN);
for (float f : floats) {
buffer.putFloat(f);
}
return buffer.array();
}
private float[] bytesToFloatArray(byte[] bytes) {
ByteBuffer buffer = ByteBuffer.wrap(bytes);
buffer.order(ByteOrder.LITTLE_ENDIAN);
float[] floats = new float[bytes.length / 4];
for (int i = 0; i < floats.length; i++) {
floats[i] = buffer.getFloat();
}
return floats;
}
}如果用的是pgvector,向量列类型是 vector(1536),需要不同的TypeHandler:
// pgvector的TypeHandler(以文本格式存储向量)
@MappedTypes(float[].class)
public class PgVectorTypeHandler extends BaseTypeHandler<float[]> {
@Override
public void setNonNullParameter(PreparedStatement ps, int i,
float[] parameter, JdbcType jdbcType)
throws SQLException {
// pgvector格式: [0.1, 0.2, 0.3, ...]
StringBuilder sb = new StringBuilder("[");
for (int j = 0; j < parameter.length; j++) {
if (j > 0) sb.append(",");
sb.append(parameter[j]);
}
sb.append("]");
ps.setObject(i, sb.toString(), java.sql.Types.OTHER);
}
@Override
public float[] getNullableResult(ResultSet rs, String columnName) throws SQLException {
String value = rs.getString(columnName);
return parseVectorString(value);
}
// ... 其他方法省略
private float[] parseVectorString(String value) {
if (value == null) return null;
// 移除括号,分割并解析
String[] parts = value.replaceAll("[\\[\\]]", "").split(",");
float[] result = new float[parts.length];
for (int i = 0; i < parts.length; i++) {
result[i] = Float.parseFloat(parts[i].trim());
}
return result;
}
}五、自动填充处理器:AI字段的自动化
自动填充是MyBatis Plus最实用的特性之一。我们扩展它来处理AI相关字段的自动化:
@Component
public class AIFieldAutoFillHandler implements MetaObjectHandler {
private final UserContextHolder userContextHolder; // 获取当前用户
@Override
public void insertFill(MetaObject metaObject) {
LocalDateTime now = LocalDateTime.now();
// 基础时间字段
fillIfNull(metaObject, "createdAt", now);
fillIfNull(metaObject, "updatedAt", now);
fillIfNull(metaObject, "lastActiveAt", now);
// 当前用户ID
fillIfNull(metaObject, "createdBy", getCurrentUserId());
// 逻辑删除默认值
fillIfNull(metaObject, "deleted", 0);
// AI会话特有字段
fillIfNull(metaObject, "totalTurns", 0);
fillIfNull(metaObject, "totalTokens", 0);
fillIfNull(metaObject, "status", "ACTIVE");
// 消息特有字段
fillIfNull(metaObject, "promptTokens", 0);
fillIfNull(metaObject, "completionTokens", 0);
fillIfNull(metaObject, "totalTokens", 0);
fillIfNull(metaObject, "latencyMs", 0L);
}
@Override
public void updateFill(MetaObject metaObject) {
// 更新时自动刷新时间
setFieldValByName("updatedAt", LocalDateTime.now(), metaObject);
setFieldValByName("lastActiveAt", LocalDateTime.now(), metaObject);
}
// 只在字段为null时填充(不覆盖已有值)
private void fillIfNull(MetaObject metaObject, String fieldName, Object value) {
if (metaObject.hasSetter(fieldName)) {
Object existingValue = metaObject.getValue(fieldName);
if (existingValue == null) {
setFieldValByName(fieldName, value, metaObject);
}
}
}
private String getCurrentUserId() {
try {
return userContextHolder.getCurrentUserId();
} catch (Exception e) {
return "system";
}
}
}六、智能Mapper:Token消耗自动累计
AI对话里,每次消息创建后都需要更新会话的累计token和轮次。用MyBatis Plus的Mapper扩展来处理:
@Mapper
public interface ConversationSessionMapper extends BaseMapper<ConversationSession> {
// 原子累加token和轮次(避免并发冲突)
@Update("""
UPDATE conversation_session
SET total_tokens = total_tokens + #{tokens},
total_turns = total_turns + #{turns},
last_active_at = NOW(3),
updated_at = NOW(3)
WHERE id = #{sessionId}
""")
int accumulateStats(@Param("sessionId") String sessionId,
@Param("tokens") int tokens,
@Param("turns") int turns);
// 按状态统计会话数
@Select("""
SELECT status, COUNT(*) as count
FROM conversation_session
WHERE user_id = #{userId} AND deleted = 0
GROUP BY status
""")
List<Map<String, Object>> countByStatusForUser(@Param("userId") String userId);
// 查询活跃会话(最近7天有消息的)
@Select("""
SELECT s.* FROM conversation_session s
WHERE s.user_id = #{userId}
AND s.deleted = 0
AND s.last_active_at > DATE_SUB(NOW(), INTERVAL 7 DAY)
ORDER BY s.last_active_at DESC
LIMIT #{limit}
""")
@Results(autoMapping = true, value = {
@Result(column = "session_config", property = "sessionConfig",
typeHandler = SessionConfigTypeHandler.class)
})
List<ConversationSession> findRecentActiveSessions(@Param("userId") String userId,
@Param("limit") int limit);
}
@Mapper
public interface ConversationMessageMapper extends BaseMapper<ConversationMessage> {
// 批量插入消息(性能更好)
int insertBatch(@Param("messages") List<ConversationMessage> messages);
// 查询会话的所有消息(按时间排序)
@Select("""
SELECT * FROM conversation_message
WHERE session_id = #{sessionId} AND deleted = 0
ORDER BY created_at ASC
""")
@Results(autoMapping = true, value = {
@Result(column = "tool_calls", property = "toolCalls",
typeHandler = ToolCallsTypeHandler.class),
@Result(column = "metadata", property = "metadata",
typeHandler = MetadataTypeHandler.class)
})
List<ConversationMessage> findBySessionId(@Param("sessionId") String sessionId);
// 统计会话的token消耗
@Select("""
SELECT
SUM(prompt_tokens) as totalPromptTokens,
SUM(completion_tokens) as totalCompletionTokens,
SUM(total_tokens) as totalTokens,
AVG(latency_ms) as avgLatencyMs,
COUNT(*) as messageCount
FROM conversation_message
WHERE session_id = #{sessionId} AND role = 'assistant' AND deleted = 0
""")
TokenUsageStats getTokenUsageStats(@Param("sessionId") String sessionId);
@Data
class TokenUsageStats {
private Long totalPromptTokens;
private Long totalCompletionTokens;
private Long totalTokens;
private Double avgLatencyMs;
private Long messageCount;
}
}七、Service层:把所有组件整合起来
@Service
@Transactional
public class ConversationService {
private final ConversationSessionMapper sessionMapper;
private final ConversationMessageMapper messageMapper;
private final ChatClient chatClient;
// 创建新会话
public ConversationSession createSession(String userId, String title,
SessionConfig config) {
ConversationSession session = ConversationSession.builder()
.userId(userId)
.title(title)
.modelName(config.modelOverride() != null ? config.modelOverride() : "gpt-4o")
.sessionConfig(config)
// totalTurns、totalTokens、status、createdAt 等字段由自动填充处理
.build();
sessionMapper.insert(session);
return session;
}
// 发送消息并获取AI响应
public ConversationMessage sendMessage(String sessionId, String userContent) {
// 1. 查询会话
ConversationSession session = sessionMapper.selectById(sessionId);
if (session == null) {
throw new SessionNotFoundException(sessionId);
}
// 2. 保存用户消息
ConversationMessage userMsg = ConversationMessage.builder()
.sessionId(sessionId)
.role("user")
.content(userContent)
.build();
messageMapper.insert(userMsg);
// 3. 获取历史消息(用于上下文)
List<ConversationMessage> history = messageMapper.findBySessionId(sessionId);
// 4. 调用AI
long startTime = System.currentTimeMillis();
ChatResponse response = chatClient.prompt()
.system(session.getSystemPrompt())
.messages(buildChatMessages(history))
.call()
.chatResponse();
long latency = System.currentTimeMillis() - startTime;
// 5. 保存AI响应消息
int promptTokens = response.getMetadata().getUsage().getPromptTokens().intValue();
int completionTokens = response.getMetadata().getUsage().getCompletionTokens().intValue();
ConversationMessage assistantMsg = ConversationMessage.builder()
.sessionId(sessionId)
.role("assistant")
.content(response.getResult().getOutput().getContent())
.modelName(session.getModelName())
.promptTokens(promptTokens)
.completionTokens(completionTokens)
.totalTokens(promptTokens + completionTokens)
.latencyMs(latency)
.finishReason(response.getResult().getMetadata().getFinishReason())
.build();
messageMapper.insert(assistantMsg);
// 6. 原子更新会话统计(+1轮次,+本次token消耗)
sessionMapper.accumulateStats(
sessionId,
promptTokens + completionTokens,
1
);
return assistantMsg;
}
// 查询会话历史
@Transactional(readOnly = true)
public List<ConversationMessage> getHistory(String sessionId) {
return messageMapper.findBySessionId(sessionId);
}
// 获取Token消耗统计
@Transactional(readOnly = true)
public ConversationMessageMapper.TokenUsageStats getStats(String sessionId) {
return messageMapper.getTokenUsageStats(sessionId);
}
private List<Message> buildChatMessages(List<ConversationMessage> history) {
return history.stream()
.map(msg -> switch (msg.getRole()) {
case "user" -> new UserMessage(msg.getContent());
case "assistant" -> new AssistantMessage(msg.getContent());
case "system" -> new SystemMessage(msg.getContent());
default -> new UserMessage(msg.getContent());
})
.toList();
}
}八、条件构造器的AI场景用法
MyBatis Plus的 LambdaQueryWrapper 在AI场景里也有很多实用用法:
@Service
public class ConversationSearchService {
private final ConversationSessionMapper sessionMapper;
private final ConversationMessageMapper messageMapper;
// 搜索用户的会话(多条件)
public List<ConversationSession> searchSessions(SessionSearchQuery query) {
LambdaQueryWrapper<ConversationSession> wrapper = new LambdaQueryWrapper<ConversationSession>()
.eq(ConversationSession::getUserId, query.userId())
.eq(query.status() != null, ConversationSession::getStatus, query.status())
.eq(query.modelName() != null, ConversationSession::getModelName, query.modelName())
.ge(query.minTurns() != null, ConversationSession::getTotalTurns, query.minTurns())
.between(query.startTime() != null && query.endTime() != null,
ConversationSession::getCreatedAt, query.startTime(), query.endTime())
.like(query.keyword() != null, ConversationSession::getTitle, query.keyword())
.orderByDesc(ConversationSession::getLastActiveAt);
return sessionMapper.selectList(wrapper);
}
// 查找高消耗会话(用于成本分析)
public List<ConversationSession> findHighCostSessions(int tokenThreshold) {
return sessionMapper.selectList(
new LambdaQueryWrapper<ConversationSession>()
.ge(ConversationSession::getTotalTokens, tokenThreshold)
.orderByDesc(ConversationSession::getTotalTokens)
);
}
// 查询某个时间范围内的消息数量(用于用量统计)
public long countMessagesInRange(String userId, LocalDateTime start, LocalDateTime end) {
return messageMapper.selectCount(
new LambdaQueryWrapper<ConversationMessage>()
.eq(ConversationMessage::getRole, "user") // 只统计用户消息
.between(ConversationMessage::getCreatedAt, start, end)
.inSql(ConversationMessage::getSessionId,
"SELECT id FROM conversation_session WHERE user_id = '" + userId + "'")
);
}
public record SessionSearchQuery(
String userId,
String status,
String modelName,
Integer minTurns,
LocalDateTime startTime,
LocalDateTime endTime,
String keyword
) {}
}九、分页查询与性能优化
AI对话历史的分页查询是个高频操作,需要注意性能:
@Service
public class ConversationPaginationService {
private final ConversationSessionMapper sessionMapper;
// 标准分页(小数据量)
public IPage<ConversationSession> getSessionPage(String userId, int pageNum, int pageSize) {
Page<ConversationSession> page = new Page<>(pageNum, pageSize);
LambdaQueryWrapper<ConversationSession> wrapper = new LambdaQueryWrapper<ConversationSession>()
.eq(ConversationSession::getUserId, userId)
.orderByDesc(ConversationSession::getLastActiveAt)
.select(
// 只查需要的字段,不查大字段
ConversationSession::getId,
ConversationSession::getTitle,
ConversationSession::getModelName,
ConversationSession::getTotalTurns,
ConversationSession::getTotalTokens,
ConversationSession::getStatus,
ConversationSession::getLastActiveAt,
ConversationSession::getCreatedAt
// 注意:不查systemPrompt和sessionConfig(大字段)
);
return sessionMapper.selectPage(page, wrapper);
}
// 游标分页(大数据量,避免深分页)
public List<ConversationSession> getSessionsAfter(
String userId, LocalDateTime cursor, int limit) {
return sessionMapper.selectList(
new LambdaQueryWrapper<ConversationSession>()
.eq(ConversationSession::getUserId, userId)
.lt(cursor != null, ConversationSession::getLastActiveAt, cursor)
.orderByDesc(ConversationSession::getLastActiveAt)
.last("LIMIT " + limit)
);
}
}小结
MyBatis Plus在AI应用中的扩展要点:
- autoResultMap = true:使用TypeHandler的实体类必须加这个,否则JSON字段查出来是null
- 泛型TypeHandler基类:减少重复代码,
JsonTypeHandler<T>只写一次,各类型继承即可 - 向量存储:BLOB + ByteBuffer的float编解码,pgvector场景用字符串格式
- 原子累加:Token消耗统计用
UPDATE total = total + delta,避免并发问题 - 自动填充:
fillIfNull而不是setFieldValByName,保留业务层的主动设置 - 分页优化:用
.select()指定需要的字段,避免查询大字段;大数据量用游标分页
这些细节加起来,能让AI应用的数据持久化层既简洁又健壮。
