事件溯源 + AI——用历史行为数据做个性化
事件溯源 + AI——用历史行为数据做个性化
适读人群:做 AI 应用的后端工程师 | 阅读时长:约14分钟 | 核心价值:用完整行为历史而非当前状态做真正的个性化 AI 响应
有个产品经理跟我说,他们的 AI 助手"很了解用户"。我问他,了解到什么程度?他说,用户在 Profile 页面填了职业、兴趣、技能,AI 会根据这些来回答问题。
我没说什么,但心里清楚:这不叫了解用户,这叫用户自我汇报。
真正了解一个人,看的是他做了什么,而不是他说自己是什么人。用户填表说"对 Python 感兴趣",但他过去三个月点击的全是 Java 相关内容,问的问题全是 Spring Boot 配置,你猜他真正需要什么?
这就是为什么我开始把事件溯源的思路引入 AI 个性化的原因。
事件溯源的核心思想
事件溯源(Event Sourcing)是一个在 DDD 圈子里被讨论很多年的模式。传统方式是存"当前状态",事件溯源是存"导致当前状态的所有事件"。
传统方式:
用户表:{ user_id: 123, skill_level: "中级", interested_topics: ["Python", "ML"] }事件溯源:
事件流:
- 2024-01-05: 用户阅读了《Python 入门》
- 2024-01-12: 用户收藏了《机器学习基础》
- 2024-02-01: 用户提问"如何用 sklearn 做分类"
- 2024-02-15: 用户阅读了《神经网络实战》5次
- 2024-03-01: 用户搜索"LangChain agent"
- 2024-03-10: 用户提问"如何实现自定义 tool"从这个事件流,你能读出来这个用户的完整学习轨迹:从 Python 基础 → ML 基础 → 实战 → AI 应用开发。这个信息比"Python,ML"这两个 tag 丰富得多。
我在项目里的实践
去年我做了一个面向开发者的 AI 学习助手。用户可以问问题、浏览内容,AI 根据用户的情况给出个性化解答。
一开始就是那种 Profile-based 的方案,效果一般。用户填的信息经常过时,或者压根不填,AI 的回答没什么针对性。
后来我改造成了行为事件驱动的方案,效果明显提升。下面把核心代码逻辑都写出来。
事件采集设计
先设计事件的数据结构:
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Dict, Optional
from uuid import uuid4
import json
@dataclass
class UserEvent:
event_id: str = field(default_factory=lambda: str(uuid4()))
user_id: str = ""
event_type: str = "" # article_view / question_asked / content_saved / search / feedback
event_data: Dict[str, Any] = field(default_factory=dict)
timestamp: datetime = field(default_factory=datetime.utcnow)
session_id: Optional[str] = None
def to_dict(self) -> dict:
return {
"event_id": self.event_id,
"user_id": self.user_id,
"event_type": self.event_type,
"event_data": self.event_data,
"timestamp": self.timestamp.isoformat(),
"session_id": self.session_id
}事件类型设计要做取舍,不是什么都要采集。我只采集有信息量的事件:
# 事件类型定义
EVENT_TYPES = {
"article_view": {
"fields": ["article_id", "title", "tags", "duration_seconds", "scroll_depth"],
"weight": 1.0 # 普通阅读
},
"article_view_repeat": {
"fields": ["article_id", "title", "tags", "view_count"],
"weight": 2.0 # 重复阅读,信号更强
},
"question_asked": {
"fields": ["question_text", "context_tags", "question_category"],
"weight": 3.0 # 主动提问,最强信号
},
"content_saved": {
"fields": ["content_id", "title", "tags"],
"weight": 2.5 # 保存收藏
},
"search_performed": {
"fields": ["query", "result_clicked"],
"weight": 1.5
},
"ai_feedback": {
"fields": ["response_id", "rating", "comment"],
"weight": 4.0 # 用户对 AI 回答的反馈,极高价值
}
}采集代码放在各个业务模块里,用异步方式写库,不影响主流程:
import asyncio
from typing import Optional
class EventCollector:
def __init__(self, db_client, redis_client):
self.db = db_client
self.redis = redis_client
async def track(self, user_id: str, event_type: str,
data: dict, session_id: Optional[str] = None):
event = UserEvent(
user_id=user_id,
event_type=event_type,
event_data=data,
session_id=session_id
)
# 异步写入,不阻塞主流程
asyncio.create_task(self._persist_event(event))
# 实时更新 Redis 里的热数据(最近行为缓存)
asyncio.create_task(self._update_hot_cache(event))
async def _persist_event(self, event: UserEvent):
await self.db.execute(
"""INSERT INTO user_events
(event_id, user_id, event_type, event_data, timestamp, session_id)
VALUES ($1, $2, $3, $4, $5, $6)""",
event.event_id, event.user_id, event.event_type,
json.dumps(event.event_data), event.timestamp, event.session_id
)
async def _update_hot_cache(self, event: UserEvent):
"""把最近50条事件缓存到 Redis"""
key = f"user_events_hot:{event.user_id}"
event_json = json.dumps(event.to_dict())
pipe = self.redis.pipeline()
pipe.lpush(key, event_json)
pipe.ltrim(key, 0, 49) # 只保留最近50条
pipe.expire(key, 86400 * 7) # 7天过期
await pipe.execute()行为历史的摘要压缩
这是整个方案里最关键的环节。用户可能有几千条行为记录,不可能全部塞进 prompt,得摘要压缩成有用的上下文。
我用了两层压缩:
第一层:规则压缩(便宜、快)
from collections import Counter, defaultdict
from datetime import timedelta
class BehaviorSummarizer:
async def get_rule_based_summary(self, user_id: str,
lookback_days: int = 30) -> dict:
"""用规则统计用户行为模式,不用 LLM"""
cutoff = datetime.utcnow() - timedelta(days=lookback_days)
events = await self.db.fetch(
"""SELECT event_type, event_data, timestamp
FROM user_events
WHERE user_id = $1 AND timestamp > $2
ORDER BY timestamp DESC""",
user_id, cutoff
)
if not events:
return {"is_new_user": True}
# 统计各类标签的出现频次
tag_counter = Counter()
question_topics = []
recent_questions = []
total_reading_time = 0
for e in events:
data = e['event_data'] if isinstance(e['event_data'], dict) \
else json.loads(e['event_data'])
if e['event_type'] in ('article_view', 'article_view_repeat'):
tags = data.get('tags', [])
weight = 2.0 if e['event_type'] == 'article_view_repeat' else 1.0
for tag in tags:
tag_counter[tag] += weight
total_reading_time += data.get('duration_seconds', 0)
elif e['event_type'] == 'question_asked':
q_text = data.get('question_text', '')
question_topics.append(q_text)
if len(recent_questions) < 5:
recent_questions.append(q_text)
elif e['event_type'] == 'content_saved':
for tag in data.get('tags', []):
tag_counter[tag] += 2.5
top_tags = [tag for tag, _ in tag_counter.most_common(8)]
return {
"is_new_user": False,
"active_interest_tags": top_tags,
"recent_questions": recent_questions,
"total_reading_minutes": int(total_reading_time / 60),
"event_count": len(events),
"question_count": len(question_topics)
}第二层:AI 压缩(用于生成自然语言画像,每天跑一次)
async def generate_ai_profile(self, user_id: str) -> str:
"""用 LLM 生成用户自然语言画像,结果缓存到数据库"""
# 取最近90天的规则摘要
rule_summary = await self.get_rule_based_summary(user_id, lookback_days=90)
# 取最近20条提问记录
recent_q = await self.db.fetch(
"""SELECT event_data->>'question_text' as question
FROM user_events
WHERE user_id = $1 AND event_type = 'question_asked'
ORDER BY timestamp DESC LIMIT 20""",
user_id
)
questions_text = "\n".join([f"- {q['question']}" for q in recent_q])
prompt = f"""根据以下用户行为数据,生成一段简洁的用户技术画像(100字以内):
活跃兴趣标签(按权重排序):{', '.join(rule_summary.get('active_interest_tags', []))}
最近提问:
{questions_text}
30天内阅读时长:{rule_summary.get('total_reading_minutes', 0)}分钟
要求:
- 描述用户的技术方向和当前水平
- 指出用户正在学习或关注的具体领域
- 语气客观,不夸大
- 不超过100字"""
response = await llm_client.chat(
messages=[{"role": "user", "content": prompt}],
model="gpt-4o-mini" # 生成画像用便宜模型
)
profile_text = response.content
# 缓存到数据库
await self.db.execute(
"""INSERT INTO user_ai_profiles (user_id, profile_text, generated_at)
VALUES ($1, $2, NOW())
ON CONFLICT (user_id) DO UPDATE
SET profile_text = $2, generated_at = NOW()""",
user_id, profile_text
)
return profile_text把行为数据喂给 AI
这是最终使用的地方,在每次 AI 对话时注入用户上下文:
class PersonalizedAIAssistant:
async def get_user_context(self, user_id: str) -> str:
"""组装用户上下文,注入 system prompt"""
# 热数据:Redis 里最近行为(实时)
hot_events_raw = await self.redis.lrange(f"user_events_hot:{user_id}", 0, 9)
recent_events = [json.loads(e) for e in hot_events_raw]
recent_actions = []
for e in recent_events:
if e['event_type'] == 'question_asked':
q = e['event_data'].get('question_text', '')[:80]
recent_actions.append(f"刚才提问:{q}")
elif e['event_type'] in ('article_view', 'article_view_repeat'):
title = e['event_data'].get('title', '')
recent_actions.append(f"阅读了:{title}")
# 冷数据:数据库里的 AI 画像(每天更新)
profile_row = await self.db.fetchrow(
"SELECT profile_text FROM user_ai_profiles WHERE user_id = $1",
user_id
)
ai_profile = profile_row['profile_text'] if profile_row else "新用户,无历史数据"
# 规则摘要(最近7天,实时计算)
recent_summary = await self.summarizer.get_rule_based_summary(user_id, lookback_days=7)
recent_tags = ', '.join(recent_summary.get('active_interest_tags', [])[:5])
context = f"""[用户背景]
技术画像:{ai_profile}
最近7天关注话题:{recent_tags}
最近行为:
{";".join(recent_actions[:5]) if recent_actions else "暂无"}"""
return context
async def chat(self, user_id: str, user_message: str,
conversation_history: list) -> str:
"""带个性化上下文的对话"""
user_context = await self.get_user_context(user_id)
system_prompt = f"""你是一个技术学习助手。根据用户背景给出针对性的回答。
{user_context}
回答原则:
- 根据用户的技术水平调整解释深度,不要解释他已经熟悉的基础概念
- 如果用户最近在学某个方向,尽量结合这个方向给例子
- 回答要具体,给代码时优先用用户熟悉的语言"""
messages = [{"role": "system", "content": system_prompt}]
messages.extend(conversation_history[-6:]) # 只带最近3轮对话
messages.append({"role": "user", "content": user_message})
response = await llm_client.chat(messages=messages, model="gpt-4o")
# 记录这次提问事件
await self.event_collector.track(
user_id=user_id,
event_type="question_asked",
data={"question_text": user_message, "session_id": None}
)
return response.content实际效果和踩坑
跑了两个月,用户对话质量评分从 3.2 提升到 4.1(满分5分),主要体现在:
- AI 不再给高级用户解释他们早就知道的东西
- 给代码示例时会用用户熟悉的框架
- 上下文连贯性更好,不用每次都重新介绍自己的背景
踩过的坑:
坑一:事件太多,AI 画像生成成本高。
早期每次对话都实时生成 AI 画像,token 消耗巨大。后来改成每天凌晨批量跑,用规则摘要作实时补充,成本降了 80%。
坑二:用户有隐私顾虑。
上线后有用户问"你们记录了我什么数据"。后来我在设置页加了行为数据查看和删除功能,同时在 Privacy Policy 里说清楚。这个不能省。
坑三:冷启动问题。
新用户没有历史数据,个性化没法做。我的解法是:新用户引导填3个标签(只要3个,不是填表),这个初始化数据维持到积累够30条真实行为为止。
坑四:行为数据质量差。
用户快速翻页浏览了100篇文章,停留时间每篇只有3秒,这种行为不该算有意义的阅读。后来加了 duration_seconds > 30 的过滤条件,数据质量明显提升。
事件溯源做个性化,核心不是技术多复杂,而是你采集的信号有没有真实的信息量。用户点击了什么、停留了多久、反复看了什么、主动问了什么,这些是真实意图的体现。从这里出发做个性化,比用户填表靠谱得多。
