设计一个Feed流系统:推模式、拉模式、推拉结合的大V难题
设计一个Feed流系统:推模式、拉模式、推拉结合的大V难题
适读人群:Java中高级工程师、社交类产品技术人员 | 阅读时长:约20分钟 | 难度:★★★★☆
开篇故事
我在一家社交公司做过Feed流系统。刚入职时,系统是纯推模式:博主发一条动态,后台就把这条动态的ID写到每个粉丝的消息盒子里。粉丝数小的时候没问题,但有个明星用户有3000万粉丝,她一发帖,Kafka消费任务就要往数据库写3000万条记录。高峰期一次发帖会让系统的写入延迟飙升10分钟以上。
我被安排解决这个问题,调研了微博、微信、Twitter的架构,最终把系统改成了"推拉结合"模式。改造花了3个月,踩了很多坑,但系统稳定性和延迟都提升了一个数量级。这篇文章把Feed流系统从原理到架构的完整思路分享出来。
一、需求分析与规模估算
功能需求
- 发布动态: 用户发图文/视频动态
- 查看Feed流: 用户首页看到自己关注的人发布的内容,时间倒序
- 实时性: 普通用户发帖后,粉丝5秒内看到;大V发帖,粉丝1分钟内看到
- 翻页: 支持下拉加载更多(游标分页)
- 内容过滤: 已屏蔽/已拉黑的用户内容不展示
规模估算
用户规模: 1亿注册用户,1000万DAU,同时在线100万
关注关系:
- 普通用户平均关注200人,平均粉丝200人
- 大V(头部1000人)平均粉丝500万
发帖量:
- 每天发帖:5000万条
- 平均QPS:578 QPS
- 峰值QPS:约3000 QPS
消息盒子存储估算(推模式):
- 1亿用户 × 平均200条动态ID(保留最近7天)× 8字节 = 160GB
- Redis存储所有用户的消息盒子:160GB(勉强可接受)
大V发帖的写放大估算:
- 大V发一帖 = 500万次数据库写入
- 1000个大V同时发帖 = 50亿次写入(不可接受)
结论: 纯推模式在大V场景下完全不可行,必须用推拉结合。
二、三种模式对比
2.1 推模式(Write Fanout)
优点: 读取极快(直接读消息盒子),延迟稳定
缺点: 大V发帖时写放大严重,吞吐量极限
2.2 拉模式(Pull on Read)
优点: 写入简单,存储少
缺点: 读取时要合并200个人的动态,延迟高;关注的人多时性能更差
2.3 推拉结合模式(Hybrid)
核心思路:
- 普通用户(粉丝 < 1万):推模式,发帖时写入所有粉丝的消息盒子
- 大V用户(粉丝 >= 1万):拉模式,发帖只写动态表,粉丝查看时主动拉取
- 在线粉丝:实时推送通知
- 离线粉丝:上线时通过合并策略拉取
三、系统架构设计
混合模式的查询流程:
- 从Redis读取用户的消息盒子(推模式,普通用户的动态已在里面)
- 从Redis读取用户关注的大V列表
- 对每个关注的大V,从Redis ZSet读取其最近N条动态ID
- 合并消息盒子和大V动态,去重并排序
- 批量查询动态详情(走缓存)
四、关键代码实现
4.1 发帖与扇出
@Service
@Slf4j
public class PostPublishService {
@Autowired
private PostMapper postMapper;
@Autowired
private KafkaTemplate<String, FanoutEvent> kafkaTemplate;
@Autowired
private UserRelationService relationService;
private static final int BIG_V_THRESHOLD = 10000; // 大V阈值
/**
* 发布动态
*/
@Transactional
public Post publish(Long userId, String content, List<String> imageUrls) {
// 写入动态表
Post post = Post.builder()
.id(idGenerator.nextId())
.userId(userId)
.content(content)
.imageUrls(imageUrls)
.createTime(LocalDateTime.now())
.status(PostStatus.NORMAL)
.build();
postMapper.insert(post);
// 发送扇出事件
FanoutEvent event = FanoutEvent.builder()
.postId(post.getId())
.userId(userId)
.createTime(post.getCreateTime())
.build();
kafkaTemplate.send("fanout-events", String.valueOf(userId), event);
return post;
}
}@Component
@Slf4j
public class FanoutWorker {
@Autowired
private UserRelationService relationService;
@Autowired
private StringRedisTemplate redisTemplate;
private static final int BIG_V_THRESHOLD = 10000;
private static final int INBOX_MAX_SIZE = 300; // 消息盒子最多保留300条
@KafkaListener(topics = "fanout-events", groupId = "fanout-group", concurrency = "5")
public void onFanoutEvent(FanoutEvent event) {
long followerCount = relationService.getFollowerCount(event.getUserId());
if (followerCount < BIG_V_THRESHOLD) {
// 推模式:写入所有粉丝的消息盒子
doPushFanout(event);
} else {
// 大V:更新大V动态缓存,推模式只推给在线粉丝
updateBigVCache(event);
}
}
/**
* 推模式扇出:写入粉丝的Redis消息盒子(ZSet)
* Score = 发帖时间戳,保证时间倒序
*/
private void doPushFanout(FanoutEvent event) {
long timestamp = event.getCreateTime().toInstant(ZoneOffset.UTC).toEpochMilli();
int page = 0;
List<Long> followers;
do {
followers = relationService.getFollowers(event.getUserId(), page++, 1000);
// Pipeline批量写Redis,减少网络RTT
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
followers.forEach(followerId -> {
String inboxKey = "inbox:" + followerId;
// ZSet: score=时间戳, member=postId
connection.zSetCommands().zAdd(
inboxKey.getBytes(),
timestamp,
String.valueOf(event.getPostId()).getBytes()
);
// 保持消息盒子大小不超过300条(裁剪最旧的)
connection.zSetCommands().zRemRangeByRank(
inboxKey.getBytes(), 0, -(INBOX_MAX_SIZE + 1)
);
});
return null;
});
} while (followers.size() == 1000);
}
/**
* 大V发帖:更新大V动态ZSet缓存
*/
private void updateBigVCache(FanoutEvent event) {
String bigVKey = "bigv:posts:" + event.getUserId();
long timestamp = event.getCreateTime().toInstant(ZoneOffset.UTC).toEpochMilli();
redisTemplate.opsForZSet().add(bigVKey,
String.valueOf(event.getPostId()), timestamp);
// 只保留最近100条
redisTemplate.opsForZSet().removeRange(bigVKey, 0, -(101));
redisTemplate.expire(bigVKey, 7, TimeUnit.DAYS);
}
}4.2 Feed聚合查询
@Service
@Slf4j
public class FeedQueryService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private PostService postService;
@Autowired
private UserRelationService relationService;
private static final int PAGE_SIZE = 20;
private static final int BIG_V_THRESHOLD = 10000;
/**
* 查询用户的Feed流
* @param userId 当前用户
* @param cursor 游标(上次最后一条的时间戳,首次为null)
* @return Feed列表
*/
public FeedResult getFeed(Long userId, Long cursor, int size) {
if (size > PAGE_SIZE) size = PAGE_SIZE;
long maxScore = cursor != null ? cursor : Long.MAX_VALUE;
// 1. 从消息盒子读取(推模式:普通用户的帖子)
String inboxKey = "inbox:" + userId;
Set<ZSetOperations.TypedTuple<String>> inboxItems =
redisTemplate.opsForZSet().reverseRangeByScoreWithScores(
inboxKey, Double.NEGATIVE_INFINITY, maxScore, 0, size * 2
);
List<FeedItem> feedItems = new ArrayList<>();
if (inboxItems != null) {
inboxItems.forEach(item -> feedItems.add(
new FeedItem(Long.parseLong(item.getValue()), item.getScore().longValue())
));
}
// 2. 拉取关注的大V的最新动态
List<Long> bigVFollowing = getBigVFollowingList(userId);
for (Long bigVId : bigVFollowing) {
String bigVKey = "bigv:posts:" + bigVId;
Set<ZSetOperations.TypedTuple<String>> bigVItems =
redisTemplate.opsForZSet().reverseRangeByScoreWithScores(
bigVKey, Double.NEGATIVE_INFINITY, maxScore, 0, 5 // 每个大V取最近5条
);
if (bigVItems != null) {
bigVItems.forEach(item -> feedItems.add(
new FeedItem(Long.parseLong(item.getValue()), item.getScore().longValue())
));
}
}
// 3. 合并排序,取前size条
feedItems.sort(Comparator.comparingLong(FeedItem::getTimestamp).reversed());
List<FeedItem> pageItems = feedItems.stream()
.distinct()
.limit(size)
.collect(Collectors.toList());
if (pageItems.isEmpty()) {
return new FeedResult(Collections.emptyList(), null, false);
}
// 4. 批量查询帖子详情(走缓存)
List<Long> postIds = pageItems.stream()
.map(FeedItem::getPostId)
.collect(Collectors.toList());
Map<Long, PostDetail> postMap = postService.batchGetPostDetail(postIds);
// 5. 过滤已屏蔽的用户内容
Set<Long> blockedUsers = getBlockedUsers(userId);
List<PostDetail> result = postIds.stream()
.map(postMap::get)
.filter(post -> post != null && !blockedUsers.contains(post.getUserId()))
.collect(Collectors.toList());
// 6. 计算下一页游标
long nextCursor = pageItems.get(pageItems.size() - 1).getTimestamp();
boolean hasMore = feedItems.size() > size;
return new FeedResult(result, nextCursor, hasMore);
}
/**
* 获取用户关注的大V列表(缓存)
*/
private List<Long> getBigVFollowingList(Long userId) {
String key = "following:bigv:" + userId;
List<String> cached = redisTemplate.opsForList().range(key, 0, -1);
if (cached != null && !cached.isEmpty()) {
return cached.stream().map(Long::parseLong).collect(Collectors.toList());
}
// 从关注关系DB查大V列表
List<Long> bigVList = relationService.getFollowingBigVs(userId, BIG_V_THRESHOLD);
if (!bigVList.isEmpty()) {
redisTemplate.opsForList().rightPushAll(
key, bigVList.stream().map(String::valueOf).collect(Collectors.toList())
);
redisTemplate.expire(key, 1, TimeUnit.HOURS);
}
return bigVList;
}
private Set<Long> getBlockedUsers(Long userId) {
String key = "blocked:" + userId;
Set<String> blocked = redisTemplate.opsForSet().members(key);
if (blocked == null) return Collections.emptySet();
return blocked.stream().map(Long::parseLong).collect(Collectors.toSet());
}
}五、扩展性设计
消息盒子的冷热分离
用户的消息盒子在Redis里只存最近300条(热数据),更早的数据存MySQL(冷数据)。大多数用户不会翻到300条以前,这个策略能极大减少Redis内存占用。
关注数超大用户的极端情况
假设一个用户关注了1万人(包括500个大V),查询Feed时需要:
- 读消息盒子(1次Redis操作)
- 读500个大V的动态ZSet(500次Redis操作)
500次Redis操作虽然很快(每次<1ms),但也需要500ms。
优化方案:对关注超过100个大V的用户,在后台预先聚合好大V动态到一个专用的ZSet,每次有大V发帖时更新这个聚合ZSet,查询时只需要读1个Key。
六、踩坑实录
坑1:推拉结合的时间线不一致
推拉结合后,消息盒子里的时间线(来自推模式)和大V动态的时间线(来自拉模式),在前端展示时会出现"时间跳跃":可能大V10分钟前发的帖排在普通用户5分钟前发的帖后面,因为大V的帖子被大规模粉丝同时拉取,拉取时间有差异。
解决方案:用帖子的原始发帖时间排序,而不是写入消息盒子的时间,保证时间线一致。
坑2:大V粉丝关注关系变化时缓存不一致
用户取关了一个大V,但"关注的大V列表"在Redis里缓存了1小时。这1小时内用户的Feed里仍然会出现已取关大V的动态。
解决方案:取关操作时,主动删除"following:bigv:{userId}"缓存key,强制下次查询时重新加载。
坑3:消息盒子被攻击,大量垃圾帖刷进来
有人用大量小号给用户发帖,把用户的消息盒子刷满了垃圾内容,正常关注的人的动态被挤到了300条之后,实际上消失了。
解决方案:消息盒子里只存"互关"或"白名单"内的用户发的帖子,单向关注的小号帖子不进消息盒子,而是走拉模式按需加载。
七、总结
Feed流系统的架构选型核心是一道权衡题:
| 模式 | 写成本 | 读成本 | 适用场景 |
|---|---|---|---|
| 推模式 | 高(写放大=粉丝数) | 低(直接读消息盒子) | 粉丝数少的普通用户 |
| 拉模式 | 低(只写一条) | 高(聚合N个关注者的动态) | 关注数少的用户 |
| 推拉结合 | 中等 | 中等 | 混合场景(推荐) |
推拉结合是工程实践中最均衡的方案,关键在于大V阈值的选择(通常是粉丝1万到10万之间)和合并策略的实现细节。
