Phaser灵活同步器:多阶段并行计算的精确控制
Phaser灵活同步器:多阶段并行计算的精确控制
适读人群:有CyclicBarrier使用经验、需要更灵活阶段同步的Java工程师 | 阅读时长:约14分钟
开篇故事
2022年,我们做了一个分布式数据处理任务:先并行处理多个数据分片(第一阶段),然后汇总结果(第二阶段),然后根据汇总结果再做一轮优化计算(第三阶段),如果结果达标就结束,否则重复第三阶段。
用CyclicBarrier能实现多阶段,但有个问题:每次循环(Barrier重置)参与者数量必须固定。但我们的优化阶段,如果某个分片已经达到精度要求,那个分片的线程就不再参与后续循环了——参与者数量在动态变化。
Phaser就是为这种场景设计的:参与者数量可以动态增减,每个参与者可以在任意阶段退出(arriveAndDeregister()),或者新参与者可以中途加入(register())。
一、Phaser的核心设计
1.1 Phaser vs CyclicBarrier
| 特性 | CyclicBarrier | Phaser |
|---|---|---|
| 参与者数量 | 固定 | 可动态增减 |
| 阶段数量 | 固定(每次重置就是一轮) | 可任意多阶段 |
| 等待方式 | await(),阻塞直到所有到达 | arriveAndAwaitAdvance(),或单独arrive() |
| 自定义阶段完成逻辑 | 只有barrierAction | onAdvance()钩子 |
| 提前退出 | 不支持(需要BrokenBarrierException) | arriveAndDeregister() |
| 支持树形结构 | 不支持 | 支持(多个Phaser形成父子关系,分散竞争) |
1.2 核心方法
Phaser phaser = new Phaser(N); // N个初始参与者
// 到达并等待所有人
phaser.arriveAndAwaitAdvance();
// 只到达,不等待(当前线程继续执行,不阻塞)
phaser.arrive();
// 到达并注销(退出后续阶段)
phaser.arriveAndDeregister();
// 注册新参与者
phaser.register();
// 注册多个
phaser.bulkRegister(n);
// 等待某个阶段完成(当前线程不是参与者时也可以用)
phaser.awaitAdvance(phaseNumber);
// 获取当前阶段号
phaser.getPhase();
// 获取已到达/未到达的参与者数量
phaser.getArrivedParties();
phaser.getUnarrivedParties();二、核心机制:onAdvance与阶段推进
2.1 state编码
Phaser的状态用一个long类型的state字段编码,包含四个信息:
- 阶段号(phase):高位,标识当前是第几阶段
- 参与者数(parties):中间位,总注册参与者数
- 已到达数(unarrived):低位,还没调用arrive的参与者数
- 终止标志:最高位为1时Phaser已终止
这种编码方式让一个volatile long可以存储所有状态,CAS操作时原子性修改。
2.2 分层Phaser(树形结构)
当参与者数量很多(成百上千)时,所有线程到达同一个Phaser的CAS竞争会很激烈。Phaser支持分层:
Phaser root = new Phaser();
Phaser child1 = new Phaser(root, 10); // 子Phaser,向root报告
Phaser child2 = new Phaser(root, 10);
// 子Phaser内部的10个线程协调
// 子Phaser完成后,作为一个整体向root汇报
// root看到两个子Phaser都报告完成,才触发onAdvance分层减少了对root Phaser的直接CAS操作,类似分段锁的思路。
三、完整代码实现
3.1 多阶段计算:参与者动态退出
package com.laozhang.concurrent.phaser;
import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadLocalRandom;
/**
* Phaser实战:迭代优化算法中的多阶段同步
*
* 场景:K-means聚类算法并行化
* - 第一阶段:所有线程并行初始化数据分片
* - 后续阶段:每轮迭代,并行计算每个数据点的归属
* - 如果某个线程的数据分片已收敛(变化<阈值),该线程退出后续迭代
* - 所有线程都退出或达到最大迭代次数时结束
*
* 测试环境:JDK 7+(Phaser是JDK 7新增的)
*/
public class PhaserMultiPhaseDemo {
private static final int WORKER_COUNT = 8;
private static final int MAX_ITERATIONS = 20;
private static final double CONVERGENCE_THRESHOLD = 0.01;
public static void main(String[] args) throws InterruptedException {
// 追踪是否所有线程都收敛了
boolean[] converged = new boolean[WORKER_COUNT];
Phaser phaser = new Phaser(WORKER_COUNT) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.printf("=== 阶段%d完成,当前参与者数:%d ===%n",
phase, registeredParties);
// 如果达到最大迭代次数,或者没有参与者了,终止
if (phase >= MAX_ITERATIONS - 1 || registeredParties == 0) {
System.out.println("Phaser终止:" +
(registeredParties == 0 ? "所有线程已收敛" : "达到最大迭代次数"));
return true; // 返回true终止Phaser
}
return false; // 继续下一阶段
}
};
Thread[] workers = new Thread[WORKER_COUNT];
for (int i = 0; i < WORKER_COUNT; i++) {
final int workerId = i;
workers[i] = new Thread(() -> {
// 第一阶段:初始化
double[] data = initData(workerId);
System.out.printf("[Worker%d] 初始化完成%n", workerId);
phaser.arriveAndAwaitAdvance(); // 等待所有人完成初始化
// 迭代优化阶段
double change = Double.MAX_VALUE;
while (!phaser.isTerminated()) {
// 检查是否已经收敛
if (change < CONVERGENCE_THRESHOLD) {
System.out.printf("[Worker%d] 在阶段%d收敛,退出后续迭代%n",
workerId, phaser.getPhase());
converged[workerId] = true;
phaser.arriveAndDeregister(); // 退出,不再参与后续阶段
return;
}
// 执行一轮迭代
change = iterate(workerId, data, phaser.getPhase());
System.out.printf("[Worker%d] 阶段%d完成,变化量=%.4f%n",
workerId, phaser.getPhase(), change);
// 等待本阶段所有参与者完成
int nextPhase = phaser.arriveAndAwaitAdvance();
if (nextPhase < 0) {
System.out.printf("[Worker%d] Phaser已终止,退出%n", workerId);
return;
}
}
}, "worker-" + i);
}
for (Thread w : workers) w.start();
for (Thread w : workers) w.join();
// 统计收敛情况
int convergedCount = 0;
for (boolean c : converged) if (c) convergedCount++;
System.out.printf("%n总结:%d个Worker提前收敛,%d个Worker坚持到了最后%n",
convergedCount, WORKER_COUNT - convergedCount);
}
static double[] initData(int workerId) {
double[] data = new double[100];
for (int i = 0; i < 100; i++) data[i] = ThreadLocalRandom.current().nextDouble();
return data;
}
static double iterate(int workerId, double[] data, int phase) {
// 模拟迭代计算(越往后变化量越小,模拟收敛过程)
try { Thread.sleep(10 + ThreadLocalRandom.current().nextInt(20)); }
catch (InterruptedException e) { Thread.currentThread().interrupt(); }
// 越早收敛的worker,workerId越小(模拟不同分片的收敛速度)
double baseChange = Math.max(0, 1.0 - phase * 0.1 - workerId * 0.05);
return baseChange * (0.5 + ThreadLocalRandom.current().nextDouble() * 0.5);
}
}3.2 分层Phaser:大规模并行任务
package com.laozhang.concurrent.phaser;
import java.util.concurrent.Phaser;
import java.util.concurrent.ThreadLocalRandom;
/**
* 分层Phaser:大规模并行任务减少CAS竞争
*
* 场景:100个工作线程分成10组,每组10个线程
* - 每组内用子Phaser协调
* - 所有组用根Phaser协调
* - 减少对根Phaser的直接CAS操作(从100次减少到10次)
*
* 测试环境:JDK 7+
*/
public class HierarchicalPhaserDemo {
private static final int GROUP_COUNT = 5;
private static final int WORKERS_PER_GROUP = 4;
private static final int PHASES = 3;
public static void main(String[] args) throws InterruptedException {
// 根Phaser:0个直接参与者(由子Phaser管理)
Phaser rootPhaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int parties) {
System.out.printf("[根Phaser] 阶段%d完成,参与子Phaser数:%d%n",
phase, parties);
return phase >= PHASES - 1; // 完成PHASES个阶段后终止
}
};
// 创建子Phaser和工作线程
Thread[] allWorkers = new Thread[GROUP_COUNT * WORKERS_PER_GROUP];
int workerIdx = 0;
for (int g = 0; g < GROUP_COUNT; g++) {
final int groupId = g;
// 子Phaser注册到根Phaser
Phaser childPhaser = new Phaser(rootPhaser, WORKERS_PER_GROUP);
for (int w = 0; w < WORKERS_PER_GROUP; w++) {
final int workerId = w;
allWorkers[workerIdx++] = new Thread(() -> {
for (int phase = 0; phase < PHASES; phase++) {
// 模拟工作
doWork(groupId, workerId, phase);
System.out.printf("[组%d-Worker%d] 阶段%d完成%n",
groupId, workerId, phase);
// 在子Phaser上同步(不直接CAS根Phaser)
childPhaser.arriveAndAwaitAdvance();
}
childPhaser.arriveAndDeregister();
}, String.format("group%d-worker%d", groupId, workerId));
}
}
// 启动所有线程
long start = System.currentTimeMillis();
for (Thread w : allWorkers) w.start();
for (Thread w : allWorkers) w.join();
System.out.printf("总耗时:%dms,完成%d个阶段,共%d个Worker%n",
System.currentTimeMillis() - start, PHASES,
GROUP_COUNT * WORKERS_PER_GROUP);
}
static void doWork(int groupId, int workerId, int phase) {
try {
Thread.sleep(50 + ThreadLocalRandom.current().nextInt(50));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}四、踩坑实录
坑1:arriveAndDeregister()后继续调用arrive()导致IllegalStateException
报错现象: java.lang.IllegalStateException: Attempted arrival of unregistered party
原因分析: 调用arriveAndDeregister()后,该线程已经从Phaser中注销。后续不能再调用任何arrive*方法,Phaser不认识这个"非注册参与者"。
phaser.arriveAndDeregister(); // 注销
// ... 一些其他逻辑
phaser.arriveAndAwaitAdvance(); // 错误!已经注销了解法: arriveAndDeregister()后,该线程不再参与任何Phaser操作。通常在循环退出时调用,代表线程完全退出。
坑2:onAdvance里抛异常导致Phaser永久终止
报错现象: 某次onAdvance()抛了RuntimeException,所有等待线程永远无法推进,Phaser永久挂起。
原因分析: 如果onAdvance()抛出异常,Phaser会进入终止状态(isTerminated()返回true),所有等待线程的arriveAndAwaitAdvance()返回负数。这可能不是期望的行为,导致线程在不知情的情况下退出。
解法: onAdvance()方法里做好异常处理,不让异常逃逸:
@Override
protected boolean onAdvance(int phase, int registeredParties) {
try {
doPhaseCompletion(phase);
} catch (Exception e) {
log.error("阶段{}完成时出错", phase, e);
// 决定:继续(return false)还是终止(return true)
}
return false; // 继续
}坑3:await阶段号超出范围
报错现象: phaser.awaitAdvance(phase)挂起很久,实际Phaser早已推进到更高阶段。
原因分析: awaitAdvance(phase)等待Phaser推进到phase+1。如果Phaser已经推进到phase+2甚至更高,这个调用会立刻返回(因为条件已经满足)。但如果传入的阶段号是错误的(比如当前阶段已经是5,但传入的是3),行为可能不符合预期。
解法: 确认awaitAdvance传入的阶段号是当前阶段或稍早的阶段:
int currentPhase = phaser.getPhase();
phaser.awaitAdvance(currentPhase); // 等待当前阶段完成坑4:动态注册新参与者的时机问题
报错现象: 在阶段N中途动态register()了新参与者,但阶段N在新参与者到达之前就完成了(因为旧参与者已经全部到达),新参与者实际上没有参与阶段N。
原因分析: register()是立即生效的,新参与者被注册后,需要在当前阶段到来时arrive(),否则这个阶段等待的参与者数量增加了,但新参与者还没到达,可能导致阶段推进变慢。
如果在阶段N完成后(所有旧参与者到达,阶段已推进到N+1)才register,新参与者从阶段N+1开始参与,没有问题。
解法: 动态注册最好在阶段完成后(比如在onAdvance()钩子里,或者在awaitAdvance返回后)进行,时机更可控。
五、总结与延伸
Phaser的独特优势总结:
- 动态参与者管理:最核心的优势,允许线程随时加入或退出
- 无限阶段:不需要预先定义阶段数,onAdvance()控制终止时机
- onAdvance钩子:每阶段完成时执行自定义逻辑(汇总、检查收敛等)
- 分层结构:大规模并发时减少CAS竞争
- 非阻塞arrive():允许到达后继续执行,在合适时机再等待
何时用Phaser而不是CyclicBarrier:
- 参与者数量会变化
- 需要在某些条件下提前终止循环
- 需要阶段完成时的复杂回调逻辑
- 参与者数量很多(用分层Phaser)
JDK 7+ 推荐: 在JDK 7+的新项目中,Phaser完全可以替代CyclicBarrier,功能是CyclicBarrier的超集,API更灵活。
