迭代器模式:Java Iterator协议与MyBatis游标查询的大数据处理
迭代器模式:Java Iterator协议与MyBatis游标查询的大数据处理
适读人群:中高级Java开发者 | 阅读时长:约20分钟 | 模式类型:行为型
开篇故事
2021年,我接手了一个数据迁移任务:把一张有5000万条记录的订单表,从旧系统迁移到新系统。
第一版代码写的是:
List<Order> allOrders = orderMapper.selectAll(); // 内存直接 OOM500万条数据加载到内存,应用直接 OOM 挂掉了。
第二版改成了分页查询:
int pageSize = 1000;
int pageNum = 0;
while (true) {
List<Order> batch = orderMapper.selectByPage(pageNum, pageSize);
if (batch.isEmpty()) break;
migrateBatch(batch);
pageNum++;
}分页查询解决了内存问题,但性能极差:5000万条数据,要执行5万次分页查询,每次查询都要计算 OFFSET,数据库的 OFFSET 越大越慢(全表扫描),最后几页的查询要花几分钟。
第三版用了 MyBatis 的游标查询(Cursor):
try (Cursor<Order> cursor = orderMapper.selectAllWithCursor()) {
for (Order order : cursor) { // 迭代器模式!
migrateOrder(order);
}
}游标查询利用迭代器模式,每次只从数据库拉取一小批数据,内存占用稳定,而且不需要 OFFSET,性能比分页快了几十倍。
这个经历让我对迭代器模式有了非常直观的认识。
一、模式动机:统一遍历不同数据结构
迭代器模式(Iterator Pattern):提供一种方法,顺序访问一个聚合对象中的各个元素,而不暴露该对象的内部表示。
核心价值:
- 封装遍历逻辑:调用者不需要知道集合内部的存储方式(数组、链表、树、数据库游标)。
- 统一接口:
for-each语法糖背后,所有可迭代对象都使用相同的接口。 - 惰性求值:迭代器可以在真正需要时才计算下一个元素(懒加载),这是游标查询高效的关键。
二、Java Iterator 协议源码分析
2.1 Iterator 与 Iterable 接口
// Iterator 接口(迭代器)
public interface Iterator<E> {
boolean hasNext(); // 是否还有元素
E next(); // 返回下一个元素(并推进游标)
// 删除当前元素(可选操作)
default void remove() {
throw new UnsupportedOperationException("remove");
}
// Java 8 新增:遍历所有剩余元素
default void forEachRemaining(Consumer<? super E> action) {
Objects.requireNonNull(action);
while (hasNext()) action.accept(next());
}
}
// Iterable 接口(可迭代对象)
public interface Iterable<T> {
Iterator<T> iterator(); // 工厂方法:创建迭代器
default void forEach(Consumer<? super T> action) { ... }
default Spliterator<T> spliterator() { ... }
}for-each 循环是语法糖,编译后等价于:
// for (Order order : orders) {...}
// 编译后等价于:
Iterator<Order> iter = orders.iterator();
while (iter.hasNext()) {
Order order = iter.next();
// 循环体
}2.2 ArrayList 的迭代器实现
// ArrayList.Itr 内部类(具体迭代器)
private class Itr implements Iterator<E> {
int cursor; // 下一个要返回的元素索引
int lastRet = -1; // 上一次返回的元素索引(用于remove())
int expectedModCount = modCount; // 快速失败:记录期望的修改计数
public boolean hasNext() {
return cursor != size;
}
@SuppressWarnings("unchecked")
public E next() {
checkForComodification(); // 检查并发修改
int i = cursor;
if (i >= size) throw new NoSuchElementException();
Object[] elementData = ArrayList.this.elementData;
if (i >= elementData.length) throw new ConcurrentModificationException();
cursor = i + 1;
return (E) elementData[lastRet = i];
}
public void remove() {
if (lastRet < 0) throw new IllegalStateException();
checkForComodification();
try {
ArrayList.this.remove(lastRet);
cursor = lastRet;
lastRet = -1;
expectedModCount = modCount;
} catch (IndexOutOfBoundsException ex) {
throw new ConcurrentModificationException();
}
}
// 快速失败机制
final void checkForComodification() {
if (modCount != expectedModCount)
throw new ConcurrentModificationException();
}
}三、MyBatis 游标查询的迭代器实现
3.1 MyBatis Cursor 接口
// MyBatis 的游标接口(实现了 Closeable 和 Iterable)
public interface Cursor<T> extends Closeable, Iterable<T> {
boolean isOpen();
boolean isConsumed();
int getCurrentIndex();
}
// DefaultCursor 的实现(简化版)
public class DefaultCursor<T> implements Cursor<T> {
private final CursorIterator cursorIterator = new CursorIterator();
private boolean iteratorRetrieved;
private final DefaultResultSetHandler resultSetHandler;
private final ResultMap resultMap;
private final ResultSetWrapper rsw;
private final RowBounds rowBounds;
protected enum CursorStatus {
CREATED, OPEN, CLOSED, CONSUMED
}
@Override
public Iterator<T> iterator() {
if (iteratorRetrieved) {
throw new IllegalStateException("Cannot open more than one iterator on a Cursor");
}
iteratorRetrieved = true;
return cursorIterator;
}
// 内部迭代器:每次调用 next() 从 ResultSet 中取一条记录
private class CursorIterator implements Iterator<T> {
T object;
int iteratorIndex = -1;
@Override
public boolean hasNext() {
if (object == null) {
object = fetchNextUsingRowBound(); // 惰性:用到时才从DB拉取
}
return object != null;
}
@Override
public T next() {
T next = object;
object = null; // 清空,下次 hasNext 会拉取下一条
if (next == null) {
throw new NoSuchElementException();
}
return next;
}
}
protected T fetchNextObjectFromDatabase() {
// 从ResultSet中读取下一行并映射为Java对象
// 这是真正的数据库IO操作,但只读取一行
return resultSetHandler.getRowValue(rsw, resultMap, null);
}
}四、生产级代码实现
4.1 大数据处理的游标查询完整示例
/**
* MyBatis Mapper:定义游标查询方法
*/
@Mapper
public interface OrderMigrationMapper {
/**
* 游标方式查询所有订单(不会把数据全部加载到内存)
* 需要在事务中执行,否则游标会被关闭
*/
@Options(resultSetType = ResultSetType.FORWARD_ONLY, fetchSize = 1000)
@Select("SELECT * FROM orders WHERE created_at < #{beforeDate} ORDER BY id")
Cursor<Order> selectAllForMigration(@Param("beforeDate") LocalDate beforeDate);
/**
* 普通分批查询(用于支持断点续传)
*/
@Select("SELECT * FROM orders WHERE id > #{lastId} ORDER BY id LIMIT #{batchSize}")
List<Order> selectBatchAfter(@Param("lastId") Long lastId, @Param("batchSize") int batchSize);
}
/**
* 数据迁移服务
*/
@Service
@Slf4j
public class OrderMigrationService {
@Autowired
private OrderMigrationMapper mapper;
@Autowired
private NewSystemOrderRepository newSystemRepository;
@Autowired
private MigrationProgressRepository progressRepository;
/**
* 方案一:使用游标迭代器(内存友好,适合全量迁移)
* 注意:需要开启事务,SqlSession在事务结束前保持打开
*/
@Transactional(rollbackFor = Exception.class, timeout = 3600)
public MigrationResult migrateWithCursor(LocalDate beforeDate) {
log.info("Starting cursor-based migration for orders before {}", beforeDate);
MigrationResult result = new MigrationResult();
List<NewOrder> buffer = new ArrayList<>(1000);
try (Cursor<Order> cursor = mapper.selectAllForMigration(beforeDate)) {
for (Order order : cursor) { // 迭代器模式:每次只取一条
try {
NewOrder newOrder = convertToNewFormat(order);
buffer.add(newOrder);
// 每1000条批量写入
if (buffer.size() >= 1000) {
newSystemRepository.batchInsert(buffer);
result.incrementSuccess(buffer.size());
buffer.clear();
log.info("Migrated {} orders so far, current id: {}",
result.getSuccessCount(), order.getId());
}
} catch (Exception e) {
result.incrementFail();
log.warn("Failed to migrate order {}: {}", order.getId(), e.getMessage());
}
}
// 处理剩余的buffer
if (!buffer.isEmpty()) {
newSystemRepository.batchInsert(buffer);
result.incrementSuccess(buffer.size());
}
}
log.info("Cursor migration completed: success={}, fail={}",
result.getSuccessCount(), result.getFailCount());
return result;
}
}
/**
* 自定义迭代器:分页查询包装为迭代器(支持断点续传)
* 适合数据量极大且需要支持中断恢复的场景
*/
public class PageableOrderIterator implements Iterator<Order>, Closeable {
private final OrderMigrationMapper mapper;
private final int batchSize;
private Long lastId;
private Queue<Order> currentBatch;
private boolean exhausted = false;
public PageableOrderIterator(OrderMigrationMapper mapper, Long startFromId, int batchSize) {
this.mapper = mapper;
this.lastId = startFromId;
this.batchSize = batchSize;
this.currentBatch = new ArrayDeque<>();
fetchNextBatch(); // 预取第一批
}
@Override
public boolean hasNext() {
if (!currentBatch.isEmpty()) return true;
if (exhausted) return false;
fetchNextBatch();
return !currentBatch.isEmpty();
}
@Override
public Order next() {
if (!hasNext()) throw new NoSuchElementException();
Order order = currentBatch.poll();
lastId = order.getId(); // 记录最后读取的ID(用于断点续传)
return order;
}
private void fetchNextBatch() {
if (exhausted) return;
List<Order> batch = mapper.selectBatchAfter(lastId, batchSize);
if (batch.isEmpty()) {
exhausted = true;
} else {
currentBatch.addAll(batch);
}
}
/**
* 获取当前处理到的ID(用于保存断点)
*/
public Long getCurrentId() { return lastId; }
@Override
public void close() {
currentBatch.clear();
exhausted = true;
}
}
/**
* 通用的惰性流转换工具:将迭代器转换为 Stream
* (支持 Java 8 Stream API 的链式操作)
*/
public final class IteratorUtils {
private IteratorUtils() {}
/**
* 将 Iterator 转为 Stream(惰性的)
*/
public static <T> Stream<T> toStream(Iterator<T> iterator) {
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED),
false // false = 串行Stream
);
}
/**
* 将游标查询结果转为 Stream,自动关闭资源
*/
public static <T> Stream<T> cursorToStream(Cursor<T> cursor) {
return StreamSupport.stream(cursor.spliterator(), false)
.onClose(() -> {
try {
cursor.close();
} catch (IOException e) {
throw new RuntimeException("Failed to close cursor", e);
}
});
}
}
// 使用Stream API处理游标数据
@Transactional
public void processWithStream() {
Cursor<Order> cursor = mapper.selectAllForMigration(LocalDate.now());
IteratorUtils.cursorToStream(cursor)
.filter(order -> order.getStatus() == OrderStatus.COMPLETED)
.map(this::convertToNewFormat)
.collect(Collectors.groupingBy(o -> o.getCreatedDate()))
.forEach((date, orders) -> {
newSystemRepository.batchInsert(orders);
log.info("Migrated {} orders for date {}", orders.size(), date);
});
}五、踩坑实录
坑一:游标查询在事务外执行
MyBatis 游标查询必须在数据库事务中进行,因为游标依赖底层的 ResultSet 保持打开状态。如果方法没有 @Transactional,SqlSession 会在方法返回后自动关闭,游标迭代会抛出 CursorIsClosedException。
这是游标查询最常见的坑,排查时要检查方法上是否有正确的 @Transactional。
坑二:ConcurrentModificationException
在 for-each 循环中修改集合(非通过迭代器的 remove() 方法)会触发快速失败机制,抛出 ConcurrentModificationException。这是因为 Java 的迭代器在创建时记录了集合的 modCount,遍历期间如果 modCount 变化,就会抛出异常。
解决方案:遍历时收集要删除的元素,遍历结束后再批量删除;或者使用迭代器的 remove() 方法;或者使用 CopyOnWriteArrayList(但性能较差)。
坑三:游标的 fetchSize 设置
@Options(fetchSize = 1000) 告诉 JDBC 驱动每次从数据库拉取 1000 条数据放入本地缓冲区。如果不设置,MySQL 默认 fetchSize = Integer.MIN_VALUE(流式读取,真正的每条拉取),网络开销极大;Oracle 默认 fetchSize = 10,效率也很低。
合理设置 fetchSize 是游标查询性能调优的关键,建议根据单条记录大小和网络条件设置为 500-5000。
六、总结
迭代器模式是 Java 最基础、使用最广泛的设计模式之一,每次 for-each 循环都在使用它。
在大数据处理场景中,迭代器的"惰性"特性尤为重要:
- MyBatis Cursor 将数据库游标包装成迭代器,实现了真正的流式读取,是处理超大表的最佳方案。
- 自定义分页迭代器 将分页查询包装成迭代器,对调用者透明,还能支持断点续传。
- Stream API 底层也是迭代器,配合
onClose()可以自动管理资源生命周期。
核心原则:遍历逻辑(如何取下一个元素)应该与聚合对象(数据存储在哪里)分离,这正是迭代器模式的精髓。
