ArrayBlockingQueue vs LinkedBlockingQueue:有界无界与内存布局
ArrayBlockingQueue vs LinkedBlockingQueue:有界无界与内存布局
适读人群:Java后端工程师、关心生产者-消费者场景性能的开发者 | 阅读时长:约15分钟
开篇故事
2021年,我们的消息处理系统做了一次架构升级,核心是一个内存队列,消息从上游系统流入,多个消费线程从队列取出处理。
第一版用的是LinkedBlockingQueue(有界,容量5000),上线稳定跑了几个月。
后来做容量规划时,同事提出改用ArrayBlockingQueue,理由是"数组比链表的内存布局更紧凑,缓存友好"。我们在测试环境压测,发现两者性能差别不大,ArrayBlockingQueue的吞吐量甚至比LinkedBlockingQueue还略低一点。
这个结果出乎意料。研究下来发现,两者的锁设计是关键差异——LinkedBlockingQueue用了两把锁(putLock + takeLock),而ArrayBlockingQueue只有一把锁。
锁分离让LinkedBlockingQueue的生产者和消费者在很多场景下不互相阻塞,整体吞吐量更高。
今天把这两个队列的底层设计讲清楚。
一、核心设计差异
1.1 数据结构
ArrayBlockingQueue:
- 内部使用
Object[]数组存储元素 - 维护
takeIndex(出队指针)和putIndex(入队指针) - 环形缓冲区设计:指针到达数组末尾后回绕到开头
- 一把
ReentrantLock控制读写(putLock == takeLock == lock)
LinkedBlockingQueue:
- 内部使用单向链表(
Node<E>)存储元素,head和last指针 - 两把
ReentrantLock:putLock(入队锁)和takeLock(出队锁) - 两个
Condition:notEmpty(等待非空的消费者)和notFull(等待非满的生产者) count字段是AtomicInteger(要被两把锁分别访问,用原子整数保证一致性)
1.2 容量设计
ArrayBlockingQueue:必须指定容量,且容量固定(创建时分配好数组)LinkedBlockingQueue:可选指定容量,默认是Integer.MAX_VALUE(实际无界)
二、核心机制深度解析
2.1 LinkedBlockingQueue的双锁设计
LinkedBlockingQueue最精妙的设计是入队和出队使用不同的锁,因为链表的head和tail是独立的:
- 入队操作只访问
last指针(链表尾部) - 出队操作只访问
head指针(链表头部) - 两个操作互相不干扰(前提是队列不空不满)
唯一的共享状态是count(元素数量),用AtomicInteger处理,避免需要全局锁。
但当队列满或空时,生产者/消费者需要等待,这时涉及跨锁的signal操作:
// LinkedBlockingQueue.put()的简化逻辑
public void put(E e) throws InterruptedException {
Node<E> node = new Node<>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
notFull.await(); // 队满,等待非满信号
}
enqueue(node);
int c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal(); // 还有空间,唤醒其他生产者
} finally {
putLock.unlock();
}
// 关键:如果count从0变成1,需要通知消费者(跨锁信号)
if (c == 0)
signalNotEmpty(); // 在takeLock下唤醒消费者
}2.2 内存布局对比
ArrayBlockingQueue:数组元素连续存储在内存中。CPU预取缓存行时,连续的元素会被一起加载到L1/L2缓存。遍历访问效率高(缓存命中率高)。但初始化时需要一次性分配所有空间。
LinkedBlockingQueue:每个Node对象分散在堆中,节点之间只有指针关联。访问元素需要追随指针,可能导致缓存未命中(cache miss)。每个Node还需要额外的对象头开销(16字节)和next指针(4/8字节)。
在现代CPU(L3缓存8-32MB)上,如果队列里的元素不大(几十字节以内),ArrayBlockingQueue的缓存命中优势比较明显;但如果元素很大(KB级别),缓存命中优势消失。
2.3 GC影响
ArrayBlockingQueue:出队时将items[takeIndex]设为null(帮助GC),但数组本身持续存活,不产生新的垃圾。GC压力低。
LinkedBlockingQueue:每次出队,head Node成为垃圾,需要GC回收。高吞吐下,每秒可能产生几十万个Node垃圾对象,对Young GC有一定压力。
三、完整代码实现
3.1 性能对比测试
package com.laozhang.concurrent.queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicLong;
/**
* ArrayBlockingQueue vs LinkedBlockingQueue性能对比
*
* 测试场景:
* 4个生产者线程,4个消费者线程
* 每个生产者生产100万条消息
* 队列容量:1000
*
* 测试环境:JDK 11,8核机器
* 参考结果:
* LinkedBlockingQueue:约3200ms(双锁,生产消费并发)
* ArrayBlockingQueue:约4100ms(单锁,生产消费互斥)
*
* 注意:结果受GC和JIT影响,需要预热后测试
*/
public class BlockingQueuePerformanceTest {
private static final int PRODUCERS = 4;
private static final int CONSUMERS = 4;
private static final int MESSAGES_PER_PRODUCER = 1_000_000;
private static final int QUEUE_CAPACITY = 1000;
static class Message {
final int producerId;
final int seqNo;
final long timestamp;
Message(int producerId, int seqNo) {
this.producerId = producerId;
this.seqNo = seqNo;
this.timestamp = System.nanoTime();
}
}
private static long test(BlockingQueue<Message> queue, String name)
throws InterruptedException {
CountDownLatch producerDone = new CountDownLatch(PRODUCERS);
CountDownLatch consumerDone = new CountDownLatch(CONSUMERS);
AtomicLong totalMessages = new AtomicLong(0);
long totalToConsume = (long) PRODUCERS * MESSAGES_PER_PRODUCER;
// 启动消费者
for (int i = 0; i < CONSUMERS; i++) {
new Thread(() -> {
long consumed = 0;
while (consumed < totalToConsume / CONSUMERS) {
try {
Message msg = queue.poll(1, TimeUnit.SECONDS);
if (msg != null) {
consumed++;
totalMessages.incrementAndGet();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
consumerDone.countDown();
}, name + "-consumer").start();
}
long start = System.currentTimeMillis();
// 启动生产者
for (int i = 0; i < PRODUCERS; i++) {
final int producerId = i;
new Thread(() -> {
for (int seq = 0; seq < MESSAGES_PER_PRODUCER; seq++) {
try {
queue.put(new Message(producerId, seq));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
producerDone.countDown();
}, name + "-producer").start();
}
producerDone.await();
consumerDone.await();
long elapsed = System.currentTimeMillis() - start;
System.out.printf("[%s] 耗时:%dms,消息数:%d,吞吐量:%,.0f/s%n",
name, elapsed, totalMessages.get(),
totalMessages.get() * 1000.0 / elapsed);
return elapsed;
}
public static void main(String[] args) throws InterruptedException {
// 预热
test(new LinkedBlockingQueue<>(QUEUE_CAPACITY), "预热");
Thread.sleep(3000);
// 正式测试
long lbqTime = test(new LinkedBlockingQueue<>(QUEUE_CAPACITY), "LinkedBlockingQueue");
Thread.sleep(2000);
long abqTime = test(new ArrayBlockingQueue<>(QUEUE_CAPACITY), "ArrayBlockingQueue");
Thread.sleep(2000);
System.out.printf("%n对比:LinkedBlockingQueue vs ArrayBlockingQueue = %d vs %dms%n",
lbqTime, abqTime);
System.out.printf("LBQ比ABQ快:%.1f%%%n",
100.0 * (abqTime - lbqTime) / abqTime);
}
}3.2 实际使用:生产者-消费者模式
package com.laozhang.concurrent.queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 实用的生产者-消费者实现
*
* 选型建议:
* - 需要严格有界(防OOM):ArrayBlockingQueue(必须有界)或LinkedBlockingQueue(capacity)
* - 高吞吐生产消费并发:LinkedBlockingQueue
* - 需要按优先级处理:PriorityBlockingQueue(无界!小心OOM)
* - 延迟处理:DelayQueue
*
* 这里展示一个带监控的LinkedBlockingQueue生产消费示例
* 测试环境:JDK 11
*/
public class ProducerConsumerDemo {
// 订单处理队列(有界,容量500)
private final LinkedBlockingQueue<String> orderQueue =
new LinkedBlockingQueue<>(500);
private final AtomicInteger produced = new AtomicInteger(0);
private final AtomicInteger consumed = new AtomicInteger(0);
private volatile boolean running = true;
/**
* 生产者:模拟接收上游订单
*/
public void startProducer(int producerCount) {
for (int i = 0; i < producerCount; i++) {
final int id = i;
Thread t = new Thread(() -> {
while (running) {
try {
String order = "ORDER-" + id + "-" + produced.incrementAndGet();
// put:队满则阻塞(背压)
boolean offered = orderQueue.offer(order, 100, TimeUnit.MILLISECONDS);
if (!offered) {
System.out.printf("[生产者%d] 队列满,丢弃:%s%n", id, order);
}
Thread.sleep(2);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "producer-" + i);
t.setDaemon(true);
t.start();
}
}
/**
* 消费者:处理订单
*/
public void startConsumer(int consumerCount) {
for (int i = 0; i < consumerCount; i++) {
final int id = i;
Thread t = new Thread(() -> {
while (running || !orderQueue.isEmpty()) {
try {
String order = orderQueue.poll(500, TimeUnit.MILLISECONDS);
if (order != null) {
// 模拟处理耗时
Thread.sleep(5);
consumed.incrementAndGet();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "consumer-" + i);
t.setDaemon(true);
t.start();
}
}
/**
* 打印实时状态
*/
public void printStats() {
System.out.printf("生产:%d,消费:%d,队列积压:%d,队列使用率:%.1f%%%n",
produced.get(), consumed.get(),
orderQueue.size(),
100.0 * orderQueue.size() / 500);
}
public static void main(String[] args) throws InterruptedException {
ProducerConsumerDemo demo = new ProducerConsumerDemo();
// 3个生产者,5个消费者
demo.startProducer(3);
demo.startConsumer(5);
// 运行5秒,每秒打印状态
for (int i = 0; i < 5; i++) {
Thread.sleep(1000);
demo.printStats();
}
demo.running = false;
Thread.sleep(1000);
System.out.printf("%n最终:生产%d,消费%d,队列剩余%d%n",
demo.produced.get(), demo.consumed.get(), demo.orderQueue.size());
}
}四、踩坑实录
坑1:LinkedBlockingQueue默认无界导致OOM
报错现象: 系统在某次流量高峰时OOM崩溃,堆dump中发现LinkedBlockingQueue的Node对象占了几个GB。
原因分析: new LinkedBlockingQueue<>()(无参构造)默认容量是Integer.MAX_VALUE,实际是无界队列。消费者处理速度跟不上生产者时,队列无限积压,最终内存耗尽。
// 危险:无界队列,OOM风险
BlockingQueue<Message> queue = new LinkedBlockingQueue<>();
// 安全:有界队列
BlockingQueue<Message> queue = new LinkedBlockingQueue<>(1000);解法: 永远为LinkedBlockingQueue指定容量,配合合理的拒绝策略(背压/丢弃/降级)。
坑2:ArrayBlockingQueue的公平模式性能很差
报错现象: 使用了new ArrayBlockingQueue<>(1000, true)(公平模式),吞吐量比非公平模式低了60%以上。
原因分析: ArrayBlockingQueue支持可选的公平模式(FIFO,等待最久的线程优先获锁)。公平模式底层用new ReentrantLock(true),启用AQS公平锁,每次获锁都要检查等待队列,开销远大于非公平模式的直接CAS。
大多数场景不需要严格公平,公平模式徒增开销。
解法: 除非业务强制需要公平,否则用默认的非公平模式(new ArrayBlockingQueue<>(capacity))。
坑3:drainTo操作的边界情况
报错现象: 使用queue.drainTo(list, maxElements)批量取元素,发现取出的元素数可能少于maxElements,但循环里没有处理这种情况,导致部分消息漏处理。
原因分析: drainTo一次性取尽队列中当前存在的元素(最多maxElements个)。如果队列当前不足maxElements个元素,实际取出的数量 < maxElements,这是正常行为。
// 期望取100个,但队列当前只有37个,actual=37
int actual = queue.drainTo(list, 100);
if (actual < 100) {
// 需要处理取到的actual个元素,而不是等到满100个
}解法: 处理drainTo返回的实际数量,不要假设一定等于maxElements。
坑4:take和put与shutdown的竞争
报错现象: 服务关闭时,消费者线程抛InterruptedException,但消息已经入队还没有被消费,消息丢失。
原因分析: 关闭服务时,通常会interrupt()工作线程。消费者线程在take()阻塞时收到中断,抛出InterruptedException,如果catch后直接return,队列中剩余消息就丢了。
解法:
// 消费者线程的正确关闭逻辑
Thread consumer = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
Message msg = queue.poll(100, TimeUnit.MILLISECONDS);
if (msg != null) process(msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// 不立即退出,先排空队列
}
}
// 退出前排空队列
List<Message> remaining = new ArrayList<>();
queue.drainTo(remaining);
remaining.forEach(this::process);
});五、总结与延伸
选型决策:
| 队列类型 | 适合场景 |
|---|---|
LinkedBlockingQueue(capacity) | 高吞吐生产消费、IO密集消费者 |
ArrayBlockingQueue(capacity) | 内存敏感场景(无动态分配)、需要预分配内存 |
SynchronousQueue | 直接移交,不需要缓冲 |
PriorityBlockingQueue | 按优先级处理任务(注意无界!) |
DelayQueue | 延迟任务调度 |
LinkedTransferQueue | JDK 7+,比LinkedBlockingQueue更高性能的无界队列 |
JDK 7新增LinkedTransferQueue:底层使用无锁的CAS+链表,在高并发场景下吞吐量优于LinkedBlockingQueue,但无界,需要外部控制容量。
