AI应用的消息推送:AI生成内容的实时触达设计
2026/10/1大约 14 分钟消息推送WebPush实时通知Spring AIJava
AI应用的消息推送:AI生成内容的实时触达设计
开篇故事:李薇的"已读不回"
2026年1月,某教育科技公司的产品经理李薇在用户调研中听到了一个频繁出现的抱怨:
"我提交了一篇作文让AI批改,然后就去干别的了。等我回来,完全不知道有没有改完。每次都要去刷新页面,很烦。"
李薇数了数,她们的AI作文批改平台日均提交量3.2万篇,平均批改时间45秒。按照用户调研的数据:
- 67%的用户会在提交后30秒内离开当前页面
- 43%的用户需要刷新3次以上才能看到结果
- 因为"不知道有没有完成",用户平均等待时间是实际处理时间的2.8倍
- 18%的用户因为等不到结果,直接放弃了这次批改
这不是AI质量问题,这是通知问题。
李薇的团队用了两周时间,为AI任务结果实现了完整的推送通知体系:
- 浏览器WebPush通知(即使离开页面也能收到)
- 应用内SSE实时更新(停留页面的用户即时看到结果)
- 微信服务号模板消息(接入微信的用户)
上线后的数据:
- 用户平均等待体验时间从125秒降低到48秒(实际处理时间没变)
- "等待相关"的负面反馈下降了71%
- 30天留存率提升了13%
这就是"最后一公里"通知的力量。本文将带你实现AI异步任务的完整推送通知体系。
TL;DR
- 核心问题:AI任务是异步的,用户需要知道"结果回来了"
- 四种方案对比:轮询/SSE/WebSocket/WebPush各自的适用场景
- 推荐组合:SSE(页面内)+ WebPush(离开页面后)
- Spring AI集成:通过观察者模式桥接AI任务完成事件与推送系统
- 重要细节:弱网重连、消息去重、用户授权最佳实践
一、AI异步任务的通知挑战
1.1 为什么AI任务需要异步通知?
同步等待的用户体验公式:
实际等待时间 × 不确定感 = 用户感知的痛苦程度
例:等45秒(不知道是否完成) >> 等45秒(进度条+预计剩余时间)AI任务的特点决定了异步通知的必要性:
- 处理时间不可预测:简单任务3秒,复杂任务3分钟
- 用户不愿意盯着屏幕等:特别是>10秒的任务
- 多端使用:用户可能切换到手机/另一个标签
1.2 四种通知方案对比
| 方案 | 实时性 | 场景 | 优点 | 缺点 |
|---|---|---|---|---|
| 短轮询 | 低(延迟=轮询间隔) | 简单场景降级方案 | 实现简单 | 浪费资源,延迟高 |
| 长轮询 | 中 | 不支持SSE的老浏览器 | 兼容性好 | 服务端连接占用 |
| SSE | 高 | 用户在页面内 | 轻量,单向,天然支持重连 | 用户离开页面失效 |
| WebSocket | 高 | 需要双向通信的场景 | 双向,低延迟 | 实现复杂,连接保持成本 |
| WebPush | 中(秒级) | 用户已离开页面/应用 | 跨平台,无需保持连接 | 需要用户授权 |
推荐组合策略:
用户提交AI任务
│
├── 用户在页面内? → SSE实时推送
│
├── 用户离开页面? → WebPush通知
│
└── 用户在微信内? → 微信模板消息二、SSE实现:页面内实时更新
2.1 Spring Boot SSE服务端
// AiTaskNotificationController.java
@RestController
@RequestMapping("/api/notifications")
@Slf4j
public class AiTaskNotificationController {
private final AiTaskService aiTaskService;
private final SseEmitterManager sseEmitterManager;
// 建立SSE连接
@GetMapping(value = "/stream/{userId}",
produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribe(
@PathVariable String userId,
@RequestHeader(value = "Last-Event-ID", required = false) String lastEventId) {
// 超时时间30分钟(用户会话时长)
SseEmitter emitter = new SseEmitter(30 * 60 * 1000L);
sseEmitterManager.addEmitter(userId, emitter);
// 设置回调
emitter.onCompletion(() -> {
sseEmitterManager.removeEmitter(userId, emitter);
log.debug("SSE连接关闭: {}", userId);
});
emitter.onTimeout(() -> {
sseEmitterManager.removeEmitter(userId, emitter);
log.debug("SSE连接超时: {}", userId);
});
emitter.onError(ex -> {
sseEmitterManager.removeEmitter(userId, emitter);
log.warn("SSE连接错误 [{}]: {}", userId, ex.getMessage());
});
// 发送初始连接确认
try {
emitter.send(SseEmitter.event()
.id(String.valueOf(System.currentTimeMillis()))
.name("connected")
.data(Map.of("status", "connected", "userId", userId)));
// 如果有lastEventId,重发未接收的消息
if (lastEventId != null) {
resendMissedEvents(userId, lastEventId, emitter);
}
} catch (IOException e) {
emitter.completeWithError(e);
}
return emitter;
}
// 提交AI任务
@PostMapping("/tasks/submit")
public ResponseEntity<TaskSubmitResponse> submitTask(
@RequestBody AiTaskRequest request,
@AuthenticationPrincipal UserDetails user) {
String taskId = UUID.randomUUID().toString();
// 异步执行AI任务,完成后通过事件通知
aiTaskService.submitAsync(taskId, request, user.getUsername());
return ResponseEntity.accepted()
.body(TaskSubmitResponse.builder()
.taskId(taskId)
.status("PROCESSING")
.estimatedTimeSeconds(aiTaskService.estimateTime(request))
.build());
}
// 重发未接收的消息
private void resendMissedEvents(String userId, String lastEventId,
SseEmitter emitter) {
long lastId = Long.parseLong(lastEventId);
// 从消息缓存中获取lastEventId之后的消息
List<CachedEvent> missedEvents = sseEmitterManager
.getMissedEvents(userId, lastId);
for (CachedEvent event : missedEvents) {
try {
emitter.send(SseEmitter.event()
.id(String.valueOf(event.getId()))
.name(event.getType())
.data(event.getData()));
} catch (IOException e) {
break;
}
}
}
}2.2 SSE发射器管理器
// SseEmitterManager.java
@Component
@Slf4j
public class SseEmitterManager {
// 用户ID -> 多个Emitter(同一用户多个标签页)
private final ConcurrentHashMap<String, CopyOnWriteArrayList<SseEmitter>>
userEmitters = new ConcurrentHashMap<>();
// 消息缓存(用于重连时重发)
private final Cache<String, List<CachedEvent>> eventCache;
public SseEmitterManager() {
this.eventCache = Caffeine.newBuilder()
.expireAfterWrite(5, TimeUnit.MINUTES)
.maximumSize(10000)
.build();
}
public void addEmitter(String userId, SseEmitter emitter) {
userEmitters.computeIfAbsent(userId, k -> new CopyOnWriteArrayList<>())
.add(emitter);
log.debug("添加SSE连接: {} (当前连接数: {})",
userId, getUserEmitterCount(userId));
}
public void removeEmitter(String userId, SseEmitter emitter) {
List<SseEmitter> emitters = userEmitters.get(userId);
if (emitters != null) {
emitters.remove(emitter);
if (emitters.isEmpty()) {
userEmitters.remove(userId);
}
}
}
// 向用户发送通知
public void sendToUser(String userId, String eventType, Object data) {
List<SseEmitter> emitters = userEmitters.get(userId);
if (emitters == null || emitters.isEmpty()) {
log.debug("用户[{}]没有活跃的SSE连接", userId);
return;
}
long eventId = System.currentTimeMillis();
String jsonData = serializeData(data);
// 缓存消息,供重连后重发
cacheEvent(userId, eventId, eventType, jsonData);
// 向所有活跃连接发送
List<SseEmitter> failedEmitters = new ArrayList<>();
for (SseEmitter emitter : emitters) {
try {
emitter.send(SseEmitter.event()
.id(String.valueOf(eventId))
.name(eventType)
.data(jsonData, MediaType.APPLICATION_JSON));
} catch (IOException e) {
failedEmitters.add(emitter);
}
}
// 清理失败的连接
failedEmitters.forEach(e -> removeEmitter(userId, e));
}
private void cacheEvent(String userId, long eventId,
String type, String data) {
String cacheKey = "sse:events:" + userId;
List<CachedEvent> events = eventCache.get(cacheKey, k -> new ArrayList<>());
events.add(new CachedEvent(eventId, type, data));
// 只保留最近50条
if (events.size() > 50) {
events.remove(0);
}
}
public List<CachedEvent> getMissedEvents(String userId, long afterEventId) {
String cacheKey = "sse:events:" + userId;
List<CachedEvent> allEvents = eventCache.getIfPresent(cacheKey);
if (allEvents == null) return Collections.emptyList();
return allEvents.stream()
.filter(e -> e.getId() > afterEventId)
.toList();
}
private int getUserEmitterCount(String userId) {
List<SseEmitter> emitters = userEmitters.get(userId);
return emitters == null ? 0 : emitters.size();
}
}2.3 AI任务完成事件监听
// AiTaskEventListener.java
@Component
@Slf4j
public class AiTaskEventListener {
private final SseEmitterManager sseEmitterManager;
private final WebPushService webPushService;
private final WechatNotificationService wechatService;
// 监听AI任务完成事件
@EventListener
@Async("notificationExecutor")
public void onAiTaskCompleted(AiTaskCompletedEvent event) {
String userId = event.getUserId();
String taskId = event.getTaskId();
log.info("AI任务完成: taskId={}, userId={}", taskId, userId);
AiTaskResult result = event.getResult();
// 构建通知数据
Map<String, Object> notification = Map.of(
"taskId", taskId,
"status", result.getStatus(),
"previewText", extractPreview(result.getContent()),
"completedAt", result.getCompletedAt().toString(),
"processingTimeMs", result.getProcessingTimeMs()
);
// 1. SSE推送(如果用户在页面内)
sseEmitterManager.sendToUser(userId, "task_completed", notification);
// 2. WebPush(无论用户是否在页面内,都发)
webPushService.sendToUser(userId, notification);
// 3. 微信通知(如果用户绑定了微信)
wechatService.sendTaskCompletionNotification(userId, result);
}
// 监听AI任务进度更新
@EventListener
@Async("notificationExecutor")
public void onAiTaskProgress(AiTaskProgressEvent event) {
Map<String, Object> progressData = Map.of(
"taskId", event.getTaskId(),
"progress", event.getProgress(),
"message", event.getMessage()
);
sseEmitterManager.sendToUser(
event.getUserId(), "task_progress", progressData);
}
// 监听AI任务失败
@EventListener
@Async("notificationExecutor")
public void onAiTaskFailed(AiTaskFailedEvent event) {
Map<String, Object> failData = Map.of(
"taskId", event.getTaskId(),
"error", event.getErrorMessage(),
"canRetry", event.isRetryable()
);
// SSE通知
sseEmitterManager.sendToUser(
event.getUserId(), "task_failed", failData);
}
private String extractPreview(String content) {
if (content == null) return "";
return content.length() > 100 ?
content.substring(0, 100) + "..." : content;
}
}三、Spring AI集成:任务状态管理
3.1 AI异步任务服务
// AiTaskService.java
@Service
@Slf4j
public class AiTaskService {
private final ChatClient chatClient;
private final AiTaskRepository taskRepository;
private final ApplicationEventPublisher eventPublisher;
private final ThreadPoolTaskExecutor aiTaskExecutor;
// 提交异步AI任务
public String submitAsync(String taskId, AiTaskRequest request, String userId) {
// 保存任务记录
AiTask task = AiTask.builder()
.id(taskId)
.userId(userId)
.type(request.getType())
.input(request.getInput())
.status(TaskStatus.PENDING)
.submittedAt(LocalDateTime.now())
.build();
taskRepository.save(task);
// 提交异步执行
aiTaskExecutor.execute(() -> executeTask(taskId, request, userId));
return taskId;
}
private void executeTask(String taskId, AiTaskRequest request, String userId) {
try {
// 更新状态为处理中
updateTaskStatus(taskId, TaskStatus.PROCESSING);
// 发送进度通知
eventPublisher.publishEvent(new AiTaskProgressEvent(
this, taskId, userId, 0, "开始处理..."));
long startTime = System.currentTimeMillis();
// 执行AI任务(流式处理,可以推送实时进度)
StringBuilder contentBuilder = new StringBuilder();
chatClient.prompt()
.system(getSystemPrompt(request.getType()))
.user(request.getInput())
.stream()
.chatResponse()
.doOnNext(response -> {
String token = response.getResult().getOutput().getContent();
if (token != null) {
contentBuilder.append(token);
// 每积累50个字符推送一次进度
if (contentBuilder.length() % 50 == 0) {
eventPublisher.publishEvent(new AiTaskProgressEvent(
this, taskId, userId,
Math.min(95, contentBuilder.length() / 5),
"生成中..."));
}
}
})
.blockLast();
// 任务完成
String result = contentBuilder.toString();
long processingTime = System.currentTimeMillis() - startTime;
// 更新数据库
AiTaskResult taskResult = AiTaskResult.builder()
.taskId(taskId)
.content(result)
.status(TaskStatus.COMPLETED)
.processingTimeMs(processingTime)
.completedAt(LocalDateTime.now())
.build();
taskRepository.saveResult(taskResult);
// 发布完成事件
eventPublisher.publishEvent(new AiTaskCompletedEvent(
this, taskId, userId, taskResult));
log.info("AI任务完成: {} (耗时: {}ms)", taskId, processingTime);
} catch (Exception e) {
log.error("AI任务失败: {}", taskId, e);
updateTaskStatus(taskId, TaskStatus.FAILED);
eventPublisher.publishEvent(new AiTaskFailedEvent(
this, taskId, userId, e.getMessage(), isRetryable(e)));
}
}
// 预估处理时间(用于前端显示)
public int estimateTime(AiTaskRequest request) {
return switch (request.getType()) {
case ESSAY_CORRECTION -> 40; // 作文批改约40秒
case TRANSLATION -> 15; // 翻译约15秒
case SUMMARY -> 20; // 摘要约20秒
case CODE_REVIEW -> 30; // 代码审查约30秒
default -> 30;
};
}
}四、WebPush实现:离开页面也能收到通知
4.1 WebPush服务端实现
WebPush需要VAPID密钥对,浏览器在后台接收通知:
// WebPushService.java
@Service
@Slf4j
public class WebPushService {
private final PushService pushService; // java-webpush库
private final PushSubscriptionRepository subscriptionRepository;
private final ObjectMapper objectMapper;
@Value("${webpush.vapid.public-key}")
private String vapidPublicKey;
@Value("${webpush.vapid.private-key}")
private String vapidPrivateKey;
@PostConstruct
public void init() throws GeneralSecurityException {
Security.addProvider(new BouncyCastleProvider());
pushService = new PushService()
.setVapidPublicKey(vapidPublicKey)
.setVapidPrivateKey(vapidPrivateKey)
.setSubject("mailto:admin@yourapp.com");
}
// 注册推送订阅
public void subscribe(String userId, PushSubscriptionRequest subscription) {
PushSubscription entity = PushSubscription.builder()
.userId(userId)
.endpoint(subscription.getEndpoint())
.p256dh(subscription.getKeys().getP256dh())
.auth(subscription.getKeys().getAuth())
.userAgent(subscription.getUserAgent())
.createdAt(LocalDateTime.now())
.build();
subscriptionRepository.save(entity);
log.info("用户[{}]注册WebPush订阅成功", userId);
}
// 发送推送通知
public void sendToUser(String userId, Map<String, Object> data) {
List<PushSubscription> subscriptions =
subscriptionRepository.findByUserId(userId);
if (subscriptions.isEmpty()) {
log.debug("用户[{}]没有WebPush订阅", userId);
return;
}
// 构建通知payload
PushNotificationPayload payload = PushNotificationPayload.builder()
.title("AI处理完成 ✓")
.body((String) data.getOrDefault("previewText", "你的AI任务已完成"))
.icon("/icons/ai-complete.png")
.badge("/icons/badge.png")
.tag((String) data.get("taskId")) // 相同tag的通知会替换
.requireInteraction(false)
.data(data)
.actions(List.of(
new NotificationAction("view", "查看结果"),
new NotificationAction("dismiss", "关闭")
))
.build();
String payloadJson;
try {
payloadJson = objectMapper.writeValueAsString(payload);
} catch (Exception e) {
log.error("序列化推送payload失败", e);
return;
}
// 向所有设备发送
List<PushSubscription> invalidSubscriptions = new ArrayList<>();
for (PushSubscription sub : subscriptions) {
try {
Notification notification = new Notification(
sub.getEndpoint(),
sub.getP256dh(),
sub.getAuth(),
payloadJson
);
HttpResponse response = pushService.send(notification);
if (response.getStatusLine().getStatusCode() == 410) {
// 410 Gone:订阅已失效,需要删除
invalidSubscriptions.add(sub);
}
} catch (Exception e) {
log.warn("发送WebPush失败 [userId={}]: {}", userId, e.getMessage());
}
}
// 删除失效的订阅
if (!invalidSubscriptions.isEmpty()) {
subscriptionRepository.deleteAll(invalidSubscriptions);
log.info("清理 {} 个失效的WebPush订阅 [userId={}]",
invalidSubscriptions.size(), userId);
}
}
}4.2 前端Service Worker
// service-worker.js(放在网站根目录)
// 监听推送事件
self.addEventListener('push', event => {
if (!event.data) return;
const data = event.data.json();
const options = {
body: data.body,
icon: data.icon || '/icons/app-icon.png',
badge: data.badge || '/icons/badge.png',
tag: data.tag,
data: data.data,
actions: data.actions || [],
requireInteraction: data.requireInteraction || false,
// 振动模式
vibrate: [200, 100, 200]
};
event.waitUntil(
self.registration.showNotification(data.title, options)
);
});
// 监听通知点击
self.addEventListener('notificationclick', event => {
event.notification.close();
if (event.action === 'dismiss') {
return;
}
// 点击通知或"查看结果"按钮
const taskId = event.notification.data?.taskId;
const url = taskId ? `/tasks/${taskId}` : '/';
event.waitUntil(
clients.matchAll({ type: 'window' }).then(clientList => {
// 如果应用已打开,聚焦并导航
for (const client of clientList) {
if (client.url.includes(self.location.origin) && 'focus' in client) {
client.focus();
client.postMessage({ type: 'NAVIGATE', url });
return;
}
}
// 否则打开新窗口
if (clients.openWindow) {
return clients.openWindow(url);
}
})
);
});// 前端:注册Service Worker和推送订阅
// push-manager.js
const VAPID_PUBLIC_KEY = 'YOUR_VAPID_PUBLIC_KEY_HERE';
async function requestPushPermission() {
// 请求通知权限
const permission = await Notification.requestPermission();
if (permission !== 'granted') {
console.log('用户拒绝了通知权限');
return false;
}
return true;
}
async function subscribeToPush(userId) {
// 检查支持情况
if (!('serviceWorker' in navigator) || !('PushManager' in window)) {
console.log('浏览器不支持WebPush');
return;
}
const permitted = await requestPushPermission();
if (!permitted) return;
// 注册Service Worker
const registration = await navigator.serviceWorker.register('/service-worker.js');
// 检查是否已经订阅
let subscription = await registration.pushManager.getSubscription();
if (!subscription) {
// 创建新订阅
subscription = await registration.pushManager.subscribe({
userVisibleOnly: true,
applicationServerKey: urlBase64ToUint8Array(VAPID_PUBLIC_KEY)
});
}
// 发送订阅信息到服务端
await fetch('/api/notifications/push/subscribe', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
userId: userId,
endpoint: subscription.endpoint,
keys: {
p256dh: btoa(String.fromCharCode(...new Uint8Array(
subscription.getKey('p256dh')))),
auth: btoa(String.fromCharCode(...new Uint8Array(
subscription.getKey('auth'))))
},
userAgent: navigator.userAgent
})
});
console.log('WebPush订阅成功!');
}
// 建立SSE连接
function setupSSE(userId, onEvent) {
const eventSource = new EventSource(`/api/notifications/stream/${userId}`);
eventSource.addEventListener('task_completed', event => {
const data = JSON.parse(event.data);
onEvent('task_completed', data);
});
eventSource.addEventListener('task_progress', event => {
const data = JSON.parse(event.data);
onEvent('task_progress', data);
});
eventSource.addEventListener('task_failed', event => {
const data = JSON.parse(event.data);
onEvent('task_failed', data);
});
eventSource.onerror = () => {
// SSE断开时自动重连(浏览器内置)
console.log('SSE重连中...');
};
return eventSource;
}
// 工具函数
function urlBase64ToUint8Array(base64String) {
const padding = '='.repeat((4 - base64String.length % 4) % 4);
const base64 = (base64String + padding)
.replace(/-/g, '+').replace(/_/g, '/');
const rawData = window.atob(base64);
return Uint8Array.from([...rawData].map(c => c.charCodeAt(0)));
}五、微信服务号通知
5.1 微信模板消息推送
// WechatNotificationService.java
@Service
@Slf4j
public class WechatNotificationService {
private final WxMpService wxMpService;
private final UserWechatBindingRepository bindingRepository;
// 任务完成通知模板
private static final String TASK_COMPLETE_TEMPLATE_ID = "your_template_id_here";
public void sendTaskCompletionNotification(String userId, AiTaskResult result) {
// 查询用户是否绑定了微信
Optional<UserWechatBinding> binding = bindingRepository.findByUserId(userId);
if (binding.isEmpty()) {
log.debug("用户[{}]未绑定微信,跳过微信通知", userId);
return;
}
String openId = binding.get().getOpenId();
try {
WxMpTemplateMessage message = WxMpTemplateMessage.builder()
.toUser(openId)
.templateId(TASK_COMPLETE_TEMPLATE_ID)
.url("https://yourapp.com/tasks/" + result.getTaskId())
.build();
// 添加模板变量
message.addData(new WxMpTemplateData("first", "您的AI任务已完成 ✓"));
message.addData(new WxMpTemplateData("keyword1",
getTaskTypeName(result.getTaskType())));
message.addData(new WxMpTemplateData("keyword2",
formatDuration(result.getProcessingTimeMs())));
message.addData(new WxMpTemplateData("keyword3",
result.getCompletedAt().format(DateTimeFormatter.ofPattern("MM-dd HH:mm"))));
message.addData(new WxMpTemplateData("remark",
"点击查看完整结果"));
wxMpService.getTemplateMsgService().sendTemplateMsg(message);
log.info("微信通知发送成功: userId={}, taskId={}",
userId, result.getTaskId());
} catch (WxErrorException e) {
log.error("微信通知发送失败: {}", e.getMessage());
}
}
private String formatDuration(long ms) {
if (ms < 1000) return ms + "毫秒";
if (ms < 60000) return (ms / 1000) + "秒";
return (ms / 60000) + "分" + ((ms % 60000) / 1000) + "秒";
}
}六、通知优先级与去重
6.1 通知路由策略
// NotificationRouter.java
@Service
@Slf4j
public class NotificationRouter {
private final SseEmitterManager sseManager;
private final WebPushService webPushService;
private final WechatNotificationService wechatService;
private final RedisTemplate<String, Object> redisTemplate;
// 统一的通知发送入口
public void send(NotificationMessage message) {
// 消息去重(防止重复通知)
if (isDuplicate(message)) {
log.debug("通知去重: {}", message.getDeduplicationKey());
return;
}
markAsSent(message);
// 根据用户的在线状态选择通知渠道
UserOnlineStatus status = getUserOnlineStatus(message.getUserId());
switch (status) {
case ACTIVE_ON_PAGE:
// 用户在页面上,只用SSE
sseManager.sendToUser(
message.getUserId(),
message.getEventType(),
message.getData());
break;
case IDLE_ON_PAGE:
// 用户打开着页面但不活跃,SSE + 轻量提示
sseManager.sendToUser(
message.getUserId(),
message.getEventType(),
message.getData());
break;
case AWAY_FROM_PAGE:
// 用户离开了页面,WebPush
sseManager.sendToUser( // 缓存,等用户回来时推送
message.getUserId(),
message.getEventType(),
message.getData());
webPushService.sendToUser(
message.getUserId(),
message.getData());
break;
case OFFLINE:
// 用户完全离线,WebPush + 微信(如果重要)
webPushService.sendToUser(
message.getUserId(),
message.getData());
if (message.getPriority() == NotificationPriority.HIGH) {
wechatService.sendTaskCompletionNotification(
message.getUserId(),
(AiTaskResult) message.getData().get("result"));
}
break;
}
}
// 用Redis实现消息去重
private boolean isDuplicate(NotificationMessage message) {
String key = "notification:dedup:" + message.getDeduplicationKey();
Boolean isNew = redisTemplate.opsForValue()
.setIfAbsent(key, "1", Duration.ofMinutes(5));
return Boolean.FALSE.equals(isNew);
}
private void markAsSent(NotificationMessage message) {
// 已在isDuplicate中标记
}
// 通过Redis的心跳键判断用户在线状态
private UserOnlineStatus getUserOnlineStatus(String userId) {
String lastActiveKey = "user:lastactive:" + userId;
String lastActive = (String) redisTemplate.opsForValue().get(lastActiveKey);
if (lastActive == null) return UserOnlineStatus.OFFLINE;
long lastActiveTime = Long.parseLong(lastActive);
long idleSeconds = (System.currentTimeMillis() - lastActiveTime) / 1000;
if (idleSeconds < 30) return UserOnlineStatus.ACTIVE_ON_PAGE;
if (idleSeconds < 300) return UserOnlineStatus.IDLE_ON_PAGE;
return UserOnlineStatus.AWAY_FROM_PAGE;
}
}七、配置与性能优化
7.1 线程池配置
// NotificationConfig.java
@Configuration
public class NotificationConfig {
@Bean("notificationExecutor")
public ThreadPoolTaskExecutor notificationExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(50);
executor.setQueueCapacity(1000);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("notification-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
@Bean("aiTaskExecutor")
public ThreadPoolTaskExecutor aiTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(20);
executor.setMaxPoolSize(100);
executor.setQueueCapacity(500);
executor.setKeepAliveSeconds(120);
executor.setThreadNamePrefix("ai-task-");
executor.initialize();
return executor;
}
}八、常见问题 FAQ
Q1:SSE在Nginx后面会被断开怎么处理?
A:Nginx默认会缓冲响应,导致SSE消息延迟。需要配置:
location /api/notifications/stream/ {
proxy_pass http://backend;
proxy_http_version 1.1;
proxy_set_header Connection '';
proxy_buffering off;
proxy_cache off;
proxy_read_timeout 3600s;
chunked_transfer_encoding on;
}Q2:WebPush通知iOS Safari支持情况?
A:iOS Safari从16.4(2023年3月)开始支持WebPush,但需要用户将网站"添加到主屏幕"。建议:
- 引导iOS用户添加到主屏幕
- 同时提供微信/短信备选通知渠道
- 使用Progressive Enhancement:检测支持情况再显示订阅按钮
Q3:用户量很大时SSE服务端连接数问题?
A:每个SSE连接占用一个线程(传统Servlet)或一个连接(响应式):
- 传统:最多支持约10k并发SSE连接
- 响应式(WebFlux + SSE):可支持100k+
- 对于高并发场景,推荐切换到WebFlux的
Flux<ServerSentEvent> - 或者使用Nginx的event stream代理层,减少直连服务端的连接数
Q4:如何处理重复通知(用户同时在多个设备上)?
A:三层去重:
- 任务级去重:同一taskId只触发一次通知事件(Redis防重)
- 用户级策略:已有SSE活跃连接时,不发送WebPush
- 设备级:WebPush的
tag字段相同时,新通知会替换旧通知
Q5:通知失败要重试吗?
A:按优先级:
- 高优先级(任务失败):重试3次,间隔指数增加
- 低优先级(任务完成):失败即放弃,用户刷新页面可以看到结果
- 所有通知:记录失败日志,定时任务清理过期的失败记录
九、总结
AI异步任务的通知体系建议按需分层实现:
| 阶段 | 实现 | 解决的问题 |
|---|---|---|
| 最小可用 | 短轮询(30s间隔) | 总比没有强 |
| 基础 | SSE + 进度条 | 在线用户的实时反馈 |
| 进阶 | SSE + WebPush | 覆盖离线用户 |
| 完整 | SSE + WebPush + 微信 | 多渠道100%触达 |
李薇的案例告诉我们:AI的质量固然重要,但如果用户在等待过程中就放弃了,再好的AI也没有价值。
通知系统的投入回报比极高——两周的工程投入,换来了18%的任务完成率提升。这笔账,值得算。
