PriorityQueue堆源码解析:从小根堆到Top-K元素问题
PriorityQueue堆源码解析:从小根堆到Top-K元素问题
适读人群:Java后端开发,有基础数据结构知识 | 阅读时长:约15分钟 | 文章类型:源码解析+算法实战
开篇故事
去年做一个推荐系统,需要从1000万商品里实时找出按某个综合分值排列的Top-100推荐结果。当时负责这块的小王同学,直接把所有商品加载到内存,Collections.sort一把梭,然后取前100。
第一次上线,请求超时了。
日志里看到一次推荐请求耗时3.7秒——对,一次推荐接口请求,三秒多。
我去看代码,当场无语。Collections.sort对1000万条数据排序,时间复杂度O(n log n),而且是全量排序,只要前100个,后面的排序全是无效工作。
我说:小王,你知道有个东西叫堆(Heap)吗?
他说:知道啊,数据结构里学过,最大堆最小堆。
我说:那你知道Top-K问题用堆来做是O(n log k),比你的O(n log n)快多少?
他掏出手机算了一下:1000万 * log(1000万) ≈ 2.4亿,1000万 * log(100) ≈ 6600万,差了将近4倍。
对,而且k固定是100,log(100)约等于7,相当于只做了7次比较就能决定一个元素是否值得保留。
这就是PriorityQueue的价值所在。今天我把Java里的堆实现从源码层面说清楚,顺带把Top-K这个经典问题彻底讲透。
一、二叉堆的核心性质
PriorityQueue底层是二叉堆,用数组表示完全二叉树。
完全二叉树的数组映射:
- 父节点下标
i,左子节点下标2i+1,右子节点下标2i+2 - 子节点下标
i,父节点下标(i-1)/2
堆性质:
- 小根堆:每个节点的值 ≤ 其子节点的值(根节点是最小值)
- 大根堆:每个节点的值 ≥ 其子节点的值(根节点是最大值)
Java的PriorityQueue默认是小根堆,如果要大根堆,需要传入Comparator.reverseOrder()。
1.1 堆操作的时间复杂度
| 操作 | 时间复杂度 | 说明 |
|---|---|---|
| offer(入堆) | O(log n) | 上浮(siftUp) |
| poll(出堆) | O(log n) | 下沉(siftDown) |
| peek(查看堆顶) | O(1) | 直接返回queue[0] |
| remove(任意位置) | O(log n) | 需要先找到位置 |
| contains | O(n) | 线性扫描 |
| heapify(批量建堆) | O(n) | 不是O(n log n)! |
建堆的O(n)是个常被忽视的知识点,后面代码里会解释。
二、PriorityQueue内部结构与核心源码
2.1 siftUp:入堆的上浮过程
package com.laozhang.heap;
import java.util.Comparator;
/**
* PriorityQueue核心操作源码解析(基于JDK11)
* 包含siftUp、siftDown、heapify的完整实现
*/
public class HeapSourceCodeAnalysis {
/**
* JDK11 PriorityQueue.siftUpComparable 源码
*
* 为什么要分siftUpComparable和siftUpUsingComparator?
* 有Comparator时用后者,无Comparator时用前者(假设元素实现Comparable)
* 避免每次都判断comparator是否为null(热路径上的判断优化)
*/
@SuppressWarnings("unchecked")
private static <T> void siftUpComparable(int k, T x, Object[] es) {
Comparable<? super T> key = (Comparable<? super T>) x;
while (k > 0) {
int parent = (k - 1) >>> 1; // 父节点下标:(k-1)/2,用位运算加速
Object e = es[parent];
if (key.compareTo((T) e) >= 0)
break; // x >= 父节点,堆性质满足,停止
es[k] = e; // 父节点下沉
k = parent; // x继续上浮
}
es[k] = x; // 找到了x的最终位置
}
/**
* JDK11 PriorityQueue.siftDownComparable 源码
*
* 下沉:和两个子节点中较小的那个比较
* half是最后一个非叶子节点的下一个,只需要和非叶子节点比较
*/
@SuppressWarnings("unchecked")
private static <T> void siftDownComparable(int k, T x, Object[] es, int n) {
Comparable<? super T> key = (Comparable<? super T>) x;
int half = n >>> 1; // 最后一个非叶子节点的下一位:n/2
while (k < half) {
int child = (k << 1) + 1; // 左子节点:2k+1
Object c = es[child];
int right = child + 1; // 右子节点:2k+2
if (right < n && ((Comparable<? super T>) c).compareTo((T) es[right]) > 0)
c = es[child = right]; // 取左右子节点中较小的
if (key.compareTo((T) c) <= 0)
break; // x <= 较小的子节点,堆性质满足,停止
es[k] = c; // 子节点上升
k = child; // x继续下沉
}
es[k] = x;
}
/**
* JDK11 PriorityQueue.heapify 源码
*
* 建堆:O(n)而不是O(n log n)
* 从最后一个非叶子节点(size/2 - 1)开始,向上做siftDown
*
* 为什么是O(n)?
* 数学证明:所有节点的下沉高度之和 = O(n)
* 直觉:大多数节点在树的底层,下沉距离短(叶子节点不需要下沉)
*/
private static void heapify(Object[] es, int n) {
for (int i = (n >>> 1) - 1; i >= 0; i--) {
siftDownComparable(i, es[i], es, n);
}
}
}2.2 offer和poll的完整流程
package com.laozhang.heap;
import java.util.Arrays;
import java.util.Comparator;
import java.util.NoSuchElementException;
/**
* 手写最小堆实现(完整可运行版本)
* 参考JDK PriorityQueue源码,去掉Comparator分支以保持简洁
*
* 实现这个的意义:面试手写堆时不至于紧张,同时深刻理解每个操作
*/
public class MinHeap<T extends Comparable<T>> {
private Object[] queue;
private int size;
private static final int DEFAULT_INITIAL_CAPACITY = 11;
public MinHeap() {
this.queue = new Object[DEFAULT_INITIAL_CAPACITY];
}
public MinHeap(int initialCapacity) {
this.queue = new Object[Math.max(1, initialCapacity)];
}
/**
* 入堆:O(log n)
* 先放到末尾,然后上浮到正确位置
*/
public boolean offer(T e) {
if (e == null) throw new NullPointerException();
int i = size;
if (i >= queue.length)
grow(i + 1); // 扩容:类似ArrayList,小堆1.5倍,大堆+50%
size = i + 1;
if (i == 0)
queue[0] = e; // 第一个元素直接放
else
siftUp(i, e); // 放到末尾然后上浮
return true;
}
/**
* 出堆(取堆顶):O(log n)
* 1. 保存堆顶(最小值)
* 2. 把最后一个元素移到堆顶
* 3. 下沉到正确位置
*/
@SuppressWarnings("unchecked")
public T poll() {
if (size == 0) return null;
int s = --size;
T result = (T) queue[0]; // 堆顶就是最小值
T x = (T) queue[s]; // 最后一个元素
queue[s] = null; // 清除最后位置(帮助GC)
if (s != 0)
siftDown(0, x); // 从堆顶开始下沉
return result;
}
@SuppressWarnings("unchecked")
public T peek() {
return size == 0 ? null : (T) queue[0];
}
public int size() { return size; }
public boolean isEmpty() { return size == 0; }
@SuppressWarnings("unchecked")
private void siftUp(int k, T x) {
while (k > 0) {
int parent = (k - 1) >>> 1;
T e = (T) queue[parent];
if (x.compareTo(e) >= 0) break; // x不小于父节点,停止
queue[k] = e;
k = parent;
}
queue[k] = x;
}
@SuppressWarnings("unchecked")
private void siftDown(int k, T x) {
int half = size >>> 1;
while (k < half) {
int child = (k << 1) + 1;
T c = (T) queue[child];
int right = child + 1;
if (right < size && c.compareTo((T) queue[right]) > 0)
c = (T) queue[child = right];
if (x.compareTo(c) <= 0) break; // x不大于较小子节点,停止
queue[k] = c;
k = child;
}
queue[k] = x;
}
private void grow(int minCapacity) {
int oldCapacity = queue.length;
// 小堆(<64):容量翻倍+2;大堆:容量增加50%
int newCapacity = oldCapacity + ((oldCapacity < 64) ?
(oldCapacity + 2) : (oldCapacity >> 1));
queue = Arrays.copyOf(queue, newCapacity);
}
/**
* 批量建堆:O(n)
* 直接传入数组,从最后一个非叶子节点开始依次siftDown
*/
public static <T extends Comparable<T>> MinHeap<T> heapify(T[] arr) {
MinHeap<T> heap = new MinHeap<>(arr.length);
System.arraycopy(arr, 0, heap.queue, 0, arr.length);
heap.size = arr.length;
// 从最后一个非叶子节点开始向上siftDown
for (int i = (arr.length >>> 1) - 1; i >= 0; i--) {
heap.siftDown(i, arr[i]);
}
return heap;
}
}三、Top-K问题的完整解法
Top-K是堆最经典的应用场景。用什么堆是个容易搞错的点:
- 找最大的K个 → 用小根堆,堆大小维持K,堆顶是当前K个里最小的
- 找最小的K个 → 用大根堆,堆大小维持K,堆顶是当前K个里最大的
很多人反过来,想当然地用大根堆找最大K个,结果写出来的代码时间复杂度更差。
package com.laozhang.heap;
import java.util.*;
/**
* Top-K问题完整解法
* 覆盖:Top-K最大元素、数据流Top-K、带权重的Top-K
*/
public class TopKSolutions {
/**
* 解法1:找数组中最大的K个数
* 时间复杂度:O(n log k),空间复杂度:O(k)
*
* 为什么用小根堆而不是大根堆:
* 小根堆堆顶是K个里最小的,遍历时只需判断新元素是否比堆顶大
* 大 -> 入堆并弹出堆顶(维持K个最大元素)
* 小 -> 直接忽略
*
* 如果用大根堆找最大K个:全部元素都要入堆,最后pop K次,是O(n log n)
*/
public static int[] topKLargest(int[] nums, int k) {
if (k <= 0 || nums == null || nums.length == 0) return new int[0];
// 小根堆,容量K
PriorityQueue<Integer> minHeap = new PriorityQueue<>(k);
for (int num : nums) {
if (minHeap.size() < k) {
minHeap.offer(num); // 堆未满,直接入堆
} else if (num > minHeap.peek()) {
// 当前元素比堆顶(K个里最小的)大:替换堆顶
minHeap.poll();
minHeap.offer(num);
}
// 否则:当前元素 <= 堆顶,一定不在Top-K里,跳过
}
// 堆里就是Top-K最大元素(顺序不保证)
return minHeap.stream().mapToInt(Integer::intValue).toArray();
}
/**
* 解法2:数据流中的第K大元素(LeetCode 703)
* 无限数据流,实时维护第K大的元素
* 经典Spring Boot服务化实现
*/
public static class KthLargestInStream {
private final PriorityQueue<Integer> minHeap;
private final int k;
/**
* @param k 要找第几大
* @param nums 初始数组(可为空)
*/
public KthLargestInStream(int k, int[] nums) {
this.k = k;
this.minHeap = new PriorityQueue<>(k); // 小根堆,容量K
for (int num : nums) add(num);
}
/**
* 添加新元素,返回当前第K大的值
* 时间复杂度:O(log k)
*/
public int add(int val) {
minHeap.offer(val);
if (minHeap.size() > k)
minHeap.poll(); // 弹出最小的,维持堆大小为K
return minHeap.peek(); // 堆顶就是第K大
}
}
/**
* 解法3:带权重的TopK——按综合得分找推荐商品
* 真实业务场景:score = 点击率 * w1 + 转化率 * w2 + 新鲜度 * w3
*/
public static class Product implements Comparable<Product> {
String id;
double score;
String name;
Product(String id, String name, double score) {
this.id = id; this.name = name; this.score = score;
}
@Override
public int compareTo(Product other) {
// 注意:compareTo返回正值表示this > other
// PriorityQueue是小根堆,所以这里用Double.compare(this, other)
// 堆顶是得分最小的(K个里最小的),方便过滤
return Double.compare(this.score, other.score);
}
@Override
public String toString() {
return String.format("%s(%.2f)", name, score);
}
}
/**
* 从1000万商品里找Top-100推荐商品
* 这就是当年小王那个案例的正确解法
*
* 测试数据:
* 1000万商品,全量排序 Collections.sort:约3.7秒
* Top-100,堆方法:约650ms(提升约5.7倍)
* 据我实测(JDK17,16核,数据在内存中)
*/
public static List<Product> top100Products(List<Product> allProducts) {
int k = 100;
// 小根堆,容量100,堆顶是100个里得分最低的
PriorityQueue<Product> heap = new PriorityQueue<>(k);
for (Product p : allProducts) {
if (heap.size() < k) {
heap.offer(p);
} else if (p.score > heap.peek().score) {
heap.poll(); // 弹出当前100个里最差的
heap.offer(p); // 换入更好的
}
}
// heap里是Top-100,按得分降序排列输出
List<Product> result = new ArrayList<>(heap);
result.sort((a, b) -> Double.compare(b.score, a.score));
return result;
}
/**
* 解法4:多路归并(K个有序数组合并)
* 场景:分布式Top-K聚合,每个节点返回本地Top-K,合并为全局Top-K
*/
public static int[] mergeKSortedArrays(int[][] arrays) {
// 小根堆:存储(值, 来自哪个数组, 该数组当前索引)
PriorityQueue<int[]> heap = new PriorityQueue<>((a, b) -> a[0] - b[0]);
// 把每个数组的第一个元素入堆
for (int i = 0; i < arrays.length; i++) {
if (arrays[i].length > 0)
heap.offer(new int[]{arrays[i][0], i, 0});
}
List<Integer> result = new ArrayList<>();
while (!heap.isEmpty()) {
int[] curr = heap.poll();
result.add(curr[0]); // 当前最小值
// 把该数组的下一个元素入堆
int arrIdx = curr[1], elemIdx = curr[2];
if (elemIdx + 1 < arrays[arrIdx].length) {
heap.offer(new int[]{arrays[arrIdx][elemIdx + 1], arrIdx, elemIdx + 1});
}
}
return result.stream().mapToInt(Integer::intValue).toArray();
}
public static void main(String[] args) {
// 测试Top-K最大
int[] nums = {3, 2, 1, 5, 6, 4, 7, 8, 9, 10};
int[] top3 = topKLargest(nums, 3);
System.out.println("Top-3最大: " + Arrays.toString(top3)); // [8, 9, 10](顺序不定)
// 测试数据流K大
KthLargestInStream stream = new KthLargestInStream(3, new int[]{4, 5, 8, 2});
System.out.println("第3大: " + stream.add(3)); // 4
System.out.println("第3大: " + stream.add(5)); // 5
System.out.println("第3大: " + stream.add(10)); // 5
System.out.println("第3大: " + stream.add(9)); // 8
// 测试多路归并
int[][] sortedArrays = {{1, 4, 7}, {2, 5, 8}, {3, 6, 9}};
System.out.println("多路归并: " + Arrays.toString(mergeKSortedArrays(sortedArrays)));
// [1, 2, 3, 4, 5, 6, 7, 8, 9]
}
}四、踩坑实录
坑1:PriorityQueue的remove(Object)是O(n),不是O(log n)
报错现象:用PriorityQueue实现一个任务调度器,需要取消任务(按任务ID删除)。代码写完后,任务数量增大到10万时,取消操作耗时从微秒飙到几十毫秒。
根本原因:PriorityQueue.remove(Object)需要线性扫描找到对应元素(因为堆不支持随机访问),时间复杂度O(n)。不是O(log n)!
具体解法:
// 错误:频繁remove(Object)
PriorityQueue<Task> pq = new PriorityQueue<>();
pq.remove(specificTask); // O(n)!
// 正确做法1:懒删除(Lazy Deletion)
// 把要删除的任务标记为"已取消",出堆时跳过
Set<String> cancelledIds = new HashSet<>();
public Task nextTask(PriorityQueue<Task> pq) {
while (!pq.isEmpty()) {
Task t = pq.peek();
if (cancelledIds.contains(t.getId())) {
pq.poll(); // 跳过已取消的任务
} else {
return pq.poll();
}
}
return null;
}
// 正确做法2:用Map记录任务在堆中的位置(需要自实现堆)
// TreeMap也可以,但多了一层排序开销坑2:自定义对象入堆没实现Comparable,运行时ClassCastException
报错现象:
java.lang.ClassCastException: class MyTask cannot be cast to class java.lang.Comparable测试环境完全正常,上线后某个新类型的任务入堆时报错。
根本原因:JDK的PriorityQueue在只有一个元素时不触发比较(直接入堆),第二个元素入堆时才触发compareTo,此时才抛异常。测试时只用了单元素的case,完全没触发。
具体解法:
// 错误:MyTask没有实现Comparable,也没有传Comparator
class MyTask {
int priority;
String name;
// 忘记实现Comparable!
}
PriorityQueue<MyTask> pq = new PriorityQueue<>(); // 运行时第二次offer才爆
pq.offer(new MyTask()); // 第一次不报错,骗了你
pq.offer(new MyTask()); // 第二次:ClassCastException
// 正确:要么实现Comparable,要么传Comparator
PriorityQueue<MyTask> pq = new PriorityQueue<>(
Comparator.comparingInt(t -> t.priority)
);
// 或者:
class MyTask implements Comparable<MyTask> {
int priority;
@Override
public int compareTo(MyTask other) {
return Integer.compare(this.priority, other.priority);
}
}坑3:用PriorityQueue做大根堆,Comparator写错了
这个坑每次面试手写代码都有人犯。
报错现象:想找Top-K最大元素,用大根堆实现,结果poll出来的一直是小的数,Top-K结果是最小的K个。
根本原因:PriorityQueue默认是小根堆(最小的在堆顶,先poll出来)。要实现大根堆,需要传入反转的Comparator。
具体解法:
// 错误:以为这样就是大根堆
PriorityQueue<Integer> wrongMaxHeap = new PriorityQueue<>((a, b) -> a - b); // 这还是小根堆!
// a-b > 0 表示 a > b,Comparator返回正数时认为a比b"大"(在小根堆里会往后排)
// 所以 a-b 作为Comparator等价于自然升序,还是小根堆
// 正确大根堆写法
PriorityQueue<Integer> maxHeap1 = new PriorityQueue<>(Comparator.reverseOrder()); // 最清晰
PriorityQueue<Integer> maxHeap2 = new PriorityQueue<>((a, b) -> b - a); // b-a:降序
PriorityQueue<Integer> maxHeap3 = new PriorityQueue<>((a, b) -> Integer.compare(b, a)); // 最安全(避免溢出)
// 注意:(a,b)->a-b 当a接近Integer.MIN_VALUE, b接近Integer.MAX_VALUE时会整数溢出!
// 永远用 Integer.compare(a,b) 而不是 a-b五、总结与延伸
PriorityQueue的核心是siftUp和siftDown,每次操作最多走树高(log n)步。 建堆的O(n)不是直觉,但可以用数学证明:所有节点下沉高度之和是O(n)。理解了这个,
Collections.sortvs 堆的选择就很清晰了。Top-K找最大用小根堆,找最小用大根堆。 这是反直觉的,但逻辑清晰:小根堆堆顶是K个候选里最差的,用它做"准入门槛"。记住这一点,Top-K系列题目不会再出错。
PriorityQueue在并发场景下不是线程安全的,需要用PriorityBlockingQueue。 而且
remove(Object)是O(n),频繁删除特定元素的场景要用"懒删除"或自实现支持O(log n)删除的索引堆。
