Stream懒加载内部实现:为什么你的filter没有执行几次
Stream懒加载内部实现:为什么你的filter没有执行几次
适读人群:Java中级开发者,用过Stream但想了解原理 | 阅读时长:约15分钟 | 文章类型:源码深度解析
开篇故事
有次同事小林来问我一个问题,说他加了很多日志想调试Stream的执行顺序,结果看懵了:
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
long count = numbers.stream()
.filter(n -> { System.out.println("filter: " + n); return n % 2 == 0; })
.map(n -> { System.out.println("map: " + n); return n * 2; })
.limit(3)
.count();他说:我以为先打10个filter,再打5个map(偶数),实际输出完全不是这样。
实际输出是:
filter: 1
filter: 2
map: 2
filter: 3
filter: 4
map: 4
filter: 5
filter: 6
map: 6然后就结束了,count=3。他数了数:filter只执行了6次(不是10次),map只执行了3次(不是5次)。
我说:这就是Stream的懒加载(Lazy Evaluation)。
他:什么是懒加载?
我说:Stream的中间操作(filter/map/sorted等)不会立即执行,而是记录在一个操作链里。真正开始执行是在遇到终止操作(count/collect/forEach等)时,而且是按元素一个一个穿过整个操作链,而不是一个操作处理完所有元素再到下一个操作。这才是为什么filter、map是交替出现的。
然后配合limit(3),只需要3个通过filter和map的元素,找到第3个就立刻停止,不再处理后面的元素。所以filter只执行了6次,而不是10次。
这个懒加载设计是Stream性能的关键,今天我们把内部实现说清楚。
一、Stream操作的两种类型
| 类型 | 示例 | 是否立即执行 | 是否消耗元素 |
|---|---|---|---|
| 中间操作(Intermediate) | filter, map, sorted, limit, skip | 否,只记录 | 否 |
| 终止操作(Terminal) | collect, count, forEach, findFirst | 是,触发整个Pipeline | 是 |
关键原则:没有终止操作,Stream什么都不做。
// 这段代码不会打印任何东西!
Stream<Integer> stream = Stream.of(1, 2, 3)
.filter(n -> { System.out.println("filter: " + n); return true; });
// filter操作只是被"记录"了,没有执行
// 加上终止操作才会触发执行
stream.collect(java.util.stream.Collectors.toList()); // 现在才执行filter二、Stream内部Pipeline的实现原理
2.1 ReferencePipeline的内部结构
package com.laozhang.stream;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
/**
* Stream懒加载原理解析(基于JDK11)
*
* Stream操作链的本质:
* 每个中间操作创建一个新的ReferencePipeline对象
* 这些对象形成一条链(双向链表结构)
* 终止操作触发时,从尾到头构建Sink链,然后驱动数据穿过
*/
public class StreamLazyEvaluation {
/**
* 简化版的Sink链演示
* Sink = "水槽",元素像水一样从源头流经每个Sink
*
* JDK源码里:Sink接口是Stream.reduce/collect等操作的核心抽象
* 每个中间操作的accept方法决定是否将元素传递给下游Sink
*/
static abstract class Sink<T> implements Consumer<T> {
Sink<T> downstream; // 下游Sink(链表结构)
Sink(Sink<T> downstream) {
this.downstream = downstream;
}
// 开始处理(初始化状态)
void begin(long size) {
if (downstream != null) downstream.begin(size);
}
// 处理完成
void end() {
if (downstream != null) downstream.end();
}
// 是否可以取消(用于short-circuit操作如limit/findFirst)
boolean cancellationRequested() {
return false;
}
}
/**
* Filter操作的Sink:不满足条件的元素不传给下游
*/
static class FilterSink<T> extends Sink<T> {
private final Predicate<T> predicate;
FilterSink(Predicate<T> predicate, Sink<T> downstream) {
super(downstream);
this.predicate = predicate;
}
@Override
public void accept(T t) {
System.out.println(" FilterSink.accept: " + t);
if (predicate.test(t)) {
downstream.accept(t); // 满足条件,传给下游
// 不满足就不传,这就是filter的语义
}
}
}
/**
* Map操作的Sink:把元素转换后传给下游
*/
@SuppressWarnings("unchecked")
static class MapSink<T, R> extends Sink {
private final Function<T, R> mapper;
private final Sink<R> downstream;
MapSink(Function<T, R> mapper, Sink<R> downstream) {
super(null); // 强制用自己的downstream字段
this.mapper = mapper;
this.downstream = downstream;
}
@Override
public void accept(Object t) {
System.out.println(" MapSink.accept: " + t);
downstream.accept(mapper.apply((T)t)); // 转换后传给下游
}
}
/**
* 终止Sink:收集最终结果
*/
static class CollectSink<T> extends Sink<T> {
private final List<T> result = new java.util.ArrayList<>();
CollectSink() { super(null); }
@Override
public void accept(T t) {
System.out.println(" CollectSink.accept: " + t);
result.add(t);
}
public List<T> getResult() { return result; }
}
/**
* 演示Stream的执行顺序
* 手动构建Sink链,模拟JDK Stream的执行过程
*/
public static void demonstrateExecutionOrder() {
System.out.println("=== 手动模拟Stream执行顺序 ===");
// 构建Sink链(从尾到头构建,JDK里叫wrapSink)
CollectSink<Integer> collectSink = new CollectSink<>();
MapSink<Integer, Integer> mapSink = new MapSink<>(n -> n * 2, collectSink);
FilterSink<Integer> filterSink = new FilterSink<>(n -> n % 2 == 0, (Sink<Integer>)(Object)mapSink);
// 驱动数据源(模拟forEach)
List<Integer> source = Arrays.asList(1, 2, 3, 4);
System.out.println("开始处理...");
for (Integer num : source) {
System.out.println("处理元素: " + num);
filterSink.accept(num);
}
System.out.println("结果: " + collectSink.getResult());
// 输出验证:每个元素是"一穿到底"的,不是批量处理
}
/**
* 真实的JDK Stream验证:懒加载+short-circuit
*/
public static void verifyLazyEvaluation() {
System.out.println("\n=== JDK Stream执行顺序验证 ===");
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
System.out.println("filter+map+limit(3).count():");
long count = numbers.stream()
.filter(n -> { System.out.println(" filter: " + n); return n % 2 == 0; })
.map(n -> { System.out.println(" map: " + n); return n * 2; })
.limit(3)
.count();
System.out.println("count=" + count);
// 只打印filter 1-6,map 2/4/6,然后停止
// 因为limit(3)触发short-circuit,找到3个就停
}
public static void main(String[] args) {
demonstrateExecutionOrder();
verifyLazyEvaluation();
}
}2.2 有状态操作与无状态操作的区别
package com.laozhang.stream;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
/**
* Stream有状态操作(Stateful)vs 无状态操作(Stateless)
*
* 无状态:filter, map, peek, flatMap
* 每个元素独立处理,不需要记录之前元素的信息
*
* 有状态:sorted, distinct, limit, skip
* 需要记录之前元素的信息才能处理当前元素
*
* 有状态操作会"打断"流水线,可能需要等所有元素到齐才能继续
*/
public class StatefulVsStatelessOps {
/**
* sorted是有状态的:必须等所有元素都到达才能排序
* 这意味着sorted会把流水线"卡住"
*
* filter()+map():流水线正常,元素一个一个穿过
* sorted()之后的操作:必须等sorted收集完所有元素再继续
*/
public static void sortedBreaksPipeline() {
List<Integer> nums = Arrays.asList(5, 3, 1, 4, 2);
System.out.println("=== filter -> sorted -> map 执行顺序 ===");
List<Integer> result = nums.stream()
.filter(n -> {
System.out.println("filter: " + n);
return n > 1;
})
.sorted((a, b) -> {
System.out.println("sorted comparing: " + a + " vs " + b);
return a - b;
})
.map(n -> {
System.out.println("map: " + n);
return n * 10;
})
.collect(Collectors.toList());
System.out.println("结果: " + result);
// 输出顺序:先是所有filter(4个通过的),然后是多次sorted比较,最后是所有map
// sorted打断了流水线!
}
/**
* 性能陷阱:错误的sorted位置
* sorted放在filter之前:对所有元素排序(开销大)
* sorted放在filter之后:只对通过filter的元素排序(开销小)
*/
public static void sortedPositionMatters() {
List<Integer> bigList = java.util.stream.IntStream.range(0, 1_000_000)
.boxed().collect(Collectors.toList());
// 错误:先排序再过滤(排100万个元素)
long start = System.currentTimeMillis();
long count1 = bigList.stream()
.sorted() // 先排序100万个
.filter(n -> n % 100 == 0) // 再过滤,只剩1万个
.count();
System.out.println("先排序后过滤: " + (System.currentTimeMillis() - start) + "ms, count=" + count1);
// 据我测试JDK17:约350ms
// 正确:先过滤再排序(只排1万个元素)
start = System.currentTimeMillis();
long count2 = bigList.stream()
.filter(n -> n % 100 == 0) // 先过滤,只剩1万个
.sorted() // 再排序1万个
.count();
System.out.println("先过滤后排序: " + (System.currentTimeMillis() - start) + "ms, count=" + count2);
// 据我测试JDK17:约15ms,快约23倍
}
/**
* distinct()的内部实现:用HashSet去重
* 每个元素都要查HashSet,开销是O(1)均摊,但有内存开销
* 不适合超大流(HashSet会占很多内存)
*/
public static void distinctInternals() {
// distinct()等价于维护一个内部HashSet
// 只有不在HashSet中的元素才传给下游
List<Integer> withDuplicates = Arrays.asList(1, 2, 2, 3, 3, 3, 4);
List<Integer> distinct = withDuplicates.stream()
.distinct() // 内部:HashSet<Object> seen = new HashSet<>()
.collect(Collectors.toList());
System.out.println("distinct结果: " + distinct); // [1, 2, 3, 4]
// 大数据量场景:distinct()内存占用和元素数成正比
// 1000万不重复整数 distinct():约240MB HashSet内存
// 这种场景建议用外部排序或BitSet去重
}
public static void main(String[] args) {
sortedBreaksPipeline();
System.out.println();
sortedPositionMatters();
System.out.println();
distinctInternals();
}
}四、踩坑实录
坑1:Stream被消费后不能再次使用
报错现象:
java.lang.IllegalStateException: stream has already been operated upon or closed在某个方法里拿到一个Stream,分别传给两个逻辑处理,第二次使用时报错。
根本原因:Stream是一次性的,终止操作调用后,Stream就处于"关闭"状态,不能再次使用。这是Stream设计的基本约束。
具体解法:
// 错误:复用同一个Stream
Stream<String> stream = list.stream().filter(s -> s.length() > 3);
long count = stream.count(); // 消费了stream
stream.forEach(System.out::println); // 报 IllegalStateException!
// 正确做法1:每次从源头重新创建Stream
long count = list.stream().filter(s -> s.length() > 3).count();
list.stream().filter(s -> s.length() > 3).forEach(System.out::println);
// 正确做法2:先collect成集合,再操作
List<String> filtered = list.stream().filter(s -> s.length() > 3).collect(Collectors.toList());
long count = filtered.size();
filtered.forEach(System.out::println);
// 正确做法3:用Supplier延迟创建
Supplier<Stream<String>> streamSupplier = () -> list.stream().filter(s -> s.length() > 3);
long count = streamSupplier.get().count();
streamSupplier.get().forEach(System.out::println);坑2:parallel Stream的执行顺序不确定,不能依赖顺序
报错现象:把一个普通Stream改成parallelStream()后,业务处理结果出现了不一致,因为一些代码依赖了处理顺序。
根本原因:parallelStream()使用ForkJoinPool.commonPool()并行处理元素,元素的处理顺序不确定。如果你的逻辑依赖顺序(比如累计计算、有状态的map),并行Stream会给出错误结果。
具体解法:
// 错误:parallel Stream中有顺序依赖的操作
List<Integer> results = new ArrayList<>();
numbers.parallelStream().forEach(n -> results.add(n * 2)); // 线程不安全!
// 正确:parallel Stream只用于无状态操作,结果用collect汇总
List<Integer> results = numbers.parallelStream()
.map(n -> n * 2) // 无状态映射,可以并行
.collect(Collectors.toList()); // collect是线程安全的汇总
// 注意:parallel Stream适合CPU密集型操作(计算量大)
// 对于IO密集型(数据库查询)或数据量小(<10000)的场景,parallel可能反而更慢
// 因为ForkJoinPool的线程调度开销比单线程处理小数据集还大坑3:在Stream中进行IO操作,导致ForkJoinPool线程耗尽
报错现象:用parallelStream()对每个元素做HTTP请求,系统卡死,后续所有请求超时。
根本原因:parallelStream()默认用ForkJoinPool.commonPool(),线程数等于CPU核数(通常8-16)。大量IO等待会把commonPool的所有线程都占满,连其他依赖ForkJoinPool的操作(比如CompletableFuture的默认线程池)也全部卡住。
具体解法:
// 错误:用parallelStream()做IO
List<String> results = urls.parallelStream()
.map(url -> httpGet(url)) // IO操作,线程会阻塞等待
.collect(Collectors.toList());
// 正确:IO操作用自定义线程池 + CompletableFuture
ExecutorService ioPool = Executors.newFixedThreadPool(200); // IO线程池,数量大
List<CompletableFuture<String>> futures = urls.stream()
.map(url -> CompletableFuture.supplyAsync(() -> httpGet(url), ioPool))
.collect(Collectors.toList());
List<String> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
ioPool.shutdown();
// 如果确实要用parallelStream(),用自定义ForkJoinPool
ForkJoinPool customPool = new ForkJoinPool(50); // 自定义线程数
List<String> results = customPool.submit(
() -> urls.parallelStream().map(url -> httpGet(url)).collect(Collectors.toList())
).get();五、总结与延伸
Stream的懒加载让"中间操作的组合"成为零开销的抽象。 filter+map+limit的组合,实际执行时是一个元素穿过整个操作链,而不是三次全量遍历。这让复杂的数据处理Pipeline与手写循环在性能上几乎等价,却有好得多的可读性。
有状态操作(sorted/distinct)是Stream的性能陷阱。 它们会打断流水线,强制等待所有元素到达。优化原则:把filter尽量放在sorted之前,减少需要排序的元素数量,这可以带来数十倍的性能差距。
parallelStream()不是免费的午餐。 它适合CPU密集型、无状态的操作。IO密集型用CompletableFuture+自定义线程池,数据量小的场景用普通Stream(ForkJoinPool调度开销可能超过并行收益)。
