消息幂等性三种方案:业务唯一键、数据库唯一索引、Redis去重
消息幂等性三种方案:业务唯一键、数据库唯一索引、Redis去重
适读人群:饱受重复消费问题困扰的Java工程师,想彻底搞清楚幂等方案选型 | 阅读时长:约16分钟
开篇故事
我们积分系统有一段难忘的历史。
2020年某天,消费者实例因为机器重启,触发了一次Rebalance,部分offset没来得及提交。重启后,这批消息被重新消费了一遍。结果:有9800个用户的积分被重复增加了一次,每人多了10-500积分不等。
整个修复过程极其痛苦:我们要找出哪些用户被重复加了积分,然后一一扣回。因为积分可以兑换礼品,有些用户已经用掉了多出来的积分,扣回时还要单独处理。这件事从发现到完全处理完,花了我们整整三天。
这件事之后,我在所有消费者里强制要求做幂等设计,并总结了三套方案供不同场景选择。今天全部分享出来。
一、幂等的本质与设计原则
幂等(Idempotent)的数学定义是:f(f(x)) = f(x),操作执行多次的结果和执行一次相同。
消息消费场景的幂等:同一条消息无论被消费多少次,对系统的最终状态影响与消费一次相同。
设计幂等方案时,有三个核心问题要回答:
- 唯一键从哪来:消息ID?业务唯一键?
- 幂等状态存哪里:Redis?数据库?
- 并发重复怎么处理:悲观锁?乐观锁?唯一索引?
二、三种幂等方案详解
2.1 方案一:业务唯一键去重
核心思路:依靠业务本身的唯一性约束判断是否重复。
比如"给用户增加积分"这个操作,每次积分增加对应一个唯一的业务事件(订单号+积分类型),直接查业务表里是否已有该记录。
优点:
- 不需要额外存储空间(业务表本身就有数据)
- 天然和业务绑定,不需要维护单独的幂等状态
缺点:
- 每次都要查业务表,有额外数据库压力
- 需要明确的业务唯一键,不适合"幂等键不好定义"的场景
适用场景:积分、优惠券发放、账单生成等有明确业务唯一键的操作。
2.2 方案二:数据库唯一索引
核心思路:利用数据库唯一索引的冲突检测机制。
在消费记录表上建唯一索引,重复消费时INSERT会抛出DuplicateKeyException,捕获这个异常认为是已处理,正常返回。
优点:
- 利用数据库事务,天然原子性,不会出现并发下的重复处理
- 不需要Redis等额外中间件
- 数据库持久化,不会因Redis重启丢失幂等状态
缺点:
- 消费记录表会持续增长,需要定期清理历史数据
- 每条消息都要写DB,对数据库有压力(通常可以接受,INSERT很快)
适用场景:中等流量、对强一致性要求高、没有Redis或Redis不可靠的场景。
2.3 方案三:Redis SET NX去重
核心思路:用Redis的原子性SET NX(Set if Not eXists)作为分布式锁。
SET key value NX EX seconds:原子性地"如果键不存在则设置,同时设置过期时间"。返回OK说明是第一次处理,返回nil说明已经处理过。
优点:
- 速度最快(Redis内存操作,微秒级)
- 不占用数据库资源
- 适合高频消息场景
缺点:
- Redis宕机或重启导致幂等状态丢失,可能出现重复处理
- TTL设置不当(TTL过短,历史消息重新出现时Redis key已过期)
- Redis和业务DB不在同一事务,极端情况下可能出现不一致
适用场景:高流量、允许极小概率重复、Redis高可用的场景(日志、埋点、通知)。
三、完整Java代码实现
3.1 方案一:业务唯一键实现
/**
* 积分消费者:业务唯一键幂等方案
*/
@Component
@Slf4j
public class PointsConsumer {
private final PointsRepository pointsRepository;
private final PointsRecordRepository pointsRecordRepository;
@KafkaListener(
topics = "points-events",
groupId = "points-consumer-group"
)
@Transactional(rollbackFor = Exception.class)
public void consume(ConsumerRecord<String, String> record,
Acknowledgment acknowledgment) {
PointsEvent event = parseEvent(record.value());
String businessKey = event.getOrderNo() + ":" + event.getPointsType();
// 幂等检查:查业务记录表
boolean exists = pointsRecordRepository.existsByBusinessKey(businessKey);
if (exists) {
log.info("积分已发放,跳过: businessKey={}", businessKey);
acknowledgment.acknowledge();
return;
}
// 增加积分
pointsRepository.increasePoints(event.getUserId(), event.getPoints());
// 记录积分流水(包含businessKey,下次查询用)
PointsRecord pointsRecord = new PointsRecord();
pointsRecord.setUserId(event.getUserId());
pointsRecord.setPoints(event.getPoints());
pointsRecord.setPointsType(event.getPointsType());
pointsRecord.setOrderNo(event.getOrderNo());
pointsRecord.setBusinessKey(businessKey);
pointsRecord.setCreateTime(LocalDateTime.now());
pointsRecordRepository.save(pointsRecord);
acknowledgment.acknowledge();
log.info("积分发放成功: userId={}, points={}, businessKey={}",
event.getUserId(), event.getPoints(), businessKey);
}
}3.2 方案二:数据库唯一索引实现
/**
* 消息消费幂等处理器:数据库唯一索引方案
* 核心:INSERT msg_id,重复时捕获DuplicateKeyException
*/
@Component
@Slf4j
public class IdempotentConsumeHandler {
private final ConsumeLogRepository consumeLogRepository;
/**
* 幂等执行消费逻辑
*
* @param msgId 消息唯一ID(Kafka的topic+partition+offset,或业务消息ID)
* @param bizKey 业务键(用于日志追踪)
* @param consumer 业务处理逻辑(Lambda)
* @return true=正常处理,false=幂等跳过
*/
@Transactional(rollbackFor = Exception.class)
public boolean executeIdempotent(String msgId, String bizKey,
Runnable consumer) {
try {
// 先插入消费记录(唯一索引在msg_id字段)
// 如果已存在,会抛出DuplicateKeyException
ConsumeLog log = new ConsumeLog();
log.setMsgId(msgId);
log.setBizKey(bizKey);
log.setConsumeTime(LocalDateTime.now());
log.setStatus(ConsumeStatus.PROCESSING);
consumeLogRepository.save(log);
// INSERT成功,执行业务逻辑
consumer.run();
// 更新状态为完成
consumeLogRepository.updateStatus(msgId, ConsumeStatus.DONE);
return true;
} catch (DataIntegrityViolationException e) {
// 唯一索引冲突:已处理过
if (e.getCause() instanceof SQLIntegrityConstraintViolationException) {
log.info("消息已处理(幂等),跳过: msgId={}, bizKey={}", msgId, bizKey);
return false;
}
throw e;
}
}
}
/**
* 使用幂等处理器的消费者示例
*/
@Component
@Slf4j
public class OrderEventConsumer {
private final IdempotentConsumeHandler idempotentHandler;
private final OrderService orderService;
@KafkaListener(topics = "order-events", groupId = "order-consumer-group")
public void consume(ConsumerRecord<String, String> record,
Acknowledgment ack) {
// 构建消息唯一ID:topic + partition + offset 保证全局唯一
String msgId = String.format("%s-%d-%d",
record.topic(), record.partition(), record.offset());
OrderEvent event = parseEvent(record.value());
boolean processed = idempotentHandler.executeIdempotent(
msgId,
"order:" + event.getOrderNo(),
() -> orderService.processOrderCreated(event)
);
if (!processed) {
log.info("幂等跳过: msgId={}", msgId);
}
ack.acknowledge();
}
}
// 消费记录表DDL(唯一索引是关键)
/*
CREATE TABLE consume_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
msg_id VARCHAR(128) NOT NULL COMMENT '消息唯一ID',
biz_key VARCHAR(256) COMMENT '业务键',
status VARCHAR(32) COMMENT '处理状态',
consume_time DATETIME COMMENT '消费时间',
UNIQUE KEY uk_msg_id (msg_id) -- 唯一索引保证幂等
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
*/3.3 方案三:Redis NX实现
/**
* Redis幂等处理器
* 使用SET NX + EX保证原子性
*/
@Component
@Slf4j
public class RedisIdempotentHandler {
private static final String IDEMPOTENT_PREFIX = "idempotent:";
private static final long DEFAULT_TTL_HOURS = 24;
private final StringRedisTemplate redisTemplate;
public RedisIdempotentHandler(StringRedisTemplate redisTemplate) {
this.redisTemplate = redisTemplate;
}
/**
* Redis幂等检查
*
* @param msgId 消息唯一ID
* @param ttlHours 幂等状态保留时间(小时),需要大于消息可能积压的最长时间
* @return true=第一次处理,false=重复消息
*/
public boolean isFirstTime(String msgId, long ttlHours) {
String key = IDEMPOTENT_PREFIX + msgId;
// SET key value NX EX seconds - 原子操作
Boolean success = redisTemplate.opsForValue().setIfAbsent(
key,
String.valueOf(System.currentTimeMillis()),
Duration.ofHours(ttlHours)
);
return Boolean.TRUE.equals(success);
}
/**
* 带重试保障的幂等消费
* 处理Redis和DB不在同一事务的极端情况:
* 1. Redis SET成功 -> 业务处理失败 -> 消息重投 -> Redis key已存在 -> 被幂等跳过
* 解决:业务处理失败时删除Redis key,允许重新处理
*/
public void executeIdempotent(String msgId, Runnable consumer) {
String key = IDEMPOTENT_PREFIX + msgId;
if (!isFirstTime(msgId, DEFAULT_TTL_HOURS)) {
log.info("重复消息,跳过: msgId={}", msgId);
return;
}
try {
consumer.run();
} catch (Exception e) {
// 业务处理失败,删除Redis key,允许重新投递时重新处理
redisTemplate.delete(key);
log.error("业务处理失败,已清除幂等状态: msgId={}", msgId);
throw e;
}
}
}
/**
* 高频日志事件消费者:Redis幂等方案
*/
@Component
@Slf4j
public class UserBehaviorLogConsumer {
private final RedisIdempotentHandler idempotentHandler;
private final BehaviorLogRepository logRepository;
@KafkaListener(
topics = "user-behavior-log",
groupId = "log-consumer-group",
containerFactory = "highThroughputContainerFactory"
)
public void consume(List<ConsumerRecord<String, String>> records,
Acknowledgment ack) {
int skipped = 0;
int processed = 0;
for (ConsumerRecord<String, String> record : records) {
String msgId = record.topic() + ":" + record.partition() + ":" + record.offset();
// 使用Redis幂等(允许极小概率重复)
if (!idempotentHandler.isFirstTime(msgId, 48)) {
skipped++;
continue;
}
try {
UserBehaviorLog logEvent = parseLogEvent(record.value());
logRepository.save(logEvent);
processed++;
} catch (Exception e) {
log.error("日志保存失败: msgId={}", msgId, e);
// 日志允许丢失,继续处理下一条
}
}
ack.acknowledge();
log.info("批次处理完成: total={}, processed={}, skipped(幂等)={}",
records.size(), processed, skipped);
}
}四、踩坑实录
坑1:用Kafka的offset作为幂等键的误区
很多人用topic+partition+offset作为幂等键,看起来全局唯一,但有个问题:offset是由Kafka分配的,同一条业务消息在不同时刻可能有不同的offset。
场景:消费者A拉取了消息offset=100,正要处理时Rebalance,partition被转移给消费者B,B从上次提交的offset=95重新消费,这条消息到B这里可能还是offset=100(同一条消息,相同offset),幂等键相同,OK,没问题。
但如果Producer重新发了这条消息(业务重试),新消息的offset会是新的(比如200),幂等键不同,会被当做新消息处理,实际上业务层面是重复的。
结论:如果要防业务层重复,幂等键必须用业务唯一键(订单号+操作类型),不能只用Kafka的offset。
坑2:Redis幂等和DB事务不原子导致的问题
1. Redis SET NX 成功(标记处理中)
2. 开启DB事务
3. DB操作
4. DB提交事务成功
5. 应用宕机(没有返回ACK)
6. 消息重投到另一个消费者
7. Redis key还在 -> 被幂等跳过 -> 实际上DB已经提交了,没问题
但反过来:
1. Redis SET NX 成功
2. DB事务提交失败
3. 应用宕机
4. 消息重投
5. Redis key还在 -> 被幂等跳过 -> DB没有数据!!!解决方案:业务失败时主动删除Redis key(如上面代码所示),或者用DB唯一索引方案避免这个问题。
坑3:幂等键TTL设置过短
积压消息是最容易踩的坑。系统出现故障,消息积压了3天。Redis幂等键TTL=24小时,3天后重新消费时,24小时前的幂等键已经过期,消息被重复处理。
规则:Redis幂等TTL >= 消息最大可能积压时间 + 一个安全余量(建议2倍)。如果消息最多积压3天,TTL应该设置为7天。
坑4:并发重复消息的竞争条件
两个消费者实例同时收到同一条重复消息(理论上不常见,但Rebalance期间可能发生),同时查DB发现记录不存在,同时执行INSERT,其中一个会失败(DuplicateKeyException)。这其实是正确的,但代码里必须正确处理这个异常,不要把它当做业务错误抛出去。
坑5:幂等消费记录表无限增长
consume_log表每天新增数百万条记录,没有配置清理任务,三个月后表大小超过50GB,严重影响INSERT性能(因为唯一索引维护变慢),最终导致消费速度下降。
解决方案:
- 按月分表,老表定期归档或删除
- 或者用消息offset作为分区键,只保留最近N天的幂等记录
- 配置定时清理任务,清理超过TTL的记录
五、三种方案选型总结
| 方案 | 适用场景 | 优点 | 缺点 | 吞吐量参考 |
|---|---|---|---|---|
| 业务唯一键 | 有明确业务唯一键,强一致 | 贴合业务,无额外依赖 | 需要业务支持 | 1-2万TPS |
| DB唯一索引 | 中等流量,强一致,无Redis | 强一致,有历史记录 | DB压力,需清理 | 2-5万TPS |
| Redis SET NX | 高流量,允许极小概率重复 | 极快(微秒级),低DB压力 | Redis故障丢幂等状态 | 10万+TPS |
生产中我们通常是组合使用:Redis NX做第一道快速过滤(拦截大部分重复),DB唯一索引做第二道兜底(防止Redis失效时的重复)。这样既保证了性能,又保证了可靠性。
下一篇(第436期)讲Kafka vs RocketMQ vs RabbitMQ的完整选型决策矩阵,三个主流MQ各有什么优劣,什么场景用哪个,给你一个清晰的判断框架。
