第2488篇:AI增强的消息系统——在IM和通知系统中引入AI能力
2026/4/30大约 7 分钟
第2488篇:AI增强的消息系统——在IM和通知系统中引入AI能力
适读人群:Java后端工程师、消息系统开发者、AI工程师 | 阅读时长:约14分钟 | 核心价值:掌握在IM和通知系统中集成AI能力的工程方法
我们团队有一个企业内部 IM 系统,用了好几年了。某次做年度技术规划,产品经理提了一个需求:能不能在 IM 里加一个 AI 助手,帮用户处理一些重复性的工作?
具体需求是:
- 自动帮用户起草回复消息
- 对长对话做摘要,让用户快速了解错过的内容
- 智能过滤通知,重要的立即推,不重要的攒起来批量推
- 在群聊里识别 Action Item,自动提醒相关人员
听起来都是小功能,但做完之后用户反馈非常好。
我们当时遇到的技术挑战比预期的多,这篇把经验整理出来。
一、整体架构设计
在消息系统里加 AI,不能"直接在消息链路上加 AI 调用"——消息的延迟要求通常是毫秒级的,而 LLM 调用动辄几秒,不能阻塞主链路。
正确的架构是异步旁路模式:
这个设计的关键:主链路不依赖 AI,AI 处理是旁路异步的。主链路出了问题不影响 AI,AI 出了问题也不影响主链路。
二、智能通知优先级系统
最先上线的功能是"通知降噪",用户反馈改善最明显。
@Service
@Slf4j
public class NotificationPriorityService {
private final ChatClient chatClient;
private final UserNotificationProfileService profileService;
private final RedisTemplate<String, String> redisTemplate;
// 判断通知优先级
public NotificationPriority assessPriority(
Notification notification,
String userId) {
// 1. 快速规则判断(不走 AI)
NotificationPriority quickPriority = quickRuleAssess(notification, userId);
if (quickPriority == NotificationPriority.CRITICAL) {
return NotificationPriority.CRITICAL; // @提到本人、紧急关键词,直接最高优先级
}
// 2. 获取用户的通知偏好配置
UserNotificationProfile profile = profileService.get(userId);
// 3. 检查内容是否与用户关注的话题相关
boolean isRelevantTopic = isRelevantToUser(notification.getContent(), profile);
if (!isRelevantTopic && notification.getType() == NotificationType.GROUP_CHAT) {
return NotificationPriority.LOW; // 与用户无关的群消息降级
}
// 4. 对于中等复杂度的通知,用轻量模型评估
return aiAssessPriority(notification, profile);
}
private NotificationPriority quickRuleAssess(Notification notification, String userId) {
String content = notification.getContent();
String mentions = notification.getMentions();
// @了本人,高优先级
if (mentions != null && mentions.contains(userId)) {
return NotificationPriority.HIGH;
}
// 包含紧急关键词,最高优先级
String[] urgentKeywords = {"紧急", "故障", "宕机", "urgent", "ASAP", "immediately"};
for (String keyword : urgentKeywords) {
if (content.toLowerCase().contains(keyword.toLowerCase())) {
return NotificationPriority.CRITICAL;
}
}
// 私信,高优先级
if (notification.getType() == NotificationType.DIRECT_MESSAGE) {
return NotificationPriority.HIGH;
}
return NotificationPriority.MEDIUM; // 默认中优先级,再由 AI 细化
}
private NotificationPriority aiAssessPriority(
Notification notification,
UserNotificationProfile profile) {
// 检查缓存(相似内容不重复调用 AI)
String cacheKey = "notify:priority:" + hashContent(notification.getContent());
String cached = redisTemplate.opsForValue().get(cacheKey);
if (cached != null) {
return NotificationPriority.valueOf(cached);
}
String prompt = String.format("""
判断以下消息对该用户的优先级。
用户角色:%s
用户关注领域:%s
当前时间:%s(工作时间 9:00-18:00)
消息来源:%s
消息内容:%s
优先级定义:
- HIGH:用户需要尽快查看(10分钟内)
- MEDIUM:用户可以在1小时内查看
- LOW:可以攒到合适时间批量查看
只返回优先级,不要解释:HIGH / MEDIUM / LOW
""",
profile.getUserRole(),
String.join("、", profile.getFocusTopics()),
LocalTime.now().toString(),
notification.getSenderName(),
notification.getContent()
);
String result = chatClient.call(prompt).trim().toUpperCase();
NotificationPriority priority;
try {
priority = NotificationPriority.valueOf(result);
} catch (IllegalArgumentException e) {
priority = NotificationPriority.MEDIUM; // 解析失败默认中优先级
}
// 缓存结果(30分钟)
redisTemplate.opsForValue().set(cacheKey, priority.name(), Duration.ofMinutes(30));
return priority;
}
private boolean isRelevantToUser(String content, UserNotificationProfile profile) {
String contentLower = content.toLowerCase();
return profile.getFocusTopics().stream()
.anyMatch(topic -> contentLower.contains(topic.toLowerCase()));
}
private String hashContent(String content) {
// 只取前100字符计算哈希,避免长消息导致不同的缓存 key
return String.valueOf((content.substring(0, Math.min(100, content.length()))).hashCode());
}
}2.1 智能通知批处理调度
@Service
@Slf4j
public class SmartNotificationBatcher {
private final NotificationPriorityService priorityService;
private final NotificationDeliveryService deliveryService;
// 根据优先级决定通知投递策略
public void scheduleNotification(Notification notification, String userId) {
NotificationPriority priority = priorityService.assessPriority(notification, userId);
switch (priority) {
case CRITICAL:
// 立即投递,不做任何延迟
deliveryService.deliverImmediately(notification, userId);
break;
case HIGH:
// 2分钟内投递(合并同一来源的多条消息)
batchScheduler.schedule(userId, notification, Duration.ofMinutes(2));
break;
case MEDIUM:
// 每30分钟批量投递一次
batchScheduler.schedule(userId, notification, Duration.ofMinutes(30));
break;
case LOW:
// 在下一个"合适时间"投递(工作结束前或第二天早上)
LocalDateTime nextBatchTime = calculateNextBatchTime();
batchScheduler.scheduleAt(userId, notification, nextBatchTime);
break;
}
}
// 批量投递时,对多条低优先级消息做聚合摘要
public void deliverBatch(String userId, List<Notification> notifications) {
if (notifications.size() == 1) {
deliveryService.deliver(notifications.get(0), userId);
return;
}
// 多条消息聚合成摘要通知
String summary = generateBatchSummary(notifications);
Notification batchNotification = Notification.builder()
.type(NotificationType.BATCH_SUMMARY)
.content(summary)
.originalNotifications(notifications)
.userId(userId)
.build();
deliveryService.deliver(batchNotification, userId);
}
private String generateBatchSummary(List<Notification> notifications) {
String notificationTexts = notifications.stream()
.map(n -> n.getSenderName() + ": " + n.getContent())
.collect(Collectors.joining("\n"));
return chatClient.call("""
请用2-3句话总结以下消息,帮助用户快速了解主要内容:
""" + notificationTexts + """
格式:直接输出总结,不要"总结:"等前缀。
""");
}
private LocalDateTime calculateNextBatchTime() {
LocalDateTime now = LocalDateTime.now();
LocalTime endOfWork = LocalTime.of(18, 0);
LocalTime startOfWork = LocalTime.of(9, 0);
if (now.toLocalTime().isBefore(endOfWork)) {
return now.toLocalDate().atTime(endOfWork);
} else {
return now.toLocalDate().plusDays(1).atTime(startOfWork);
}
}
}三、长对话摘要功能
@Service
@Slf4j
public class ConversationSummaryService {
private final ChatClient chatClient;
private final MessageRepository messageRepo;
// 生成未读消息摘要(用户回来看一眼就知道聊了啥)
public ConversationSummary generateUnreadSummary(
String conversationId,
String userId,
Instant lastReadTime) {
// 获取用户未读的消息
List<Message> unreadMessages = messageRepo.findUnread(
conversationId, lastReadTime, Instant.now());
if (unreadMessages.isEmpty()) {
return ConversationSummary.empty();
}
// 太少的消息不需要摘要
if (unreadMessages.size() <= 5) {
return ConversationSummary.noSummaryNeeded(unreadMessages.size());
}
// 构建消息文本
String conversationText = unreadMessages.stream()
.map(m -> String.format("[%s] %s: %s",
m.getTimestamp().toString().substring(11, 16),
m.getSenderName(),
m.getContent()))
.collect(Collectors.joining("\n"));
String prompt = String.format("""
以下是你不在线期间群聊的消息记录(共%d条):
%s
请生成一个简洁的摘要,包含:
1. 主要讨论了什么(2-3句话)
2. 是否有需要你关注的事项(有则列出,无则说"无需关注")
3. 是否有未解决的问题(有则列出)
格式简洁,用中文,方便快速阅读。
""", unreadMessages.size(), conversationText);
String summaryText = chatClient.call(prompt);
// 提取 Action Items(需要当前用户跟进的事)
List<String> actionItems = extractActionItemsForUser(
conversationText, userId);
return ConversationSummary.builder()
.conversationId(conversationId)
.unreadCount(unreadMessages.size())
.summaryText(summaryText)
.actionItems(actionItems)
.generatedAt(Instant.now())
.build();
}
// 从对话中提取针对特定用户的 Action Items
private List<String> extractActionItemsForUser(String conversation, String userId) {
String prompt = String.format("""
从以下对话中提取需要 %s 跟进的 Action Items。
只提取明确指向该用户或与其职责相关的事项。
对话:
%s
以JSON数组格式返回:["action1", "action2"]
如果没有相关 Action Items,返回:[]
""", userId, conversation);
try {
String response = chatClient.call(prompt);
return objectMapper.readValue(response.trim(), new TypeReference<List<String>>() {});
} catch (Exception e) {
log.warn("提取 Action Items 失败", e);
return Collections.emptyList();
}
}
}四、AI 辅助起草回复
@Service
@Slf4j
public class ReplyDraftService {
private final ChatClient chatClient;
private final UserCommunicationStyleService styleService;
// 基于上下文为用户生成回复草稿
public List<ReplyDraft> generateDrafts(
String conversationId,
Message lastMessage,
String userId) {
// 获取用户的沟通风格偏好
CommunicationStyle style = styleService.getStyle(userId);
// 获取最近的上下文消息
List<Message> context = messageRepo.findRecent(conversationId, 5);
String contextText = context.stream()
.map(m -> m.getSenderName() + ": " + m.getContent())
.collect(Collectors.joining("\n"));
String prompt = String.format("""
场景:企业内部IM
沟通风格要求:%s
对话上下文:
%s
最新消息:%s: %s
请为用户生成3个风格不同的回复草稿:
1. 简洁确认型(一句话)
2. 详细回应型(3-5句话)
3. 询问澄清型(如果信息不够明确)
返回JSON格式:
[
{"type": "brief", "content": "回复内容"},
{"type": "detailed", "content": "回复内容"},
{"type": "clarify", "content": "回复内容"}
]
""",
style.getDescription(),
contextText,
lastMessage.getSenderName(),
lastMessage.getContent()
);
try {
String response = chatClient.call(prompt);
List<ReplyDraft> drafts = objectMapper.readValue(response,
new TypeReference<List<ReplyDraft>>() {});
// 标记为 AI 生成
drafts.forEach(d -> d.setAiGenerated(true));
return drafts;
} catch (Exception e) {
log.error("生成回复草稿失败", e);
return Collections.emptyList();
}
}
}五、关键的工程注意事项
异步处理,不阻塞主链路。这是最重要的一条。AI 功能必须异步处理,失败了就静默降级,不能影响消息的正常收发。
用户感知的透明度。AI 生成的内容要标注"AI建议",让用户知道这不是人写的。用户修改了 AI 草稿,也要记录下来用于改进。
隐私保护。对话内容非常敏感。AI 处理只能在企业私有化部署的模型上进行,绝对不能把内部对话发给第三方 API。
可关闭性。每个 AI 功能都要有开关,用户可以选择关闭所有 AI 辅助功能。
延迟可见性。即使是异步处理,也要在 UI 上对摘要等功能显示"生成中..."状态,让用户知道在处理。
这几个原则做到了,用户才会真正信任和接受 IM 里的 AI 功能。
