Python 生成器与迭代器深度实战——yield from、无限数据流、内存优化
2026/4/30大约 6 分钟
Python 生成器与迭代器深度实战——yield from、无限数据流、内存优化
适读人群:想掌握 Python 惰性求值、处理大数据流的工程师 | 阅读时长:约 15 分钟 | 核心价值:用生成器处理无限数据流,将内存消耗从 GB 降到 MB
那次 OOM 让我彻底明白了生成器
组里有个同学叫小潘,写了一个日志分析脚本,需要读取一个 20GB 的日志文件,统计某类错误出现的次数。他的第一版代码:
with open("app.log") as f:
lines = f.readlines() # 把整个文件读进内存
errors = [line for line in lines if "ERROR" in line]
print(f"错误数: {len(errors)}")结果服务器直接 OOM(内存不足)报警,进程被杀掉了。
我帮他改了一行:
with open("app.log") as f:
error_count = sum(1 for line in f if "ERROR" in line)
print(f"错误数: {error_count}")内存从几十 GB 降到了几 MB,而且还更快了,因为不需要构建中间列表。
这就是生成器的威力:惰性求值,按需计算,内存友好。
一、迭代器协议:从底层理解
任何实现了 __iter__ 和 __next__ 方法的对象都是迭代器:
class CountUp:
"""从 start 计数到 stop 的迭代器"""
def __init__(self, start: int, stop: int):
self.current = start
self.stop = stop
def __iter__(self):
return self # 迭代器返回自身
def __next__(self) -> int:
if self.current >= self.stop:
raise StopIteration # 迭代结束
value = self.current
self.current += 1
return value
for n in CountUp(1, 5):
print(n) # 1 2 3 4和 Java Iterable/Iterator 对比:
| Java | Python |
|---|---|
Iterable<T> | __iter__ |
Iterator<T> | __iter__ + __next__ |
hasNext() | 无显式方法(StopIteration 信号结束) |
next() | __next__() |
二、生成器:yield 的魔法
生成器是写起来像普通函数,但用 yield 返回值的特殊函数。每次调用 __next__ 时,函数从上次 yield 处继续执行:
def count_up(start: int, stop: int):
"""生成器函数,等价于上面的 CountUp 类"""
current = start
while current < stop:
yield current # 暂停,返回值,保持状态
current += 1
gen = count_up(1, 5)
print(next(gen)) # 1
print(next(gen)) # 2
print(list(gen)) # [3, 4]
# 无限生成器
def natural_numbers(start: int = 1):
n = start
while True:
yield n
n += 1
# 配合 itertools 使用
import itertools
first_10 = list(itertools.islice(natural_numbers(), 10))
print(first_10) # [1, 2, ..., 10]三、生成器表达式 vs 列表推导
# 列表推导:立即计算,全部放入内存
squares_list = [x**2 for x in range(1_000_000)] # ~8MB 内存
# 生成器表达式:惰性求值,几乎不占内存
squares_gen = (x**2 for x in range(1_000_000)) # 几百字节
# 当你只需要遍历一次,用生成器表达式
total = sum(x**2 for x in range(1_000_000)) # 只需 sum 最后结果
# 当你需要多次访问、随机索引,用列表
squares = [x**2 for x in range(1000)]
print(squares[42]) # 可以随机访问四、yield from:委托生成器
yield from 是 Python 3.3+ 的语法,用来委托给另一个可迭代对象:
def flatten(nested):
"""递归展平嵌套列表"""
for item in nested:
if isinstance(item, list):
yield from flatten(item) # 递归委托
else:
yield item
nested = [1, [2, 3], [4, [5, 6]], 7]
print(list(flatten(nested))) # [1, 2, 3, 4, 5, 6, 7]
# 组合多个生成器
def chain_generators(*iterables):
"""等价于 itertools.chain"""
for iterable in iterables:
yield from iterable
result = list(chain_generators([1, 2], [3, 4], range(5, 8)))
print(result) # [1, 2, 3, 4, 5, 6, 7]五、完整可运行示例:大文件处理管道
#!/usr/bin/env python3
"""
生成器与迭代器实战:构建内存高效的数据处理管道
"""
import io
import itertools
import time
from typing import Generator, Iterable, Iterator
# ===== 1. 无限序列生成器 =====
def fibonacci() -> Generator[int, None, None]:
"""斐波那契数列(无限)"""
a, b = 0, 1
while True:
yield a
a, b = b, a + b
def primes() -> Generator[int, None, None]:
"""质数生成器(无限,筛法)"""
yield 2
found = [2]
candidate = 3
while True:
if all(candidate % p != 0 for p in found):
found.append(candidate)
yield candidate
candidate += 2
# ===== 2. 文件处理管道 =====
def read_lines(text: str) -> Generator[str, None, None]:
"""逐行读取(模拟文件)"""
for line in text.splitlines():
yield line.strip()
def filter_non_empty(lines: Iterable[str]) -> Generator[str, None, None]:
"""过滤空行"""
for line in lines:
if line:
yield line
def parse_log_entry(lines: Iterable[str]) -> Generator[dict, None, None]:
"""解析日志条目"""
for line in lines:
parts = line.split("|", maxsplit=3)
if len(parts) >= 3:
yield {
"level": parts[0].strip(),
"timestamp": parts[1].strip(),
"message": parts[2].strip() if len(parts) > 2 else "",
}
def filter_by_level(entries: Iterable[dict], level: str) -> Generator[dict, None, None]:
"""按日志级别过滤"""
for entry in entries:
if entry["level"] == level:
yield entry
def batch(iterable: Iterable, size: int) -> Generator[list, None, None]:
"""将可迭代对象分批"""
it = iter(iterable)
while True:
chunk = list(itertools.islice(it, size))
if not chunk:
break
yield chunk
# ===== 3. 内存对比演示 =====
def demo_memory_comparison():
print("=== 内存使用对比 ===\n")
import sys
data = range(1_000_000)
# 列表推导:全部存入内存
list_result = [x * 2 for x in data]
list_size = sys.getsizeof(list_result)
# 生成器:惰性,几乎不占内存
gen_result = (x * 2 for x in data)
gen_size = sys.getsizeof(gen_result)
print(f"列表推导占用内存: {list_size:,} bytes ({list_size/1024/1024:.2f} MB)")
print(f"生成器表达式占用内存: {gen_size} bytes")
print(f"内存节省比例: {list_size / gen_size:.0f}x\n")
# ===== 4. 处理管道演示 =====
def demo_pipeline():
print("=== 日志处理管道演示 ===\n")
# 模拟日志文件内容
log_content = """
INFO | 2024-01-01 10:00:01 | 服务启动
ERROR | 2024-01-01 10:00:15 | 数据库连接失败
INFO | 2024-01-01 10:00:16 | 重试连接
ERROR | 2024-01-01 10:00:20 | 超时
WARNING | 2024-01-01 10:00:21 | 使用备用连接
INFO | 2024-01-01 10:01:00 | 处理请求 #1
ERROR | 2024-01-01 10:02:00 | 内存不足
INFO | 2024-01-01 10:02:01 | GC 触发
"""
# 构建处理管道(全程惰性求值)
pipeline = filter_by_level(
parse_log_entry(
filter_non_empty(
read_lines(log_content)
)
),
level="ERROR",
)
errors = list(pipeline)
print(f"发现 {len(errors)} 条错误:")
for err in errors:
print(f" [{err['timestamp']}] {err['message']}")
print()
# ===== 5. 无限序列 + islice =====
def demo_infinite_sequences():
print("=== 无限序列演示 ===\n")
# 前 10 个斐波那契数
fibs = list(itertools.islice(fibonacci(), 10))
print(f"前10个斐波那契数: {fibs}")
# 前 10 个质数
prime_list = list(itertools.islice(primes(), 10))
print(f"前10个质数: {prime_list}")
# 斐波那契数中的偶数
even_fibs = list(itertools.islice(
(f for f in fibonacci() if f % 2 == 0),
8,
))
print(f"前8个偶斐波那契数: {even_fibs}")
print()
# ===== 6. 分批处理演示 =====
def demo_batching():
print("=== 分批处理演示 ===\n")
def process_batch(items: list[int]) -> dict:
return {"count": len(items), "sum": sum(items), "max": max(items)}
total_items = 0
for i, chunk in enumerate(batch(range(100), size=15), 1):
result = process_batch(chunk)
total_items += result["count"]
print(f"批次{i}: {result}")
print(f"总处理: {total_items} 条\n")
def main():
demo_memory_comparison()
demo_pipeline()
demo_infinite_sequences()
demo_batching()
if __name__ == "__main__":
main()六、踩坑实录 1:生成器只能遍历一次
gen = (x**2 for x in range(5))
print(list(gen)) # [0, 1, 4, 9, 16]
print(list(gen)) # [] —— 生成器耗尽了!
# 解法:如果需要多次遍历,转成列表,或封装成函数
def squares_gen():
return (x**2 for x in range(5)) # 每次调用返回新的生成器
print(list(squares_gen())) # [0, 1, 4, 9, 16]
print(list(squares_gen())) # [0, 1, 4, 9, 16]七、踩坑实录 2:在 for 循环里修改被迭代的列表
# 错误:迭代列表的同时修改它,会跳过元素
items = [1, 2, 3, 4, 5]
for item in items:
if item % 2 == 0:
items.remove(item) # 修改正在迭代的列表!
print(items) # [1, 3, 5]? 不对!会跳过某些元素
# 正确:先过滤,赋值给新列表
items = [1, 2, 3, 4, 5]
items = [x for x in items if x % 2 != 0] # 或用生成器
print(items) # [1, 3, 5]八、踩坑实录 3:itertools 的惰性陷阱
import itertools
# 陷阱:itertools 的大多数函数返回迭代器,不是列表
grouped = itertools.groupby([1, 1, 2, 2, 3], key=lambda x: x)
# 如果在消费分组之前移动了迭代器,分组数据会丢失!
for key, group in grouped:
print(key, list(group)) # 正确:立即消费 group
# 错误:先获取所有 key,再消费 group
groups_list = list(grouped) # 这样 group 迭代器已经被移动,数据丢失总结
生成器与迭代器的核心价值:
- 内存优化:大数据集不要一次性加载,用生成器流式处理
- 管道模式:多个生成器串联,构建可组合的数据处理流水线
- 无限序列:用
yield+while True+itertools.islice处理无限数据 - yield from:委托给子生成器,简化递归迭代
- inerttools 宝库:
chain、islice、groupby、batched(3.12+)是日常必用
