Exchanger使用场景:两个线程之间的数据交换与流水线模式
Exchanger使用场景:两个线程之间的数据交换与流水线模式
适读人群:Java并发开发者、对不常见并发工具感兴趣的工程师 | 阅读时长:约13分钟
开篇故事
2021年,我们做了一个图片处理流水线系统:读取线程从磁盘读取原始图片,处理线程对图片做压缩/滤镜处理。这两个操作需要高效衔接,不能让读取线程或处理线程空等太久。
最初的设计是用BlockingQueue:读取线程往队列里塞,处理线程从队列取。但我们发现,每次读取的图片批次大小固定,总是恰好一个批次交给处理线程,再换一个批次。
同事老王说:这不就是Exchanger的典型场景吗?两个线程拿着各自的"桶",交换一次,读取线程拿到空桶继续填,处理线程拿到满桶继续处理。
Exchanger的特点:两个线程,分别持有一个对象,在汇合点交换,然后各自继续。比BlockingQueue更简单直接,没有队列的额外开销,完美的双缓冲(Double Buffer)实现。
一、Exchanger的核心机制
1.1 工作原理
Exchanger<V>提供一个汇合点,允许两个线程互相交换对象:
Exchanger<String> exchanger = new Exchanger<>();
// 线程1
String result1 = exchanger.exchange("从线程1来的数据");
// result1是线程2交换过来的数据
// 线程2
String result2 = exchanger.exchange("从线程2来的数据");
// result2是线程1交换过来的数据关键行为: 第一个调用exchange()的线程会阻塞,直到第二个线程也调用exchange(),此时两个线程同时获得对方的数据,然后各自继续执行。
1.2 适用场景
- 双缓冲:一个线程写入缓冲区,另一个线程处理缓冲区,交换缓冲区
- 流水线:生产者和消费者以"批次"为单位交换数据
- 遗传算法:两个"种群"互相交换基因
- 游戏:两个玩家交换游戏状态(对战场景)
1.3 不适用场景
- 超过两个线程的协调(用CyclicBarrier)
- 不对称的数据传输(一方只发不接收,用BlockingQueue)
- 需要超时的数据传输(
exchange(V, long, TimeUnit)可以设超时,但有限制)
二、核心机制解析
2.1 Exchanger的内部实现
Exchanger内部维护一个"槽位"(Slot),当第一个线程到达时,把数据存入槽位并等待;当第二个线程到达时,取出槽位中的数据,将自己的数据写入,然后唤醒第一个线程。
JDK 8对多核CPU做了优化:引入了多个槽位(arena),减少高并发下的竞争。但Exchanger本质上只支持两个线程,arena优化针对的是多对线程同时使用同一个Exchanger实例(每对线程竞争不同的arena槽)。
三、完整代码实现
3.1 双缓冲流水线:图片处理
package com.laozhang.concurrent.exchanger;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Exchanger实战:双缓冲流水线图片处理
*
* 核心思想:
* - 读取线程维护一个"写入缓冲区",不断往里填图片
* - 处理线程维护一个"处理缓冲区",不断处理里面的图片
* - 当读取线程填满/处理线程处理完后,两个线程在Exchanger处交换缓冲区
* - 避免了读取和处理的顺序等待(提高CPU利用率)
*
* 对比BlockingQueue:
* - 无需中间队列的内存开销
* - 天然批次处理(一次性交换整个缓冲区)
* - 两线程速度不匹配时,快的那个会等慢的(内置背压)
*
* 测试环境:JDK 11
*/
public class DoubleBufPipelineDemo {
private static final int BATCH_SIZE = 100; // 每批图片数量
private static final int TOTAL_BATCHES = 10;
static class ImageBatch {
private final List<byte[]> images = new ArrayList<>(BATCH_SIZE);
private int batchId = -1;
void clear() { images.clear(); batchId = -1; }
void addImage(byte[] img) { images.add(img); }
void setBatchId(int id) { this.batchId = id; }
int size() { return images.size(); }
int getBatchId() { return batchId; }
List<byte[]> getImages() { return images; }
}
public static void main(String[] args) throws InterruptedException {
Exchanger<ImageBatch> exchanger = new Exchanger<>();
// 读取线程:生成/读取图片数据
Thread readerThread = new Thread(() -> {
Random random = new Random();
ImageBatch fillBatch = new ImageBatch(); // 当前填充中的批次
for (int batchNo = 0; batchNo < TOTAL_BATCHES; batchNo++) {
fillBatch.clear();
fillBatch.setBatchId(batchNo);
// 读取一批图片(模拟磁盘IO)
for (int i = 0; i < BATCH_SIZE; i++) {
byte[] fakeImage = new byte[1024]; // 1KB假图片数据
random.nextBytes(fakeImage);
fillBatch.addImage(fakeImage);
}
System.out.printf("[Reader] 第%d批次读取完成(%d张),准备交换%n",
batchNo, fillBatch.size());
try {
long start = System.currentTimeMillis();
// 交换:把填满的批次给处理线程,拿回空批次对象继续填
fillBatch = exchanger.exchange(fillBatch, 5, TimeUnit.SECONDS);
System.out.printf("[Reader] 交换完成(等待了%dms)%n",
System.currentTimeMillis() - start);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (TimeoutException e) {
System.err.println("[Reader] 交换超时!");
break;
}
}
// 发送结束信号(空批次)
try {
fillBatch.clear();
fillBatch.setBatchId(-1); // -1表示结束
exchanger.exchange(fillBatch, 5, TimeUnit.SECONDS);
} catch (Exception e) {
Thread.currentThread().interrupt();
}
System.out.println("[Reader] 读取完成");
}, "reader");
// 处理线程:处理图片
Thread processorThread = new Thread(() -> {
// 初始给处理线程一个空批次(用于交换)
ImageBatch processBatch = new ImageBatch();
long totalProcessed = 0;
while (true) {
try {
// 交换:把空批次给读取线程,拿回满批次
processBatch = exchanger.exchange(processBatch, 5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
} catch (TimeoutException e) {
System.err.println("[Processor] 等待超时!");
break;
}
if (processBatch.getBatchId() == -1) {
System.out.println("[Processor] 收到结束信号");
break;
}
// 处理这批图片(模拟CPU密集处理)
long start = System.currentTimeMillis();
for (byte[] image : processBatch.getImages()) {
// 模拟图片压缩处理
compressImage(image);
totalProcessed++;
}
System.out.printf("[Processor] 第%d批次处理完成(%d张,耗时%dms)%n",
processBatch.getBatchId(),
processBatch.size(),
System.currentTimeMillis() - start);
}
System.out.printf("[Processor] 处理完成,共处理%d张图片%n", totalProcessed);
}, "processor");
long start = System.currentTimeMillis();
processorThread.start(); // 先启动处理线程(等待读取线程)
readerThread.start();
readerThread.join();
processorThread.join();
System.out.printf("流水线完成,总耗时:%dms%n", System.currentTimeMillis() - start);
}
static void compressImage(byte[] image) {
// 模拟处理耗时
long sum = 0;
for (byte b : image) sum += b;
// 防止JIT优化掉
}
}3.2 遗传算法中的Exchanger:两个种群交换基因
package com.laozhang.concurrent.exchanger;
import java.util.*;
import java.util.concurrent.Exchanger;
/**
* Exchanger实战:遗传算法并行优化
*
* 场景:用两个并行线程维护两个种群,周期性交换部分个体(基因交流)
* 这比单线程遗传算法更快找到全局最优解(避免局部最优)
*
* 测试环境:JDK 11
*/
public class GeneticAlgorithmDemo {
private static final int POPULATION_SIZE = 100;
private static final int GENE_LENGTH = 20;
private static final int GENERATIONS = 50;
private static final int EXCHANGE_INTERVAL = 10; // 每10代交换一次
private static final int EXCHANGE_COUNT = 10; // 每次交换10个个体
// 个体(染色体)
static class Individual {
int[] genes = new int[GENE_LENGTH];
double fitness;
Individual() {
Random r = new Random();
for (int i = 0; i < GENE_LENGTH; i++) genes[i] = r.nextInt(2);
this.fitness = calcFitness();
}
double calcFitness() {
int sum = 0;
for (int g : genes) sum += g;
return sum; // 简化:基因中1的个数作为适应度
}
}
/**
* 进化一个种群(简化版)
*/
static List<Individual> evolve(List<Individual> population, Random random) {
// 选择(按适应度)
population.sort((a, b) -> Double.compare(b.fitness, a.fitness));
List<Individual> newPop = new ArrayList<>(population.subList(0, POPULATION_SIZE / 2));
// 交叉
while (newPop.size() < POPULATION_SIZE) {
Individual parent = newPop.get(random.nextInt(newPop.size()));
Individual child = new Individual();
System.arraycopy(parent.genes, 0, child.genes, 0, GENE_LENGTH);
// 变异
if (random.nextDouble() < 0.1) {
child.genes[random.nextInt(GENE_LENGTH)] ^= 1;
}
child.fitness = child.calcFitness();
newPop.add(child);
}
return newPop;
}
public static void main(String[] args) throws InterruptedException {
Exchanger<List<Individual>> exchanger = new Exchanger<>();
double[] bestFitness = {0};
Thread population1 = new Thread(() -> {
Random random = new Random(42);
List<Individual> pop = new ArrayList<>();
for (int i = 0; i < POPULATION_SIZE; i++) pop.add(new Individual());
for (int gen = 0; gen < GENERATIONS; gen++) {
pop = evolve(pop, random);
// 每隔EXCHANGE_INTERVAL代,交换部分个体
if ((gen + 1) % EXCHANGE_INTERVAL == 0) {
// 取最优的EXCHANGE_COUNT个个体用于交换
pop.sort((a, b) -> Double.compare(b.fitness, a.fitness));
List<Individual> toExchange = new ArrayList<>(pop.subList(0, EXCHANGE_COUNT));
try {
List<Individual> received = exchanger.exchange(toExchange);
// 用接收到的个体替换最差的几个
int replaceStart = POPULATION_SIZE - EXCHANGE_COUNT;
for (int i = 0; i < EXCHANGE_COUNT; i++) {
pop.set(replaceStart + i, received.get(i));
}
System.out.printf("[种群1] 第%d代,交换后最佳适应度:%.1f%n",
gen + 1, pop.get(0).fitness);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
pop.sort((a, b) -> Double.compare(b.fitness, a.fitness));
synchronized (bestFitness) {
bestFitness[0] = Math.max(bestFitness[0], pop.get(0).fitness);
}
}, "Population-1");
Thread population2 = new Thread(() -> {
Random random = new Random(123);
List<Individual> pop = new ArrayList<>();
for (int i = 0; i < POPULATION_SIZE; i++) pop.add(new Individual());
for (int gen = 0; gen < GENERATIONS; gen++) {
pop = evolve(pop, random);
if ((gen + 1) % EXCHANGE_INTERVAL == 0) {
pop.sort((a, b) -> Double.compare(b.fitness, a.fitness));
List<Individual> toExchange = new ArrayList<>(pop.subList(0, EXCHANGE_COUNT));
try {
List<Individual> received = exchanger.exchange(toExchange);
int replaceStart = POPULATION_SIZE - EXCHANGE_COUNT;
for (int i = 0; i < EXCHANGE_COUNT; i++) {
pop.set(replaceStart + i, received.get(i));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}
pop.sort((a, b) -> Double.compare(b.fitness, a.fitness));
synchronized (bestFitness) {
bestFitness[0] = Math.max(bestFitness[0], pop.get(0).fitness);
}
}, "Population-2");
population1.start();
population2.start();
population1.join();
population2.join();
System.out.printf("最终最佳适应度:%.1f(理论最大值:%d)%n",
bestFitness[0], GENE_LENGTH);
}
}四、踩坑实录
坑1:Exchanger只有一方调用,另一方永远阻塞
报错现象: 程序hang住,一个线程一直在exchanger.exchange()里等待。
原因分析: Exchanger严格要求恰好两个线程参与每次交换。如果某一方提前退出(异常、被中断、逻辑错误),另一方会永远等待。
解法:
- 使用
exchange(V, long, TimeUnit)设置超时,防止无限等待 - 确保两个线程的循环次数匹配,或有一致的退出信号
坑2:单线程多次调用Exchanger
报错现象: 一个线程对同一个Exchanger连续调用两次exchange(),第一次自己和自己交换,返回了自己的数据,第二次又阻塞。
原因分析: Exchanger在同一个线程调用两次时,取决于是否有另一个线程也在等待。如果是单线程测试,会死锁(等待自己)。
解法: Exchanger只能用于两个不同线程之间,不要在单线程里自己和自己交换。
坑3:交换的对象被共享修改
报错现象: 线程A把列表L1交换给线程B后,线程A又继续修改L1,线程B看到的L1数据也被修改了。
原因分析: exchange()交换的是对象引用,不是对象的深拷贝。线程A交换出去L1,线程B持有L1的引用;如果线程A还保有L1的引用并修改它,线程B看到的也会变化。
这就是双缓冲模式的正确用法:交换后,线程A应该只操作它新拿到的对象,不再操作原来那个对象。
解法: 交换后,各自只操作自己持有的那个缓冲区。使用完整的双缓冲协议:线程A只写bufferA,线程B只处理bufferB,交换后角色互换。
坑4:Exchanger和超时一起用,抛TimeoutException后状态不清晰
报错现象: 线程1调用exchange(data, 1, TimeUnit.SECONDS)超时,抛TimeoutException。此时线程2正好也到了,但交换没有发生,线程2一直阻塞。
原因分析: 当一方超时退出等待时,它的数据被"撤回",对方下次的exchange()需要等新的伙伴。这意味着超时可能导致一轮完整的交换丢失,影响数据流的连续性。
解法: 超时后,需要处理"半段交换"的情况。超时的那方应该重新进行exchange()(重试),或者通知对方超时,双方都重置并重试。具体策略取决于业务场景。
五、总结与延伸
Exchanger是并发工具库里"小众但精准"的工具:
| 特性 | 值 |
|---|---|
| 参与线程数 | 严格2个 |
| 阻塞行为 | 先到的线程等待后到的 |
| 是否支持超时 | 是(exchange(V, long, TimeUnit)) |
| 适用模式 | 双缓冲、流水线、遗传算法 |
| JDK版本 | JDK 5+ |
正确使用Exchanger的关键:
- 确保两个线程以相同的频率到达交换点(不然一方会一直等)
- 交换后只操作自己新拿到的对象
- 使用超时防止无限等待
- 生产环境中加异常处理和退出机制
