并发容器选型指南:7种容器的适用场景与性能对比
并发容器选型指南:7种容器的适用场景与性能对比
适读人群:Java后端工程师、需要在多线程场景选择合适容器的开发者 | 阅读时长:约16分钟
开篇故事
代码评审时,我最常看到的并发容器使用问题有两类:
第一类:用了错误的容器导致bug。最典型的是用HashMap在多线程场景,JDK 7时出现死循环,JDK 8时出现数据丢失。
第二类:用了"安全的"容器,但不合适。比如把Hashtable当线程安全Map用,全方法加了synchronized,并发性能极差;或者把CopyOnWriteArrayList用在写频繁的场景,GC风暴。
今天系统梳理7种常用并发容器,从读多写少到写多读少,从有序到无序,把选型决策树讲清楚。
一、7种并发容器概览
| 容器 | 类型 | 线程安全方式 | JDK版本 | 特点 |
|---|---|---|---|---|
Hashtable | Map | 方法级synchronized | 1.0 | 全局锁,极低并发性,不推荐 |
Collections.synchronizedMap | Map | 方法级synchronized | 1.2 | 包装器,同Hashtable,不推荐 |
ConcurrentHashMap | Map | 分段锁/CAS | 1.5 | 高并发Map首选 |
CopyOnWriteArrayList | List | 写时复制 | 1.5 | 读多写少List |
CopyOnWriteArraySet | Set | 基于COWArrayList | 1.5 | 读多写少Set |
ConcurrentLinkedQueue | Queue | 无锁CAS | 1.5 | 高吞吐无界并发队列 |
ConcurrentSkipListMap | 有序Map | 跳表+CAS | 1.6 | 支持并发的有序Map |
二、核心选型决策流程
2.1 ConcurrentHashMap的分代演进
JDK 7的ConcurrentHashMap:分段锁(Segment),默认16个Segment,每个Segment是一个小HashMap,各自有独立的ReentrantLock。并发度 = Segment数量(最高16)。
JDK 8的ConcurrentHashMap:彻底重写,放弃Segment,改用CAS + synchronized:
- get操作:完全无锁(volatile读节点)
- put操作:
- 如果桶为空:CAS写入头节点(无锁)
- 如果桶有节点:synchronized(头节点) + 链表/红黑树操作(最细粒度的锁)
- 并发度 = 数组长度(最高可达16384以上)
JDK 8的resize:多线程协作扩容,每个线程领取一段桶区间并行迁移,transfer数组标记迁移进度。
2.2 ConcurrentSkipListMap:并发有序Map
跳表(Skip List)是一种概率性数据结构,兼顾有序性和并发性:
- 查找、插入、删除:平均O(log n)
- 天然支持范围查询(
subMap,headMap,tailMap) - 无锁实现(使用CAS),并发性优于
TreeMap + ReadWriteLock
三、完整代码实现
3.1 各容器并发读写的正确用法
package com.laozhang.concurrent.collection;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
* 7种并发容器的使用示例与选型对比
*
* 演示各种容器在并发场景下的正确用法和关键注意点
* 测试环境:JDK 11
*/
public class ConcurrentContainerGuide {
// ===== 1. ConcurrentHashMap =====
static void demoConcurrentHashMap() throws InterruptedException {
ConcurrentHashMap<String, AtomicLong> wordCount = new ConcurrentHashMap<>();
// 正确的原子性操作:computeIfAbsent + atomicLong
// 不能用 map.get(key) 然后 map.put(key, value) —— 这不是原子的!
String[] words = {"hello", "world", "hello", "java", "world", "hello"};
Thread[] threads = new Thread[3];
for (int t = 0; t < 3; t++) {
threads[t] = new Thread(() -> {
for (String word : words) {
wordCount.computeIfAbsent(word, k -> new AtomicLong(0))
.incrementAndGet(); // 原子递增
}
});
threads[t].start();
}
for (Thread t : threads) t.join();
System.out.println("词频统计:" + wordCount);
// hello=9, world=6, java=3
// 原子性的条件更新
wordCount.compute("java", (k, v) -> v == null ? new AtomicLong(1) : v);
// merge操作(合并两个Map的值)
ConcurrentHashMap<String, Long> map1 = new ConcurrentHashMap<>();
ConcurrentHashMap<String, Long> map2 = new ConcurrentHashMap<>();
map1.put("a", 1L); map2.put("a", 2L); map2.put("b", 3L);
map2.forEach((k, v) ->
map1.merge(k, v, Long::sum)); // 原子合并
System.out.println("合并后:" + map1); // {a=3, b=3}
}
// ===== 2. ConcurrentSkipListMap =====
static void demoConcurrentSkipListMap() {
ConcurrentSkipListMap<Long, String> scoreBoard = new ConcurrentSkipListMap<>();
// 模拟竞赛排行榜(按分数排序)
scoreBoard.put(95L, "张三");
scoreBoard.put(87L, "李四");
scoreBoard.put(99L, "王五");
scoreBoard.put(76L, "赵六");
// 有序的范围查询:分数在80-95之间的选手
ConcurrentNavigableMap<Long, String> subMap = scoreBoard.subMap(80L, true, 95L, true);
System.out.println("80-95分的选手:" + subMap); // {87=李四, 95=张三}
// 最高分
System.out.println("最高分:" + scoreBoard.lastEntry()); // 99=王五
// 并发更新
scoreBoard.put(99L, "王五(冠军)"); // CAS保证原子性
}
// ===== 3. CopyOnWriteArrayList =====
static void demoCopyOnWriteArrayList() throws InterruptedException {
CopyOnWriteArrayList<String> listeners = new CopyOnWriteArrayList<>();
// 适合事件监听器列表:注册少,触发多
listeners.add("EmailNotifier");
listeners.add("SMSNotifier");
listeners.add("PushNotifier");
// 并发触发事件:读取监听器列表(无锁)
Thread[] eventTriggers = new Thread[100];
for (int i = 0; i < 100; i++) {
final int eventId = i;
eventTriggers[i] = new Thread(() -> {
// 迭代期间,即使有其他线程增删监听器,也不会CME
for (String listener : listeners) {
notifyListener(listener, "EVENT-" + eventId);
}
});
eventTriggers[i].start();
}
// 偶尔添加新监听器(低频写)
listeners.add("LoggingNotifier");
for (Thread t : eventTriggers) t.join();
}
static void notifyListener(String listener, String event) {
// 模拟通知
}
// ===== 4. ConcurrentLinkedQueue =====
static void demoConcurrentLinkedQueue() throws InterruptedException {
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
// 适合高吞吐的任务队列(不需要阻塞)
Thread producer = new Thread(() -> {
for (int i = 0; i < 1000; i++) {
queue.offer("task-" + i);
}
});
Thread consumer = new Thread(() -> {
int consumed = 0;
while (consumed < 1000) {
String task = queue.poll(); // 不阻塞,没有任务返回null
if (task != null) {
consumed++;
}
// 如果需要等待,用Thread.yield()或sleep,或者换用BlockingQueue
}
});
producer.start();
consumer.start();
producer.join();
consumer.join();
System.out.println("ConcurrentLinkedQueue剩余:" + queue.size());
}
public static void main(String[] args) throws InterruptedException {
System.out.println("=== ConcurrentHashMap ===");
demoConcurrentHashMap();
System.out.println("\n=== ConcurrentSkipListMap ===");
demoConcurrentSkipListMap();
System.out.println("\n=== CopyOnWriteArrayList ===");
demoCopyOnWriteArrayList();
System.out.println("\n=== ConcurrentLinkedQueue ===");
demoConcurrentLinkedQueue();
}
}3.2 并发容器性能基准测试
package com.laozhang.concurrent.collection;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 并发容器性能对比
*
* 测试:16读线程 + 4写线程,各执行100万次操作,Map大小稳定在1000个key
*
* 参考结果(JDK 11,8核机器):
* Hashtable: 约4800ms(全局锁,最慢)
* synchronizedHashMap: 约4600ms(略优于Hashtable)
* ConcurrentHashMap: 约380ms (最快,约12倍提升)
* ConcurrentSkipListMap:约620ms(有序,略慢于CHM)
*
* 测试环境:JDK 11
*/
public class ContainerBenchmark {
private static final int READER_COUNT = 16;
private static final int WRITER_COUNT = 4;
private static final int OPERATIONS = 500_000;
private static final int MAP_SIZE = 1000;
private static long benchmark(Map<String, Integer> map, String name)
throws InterruptedException {
// 预填充数据
for (int i = 0; i < MAP_SIZE; i++) {
map.put("key" + i, i);
}
CountDownLatch latch = new CountDownLatch(READER_COUNT + WRITER_COUNT);
long start = System.currentTimeMillis();
// 读线程
for (int i = 0; i < READER_COUNT; i++) {
new Thread(() -> {
Random r = new Random();
for (int op = 0; op < OPERATIONS; op++) {
map.get("key" + r.nextInt(MAP_SIZE));
}
latch.countDown();
}).start();
}
// 写线程(put覆盖,大小不变)
for (int i = 0; i < WRITER_COUNT; i++) {
new Thread(() -> {
Random r = new Random();
for (int op = 0; op < OPERATIONS / 4; op++) {
map.put("key" + r.nextInt(MAP_SIZE), r.nextInt());
}
latch.countDown();
}).start();
}
latch.await();
long elapsed = System.currentTimeMillis() - start;
System.out.printf("[%s] 耗时:%dms%n", name, elapsed);
return elapsed;
}
public static void main(String[] args) throws InterruptedException {
// 预热
benchmark(new ConcurrentHashMap<>(), "预热");
Thread.sleep(2000);
long ht = benchmark(new Hashtable<>(), "Hashtable");
Thread.sleep(1000);
long sm = benchmark(Collections.synchronizedMap(new HashMap<>()),
"synchronizedMap");
Thread.sleep(1000);
long chm = benchmark(new ConcurrentHashMap<>(), "ConcurrentHashMap");
Thread.sleep(1000);
long cslm = benchmark(new ConcurrentSkipListMap<>(), "ConcurrentSkipListMap");
System.out.printf("%n性能提升倍数(相对Hashtable):%n");
System.out.printf("ConcurrentHashMap:%.1fx%n", (double) ht / chm);
System.out.printf("ConcurrentSkipListMap:%.1fx%n", (double) ht / cslm);
}
}四、踩坑实录
坑1:ConcurrentHashMap的复合操作不是原子的
报错现象: if (!map.containsKey(key)) map.put(key, value) 在并发下出现重复put,两个线程同时通过了containsKey检查,都执行了put。
原因分析: containsKey和put是两个独立的原子操作,组合起来不是原子的。高并发下两个操作之间可能有其他线程介入。
解法: 使用putIfAbsent(key, value) 或 computeIfAbsent(key, mapper) —— 这是ConcurrentHashMap提供的原子复合操作。
坑2:ConcurrentHashMap的size()不是精确实时值
报错现象: 用map.size()控制缓存容量,但实际缓存条目超过了设定上限。
原因分析: ConcurrentHashMap的size()方法(JDK 8)使用CounterCell累加各段的计数,是一个"估算值",在高并发下可能不精确。不能用size()做精确的容量控制。
解法: 如果需要精确计数,维护一个独立的AtomicInteger计数器,或者使用LongAdder。
坑3:Collections.unmodifiableMap不是线程安全的
报错现象: 用Collections.unmodifiableMap(map)包装了一个HashMap,以为不能修改就是线程安全的,结果在读取时抛ConcurrentModificationException。
原因分析: unmodifiableMap只是防止通过这个包装视图修改,但原始的HashMap如果在其他地方被修改,迭代unmodifiableMap仍然会CME。
"不可修改"≠"线程安全"。
解法: 如果需要线程安全的只读Map,要么用ConcurrentHashMap(读写都安全),要么在"发布"前先完成所有修改,再发布(利用final语义的安全发布)。
坑4:ConcurrentHashMap的forEach不是快照迭代
报错现象: 用forEach遍历ConcurrentHashMap时,发现遍历过程中put/remove的元素有时可见有时不可见,行为不一致。
原因分析: ConcurrentHashMap的迭代器是"弱一致性"的(weakly consistent):迭代过程中,已经迭代过的部分不会被再次遍历,但正在迭代的部分对写操作可见(可能看到,也可能看不到)。不像CopyOnWriteArrayList的迭代器是快照。
这是设计上的取舍:CopyOnWrite保证快照一致性但有复制开销;CHM的弱一致性允许更高的并发。
解法: 如果需要遍历时的强一致性快照,先new HashMap<>(concurrentMap)复制一份,再遍历副本。
五、总结与延伸
选型速查卡:
| 场景 | 推荐容器 | 备注 |
|---|---|---|
| 并发Map(最常用) | ConcurrentHashMap | 首选,几乎所有Map场景 |
| 有序并发Map | ConcurrentSkipListMap | 需要范围查询/排序时 |
| 读多写少List | CopyOnWriteArrayList | 写操作 < 1次/秒 且 元素数 < 1000 |
| 事件监听器列表 | CopyOnWriteArrayList | Spring内部也这么用 |
| 高吞吐无界队列 | ConcurrentLinkedQueue | 非阻塞,需外部控制流量 |
| 阻塞有界队列 | LinkedBlockingQueue(n) | 最常用的阻塞队列 |
| 需要Key集合 | ConcurrentHashMap.newKeySet() | 比CopyOnWriteArraySet性能好 |
JDK 8+的Map强化方法(都是原子操作):
computeIfAbsent(k, fn):key不存在时计算并putcompute(k, fn):对key做原子计算(fn接收旧值,返回新值)merge(k, v, fn):key不存在时put v,存在时用fn合并putIfAbsent(k, v):key不存在时putreplace(k, old, new):只有旧值匹配时才更新
