Spring AI事件系统:基于事件驱动的AI应用扩展架构
Spring AI事件系统:基于事件驱动的AI应用扩展架构
一、开篇故事:耦合的噩梦
2025年9月,某SaaS公司后端工程师小李接到了一个"简单"需求:公司的AI问答系统每次回答完毕后,需要做以下几件事:
- 记录用量日志(计费系统需要)
- 更新用户AI使用统计(Dashboard展示)
- 发送微信通知(当AI给出重要建议时)
- 触发推荐系统(根据用户问题更新兴趣标签)
- 异步评估答案质量(后台AI评分系统)
小李想了想,直接在AI回答方法后面加代码:
public String aiAnswer(String userId, String question) {
// AI调用(核心逻辑,3行)
String answer = chatClient.prompt()
.user(question)
.call()
.content();
// 后续动作1:用量记录(同步,耦合进来了)
usageService.record(userId, question, answer);
// 后续动作2:统计更新(同步,耦合进来了)
statisticsService.update(userId, question.length());
// 后续动作3:微信通知(同步,耦合进来了,还慢!)
if (answer.contains("重要建议")) {
wechatService.sendNotification(userId, answer); // 网络调用,300ms+
}
// 后续动作4:推荐系统(同步,耦合进来了)
recommendationService.updateInterestTags(userId, question);
// 后续动作5:质量评分(同步,耦合进来了,超级慢!)
qualityService.evaluate(userId, question, answer); // 又一次AI调用,1000ms+
return answer;
}三周后,小李发现:
- 接口响应时间从1.2秒变成了3.8秒(微信通知+质量评分拖慢了)
- 用量记录失败时整个接口报错(不该影响用户体验的)
- 新增第6个后续动作需要修改这个方法(开闭原则违反了)
- 写单元测试要Mock 5个依赖(测试地狱)
产品经理还在催他加第7个:用户连续问同类问题超过5次时,推送相关课程链接。
小李崩溃了。他找到了老张请教,老张说了一句话:
"这是典型的横向扩展问题,你需要的是事件驱动架构。AI回答完毕,发布一个事件,让关心这件事的人自己去处理。"
二、事件驱动在AI应用中的价值
2.1 核心思想
2.2 事件驱动的4大优势
| 维度 | 耦合架构 | 事件驱动架构 |
|---|---|---|
| 响应时间 | 3.8s(串行执行) | 1.3s(异步并行) |
| 扩展性 | 修改核心方法 | 新增Listener即可 |
| 单元测试 | 5个Mock依赖 | 只需验证事件发布 |
| 故障隔离 | 通知失败→接口报错 | 通知失败→只影响通知 |
2.3 Spring事件系统核心组件
三、项目依赖配置
3.1 完整pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.4</version>
</parent>
<groupId>com.laozhang.ai</groupId>
<artifactId>spring-ai-event-demo</artifactId>
<version>1.0.0</version>
<properties>
<java.version>21</java.version>
<spring-ai.version>1.0.0-M6</spring-ai.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Spring AI -->
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-openai-spring-boot-starter</artifactId>
</dependency>
<!-- RocketMQ(分布式事件场景) -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.3.0</version>
</dependency>
<!-- Redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- JPA + MySQL -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
</dependency>
<!-- Micrometer 指标 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<!-- Lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<!-- Test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.ai</groupId>
<artifactId>spring-ai-bom</artifactId>
<version>${spring-ai.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<repositories>
<repository>
<id>spring-milestones</id>
<url>https://repo.spring.io/milestone</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>3.2 application.yml
server:
port: 8080
spring:
application:
name: spring-ai-event-demo
ai:
openai:
api-key: ${DASHSCOPE_API_KEY}
base-url: https://dashscope.aliyuncs.com/compatible-mode/v1
chat:
options:
model: qwen-plus
temperature: 0.7
datasource:
url: jdbc:mysql://localhost:3306/ai_events?useUnicode=true&characterEncoding=utf8
username: ${DB_USER:root}
password: ${DB_PASSWORD:password}
driver-class-name: com.mysql.cj.jdbc.Driver
jpa:
hibernate:
ddl-auto: update
show-sql: false
properties:
hibernate:
dialect: org.hibernate.dialect.MySQLDialect
data:
redis:
host: localhost
port: 6379
# 异步线程池配置
task:
execution:
pool:
core-size: 10
max-size: 50
queue-capacity: 200
keep-alive: 60s
thread-name-prefix: ai-event-
# RocketMQ配置
rocketmq:
name-server: localhost:9876
producer:
group: ai-event-producer
send-message-timeout: 3000
consumer:
group: ai-event-consumer
# 事件系统配置
ai:
events:
quality-evaluation:
enabled: true
min-response-length: 50
threshold-score: 0.6
alert:
low-satisfaction-threshold: 0.4
consecutive-low-count: 3
audit:
retention-days: 90
storage: database # database / elasticsearch四、AI事件定义:核心事件类体系
4.1 事件基类
package com.laozhang.ai.event;
import lombok.Getter;
import org.springframework.context.ApplicationEvent;
import java.time.Instant;
/**
* AI事件基类
* 所有AI相关事件都继承此类
*/
@Getter
public abstract class BaseAiEvent extends ApplicationEvent {
private final String eventId;
private final String userId;
private final String sessionId;
private final Instant occurredAt;
protected BaseAiEvent(Object source, String userId, String sessionId) {
super(source);
this.eventId = java.util.UUID.randomUUID().toString();
this.userId = userId;
this.sessionId = sessionId;
this.occurredAt = Instant.now();
}
}4.2 AI请求事件
package com.laozhang.ai.event;
import lombok.*;
/**
* AI请求事件 - 在调用AI之前发布
*/
@Getter
@ToString(callSuper = true)
public class AiRequestEvent extends BaseAiEvent {
/** 用户输入的原始问题 */
private final String userMessage;
/** 系统提示词(脱敏后) */
private final String systemPrompt;
/** 使用的AI模型 */
private final String modelName;
/** 预估Token数 */
private final int estimatedTokens;
/** 请求发起的时间戳 */
private final long requestTimestamp;
public AiRequestEvent(Object source, String userId, String sessionId,
String userMessage, String systemPrompt,
String modelName, int estimatedTokens) {
super(source, userId, sessionId);
this.userMessage = userMessage;
this.systemPrompt = systemPrompt;
this.modelName = modelName;
this.estimatedTokens = estimatedTokens;
this.requestTimestamp = System.currentTimeMillis();
}
}4.3 AI响应事件
package com.laozhang.ai.event;
import lombok.*;
/**
* AI响应事件 - 收到AI响应后发布
*/
@Getter
@ToString(callSuper = true)
public class AiResponseEvent extends BaseAiEvent {
/** AI的回答内容 */
private final String responseContent;
/** 实际消耗的Input Token数 */
private final int inputTokens;
/** 实际消耗的Output Token数 */
private final int outputTokens;
/** 总Token消耗 */
private final int totalTokens;
/** 响应耗时(毫秒) */
private final long elapsedMs;
/** 响应质量评分(0-1,由AI评分系统异步填充) */
private volatile Double qualityScore;
/** 是否来自缓存 */
private final boolean fromCache;
/** 关联的请求事件ID */
private final String requestEventId;
public AiResponseEvent(Object source, String userId, String sessionId,
String responseContent, int inputTokens, int outputTokens,
long elapsedMs, boolean fromCache, String requestEventId) {
super(source, userId, sessionId);
this.responseContent = responseContent;
this.inputTokens = inputTokens;
this.outputTokens = outputTokens;
this.totalTokens = inputTokens + outputTokens;
this.elapsedMs = elapsedMs;
this.fromCache = fromCache;
this.requestEventId = requestEventId;
}
public void setQualityScore(Double score) {
this.qualityScore = score;
}
}4.4 AI错误事件
package com.laozhang.ai.event;
import lombok.*;
/**
* AI错误事件 - AI调用出错时发布
*/
@Getter
public class AiErrorEvent extends BaseAiEvent {
/** 错误类型枚举 */
public enum ErrorType {
TIMEOUT, // 超时
RATE_LIMITED, // 被限流
CONTENT_FILTERED, // 内容被过滤
TOKEN_EXCEEDED, // Token超限
API_ERROR, // API错误
NETWORK_ERROR, // 网络错误
UNKNOWN // 未知错误
}
private final String userMessage;
private final ErrorType errorType;
private final String errorMessage;
private final Throwable cause;
private final int retryCount;
public AiErrorEvent(Object source, String userId, String sessionId,
String userMessage, ErrorType errorType,
String errorMessage, Throwable cause, int retryCount) {
super(source, userId, sessionId);
this.userMessage = userMessage;
this.errorType = errorType;
this.errorMessage = errorMessage;
this.cause = cause;
this.retryCount = retryCount;
}
}4.5 用户满意度事件
package com.laozhang.ai.event;
import lombok.*;
/**
* 用户满意度反馈事件 - 用户点赞/踩/评分时发布
*/
@Getter
public class UserFeedbackEvent extends BaseAiEvent {
public enum FeedbackType {
THUMBS_UP, // 点赞
THUMBS_DOWN, // 踩
RATING, // 评分(1-5)
COMMENT // 评论
}
private final String responseId;
private final FeedbackType feedbackType;
private final Double rating; // 1.0-5.0
private final String comment;
private final boolean isPositive; // 简化的正负判断
public UserFeedbackEvent(Object source, String userId, String sessionId,
String responseId, FeedbackType feedbackType,
Double rating, String comment) {
super(source, userId, sessionId);
this.responseId = responseId;
this.feedbackType = feedbackType;
this.rating = rating;
this.comment = comment;
this.isPositive = determinePositive(feedbackType, rating);
}
private boolean determinePositive(FeedbackType type, Double rating) {
return switch (type) {
case THUMBS_UP -> true;
case THUMBS_DOWN -> false;
case RATING -> rating != null && rating >= 3.5;
case COMMENT -> true; // 有评论视为正向互动
};
}
}五、事件监听器:异步处理AI事件
5.1 异步配置
package com.laozhang.ai.config;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.context.annotation.Bean;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import java.util.concurrent.Executor;
/**
* 异步事件处理线程池配置
*/
@Configuration
@EnableAsync
public class AsyncEventConfig {
/**
* AI事件专用线程池
* 与Spring默认线程池隔离,避免AI事件处理影响主业务
*/
@Bean(name = "aiEventExecutor")
public Executor aiEventExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(30);
executor.setQueueCapacity(500);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("ai-event-");
// 拒绝策略:打日志后降级到调用线程执行
executor.setRejectedExecutionHandler((r, exec) -> {
org.slf4j.LoggerFactory.getLogger(AsyncEventConfig.class)
.warn("[AI-EVENT] Thread pool full, executing in caller thread");
if (!exec.isShutdown()) {
r.run();
}
});
executor.initialize();
return executor;
}
/**
* 异步未捕获异常处理器
*/
@Bean
public AsyncUncaughtExceptionHandler asyncUncaughtExceptionHandler() {
return (ex, method, params) -> {
org.slf4j.LoggerFactory.getLogger(AsyncEventConfig.class)
.error("[AI-EVENT] Async exception in method={}, error={}",
method.getName(), ex.getMessage(), ex);
};
}
}六、实战1:AI调用后自动记录用量统计
6.1 用量统计数据模型
package com.laozhang.ai.domain;
import jakarta.persistence.*;
import lombok.*;
import java.time.LocalDate;
import java.time.LocalDateTime;
/**
* AI用量统计日表
*/
@Entity
@Table(name = "ai_usage_daily",
uniqueConstraints = @UniqueConstraint(columnNames = {"user_id", "stat_date", "model_name"}))
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class AiUsageDaily {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "user_id", nullable = false, length = 64)
private String userId;
@Column(name = "stat_date", nullable = false)
private LocalDate statDate;
@Column(name = "model_name", nullable = false, length = 50)
private String modelName;
@Column(name = "request_count")
private int requestCount;
@Column(name = "success_count")
private int successCount;
@Column(name = "error_count")
private int errorCount;
@Column(name = "input_tokens")
private long inputTokens;
@Column(name = "output_tokens")
private long outputTokens;
@Column(name = "total_tokens")
private long totalTokens;
/** 估算费用(分) */
@Column(name = "estimated_cost_fen")
private long estimatedCostFen;
/** 平均响应时间(毫秒) */
@Column(name = "avg_elapsed_ms")
private double avgElapsedMs;
@Column(name = "cache_hit_count")
private int cacheHitCount;
@Column(name = "created_at")
private LocalDateTime createdAt;
@Column(name = "updated_at")
private LocalDateTime updatedAt;
@PrePersist
void onCreate() {
createdAt = LocalDateTime.now();
updatedAt = LocalDateTime.now();
}
@PreUpdate
void onUpdate() {
updatedAt = LocalDateTime.now();
}
}6.2 用量统计Repository
package com.laozhang.ai.repository;
import com.laozhang.ai.domain.AiUsageDaily;
import org.springframework.data.jpa.repository.*;
import org.springframework.data.repository.query.Param;
import org.springframework.stereotype.Repository;
import java.time.LocalDate;
import java.util.List;
import java.util.Optional;
@Repository
public interface AiUsageDailyRepository extends JpaRepository<AiUsageDaily, Long> {
Optional<AiUsageDaily> findByUserIdAndStatDateAndModelName(
String userId, LocalDate statDate, String modelName);
@Query("SELECT SUM(u.totalTokens) FROM AiUsageDaily u " +
"WHERE u.userId = :userId AND u.statDate BETWEEN :start AND :end")
Long sumTokensByUserAndDateRange(@Param("userId") String userId,
@Param("start") LocalDate start,
@Param("end") LocalDate end);
@Query("SELECT u FROM AiUsageDaily u WHERE u.statDate = :date " +
"ORDER BY u.totalTokens DESC")
List<AiUsageDaily> findTopUsersByDate(@Param("date") LocalDate date,
org.springframework.data.domain.Pageable pageable);
}6.3 用量统计监听器
package com.laozhang.ai.event.listener;
import com.laozhang.ai.domain.AiUsageDaily;
import com.laozhang.ai.event.AiResponseEvent;
import com.laozhang.ai.event.AiErrorEvent;
import com.laozhang.ai.repository.AiUsageDailyRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import java.time.LocalDate;
/**
* AI用量统计事件监听器
*
* 监听AI响应和错误事件,异步更新用量统计
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class UsageStatisticsListener {
private final AiUsageDailyRepository usageRepository;
/**
* 监听AI响应事件,更新用量统计
* @Async 确保不阻塞主线程
*/
@Async("aiEventExecutor")
@EventListener
@Transactional
public void onAiResponse(AiResponseEvent event) {
try {
String userId = event.getUserId();
LocalDate today = LocalDate.now();
String modelName = "qwen-plus"; // 从事件中获取
// 查找或创建今日统计记录
AiUsageDaily daily = usageRepository
.findByUserIdAndStatDateAndModelName(userId, today, modelName)
.orElse(AiUsageDaily.builder()
.userId(userId)
.statDate(today)
.modelName(modelName)
.build());
// 更新统计数据
daily.setRequestCount(daily.getRequestCount() + 1);
daily.setSuccessCount(daily.getSuccessCount() + 1);
daily.setInputTokens(daily.getInputTokens() + event.getInputTokens());
daily.setOutputTokens(daily.getOutputTokens() + event.getOutputTokens());
daily.setTotalTokens(daily.getTotalTokens() + event.getTotalTokens());
// 更新缓存命中统计
if (event.isFromCache()) {
daily.setCacheHitCount(daily.getCacheHitCount() + 1);
}
// 计算移动平均响应时间
int count = daily.getRequestCount();
double newAvg = (daily.getAvgElapsedMs() * (count - 1) + event.getElapsedMs()) / count;
daily.setAvgElapsedMs(newAvg);
// 估算成本(以qwen-plus为例:input 0.004元/千token,output 0.012元/千token)
long costFen = (long) (event.getInputTokens() / 1000.0 * 0.4 +
event.getOutputTokens() / 1000.0 * 1.2);
daily.setEstimatedCostFen(daily.getEstimatedCostFen() + costFen);
usageRepository.save(daily);
log.debug("[USAGE] Updated: userId={}, tokens={}, cost={}分",
userId, event.getTotalTokens(), costFen);
} catch (Exception e) {
log.error("[USAGE] Failed to update statistics for userId={}",
event.getUserId(), e);
}
}
/**
* 监听AI错误事件,记录错误统计
*/
@Async("aiEventExecutor")
@EventListener
@Transactional
public void onAiError(AiErrorEvent event) {
try {
LocalDate today = LocalDate.now();
AiUsageDaily daily = usageRepository
.findByUserIdAndStatDateAndModelName(event.getUserId(), today, "qwen-plus")
.orElse(AiUsageDaily.builder()
.userId(event.getUserId())
.statDate(today)
.modelName("qwen-plus")
.build());
daily.setRequestCount(daily.getRequestCount() + 1);
daily.setErrorCount(daily.getErrorCount() + 1);
usageRepository.save(daily);
log.warn("[USAGE] Error recorded: userId={}, errorType={}",
event.getUserId(), event.getErrorType());
} catch (Exception e) {
log.error("[USAGE] Failed to record error for userId={}", event.getUserId(), e);
}
}
}七、实战2:AI回答质量评分触发器
7.1 质量评分数据模型
package com.laozhang.ai.domain;
import jakarta.persistence.*;
import lombok.*;
import java.time.LocalDateTime;
/**
* AI回答质量评分记录
*/
@Entity
@Table(name = "ai_quality_score")
@Data
@NoArgsConstructor
@Builder
@AllArgsConstructor
public class AiQualityScore {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "response_event_id", nullable = false, unique = true)
private String responseEventId;
@Column(name = "user_id", nullable = false)
private String userId;
@Column(name = "user_question", columnDefinition = "TEXT")
private String userQuestion;
@Column(name = "ai_response", columnDefinition = "TEXT")
private String aiResponse;
/** AI自动评分(0.0-1.0) */
@Column(name = "auto_score")
private Double autoScore;
/** 评分维度 */
@Column(name = "relevance_score")
private Double relevanceScore; // 相关性
@Column(name = "accuracy_score")
private Double accuracyScore; // 准确性
@Column(name = "completeness_score")
private Double completenessScore; // 完整性
@Column(name = "clarity_score")
private Double clarityScore; // 清晰度
/** 用户反馈评分(来自用户行为) */
@Column(name = "user_score")
private Double userScore;
@Column(name = "evaluated_at")
private LocalDateTime evaluatedAt;
@Column(name = "created_at")
private LocalDateTime createdAt;
@PrePersist
void onCreate() {
createdAt = LocalDateTime.now();
}
}7.2 质量评分服务
package com.laozhang.ai.service;
import com.laozhang.ai.domain.AiQualityScore;
import com.laozhang.ai.repository.AiQualityScoreRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
/**
* AI回答质量评分服务
* 使用AI来评估AI的回答质量(Meta-AI模式)
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class QualityEvaluationService {
private final ChatClient qualityJudgeClient;
private final AiQualityScoreRepository scoreRepository;
private static final String EVALUATION_PROMPT = """
你是一个专业的AI回答质量评估专家。请评估以下AI回答的质量:
用户问题:{question}
AI回答:{answer}
请从以下4个维度进行评分(每个维度0.0-1.0分),并给出整体评分:
1. 相关性(relevance):回答是否切合用户问题
2. 准确性(accuracy):回答内容是否正确
3. 完整性(completeness):回答是否完整覆盖了问题
4. 清晰度(clarity):表达是否清晰易懂
请以JSON格式输出,不要包含任何其他文字:
{
"relevance": 0.0,
"accuracy": 0.0,
"completeness": 0.0,
"clarity": 0.0,
"overall": 0.0,
"brief_reason": "一句话评语"
}
""";
/**
* 对AI回答进行质量评估
*
* @param eventId 响应事件ID
* @param userId 用户ID
* @param question 用户问题
* @param answer AI回答
* @return 质量评分对象
*/
public AiQualityScore evaluate(String eventId, String userId,
String question, String answer) {
// 过短的回答不进行评估
if (answer == null || answer.length() < 50) {
return buildDefaultScore(eventId, userId, question, answer);
}
try {
String evaluationResult = qualityJudgeClient.prompt()
.system("你是质量评估专家,只输出JSON,不输出其他内容。")
.user(EVALUATION_PROMPT
.replace("{question}", question)
.replace("{answer}", answer))
.call()
.content();
return parseEvaluationResult(evaluationResult, eventId, userId, question, answer);
} catch (Exception e) {
log.error("[QUALITY] Evaluation failed for eventId={}: {}", eventId, e.getMessage());
return buildDefaultScore(eventId, userId, question, answer);
}
}
private AiQualityScore parseEvaluationResult(String json, String eventId,
String userId, String question, String answer) {
// 使用Jackson解析JSON评分结果
try {
com.fasterxml.jackson.databind.ObjectMapper mapper =
new com.fasterxml.jackson.databind.ObjectMapper();
var node = mapper.readTree(json);
AiQualityScore score = AiQualityScore.builder()
.responseEventId(eventId)
.userId(userId)
.userQuestion(question.length() > 1000 ? question.substring(0, 1000) : question)
.aiResponse(answer.length() > 2000 ? answer.substring(0, 2000) : answer)
.relevanceScore(node.get("relevance").asDouble())
.accuracyScore(node.get("accuracy").asDouble())
.completenessScore(node.get("completeness").asDouble())
.clarityScore(node.get("clarity").asDouble())
.autoScore(node.get("overall").asDouble())
.evaluatedAt(LocalDateTime.now())
.build();
return scoreRepository.save(score);
} catch (Exception e) {
log.warn("[QUALITY] JSON parse failed: {}", e.getMessage());
return buildDefaultScore(eventId, userId, question, answer);
}
}
private AiQualityScore buildDefaultScore(String eventId, String userId,
String question, String answer) {
return AiQualityScore.builder()
.responseEventId(eventId)
.userId(userId)
.userQuestion(question)
.aiResponse(answer)
.autoScore(0.5) // 默认中等分
.evaluatedAt(LocalDateTime.now())
.build();
}
}7.3 质量评分监听器
package com.laozhang.ai.event.listener;
import com.laozhang.ai.event.AiResponseEvent;
import com.laozhang.ai.service.QualityEvaluationService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* AI回答质量评分监听器
* 异步触发质量评估,不影响主流程
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class QualityEvaluationListener {
private final QualityEvaluationService qualityEvaluationService;
@Value("${ai.events.quality-evaluation.enabled:true}")
private boolean evaluationEnabled;
@Value("${ai.events.quality-evaluation.min-response-length:50}")
private int minResponseLength;
@Async("aiEventExecutor")
@EventListener
public void onAiResponse(AiResponseEvent event) {
if (!evaluationEnabled) return;
// 只评估非缓存且有足够长度的回答
if (event.isFromCache()) return;
if (event.getResponseContent() == null ||
event.getResponseContent().length() < minResponseLength) return;
log.debug("[QUALITY-EVAL] Triggering evaluation for eventId={}", event.getEventId());
try {
qualityEvaluationService.evaluate(
event.getEventId(),
event.getUserId(),
// 注意:这里需要从事件中获取用户问题,
// 可以在AiResponseEvent中携带或通过eventId查询
"用户问题(从事件上下文获取)",
event.getResponseContent()
);
log.debug("[QUALITY-EVAL] Evaluation completed for eventId={}", event.getEventId());
} catch (Exception e) {
// 评分失败不影响主流程
log.warn("[QUALITY-EVAL] Evaluation failed for eventId={}: {}",
event.getEventId(), e.getMessage());
}
}
}八、实战3:用户满意度低时自动告警
package com.laozhang.ai.event.listener;
import com.laozhang.ai.event.UserFeedbackEvent;
import com.laozhang.ai.service.AlertService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.event.EventListener;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
/**
* 用户满意度告警监听器
*
* 触发告警的条件:
* 1. 单次负面反馈(踩/差评)
* 2. 用户连续N次负面反馈
* 3. 某时间段内全局负面率超过阈值
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class SatisfactionAlertListener {
private final StringRedisTemplate redisTemplate;
private final AlertService alertService;
@Value("${ai.events.alert.low-satisfaction-threshold:0.4}")
private double lowSatisfactionThreshold;
@Value("${ai.events.alert.consecutive-low-count:3}")
private int consecutiveLowCount;
private static final String USER_NEG_COUNT_KEY = "ai:alert:neg:user:";
private static final String GLOBAL_NEG_COUNT_KEY = "ai:alert:neg:global:";
private static final String GLOBAL_TOTAL_COUNT_KEY = "ai:alert:total:global:";
@Async("aiEventExecutor")
@EventListener
public void onUserFeedback(UserFeedbackEvent event) {
// 更新全局统计
updateGlobalStats(event.isPositive());
if (!event.isPositive()) {
handleNegativeFeedback(event);
}
}
private void handleNegativeFeedback(UserFeedbackEvent event) {
String userId = event.getUserId();
// 1. 检查用户连续负面反馈次数
String userKey = USER_NEG_COUNT_KEY + userId;
Long consecutiveNeg = redisTemplate.opsForValue().increment(userKey);
redisTemplate.expire(userKey, 1, TimeUnit.HOURS);
if (consecutiveNeg != null && consecutiveNeg >= consecutiveLowCount) {
log.warn("[ALERT] User consecutive negative feedback: userId={}, count={}",
userId, consecutiveNeg);
alertService.sendAlert(AlertService.AlertLevel.WARNING,
String.format("用户[%s]连续%d次给出负面反馈,可能需要人工介入",
userId, consecutiveNeg));
// 重置计数(已告警)
redisTemplate.delete(userKey);
}
// 2. 检查全局负面率
checkGlobalNegativeRate();
}
private void checkGlobalNegativeRate() {
String negKey = GLOBAL_NEG_COUNT_KEY + getCurrentHourKey();
String totalKey = GLOBAL_TOTAL_COUNT_KEY + getCurrentHourKey();
String negStr = redisTemplate.opsForValue().get(negKey);
String totalStr = redisTemplate.opsForValue().get(totalKey);
if (negStr != null && totalStr != null) {
long neg = Long.parseLong(negStr);
long total = Long.parseLong(totalStr);
if (total >= 100) { // 至少100次才有统计意义
double negRate = (double) neg / total;
if (negRate > lowSatisfactionThreshold) {
log.warn("[ALERT] Global negative rate HIGH: {:.1f}% (threshold: {:.1f}%)",
negRate * 100, lowSatisfactionThreshold * 100);
alertService.sendAlert(AlertService.AlertLevel.HIGH,
String.format("全局负面反馈率过高: %.1f%%,超过阈值%.1f%%",
negRate * 100, lowSatisfactionThreshold * 100));
}
}
}
}
private void updateGlobalStats(boolean isPositive) {
String hourKey = getCurrentHourKey();
String totalKey = GLOBAL_TOTAL_COUNT_KEY + hourKey;
redisTemplate.opsForValue().increment(totalKey);
redisTemplate.expire(totalKey, 2, TimeUnit.HOURS);
if (!isPositive) {
String negKey = GLOBAL_NEG_COUNT_KEY + hourKey;
redisTemplate.opsForValue().increment(negKey);
redisTemplate.expire(negKey, 2, TimeUnit.HOURS);
}
}
private String getCurrentHourKey() {
return java.time.LocalDateTime.now()
.format(java.time.format.DateTimeFormatter.ofPattern("yyyyMMddHH"));
}
}AlertService实现
package com.laozhang.ai.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
/**
* 告警服务
* 可接入钉钉、企业微信、PagerDuty等告警渠道
*/
@Service
@Slf4j
public class AlertService {
public enum AlertLevel { INFO, WARNING, HIGH, CRITICAL }
public void sendAlert(AlertLevel level, String message) {
log.warn("[ALERT][{}] {}", level, message);
// TODO: 接入真实告警渠道
// dingTalkService.send(level, message);
// weworkService.send(level, message);
}
}九、事件溯源:基于事件的AI操作审计
9.1 审计日志数据模型
package com.laozhang.ai.domain;
import jakarta.persistence.*;
import lombok.*;
import java.time.Instant;
/**
* AI操作审计日志
* 用于合规审计和问题溯源
*/
@Entity
@Table(name = "ai_audit_log",
indexes = {
@Index(name = "idx_user_time", columnList = "user_id, occurred_at"),
@Index(name = "idx_session", columnList = "session_id"),
@Index(name = "idx_event_type", columnList = "event_type")
})
@Data
@NoArgsConstructor
@AllArgsConstructor
@Builder
public class AiAuditLog {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "event_id", nullable = false, unique = true, length = 64)
private String eventId;
@Column(name = "event_type", nullable = false, length = 50)
private String eventType;
@Column(name = "user_id", nullable = false, length = 64)
private String userId;
@Column(name = "session_id", length = 64)
private String sessionId;
/** 用户输入(脱敏后) */
@Column(name = "user_input", columnDefinition = "TEXT")
private String userInput;
/** AI输出摘要(前200字) */
@Column(name = "response_summary", length = 500)
private String responseSummary;
@Column(name = "model_name", length = 50)
private String modelName;
@Column(name = "input_tokens")
private int inputTokens;
@Column(name = "output_tokens")
private int outputTokens;
@Column(name = "elapsed_ms")
private long elapsedMs;
@Column(name = "is_blocked")
private boolean blocked;
@Column(name = "block_reason", length = 200)
private String blockReason;
@Column(name = "client_ip", length = 50)
private String clientIp;
@Column(name = "user_agent", length = 200)
private String userAgent;
@Column(name = "occurred_at", nullable = false)
private Instant occurredAt;
/** 脱敏处理用户输入 */
public static String desensitize(String input) {
if (input == null) return null;
// 手机号、身份证号等敏感信息替换
return input
.replaceAll("1[3-9]\\d{9}", "138****xxxx")
.replaceAll("\\d{15,18}", "[ID号已脱敏]")
.replaceAll("[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\\.[a-zA-Z]{2,}", "[邮箱已脱敏]");
}
}9.2 审计监听器
package com.laozhang.ai.event.listener;
import com.laozhang.ai.domain.AiAuditLog;
import com.laozhang.ai.event.*;
import com.laozhang.ai.repository.AiAuditLogRepository;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.event.EventListener;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
/**
* AI操作审计监听器
* 记录所有AI交互的完整审计链
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class AuditLogListener {
private final AiAuditLogRepository auditLogRepository;
@Async("aiEventExecutor")
@EventListener
public void onAiResponse(AiResponseEvent event) {
try {
AiAuditLog log = AiAuditLog.builder()
.eventId(event.getEventId())
.eventType("AI_RESPONSE")
.userId(event.getUserId())
.sessionId(event.getSessionId())
.responseSummary(truncate(event.getResponseContent(), 200))
.inputTokens(event.getInputTokens())
.outputTokens(event.getOutputTokens())
.elapsedMs(event.getElapsedMs())
.blocked(false)
.occurredAt(event.getOccurredAt())
.build();
auditLogRepository.save(log);
} catch (Exception e) {
AuditLogListener.log.error("[AUDIT] Failed to save audit log", e);
}
}
@Async("aiEventExecutor")
@EventListener
public void onAiError(AiErrorEvent event) {
try {
AiAuditLog log = AiAuditLog.builder()
.eventId(event.getEventId())
.eventType("AI_ERROR")
.userId(event.getUserId())
.sessionId(event.getSessionId())
.userInput(AiAuditLog.desensitize(event.getUserMessage()))
.blocked(true)
.blockReason(event.getErrorType().name() + ": " + event.getErrorMessage())
.occurredAt(event.getOccurredAt())
.build();
auditLogRepository.save(log);
} catch (Exception e) {
AuditLogListener.log.error("[AUDIT] Failed to save error audit log", e);
}
}
private String truncate(String text, int maxLen) {
if (text == null) return null;
return text.length() <= maxLen ? text : text.substring(0, maxLen) + "...";
}
}十、分布式AI事件:RocketMQ实现跨服务事件传播
10.1 架构图
10.2 事件桥接器(本地事件→MQ)
package com.laozhang.ai.event.bridge;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.laozhang.ai.event.AiResponseEvent;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.context.event.EventListener;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* AI事件到RocketMQ的桥接器
* 将Spring本地事件转换为分布式MQ消息
*/
@Component
@Slf4j
@RequiredArgsConstructor
public class AiEventMqBridge {
private final RocketMQTemplate rocketMQTemplate;
private final ObjectMapper objectMapper;
private static final String AI_EVENT_TOPIC = "ai-events";
@Async("aiEventExecutor")
@EventListener
public void bridgeAiResponseEvent(AiResponseEvent event) {
try {
// 构建MQ消息体(轻量化,不发送完整响应内容)
Map<String, Object> message = new HashMap<>();
message.put("eventId", event.getEventId());
message.put("eventType", "AI_RESPONSE");
message.put("userId", event.getUserId());
message.put("sessionId", event.getSessionId());
message.put("inputTokens", event.getInputTokens());
message.put("outputTokens", event.getOutputTokens());
message.put("totalTokens", event.getTotalTokens());
message.put("elapsedMs", event.getElapsedMs());
message.put("fromCache", event.isFromCache());
message.put("occurredAt", event.getOccurredAt().toString());
String jsonBody = objectMapper.writeValueAsString(message);
// 发送到RocketMQ,tags用于消费者过滤
rocketMQTemplate.syncSend(
AI_EVENT_TOPIC + ":AI_RESPONSE",
MessageBuilder.withPayload(jsonBody)
.setHeader("userId", event.getUserId())
.setHeader("eventId", event.getEventId())
.build()
);
log.debug("[MQ-BRIDGE] AI response event sent to MQ: eventId={}", event.getEventId());
} catch (Exception e) {
// MQ发送失败不影响主流程
log.error("[MQ-BRIDGE] Failed to send event to MQ: {}", e.getMessage());
}
}
}10.3 核心AI服务(事件发布者)
package com.laozhang.ai.service;
import com.laozhang.ai.event.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.model.ChatResponse;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Service;
/**
* AI回答核心服务
* 职责单一:调用AI + 发布事件
* 后续动作由监听器异步处理
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class AiAnswerService {
private final ChatClient chatClient;
private final ApplicationEventPublisher eventPublisher;
public String answer(String userId, String sessionId, String question) {
long start = System.currentTimeMillis();
// 1. 发布请求事件(供日志、限流等前置处理器使用)
String requestEventId = java.util.UUID.randomUUID().toString();
AiRequestEvent requestEvent = new AiRequestEvent(
this, userId, sessionId, question, null, "qwen-plus",
estimateTokens(question)
);
eventPublisher.publishEvent(requestEvent);
try {
// 2. 调用AI(核心逻辑)
ChatResponse chatResponse = chatClient.prompt()
.user(question)
.advisors(a -> a.param("userId", userId)
.param("sessionId", sessionId))
.call()
.chatResponse();
String answer = chatResponse.getResult().getOutput().getContent();
long elapsed = System.currentTimeMillis() - start;
int inputTokens = extractInputTokens(chatResponse);
int outputTokens = extractOutputTokens(chatResponse);
// 3. 发布响应事件(解耦所有后续动作)
AiResponseEvent responseEvent = new AiResponseEvent(
this, userId, sessionId,
answer, inputTokens, outputTokens,
elapsed, false, requestEventId
);
eventPublisher.publishEvent(responseEvent);
// 核心方法只需返回结果,其余由事件系统处理
return answer;
} catch (Exception e) {
long elapsed = System.currentTimeMillis() - start;
// 4. 发布错误事件
AiErrorEvent errorEvent = new AiErrorEvent(
this, userId, sessionId, question,
AiErrorEvent.ErrorType.API_ERROR,
e.getMessage(), e, 0
);
eventPublisher.publishEvent(errorEvent);
throw e;
}
}
private int estimateTokens(String text) {
return text != null ? (int)(text.length() * 0.75) : 0;
}
private int extractInputTokens(ChatResponse response) {
try {
return response.getMetadata().getUsage().getPromptTokens().intValue();
} catch (Exception e) { return 0; }
}
private int extractOutputTokens(ChatResponse response) {
try {
return response.getMetadata().getUsage().getGenerationTokens().intValue();
} catch (Exception e) { return 0; }
}
}十一、性能数据与架构效果
11.1 事件驱动改造前后对比
| 指标 | 改造前(耦合) | 改造后(事件驱动) | 提升 |
|---|---|---|---|
| 接口P50延迟 | 3800ms | 1280ms | 66%降低 |
| 接口P99延迟 | 8500ms | 2800ms | 67%降低 |
| 接口超时率 | 2.3% | 0.2% | 91%降低 |
| 新增监听器工作量 | 修改核心方法 | 新增Listener类 | 零侵入 |
| 核心方法单元测试Mock数量 | 5个 | 1个 | 80%减少 |
| 某监听器故障影响面 | 整个接口不可用 | 只影响该监听器 | 故障隔离 |
11.2 异步线程池监控数据
某大厂AI平台(日调用量500万次)的线程池运行数据:
- 核心线程数:10,最大线程数:30
- 高峰期(20:00-22:00)活跃线程:18
- 任务队列积压:平均15个(峰值120个)
- 任务执行平均耗时:8ms(主要是DB写入)
- 任务拒绝率:0.003%
十二、FAQ
Q1:Spring @EventListener默认是同步的,如何确保异步?
A:必须加上@Async注解,并且配置@EnableAsync。仅有@EventListener不会异步。另外注意异步方法不能是final、private或static的,且必须在Spring容器管理的Bean中。
Q2:@TransactionalEventListener和@EventListener有什么区别?
A:@TransactionalEventListener默认在事务提交后才执行,适合"用户注册后发邮件"这类场景,确保数据库操作成功后才触发后续动作。AI事件一般不需要事务绑定,用@EventListener即可。
Q3:事件监听器中发生异常会影响主事务吗?
A:异步监听器(加了@Async)的异常不会影响主线程,但会被吞掉(需要配置AsyncUncaughtExceptionHandler捕获)。同步监听器(无@Async)的未捕获异常会传播到发布者。
Q4:如何实现有序的事件处理?
A:同一个事件类型有多个Listener时,通过@Order注解控制执行顺序(仅适用于同步监听器)。异步监听器无法保证顺序。如果需要顺序,考虑在单个监听器中按顺序调用多个服务。
Q5:大量AI事件会导致数据库写入压力过大怎么办?
A:几种优化方案:1)批量写入(accumulate事件后批量flush);2)写入消息队列后异步消费;3)使用时序数据库(InfluxDB/ClickHouse)代替MySQL;4)只写入Redis,定期同步到MySQL。
Q6:如何调试事件流?某个事件没有被处理怎么排查?
A:开启Spring的debug日志:logging.level.org.springframework.context.event=DEBUG。可以看到每个事件的发布和监听过程。另外建议给每个事件加唯一ID(本文BaseAiEvent中已实现),方便日志追踪。
结尾
小李按照事件驱动架构重构了代码。核心AI方法从117行缩减到35行,5个后续动作全部解耦成独立的Listener。接口延迟从3.8秒降到1.3秒,接口超时率从2.3%降到0.2%。
产品经理又来催那第7个需求了,这次小李只需要新增一个CourseRecommendationListener,核心代码30行,完全不需要动AI调用逻辑。
这就是事件驱动架构的魅力:让关心某件事的人自己来处理,而不是所有人挤进同一个方法。
