第1815篇:实时个性化推荐——流式特征计算与LLM的在线推理结合
2026/4/30大约 10 分钟
第1815篇:实时个性化推荐——流式特征计算与LLM的在线推理结合
推荐系统这个话题,我前前后后做了大概四年。
从最早的协同过滤,到后来的深度学习排序,现在到LLM介入推荐。每一个阶段都觉得自己在做最新的东西,然后过两年回头看又觉得当时的方案太粗糙。
这篇文章聊的是当前阶段我认为最务实的一种架构:流式特征计算 + LLM在线推理的组合。不是纯LLM推荐(太慢太贵),也不是传统推荐(精度不够),而是两者的混合。
先说清楚解决什么问题:传统推荐系统的特征计算通常是批量的——每天凑一批数据,跑离线特征,更新模型。但用户的实时行为(刚看了什么、刚搜了什么、刚加购了什么)往往是最有价值的信号,批量特征完全反映不了这种"刚刚发生"的状态变化。流式特征计算就是为了填这个空。
推荐系统的特征分层
先把特征的层次理清楚,这是架构设计的基础:
三层特征各有分工:
- 实时特征:捕捉"当下",反映用户此刻想要什么
- 近实时特征:捕捉"最近",反映用户近期的兴趣漂移
- 离线特征:捕捉"一贯",反映用户的稳定偏好
LLM在这套架构里的定位:基于实时特征+近实时特征,做语义理解,生成个性化的推荐解释,并对候选集进行语义相关性重排。
系统整体架构
注意这里的在线服务不是直接用LLM做端到端推荐,而是LLM只负责重排和解释生成,召回还是靠传统方式(向量相似度+协同过滤)。这样可以把LLM的调用控制在合理范围内。
用户行为事件模型
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class UserBehaviorEvent {
private String eventId;
private String userId;
private String sessionId;
private BehaviorType behaviorType;
private String itemId;
private String itemTitle;
private String itemCategory;
private Double itemPrice;
private String searchQuery; // 搜索行为时的查询词
private int dwellTimeSeconds; // 停留时长(秒)
private long timestamp;
private Map<String, String> context; // 设备/页面等上下文
public enum BehaviorType {
VIEW, // 浏览
CLICK, // 点击
SEARCH, // 搜索
ADD_CART, // 加购
PURCHASE, // 购买
LIKE, // 喜欢/收藏
SKIP // 跳过(负反馈)
}
}Flink流式特征计算
@Component
@Slf4j
public class UserFeatureStreamJob {
public void buildFeatureStreams(DataStream<UserBehaviorEvent> behaviorStream) {
// 按userId分组
KeyedStream<UserBehaviorEvent, String> keyedStream =
behaviorStream.keyBy(UserBehaviorEvent::getUserId);
// ===== 实时特征1:最近N次行为序列 =====
keyedStream
.process(new RecentBehaviorFeatureFunction(10)) // 保留最近10次行为
.addSink(new RedisRealTimeFeatureSink("rt_recent_behaviors"));
// ===== 近实时特征1:1小时内品类偏好 =====
keyedStream
.window(SlidingEventTimeWindows.of(
Duration.ofHours(1),
Duration.ofMinutes(5) // 每5分钟滑动一次
))
.aggregate(new CategoryPreferenceAggregator())
.addSink(new RedisNearRealTimeFeatureSink("nrt_category_pref_1h"));
// ===== 近实时特征2:会话内搜索意图提取 =====
keyedStream
.window(SessionWindows.withGap(Duration.ofMinutes(30))) // 30分钟会话gap
.process(new SessionIntentExtractor())
.addSink(new RedisNearRealTimeFeatureSink("nrt_session_intent"));
// ===== 近实时特征3:价格敏感度实时计算 =====
keyedStream
.window(TumblingEventTimeWindows.of(Duration.ofHours(24)))
.aggregate(new PriceSensitivityAggregator())
.addSink(new RedisNearRealTimeFeatureSink("nrt_price_sensitivity"));
}
/**
* 实时维护最近N次行为序列
* 用有界队列保存,O(1)插入
*/
static class RecentBehaviorFeatureFunction
extends KeyedProcessFunction<String, UserBehaviorEvent, UserRecentBehaviors> {
private final int maxSize;
private ValueState<ArrayDeque<SimpleBehavior>> behaviorQueueState;
RecentBehaviorFeatureFunction(int maxSize) {
this.maxSize = maxSize;
}
@Override
public void open(Configuration parameters) {
behaviorQueueState = getRuntimeContext().getState(
new ValueStateDescriptor<>("recent-behaviors",
TypeInformation.of(new TypeHint<ArrayDeque<SimpleBehavior>>() {}))
);
}
@Override
public void processElement(UserBehaviorEvent event,
Context ctx,
Collector<UserRecentBehaviors> out) throws Exception {
ArrayDeque<SimpleBehavior> queue = behaviorQueueState.value();
if (queue == null) queue = new ArrayDeque<>();
// 加入新行为
queue.addLast(new SimpleBehavior(
event.getItemId(),
event.getItemTitle(),
event.getItemCategory(),
event.getBehaviorType().name(),
event.getTimestamp()
));
// 超过最大长度则移除最旧的
while (queue.size() > maxSize) {
queue.pollFirst();
}
behaviorQueueState.update(queue);
// 输出特征
out.collect(UserRecentBehaviors.builder()
.userId(event.getUserId())
.behaviors(new ArrayList<>(queue))
.updatedAt(event.getTimestamp())
.build());
}
}
/**
* 计算时间窗口内的品类偏好分布
*/
static class CategoryPreferenceAggregator
implements AggregateFunction<UserBehaviorEvent,
Map<String, Double>,
CategoryPreference> {
@Override
public Map<String, Double> createAccumulator() {
return new HashMap<>();
}
@Override
public Map<String, Double> add(UserBehaviorEvent event, Map<String, Double> acc) {
if (event.getItemCategory() == null) return acc;
// 不同行为类型权重不同
double weight = switch (event.getBehaviorType()) {
case PURCHASE -> 5.0;
case ADD_CART -> 3.0;
case LIKE -> 2.0;
case CLICK -> 1.0;
case VIEW -> 0.5;
case SKIP -> -1.0; // 负反馈
default -> 0.0;
};
acc.merge(event.getItemCategory(), weight, Double::sum);
return acc;
}
@Override
public CategoryPreference getResult(Map<String, Double> acc) {
// 归一化,得到概率分布
double total = acc.values().stream().mapToDouble(Math::abs).sum();
if (total == 0) return CategoryPreference.empty();
Map<String, Double> normalized = new HashMap<>();
acc.forEach((cat, score) -> normalized.put(cat, score / total));
// 按得分排序,取top5
List<String> topCategories = normalized.entrySet().stream()
.filter(e -> e.getValue() > 0)
.sorted(Map.Entry.<String, Double>comparingByValue().reversed())
.limit(5)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
return CategoryPreference.builder()
.distribution(normalized)
.topCategories(topCategories)
.build();
}
@Override
public Map<String, Double> merge(Map<String, Double> a, Map<String, Double> b) {
Map<String, Double> merged = new HashMap<>(a);
b.forEach((k, v) -> merged.merge(k, v, Double::sum));
return merged;
}
}
}在线推荐服务:特征汇聚+LLM重排
@Service
@Slf4j
public class OnlineRecommendationService {
private final FeatureStore featureStore; // Redis特征读取
private final RecallService recallService; // 召回服务
private final LLMRerankService llmRerankService;
private final MeterRegistry meterRegistry;
public RecommendationResult recommend(RecommendRequest request) {
long startTime = System.currentTimeMillis();
// Step 1: 并行获取所有特征(不要串行,节省时间)
CompletableFuture<UserRecentBehaviors> recentBehaviorsFuture =
CompletableFuture.supplyAsync(() ->
featureStore.getRecentBehaviors(request.getUserId()));
CompletableFuture<CategoryPreference> categoryPrefFuture =
CompletableFuture.supplyAsync(() ->
featureStore.getCategoryPreference(request.getUserId()));
CompletableFuture<UserProfile> userProfileFuture =
CompletableFuture.supplyAsync(() ->
featureStore.getUserProfile(request.getUserId()));
// Step 2: 等待特征准备好
UserRecentBehaviors recentBehaviors;
CategoryPreference categoryPref;
UserProfile userProfile;
try {
CompletableFuture.allOf(recentBehaviorsFuture, categoryPrefFuture, userProfileFuture)
.get(200, TimeUnit.MILLISECONDS); // 特征获取超时200ms
recentBehaviors = recentBehaviorsFuture.get();
categoryPref = categoryPrefFuture.get();
userProfile = userProfileFuture.get();
} catch (TimeoutException e) {
log.warn("Feature fetch timeout for user: {}", request.getUserId());
// 降级:用空特征
recentBehaviors = UserRecentBehaviors.empty();
categoryPref = CategoryPreference.empty();
userProfile = featureStore.getUserProfile(request.getUserId()); // 离线特征降级同步获取
} catch (Exception e) {
log.error("Feature fetch failed", e);
return fallbackRecommendation(request);
}
// Step 3: 召回候选集
UserContext userContext = buildUserContext(recentBehaviors, categoryPref, userProfile);
List<CandidateItem> candidates = recallService.recall(userContext, 50); // 召回50个候选
if (candidates.isEmpty()) {
return RecommendationResult.empty();
}
// Step 4: 决定是否使用LLM重排
// 只有满足条件才调LLM,控制成本
boolean useLLM = shouldUseLLM(request, recentBehaviors);
List<RankedItem> rankedItems;
if (useLLM) {
rankedItems = llmRerankService.rerankWithExplanation(
userContext, candidates, request.getPageSize());
} else {
// 降级:用规则排序
rankedItems = ruleBasedRank(candidates, userContext, request.getPageSize());
}
long elapsed = System.currentTimeMillis() - startTime;
meterRegistry.timer("recommendation.latency")
.record(elapsed, TimeUnit.MILLISECONDS);
return RecommendationResult.builder()
.items(rankedItems)
.usedLLM(useLLM)
.totalTimeMs(elapsed)
.build();
}
/**
* 判断是否启用LLM重排
* 不是所有请求都需要LLM,控制成本的关键
*/
private boolean shouldUseLLM(RecommendRequest request, UserRecentBehaviors recentBehaviors) {
// 条件1:有足够的实时行为数据(否则LLM也没什么可分析的)
if (recentBehaviors.getBehaviors().size() < 3) return false;
// 条件2:不是高峰期(防止LLM成为瓶颈)
int hour = LocalDateTime.now().getHour();
if (hour >= 20 && hour <= 22) return false; // 晚高峰不用LLM
// 条件3:用户是活跃用户(ROI更高)
if (request.getUserTier() == UserTier.INACTIVE) return false;
// 条件4:请求来源是首页推荐(比搜索结果页更适合个性化解释)
return request.getScene() == RecommendScene.HOME_FEED;
}
private UserContext buildUserContext(UserRecentBehaviors recent,
CategoryPreference pref,
UserProfile profile) {
return UserContext.builder()
.recentBehaviors(recent.getBehaviors())
.preferredCategories(pref.getTopCategories())
.priceRange(profile.getTypicalPriceRange())
.ageGroup(profile.getAgeGroup())
.gender(profile.getGender())
.build();
}
private List<RankedItem> ruleBasedRank(List<CandidateItem> candidates,
UserContext context, int pageSize) {
return candidates.stream()
.sorted(Comparator.comparingDouble(item ->
calculateRuleScore(item, context)))
.limit(pageSize)
.map(item -> RankedItem.builder()
.item(item)
.explanation("为你推荐") // 通用解释
.score(calculateRuleScore(item, context))
.build())
.collect(Collectors.toList());
}
private double calculateRuleScore(CandidateItem item, UserContext context) {
double score = item.getBaseScore();
// 品类加权
if (context.getPreferredCategories().contains(item.getCategory())) {
score *= 1.3;
}
// 价格匹配
if (context.getPriceRange() != null
&& item.getPrice() >= context.getPriceRange().getMin()
&& item.getPrice() <= context.getPriceRange().getMax()) {
score *= 1.2;
}
return score;
}
private RecommendationResult fallbackRecommendation(RecommendRequest request) {
// 完全降级:返回热门商品
List<CandidateItem> hotItems = recallService.getHotItems(request.getPageSize());
List<RankedItem> items = hotItems.stream()
.map(item -> RankedItem.builder().item(item).explanation("热门推荐").build())
.collect(Collectors.toList());
return RecommendationResult.builder().items(items).usedLLM(false).build();
}
}LLM重排与个性化解释生成
@Service
@Slf4j
public class LLMRerankService {
private final ChatLanguageModel chatModel;
public List<RankedItem> rerankWithExplanation(UserContext context,
List<CandidateItem> candidates,
int pageSize) {
// 取候选集的top20来做LLM重排(不是全部50个,控制token消耗)
List<CandidateItem> top20 = candidates.stream()
.sorted(Comparator.comparingDouble(CandidateItem::getBaseScore).reversed())
.limit(20)
.collect(Collectors.toList());
String prompt = buildRerankPrompt(context, top20, pageSize);
try {
String response = chatModel.generate(prompt);
return parseRerankResponse(response, top20);
} catch (Exception e) {
log.error("LLM rerank failed, falling back to rule-based", e);
// 降级到规则排序
return top20.stream().limit(pageSize)
.map(item -> RankedItem.builder().item(item).explanation("为你推荐").build())
.collect(Collectors.toList());
}
}
private String buildRerankPrompt(UserContext context,
List<CandidateItem> candidates,
int pageSize) {
StringBuilder sb = new StringBuilder();
sb.append("你是电商个性化推荐专家。根据用户特征对候选商品进行排序,并生成个性化推荐理由。\n\n");
sb.append("## 用户特征\n");
sb.append("最近浏览: ");
context.getRecentBehaviors().stream().limit(5).forEach(b ->
sb.append(b.getItemTitle()).append("(").append(b.getBehaviorType()).append("), "));
sb.append("\n");
sb.append("偏好品类: ").append(String.join(", ", context.getPreferredCategories())).append("\n");
sb.append("价格区间: ").append(context.getPriceRange()).append("\n\n");
sb.append("## 候选商品\n");
for (int i = 0; i < candidates.size(); i++) {
CandidateItem item = candidates.get(i);
sb.append(String.format("%d. [%s] %s - ¥%.0f (%s)\n",
i+1, item.getItemId(), item.getTitle(), item.getPrice(), item.getCategory()));
}
sb.append("\n请选出最适合该用户的").append(pageSize).append("件商品,");
sb.append("按推荐优先级排序,并为每件商品生成一句个性化推荐理由(15字以内)。\n\n");
sb.append("以JSON数组返回,格式:[{\"itemId\": \"xxx\", \"reason\": \"xxx\"}, ...]");
return sb.toString();
}
private List<RankedItem> parseRerankResponse(String response, List<CandidateItem> candidates) {
try {
Map<String, CandidateItem> itemMap = candidates.stream()
.collect(Collectors.toMap(CandidateItem::getItemId, item -> item));
ObjectMapper mapper = new ObjectMapper();
String jsonStr = extractJsonArray(response);
JsonNode array = mapper.readTree(jsonStr);
List<RankedItem> result = new ArrayList<>();
for (JsonNode node : array) {
String itemId = node.get("itemId").asText();
String reason = node.get("reason").asText("为你推荐");
CandidateItem item = itemMap.get(itemId);
if (item != null) {
result.add(RankedItem.builder()
.item(item)
.explanation(reason)
.build());
}
}
return result;
} catch (Exception e) {
log.warn("Failed to parse LLM rerank response: {}", e.getMessage());
return candidates.stream()
.map(item -> RankedItem.builder().item(item).explanation("为你推荐").build())
.collect(Collectors.toList());
}
}
private String extractJsonArray(String text) {
int start = text.indexOf('[');
int end = text.lastIndexOf(']');
if (start >= 0 && end > start) {
return text.substring(start, end + 1);
}
throw new IllegalArgumentException("No JSON array found in response");
}
}特征存储:Redis数据结构选型
实时特征存在Redis里,数据结构选型很重要,直接影响读写性能:
@Service
public class FeatureStore {
private final RedisTemplate<String, Object> redisTemplate;
/**
* 最近行为序列 - 用List存储,LPUSH+LTRIM实现有界队列
*/
public void saveRecentBehaviors(String userId, SimpleBehavior behavior) {
String key = "rt:behaviors:" + userId;
redisTemplate.opsForList().leftPush(key, behavior);
redisTemplate.opsForList().trim(key, 0, 9); // 只保留最近10条
redisTemplate.expire(key, Duration.ofHours(24));
}
public UserRecentBehaviors getRecentBehaviors(String userId) {
String key = "rt:behaviors:" + userId;
List<Object> raw = redisTemplate.opsForList().range(key, 0, -1);
List<SimpleBehavior> behaviors = raw == null ? Collections.emptyList() :
raw.stream()
.filter(o -> o instanceof SimpleBehavior)
.map(o -> (SimpleBehavior) o)
.collect(Collectors.toList());
return UserRecentBehaviors.builder()
.userId(userId)
.behaviors(behaviors)
.build();
}
/**
* 品类偏好 - 用Sorted Set,按得分排序,天然支持topK查询
*/
public void saveCategoryPreference(String userId, Map<String, Double> distribution) {
String key = "nrt:cat_pref:" + userId;
// 清空旧数据,写入新数据
redisTemplate.delete(key);
distribution.forEach((category, score) ->
redisTemplate.opsForZSet().add(key, category, score));
redisTemplate.expire(key, Duration.ofHours(2));
}
public CategoryPreference getCategoryPreference(String userId) {
String key = "nrt:cat_pref:" + userId;
Set<ZSetOperations.TypedTuple<Object>> tuples =
redisTemplate.opsForZSet().reverseRangeWithScores(key, 0, 4); // top5
if (tuples == null || tuples.isEmpty()) return CategoryPreference.empty();
List<String> topCategories = tuples.stream()
.map(t -> (String) t.getValue())
.collect(Collectors.toList());
return CategoryPreference.builder().topCategories(topCategories).build();
}
/**
* 用户画像 - 用Hash存储,字段级别更新
*/
public UserProfile getUserProfile(String userId) {
String key = "offline:profile:" + userId;
Map<Object, Object> entries = redisTemplate.opsForHash().entries(key);
if (entries.isEmpty()) return UserProfile.defaultProfile();
return UserProfile.builder()
.userId(userId)
.ageGroup((String) entries.get("age_group"))
.gender((String) entries.get("gender"))
.typicalPriceRange(parsePriceRange((String) entries.get("price_range")))
.build();
}
private PriceRange parsePriceRange(String priceRange) {
if (priceRange == null) return PriceRange.defaultRange();
String[] parts = priceRange.split("-");
if (parts.length != 2) return PriceRange.defaultRange();
return new PriceRange(Double.parseDouble(parts[0]), Double.parseDouble(parts[1]));
}
}成本与效果的平衡
这套架构上线后,我们做了一个AB测试:
- 对照组(传统规则推荐):点击率3.2%,转化率0.8%
- 实验组A(流式特征+LLM重排):点击率4.1%,转化率1.1%
- 实验组B(只有流式特征,无LLM):点击率3.8%,转化率0.95%
LLM重排带来了额外的提升,但也带来了额外的成本。经过计算,LLM重排在30%的请求上启用(满足上文那几个条件),每天LLM调用费用约160元人民币,对应的转化率提升带来的GMV增量远超这个数字,ROI是正的。
但如果全量启用LLM重排,不做过滤,费用会涨到500+元/天,而效果提升可能只有10-15%,划不来。选择性地使用LLM才是务实的工程决策。
