CAS与ABA问题:AtomicStampedReference的版本号解决方案
CAS与ABA问题:AtomicStampedReference的版本号解决方案
适读人群:理解基本CAS操作、想深入了解无锁编程的Java工程师 | 阅读时长:约15分钟
开篇故事
2019年,我在一个金融支付团队工作,负责账户余额的并发扣款逻辑。当时为了"极致性能",用了CAS做无锁扣款:
// 简化版本
AtomicInteger balance = new AtomicInteger(100);
// 扣款30元
while (true) {
int current = balance.get();
int newBalance = current - 30;
if (newBalance < 0) throw new RuntimeException("余额不足");
if (balance.compareAndSet(current, newBalance)) break;
}上线后,系统偶发出现"余额不足"的错误回滚,但账户实际余额是够的。日志里看到账户余额100,扣款30,CAS失败,重试,余额还是100,但第二次CAS也失败了,第三次成功...但整个过程中用户感受到了多次重试导致的延迟。
更严重的是,压测时发现一个极小概率的bug:在高并发下,有个账户余额最终多扣了一次。排查了三天,最终发现是ABA问题。
过程是这样的:
- 线程T1读到balance=100,准备CAS从100改为70(扣30)
- T2先执行,把balance从100改为70(扣30)
- T3执行,把balance从70改回100(充值30,新的充值到账)
- T1的CAS:100→70,成功!但此时T1扣的那30元,其实是用T3充进来的那30元抵消的
账面余额对了,但逻辑上T1操作了"上一个版本的100",而不是"当前版本的100"。
这就是经典的ABA问题。
一、CAS原理与ABA问题
1.1 CAS的底层实现
CAS(Compare-And-Swap)是一条CPU原子指令。在x86架构上,对应LOCK CMPXCHG指令:
LOCK CMPXCHG [内存地址], 新值
; 如果 [内存地址] == 预期值(EAX/RAX寄存器),则写入新值,ZF=1
; 否则,把 [内存地址] 的值加载到EAX/RAX,ZF=0LOCK前缀保证这是一个原子操作(不可被其他CPU中断),并且触发总线锁或缓存锁定。
Java的AtomicInteger.compareAndSet(expect, update)底层调用Unsafe.compareAndSwapInt(),最终编译成这条CPU指令。
1.2 ABA问题的本质
CAS只比较值是否等于预期,不关心这个值经历了哪些变化。
如果:
- 变量从A变成B
- 然后从B变回A
此时CAS会认为"变量没有变化"(值还是A),但实际上已经发生了两次修改。
什么场景下ABA会引发问题?
关键在于业务语义:如果业务不关心"中间变化过程",ABA无害(比如简单计数器,100→70→100没有语义问题)。但如果业务需要确保"自从上次读取后没有被修改过",ABA就是bug。
二、AtomicStampedReference解决方案
2.1 AtomicStampedReference的内部实现
AtomicStampedReference内部封装了一个Pair<T>,把引用和版本号(stamp)打包在一起,对整个Pair做CAS:
// AtomicStampedReference内部的Pair类
private static class Pair<T> {
final T reference;
final int stamp;
static <T> Pair<T> of(T reference, int stamp) {
return new Pair<T>(reference, stamp);
}
}
// 字段
private volatile Pair<V> pair;
// CAS操作:同时比较reference和stamp
public boolean compareAndSet(V expectedReference,
V newReference,
int expectedStamp,
int newStamp) {
Pair<V> current = pair;
return expectedReference == current.reference
&& expectedStamp == current.stamp
&& ((newReference == current.reference && newStamp == current.stamp)
|| casPair(current, Pair.of(newReference, newStamp)));
}注意:每次stamp变化都会创建一个新的Pair对象,这会产生GC压力。如果stamp变化非常频繁,考虑用AtomicMarkableReference(只有1位的mark标志,不是完整的版本号)来降低开销。
2.2 AtomicMarkableReference
当你不需要具体的版本号,只需要知道"有没有被修改过",用AtomicMarkableReference:
AtomicMarkableReference<String> ref = new AtomicMarkableReference<>("value", false);
// 标记为已修改
ref.compareAndSet("value", "value", false, true);
// 只有当value相同且mark为false时才CAS
boolean success = ref.compareAndSet("value", "newValue", false, false);三、完整代码实现
3.1 演示ABA问题与修复
package com.laozhang.concurrent.cas;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicStampedReference;
/**
* ABA问题演示与AtomicStampedReference修复
*
* 场景:模拟两个线程操作同一个"栈"
* - 使用AtomicInteger模拟栈顶指针(复现ABA问题)
* - 使用AtomicStampedReference修复
*
* 测试环境:JDK 11
*/
public class ABADemo {
// ===== 有ABA问题的版本 =====
static AtomicInteger aba_counter = new AtomicInteger(100);
/**
* 演示ABA问题:T1读到100,T2把100改为70再改回100
* T1的CAS从100→70成功,但T1"以为"账户没有变化
*/
public static void demonstrateABA() throws InterruptedException {
System.out.println("=== ABA问题演示 ===");
System.out.println("初始值:" + aba_counter.get());
// T2线程:快速执行两次操作,模拟ABA
Thread t2 = new Thread(() -> {
// ABA序列
aba_counter.compareAndSet(100, 70); // A→B
System.out.println("[T2] 100→70: " + aba_counter.get());
aba_counter.compareAndSet(70, 100); // B→A
System.out.println("[T2] 70→100: " + aba_counter.get());
}, "T2");
// T1线程:读到100,准备CAS
int t1_read = aba_counter.get();
System.out.println("[T1] 读到值:" + t1_read);
// 让T2先完成ABA操作
t2.start();
t2.join();
// T1执行CAS:认为值没变(还是100),成功
boolean result = aba_counter.compareAndSet(t1_read, 70);
System.out.println("[T1] CAS(100→70):" + result + ",当前值:" + aba_counter.get());
System.out.println("问题:T1不知道期间发生了 100→70→100 的变化");
}
// ===== AtomicStampedReference修复版本 =====
// 初始值100,初始stamp=1
static AtomicStampedReference<Integer> stamped_balance =
new AtomicStampedReference<>(100, 1);
/**
* 使用AtomicStampedReference防止ABA
*/
public static void fixWithStampedReference() throws InterruptedException {
System.out.println("\n=== AtomicStampedReference修复 ===");
// T1读取当前值和版本号
int[] t1_stamp = new int[1];
Integer t1_value = stamped_balance.get(t1_stamp);
System.out.printf("[T1] 读取:value=%d, stamp=%d%n", t1_value, t1_stamp[0]);
// T2线程:ABA操作(每次操作stamp+1)
Thread t2 = new Thread(() -> {
int[] stamp = new int[1];
Integer val = stamped_balance.get(stamp);
// A→B,stamp: 1→2
boolean r1 = stamped_balance.compareAndSet(100, 70, 1, 2);
System.out.printf("[T2] CAS(100→70, stamp 1→2):%b, 当前:value=%d, stamp=%d%n",
r1, stamped_balance.getReference(), stamped_balance.getStamp());
// B→A,stamp: 2→3
boolean r2 = stamped_balance.compareAndSet(70, 100, 2, 3);
System.out.printf("[T2] CAS(70→100, stamp 2→3):%b, 当前:value=%d, stamp=%d%n",
r2, stamped_balance.getReference(), stamped_balance.getStamp());
}, "T2");
t2.start();
t2.join();
// T1尝试CAS:期望value=100, stamp=1
// 现在实际stamp=3(T2修改了两次),CAS失败!
boolean result = stamped_balance.compareAndSet(
t1_value, 70, // 期望value=100, 新value=70
t1_stamp[0], t1_stamp[0] + 1 // 期望stamp=1, 新stamp=2
);
System.out.printf("[T1] CAS(100→70, stamp %d→%d):%b%n",
t1_stamp[0], t1_stamp[0] + 1, result);
System.out.printf("当前:value=%d, stamp=%d%n",
stamped_balance.getReference(), stamped_balance.getStamp());
System.out.println("T1的CAS失败了!检测到了ABA变化");
}
public static void main(String[] args) throws InterruptedException {
demonstrateABA();
fixWithStampedReference();
}
}3.2 无锁栈的ABA问题与修复
package com.laozhang.concurrent.cas;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicStampedReference;
/**
* 无锁栈(Treiber Stack)的ABA问题与修复
*
* 无锁栈使用AtomicReference存储栈顶节点
* ABA场景:
* 1. T1读到栈顶是节点A(下一个是节点B)
* 2. T2弹出A和B,推入C和A(A重新成为栈顶,但next已经不是B而是C)
* 3. T1 CAS 成功,但T1以为A.next还是B,其实已经是C了
* 结果:栈结构被破坏,节点C丢失
*
* 测试环境:JDK 11
*/
public class LockFreeStackDemo {
static class Node<T> {
final T value;
Node<T> next;
Node(T value) { this.value = value; }
}
// ===== 有ABA问题的无锁栈 =====
static class ABAStack<T> {
private final AtomicReference<Node<T>> top = new AtomicReference<>();
public void push(T value) {
Node<T> newNode = new Node<>(value);
Node<T> currentTop;
do {
currentTop = top.get();
newNode.next = currentTop;
} while (!top.compareAndSet(currentTop, newNode));
}
public T pop() {
Node<T> currentTop;
Node<T> newTop;
do {
currentTop = top.get();
if (currentTop == null) return null;
newTop = currentTop.next;
} while (!top.compareAndSet(currentTop, newTop));
return currentTop.value;
}
}
// ===== 用AtomicStampedReference修复的无锁栈 =====
static class SafeStack<T> {
// stamp表示操作次数,每次push/pop时+1
private final AtomicStampedReference<Node<T>> top =
new AtomicStampedReference<>(null, 0);
public void push(T value) {
Node<T> newNode = new Node<>(value);
int[] stamp = new int[1];
Node<T> currentTop;
do {
currentTop = top.get(stamp);
newNode.next = currentTop;
} while (!top.compareAndSet(currentTop, newNode, stamp[0], stamp[0] + 1));
}
public T pop() {
int[] stamp = new int[1];
Node<T> currentTop;
Node<T> newTop;
do {
currentTop = top.get(stamp);
if (currentTop == null) return null;
newTop = currentTop.next;
} while (!top.compareAndSet(currentTop, newTop, stamp[0], stamp[0] + 1));
return currentTop.value;
}
public boolean isEmpty() {
return top.getReference() == null;
}
}
public static void main(String[] args) throws InterruptedException {
// 测试SafeStack的基本功能
SafeStack<Integer> stack = new SafeStack<>();
// 单线程基本测试
stack.push(1);
stack.push(2);
stack.push(3);
System.out.println("弹出顺序(应为3,2,1):");
System.out.print(stack.pop() + " "); // 3
System.out.print(stack.pop() + " "); // 2
System.out.println(stack.pop()); // 1
// 多线程并发push/pop测试
SafeStack<Integer> concurrentStack = new SafeStack<>();
int[] sumPushed = {0};
int[] sumPopped = {0};
Thread[] pushers = new Thread[4];
Thread[] poppers = new Thread[4];
for (int i = 0; i < 4; i++) {
final int base = i * 1000;
pushers[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
concurrentStack.push(base + j);
synchronized (sumPushed) { sumPushed[0] += base + j; }
}
});
poppers[i] = new Thread(() -> {
for (int j = 0; j < 1000; j++) {
Integer val = concurrentStack.pop();
if (val != null) {
synchronized (sumPopped) { sumPopped[0] += val; }
}
}
});
}
for (Thread t : pushers) t.start();
for (Thread t : poppers) t.start();
for (Thread t : pushers) t.join();
for (Thread t : poppers) t.join();
// 排空剩余
Integer val;
while ((val = concurrentStack.pop()) != null) {
synchronized (sumPopped) { sumPopped[0] += val; }
}
System.out.println("push总和:" + sumPushed[0]);
System.out.println("pop总和:" + sumPopped[0]);
System.out.println("数据一致:" + (sumPushed[0] == sumPopped[0]));
}
}四、踩坑实录
坑1:stamp用int类型可能溢出
报错现象: 系统运行很长时间后,AtomicStampedReference的stamp从Integer.MAX_VALUE溢出变成Integer.MIN_VALUE,之后重新递增,最终再次匹配到某个旧的stamp值,ABA重现。
原因分析: int类型的stamp是32位,最大值约21亿。如果每次操作stamp+1,每秒100万次操作,约35分钟后溢出。溢出后从MIN_VALUE开始,理论上可能碰撞。
解法: 如果操作极其频繁,考虑用long类型的stamp(但Java没有内置的AtomicStampedLongReference,需要自己实现或用AtomicLong存一个编码了reference和stamp的复合值)。对于一般业务场景,int的21亿次足够,但要有意识。
坑2:CAS自旋在高竞争下CPU飙升
报错现象: 高并发下(几百个线程),CPU使用率接近100%,但实际有效吞吐量很低。分析发现CAS失败率极高,大量线程在自旋重试。
原因分析: CAS自旋适合低竞争场景。如果有N个线程同时CAS同一个变量,每轮只有1个成功,其他N-1个失败并重试。竞争越激烈,失败率越高,自旋开销越大。最坏情况下,系统在"无意义的重试"上消耗大量CPU。
解法:
- 减少竞争范围(分段、sharding),如
LongAdder把一个long分成多个Cell,减少单点竞争 - 加入退避策略(exponential backoff):失败后等待随机时间再重试
- 竞争非常激烈时,回退到锁(synchronized或ReentrantLock)反而更好
坑3:AtomicReference的CAS对比用的是引用相等(==)而非equals
报错现象: 用AtomicReference<String>做CAS,偶发CAS失败,即使字符串内容相同。
原因分析: AtomicReference.compareAndSet(expect, update)用==(引用相等)比较,不是equals()(内容相等)。两个内容相同但地址不同的String对象,CAS会失败。
AtomicReference<String> ref = new AtomicReference<>("hello");
String expect = new String("hello"); // 新建String对象,与ref的引用不同
boolean result = ref.compareAndSet(expect, "world"); // false!解法: 对于AtomicReference,必须保持对象引用的一致性(不要每次new一个新对象来作为expect值)。用ref.get()取出引用,用这个引用做CAS。
坑4:AtomicStampedReference的get方法需要传入数组获取stamp
报错现象: 代码里stamped.get()只能拿到reference,stamp不知道怎么取,误用了getStamp(),但getReference()和getStamp()是两次调用,不是原子的。
原因分析: AtomicStampedReference.get(int[] stampHolder)是原子性地同时返回reference和stamp的正确方式:
int[] stamp = new int[1];
T value = stamped.get(stamp); // 同时获取value和stamp,原子的
// stamp[0] 就是当前的stamp值
// 错误:两次非原子调用
T value = stamped.getReference(); // 这两行之间可能有并发修改
int s = stamped.getStamp();解法: 始终用get(int[] stampHolder)同时获取value和stamp,保证读取的一致性。
五、总结与延伸
CAS是Java无锁并发的基石,理解ABA问题是写出正确无锁代码的前提。
ABA问题出现的条件:
- CAS操作基于值比较
- 值从A→B→A后,CAS无法区分"一直是A"和"变过了又回来"
- 业务逻辑关心这种中间变化
AtomicStampedReference的代价:
- 每次更新创建新Pair对象(GC压力)
- 原子操作比普通AtomicReference稍慢
- 只适合版本号增长速度远低于stamp溢出阈值的场景
实际开发中的选择:
- 简单计数/标志:
AtomicInteger/AtomicBoolean,ABA通常无害 - 对象引用不需要关心中间变化:
AtomicReference - 对象引用需要检测中间变化:
AtomicStampedReference - 只需要知道"是否被修改过"(1bit):
AtomicMarkableReference - 高竞争计数:
LongAdder(分段减少CAS竞争)
