设计一个实时计数器:Redis、DB、内存缓存的一致性与性能权衡
设计一个实时计数器:Redis、DB、内存缓存的一致性与性能权衡
适读人群:Java中高级工程师、需要做高并发计数的技术人员 | 阅读时长:约16分钟 | 难度:★★★☆☆
开篇故事
视频网站的播放量计数,这是个看起来最简单的功能,但我们踩过不少坑。
最初的实现:用户每次播放,执行一次 UPDATE video SET play_count = play_count + 1 WHERE id = ?。单个视频千万级播放量时,这条更新语句成了热点行锁,MySQL的QPS一上来,延迟就飙升。
换成Redis INCR后,性能问题解决了,但新问题来了:Redis宕机时计数全部丢失,恢复后数据回到了几小时前的快照。某个大V的视频播放量从5000万瞬间"跌"到4800万,社区里炸了锅。
折腾了几个月,我们最终建立了一套多层计数架构,把性能、可靠性和一致性都做到了平衡点。这篇文章就把计数器这个"小而不简单"的系统设计讲透。
一、需求分析与规模估算
功能分类
计数器场景五花八门,先分个类:
精确计数(强一致): 库存数量、优惠券剩余数量、付费会员数。这类计数不能有误差,必须强一致。
准确计数(最终一致): 订单数量、用户余额变动次数。允许秒级延迟,但不能丢。
近似计数(弱一致): 视频播放量、文章阅读数、商品点击数。允许千分之一以内的误差,重视性能。
UV计数(去重计数): 页面独立访客数。需要去重,精确UV与近似UV方案差异很大。
规模估算(以视频播放量为例)
读写QPS:
- 热门视频:每秒1万次播放 = 写QPS 10000
- 读播放量(展示在视频列表):读QPS约写的5倍 = 50000 QPS
存储规模:
- 视频总数:1000万条
- 每条视频的播放量计数:8字节(Long类型)
- 总大小:1000万 × 8字节 = 80MB(极小,内存完全放得下)
关键挑战: 热点视频的写QPS极高(10000/s),MySQL单行锁完全扛不住。
可靠性要求:
- 计数丢失容忍度:允许最多丢失最近5分钟的数据(机器宕机时)
- 误差容忍度:允许0.1%的误差
二、三种主流方案对比
方案一:直接写数据库(不推荐)
UPDATE counter SET count = count + 1 WHERE id = ?;问题: 单行热点锁,高并发下TPS极低(MySQL行锁每秒约2000次更新,4000 QPS就挂了)
方案二:Redis INCR(高性能但有可靠性风险)
redisTemplate.opsForValue().increment("view:count:" + videoId);问题: Redis宕机或主从切换时可能丢数据(AOF/RDB都有窗口期)
方案三:内存计数 + 定期刷库(最常用)
应用内存维护计数,批量异步写入Redis/DB。
问题: 机器宕机时内存数据丢失
方案四:多级缓存 + 持久化异步刷(推荐)
本文重点讲这个方案,把性能、可靠性和一致性都做到满意水平。
三、系统架构设计
核心设计思路:
写入路径:本地内存缓冲(LongAdder)→ 批量刷Redis → 异步写MySQL。
读取路径:本地读缓存 → Redis → MySQL(三级缓存)。
可靠性保证:Kafka做Write-Ahead Log。每次本地缓冲刷出时,先发Kafka消息(记录本次刷出的增量),Kafka消费者写MySQL。即使Redis宕机,也可以从Kafka回放重建计数。
四、关键代码实现
4.1 本地计数缓冲器(高性能写入)
@Component
@Slf4j
public class LocalCounterBuffer {
// 每个视频一个LongAdder,LongAdder比AtomicLong在高并发下性能更好
// 原理:分段累加,减少CAS竞争
private final ConcurrentHashMap<String, LongAdder> bufferMap =
new ConcurrentHashMap<>();
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private KafkaTemplate<String, CounterFlushEvent> kafkaTemplate;
// 记录已注册的Key,用于定时刷新
private final Set<String> registeredKeys = ConcurrentHashMap.newKeySet();
/**
* 计数+1
* 这是最热的路径,必须极致快,只有内存操作
*/
public void increment(String counterKey) {
bufferMap.computeIfAbsent(counterKey, k -> {
registeredKeys.add(k);
return new LongAdder();
}).increment();
}
/**
* 批量+N(用于批处理场景)
*/
public void add(String counterKey, long delta) {
bufferMap.computeIfAbsent(counterKey, k -> {
registeredKeys.add(k);
return new LongAdder();
}).add(delta);
}
/**
* 定时刷新:每5秒把缓冲区的增量刷入Redis
* 非阻塞,失败不影响下一次刷新
*/
@Scheduled(fixedDelay = 5000)
public void flushToRedis() {
if (registeredKeys.isEmpty()) return;
// 快照:把当前所有Key的增量取出(并重置为0)
Map<String, Long> snapshot = new HashMap<>();
for (String key : registeredKeys) {
LongAdder adder = bufferMap.get(key);
if (adder != null) {
long delta = adder.sumThenReset(); // 原子取出并重置
if (delta > 0) {
snapshot.put(key, delta);
}
}
}
if (snapshot.isEmpty()) return;
// 1. Pipeline批量写Redis(减少网络RTT)
try {
redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
snapshot.forEach((key, delta) -> {
connection.stringCommands().incrBy(
("counter:" + key).getBytes(), delta
);
});
return null;
});
} catch (Exception e) {
log.error("批量刷新Redis失败", e);
// Redis失败时:把增量放回缓冲区
snapshot.forEach((key, delta) -> {
LongAdder adder = bufferMap.computeIfAbsent(key, k -> new LongAdder());
adder.add(delta); // 增量放回,等下次重试
});
return;
}
// 2. 发送Kafka消息(Write-Ahead Log,保证可靠性)
CounterFlushEvent event = CounterFlushEvent.builder()
.deltas(snapshot)
.timestamp(System.currentTimeMillis())
.build();
kafkaTemplate.send("counter-flush", event)
.addCallback(
result -> log.debug("计数刷新事件发送成功"),
ex -> log.error("计数刷新事件发送失败", ex)
);
log.debug("计数缓冲区刷新完成, keys={}, total_delta={}",
snapshot.size(), snapshot.values().stream().mapToLong(Long::longValue).sum());
}
}4.2 读取服务(三级缓存)
@Service
public class CounterReadService {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private CounterMapper counterMapper;
@Autowired
private LocalCounterBuffer localBuffer;
// 本地读缓存:30秒TTL,热点计数不用每次读Redis
private final Cache<String, Long> readCache = Caffeine.newBuilder()
.maximumSize(100_000)
.expireAfterWrite(30, TimeUnit.SECONDS)
.build();
/**
* 获取计数值
* 三级缓存:本地内存 → Redis → MySQL
*/
public long getCount(String counterKey) {
// 第一级:本地Caffeine缓存
Long cached = readCache.getIfPresent(counterKey);
if (cached != null) return cached;
// 第二级:Redis
String redisKey = "counter:" + counterKey;
String redisVal = redisTemplate.opsForValue().get(redisKey);
if (redisVal != null) {
long count = Long.parseLong(redisVal);
readCache.put(counterKey, count);
return count;
}
// 第三级:MySQL
Long dbCount = counterMapper.getCount(counterKey);
long count = dbCount != null ? dbCount : 0L;
// 回填Redis(带随机TTL防雪崩)
long ttl = 3600 + ThreadLocalRandom.current().nextLong(600);
redisTemplate.opsForValue().set(redisKey, String.valueOf(count),
ttl, TimeUnit.SECONDS);
readCache.put(counterKey, count);
return count;
}
/**
* 批量获取计数(用于视频列表展示)
*/
public Map<String, Long> batchGetCount(List<String> counterKeys) {
Map<String, Long> result = new HashMap<>();
List<String> missingKeys = new ArrayList<>();
// 先查本地缓存
for (String key : counterKeys) {
Long cached = readCache.getIfPresent(key);
if (cached != null) {
result.put(key, cached);
} else {
missingKeys.add(key);
}
}
if (missingKeys.isEmpty()) return result;
// Pipeline批量查Redis
List<Object> redisResults = redisTemplate.executePipelined(
(RedisCallback<Object>) connection -> {
missingKeys.forEach(key ->
connection.stringCommands().get(("counter:" + key).getBytes())
);
return null;
}
);
List<String> dbMissingKeys = new ArrayList<>();
for (int i = 0; i < missingKeys.size(); i++) {
String key = missingKeys.get(i);
Object val = redisResults.get(i);
if (val != null) {
long count = Long.parseLong(new String((byte[]) val));
result.put(key, count);
readCache.put(key, count);
} else {
dbMissingKeys.add(key);
}
}
// 批量查MySQL(极少数情况)
if (!dbMissingKeys.isEmpty()) {
Map<String, Long> dbResult = counterMapper.batchGetCount(dbMissingKeys);
dbResult.forEach((key, count) -> {
result.put(key, count);
readCache.put(key, count);
// 异步回填Redis
redisTemplate.opsForValue().set(
"counter:" + key, String.valueOf(count),
3600, TimeUnit.SECONDS
);
});
}
return result;
}
}4.3 UV计数(HyperLogLog实现)
对于需要去重的UV计数,精确统计需要存所有访问过的用户ID(内存消耗大),可以用HyperLogLog近似统计:
@Service
public class UVCounterService {
@Autowired
private StringRedisTemplate redisTemplate;
/**
* 记录一次UV
* HyperLogLog:内存消耗固定12KB,误差率0.81%
*/
public void recordUV(String pageKey, String userId) {
String key = "uv:" + pageKey + ":" + getCurrentDateStr();
redisTemplate.opsForHyperLogLog().add(key, userId);
// 数据保留7天
redisTemplate.expire(key, 7, TimeUnit.DAYS);
}
/**
* 获取今日UV
*/
public long getTodayUV(String pageKey) {
String key = "uv:" + pageKey + ":" + getCurrentDateStr();
Long count = redisTemplate.opsForHyperLogLog().size(key);
return count != null ? count : 0L;
}
/**
* 合并多天UV(求某个时间段内的独立访客数)
* HyperLogLog支持合并操作,不是简单相加(避免重复计用户)
*/
public long getRangeUV(String pageKey, LocalDate startDate, LocalDate endDate) {
List<String> keys = startDate.datesUntil(endDate.plusDays(1))
.map(date -> "uv:" + pageKey + ":" + date.format(DateTimeFormatter.BASIC_ISO_DATE))
.collect(Collectors.toList());
if (keys.isEmpty()) return 0;
// 合并多个HyperLogLog到临时Key
String mergeKey = "uv:merge:" + System.currentTimeMillis();
try {
redisTemplate.opsForHyperLogLog().union(mergeKey, keys.toArray(new String[0]));
Long count = redisTemplate.opsForHyperLogLog().size(mergeKey);
return count != null ? count : 0L;
} finally {
redisTemplate.delete(mergeKey);
}
}
private String getCurrentDateStr() {
return LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE);
}
}五、扩展性设计
从1万QPS扩展到100万QPS
本地缓冲区方案天然支持极高QPS: 本地内存操作的QPS上限是CPU处理能力(每秒数百万次),唯一的瓶颈是5秒一次的批量刷新。如果增量太大,Pipeline写Redis也能轻松处理。
热点Key问题: 某个超级爆款视频,每秒10万次播放,对应Redis的counter key每5秒收到一次INCRBY(增量5万),Redis单Key吞吐完全够用。
多数据中心场景: 每个数据中心维护独立的本地缓冲,定期合并到中心Redis,读取时合并各数据中心的值(近似计数可接受)。
六、踩坑实录
坑1:LongAdder的sumThenReset不是原子的
sumThenReset() 内部是先sum(累加各分段),再reset(重置各分段),这两个操作不是原子的。在sum和reset之间,如果有新的increment到来,这个increment会被reset掉,但没有被算进sum里,导致计数丢失。
解决方案:换用sum()取出当前值,然后记录这个值,与下次sum的差值作为增量发送,不用reset。
// 错误方式
long delta = adder.sumThenReset();
// 正确方式(记录上次sum的值,计算差值)
Map<String, Long> lastSumMap = new ConcurrentHashMap<>();
long currentSum = adder.sum();
long lastSum = lastSumMap.getOrDefault(key, 0L);
long delta = currentSum - lastSum;
lastSumMap.put(key, currentSum);坑2:Redis重启后计数回到MySQL快照值
Redis没有持久化(或AOF写盘延迟),重启后从MySQL加载数据。但MySQL是5分钟前的快照,加上本地缓冲还没刷入,导致计数回退了几分钟。
解决方案:Kafka作为WAL,Redis宕机后从Kafka最近的消息回放增量,恢复到最新状态。
坑3:定时刷新的5秒间隔导致计数"跳跃"
每5秒批量刷新一次,用户在视频详情页每隔几秒刷新,会看到播放量从"10000"瞬间跳到"10547"(一次刷新带来了547个用户的增量),视觉效果不好。
解决方案:前端做视觉平滑处理,展示时用动画递增;或者把刷新间隔缩短到1秒(性能损耗极小,因为用的是批量Pipeline)。
七、总结
计数器的选型决策树:
是否需要强一致?
是 → 直接写DB(加行锁),适合库存/余额
否 →
读QPS是否很高(>1万)?
是 → 三级缓存(本地+Redis+DB)
否 →
写QPS是否很高(>2000)?
是 → 本地缓冲+批量刷新
否 → Redis INCR即可计数器看似简单,但涉及的是"高频写 + 频繁读"的经典矛盾,本地缓冲批量刷是解决这个矛盾的通用思路。结合Kafka做WAL,可以在性能和可靠性之间找到很好的平衡点。
