Python 多进程实战——multiprocessing、进程池、共享内存、IPC 通信
2026/4/30大约 7 分钟
Python 多进程实战——multiprocessing、进程池、共享内存、IPC 通信
适读人群:需要突破 GIL 限制、进行 CPU 密集型并行计算的 Python 工程师 | 阅读时长:约 16 分钟 | 核心价值:掌握 Python 多进程的完整方案,榨干多核 CPU 性能
GIL 第一次把我搞蒙
做 Java 的时候,多线程几乎是本能反应——CPU 密集的任务开多个线程,跑满所有核心,完事。
转到 Python 之后,我很自然地写了个多线程版本的图像处理程序,结果让我傻眼了:4 核 CPU,跑了 4 个线程,CPU 总占用率只有 25%。和单线程一样慢。
同事告诉我:Python 有 GIL(全局解释器锁),同一时刻只有一个线程在执行 Python 字节码。CPU 密集型任务开多线程,反而因为锁竞争还更慢。
要真正利用多核,得用多进程(multiprocessing)。每个进程有独立的 Python 解释器和 GIL,可以真正并行。
这篇文章,把 Python 多进程的完整方案讲清楚。
一、GIL 限制:什么时候用多进程
| 任务类型 | 推荐方案 |
|---|---|
| I/O 密集(网络、文件、数据库) | asyncio 或多线程 |
| CPU 密集(计算、加密、图像处理) | multiprocessing 多进程 |
| 混合(I/O + 少量计算) | asyncio + ProcessPoolExecutor |
import time
import concurrent.futures
def cpu_work(n: int) -> int:
"""CPU 密集型:计算 n 的因子数"""
count = 0
for i in range(1, n + 1):
if n % i == 0:
count += 1
return count
numbers = [5_000_000 + i for i in range(8)]
# 单线程
start = time.perf_counter()
results = [cpu_work(n) for n in numbers]
print(f"单线程: {time.perf_counter() - start:.2f}s")
# 多线程(受 GIL 限制,并不比单线程快)
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
start = time.perf_counter()
results = list(executor.map(cpu_work, numbers))
print(f"多线程: {time.perf_counter() - start:.2f}s") # 差不多一样慢
# 多进程(绕过 GIL,真正并行)
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
start = time.perf_counter()
results = list(executor.map(cpu_work, numbers))
print(f"多进程: {time.perf_counter() - start:.2f}s") # 快 ~4 倍二、multiprocessing 基础
2.1 Process:手动管理
import multiprocessing as mp
import os
def worker(name: str, queue: mp.Queue) -> None:
"""工作进程"""
pid = os.getpid()
print(f"[{name}] PID={pid} 开始")
result = sum(range(1_000_000))
queue.put((name, result))
print(f"[{name}] 完成")
if __name__ == "__main__": # 多进程必须在 if __name__ == "__main__" 里启动
result_queue = mp.Queue()
processes = []
for i in range(4):
p = mp.Process(target=worker, args=(f"Worker-{i}", result_queue))
processes.append(p)
p.start()
for p in processes:
p.join() # 等待所有进程完成
while not result_queue.empty():
name, result = result_queue.get()
print(f"{name} 结果: {result}")2.2 Pool:进程池
from multiprocessing import Pool
def process_item(item: dict) -> dict:
"""处理单个数据项"""
import time
time.sleep(0.01)
return {"id": item["id"], "result": item["value"] ** 2}
if __name__ == "__main__":
items = [{"id": i, "value": i} for i in range(100)]
with Pool(processes=4) as pool:
# map:有序结果,阻塞直到全部完成
results = pool.map(process_item, items)
# imap:惰性迭代,内存更友好
for result in pool.imap(process_item, items, chunksize=10):
pass
# starmap:多参数
def add(a, b): return a + b
sums = pool.starmap(add, [(1, 2), (3, 4), (5, 6)])三、ProcessPoolExecutor:更现代的 API
from concurrent.futures import ProcessPoolExecutor, as_completed
def analyze_text(text: str) -> dict:
"""模拟文本分析"""
words = text.lower().split()
return {
"word_count": len(words),
"unique_words": len(set(words)),
"avg_word_length": sum(len(w) for w in words) / len(words) if words else 0,
}
if __name__ == "__main__":
texts = [
"Python is a great programming language for AI development",
"Java is widely used in enterprise backend systems",
"Machine learning requires large amounts of training data",
"Deep learning models need GPU acceleration to train",
]
with ProcessPoolExecutor(max_workers=4) as executor:
# submit:提交单个任务,返回 Future
futures = {executor.submit(analyze_text, text): i for i, text in enumerate(texts)}
# as_completed:哪个先完成先处理
for future in as_completed(futures):
idx = futures[future]
try:
result = future.result(timeout=10)
print(f"文本{idx}: {result}")
except Exception as e:
print(f"文本{idx}分析失败: {e}")四、共享内存:进程间高效数据共享
from multiprocessing import shared_memory
import numpy as np
# 创建共享内存块
def producer():
# 创建共享内存,存储 numpy 数组
shm = shared_memory.SharedMemory(create=True, size=1000 * 8) # 1000 个 float64
arr = np.ndarray((1000,), dtype=np.float64, buffer=shm.buf)
arr[:] = np.random.random(1000)
return shm.name, shm # 返回共享内存名称
def consumer(shm_name: str) -> float:
# 连接到已存在的共享内存
existing_shm = shared_memory.SharedMemory(name=shm_name, create=False)
arr = np.ndarray((1000,), dtype=np.float64, buffer=existing_shm.buf)
result = float(arr.sum())
existing_shm.close()
return result4.1 Value 和 Array:简单共享状态
from multiprocessing import Value, Array, Process
import ctypes
def increment_counter(counter, n: int):
for _ in range(n):
with counter.get_lock(): # 必须加锁!
counter.value += 1
if __name__ == "__main__":
counter = Value(ctypes.c_int, 0) # 共享整数
arr = Array(ctypes.c_double, [1.0, 2.0, 3.0]) # 共享数组
processes = [
Process(target=increment_counter, args=(counter, 10000))
for _ in range(4)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"最终计数: {counter.value}") # 应该是 40000五、IPC 通信:Queue 和 Pipe
from multiprocessing import Process, Queue, Pipe
import time
# ===== Queue:多生产者多消费者 =====
SENTINEL = None # 终止信号
def producer(queue: Queue, items: list) -> None:
for item in items:
queue.put(item)
time.sleep(0.01)
queue.put(SENTINEL) # 发送终止信号
def consumer(queue: Queue, result_queue: Queue, worker_id: int) -> None:
processed = 0
while True:
item = queue.get()
if item is SENTINEL:
queue.put(SENTINEL) # 把终止信号放回去,让其他消费者也能退出
break
result = item ** 2
result_queue.put((worker_id, result))
processed += 1
result_queue.put((worker_id, f"DONE:{processed}"))
# ===== Pipe:双工通信 =====
def pipe_worker(conn):
while True:
msg = conn.recv()
if msg == "QUIT":
break
conn.send(f"处理完成: {msg}")
conn.close()六、完整可运行示例:并行数据处理管道
#!/usr/bin/env python3
"""
Python 多进程并行数据处理管道
"""
import math
import multiprocessing as mp
import os
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
def is_prime(n: int) -> bool:
"""判断是否为质数(CPU 密集型)"""
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
for i in range(3, int(math.sqrt(n)) + 1, 2):
if n % i == 0:
return False
return True
def count_primes_in_range(start: int, end: int) -> tuple[int, int, int]:
"""统计区间内的质数个数"""
count = sum(1 for n in range(start, end) if is_prime(n))
return start, end, count
def demo_process_pool():
print("=== 进程池并行计算质数 ===\n")
ranges = [
(1, 100_000),
(100_000, 200_000),
(200_000, 300_000),
(300_000, 400_000),
(400_000, 500_000),
(500_000, 600_000),
(600_000, 700_000),
(700_000, 800_000),
]
# 单进程
start = time.perf_counter()
results_single = [count_primes_in_range(s, e) for s, e in ranges]
single_time = time.perf_counter() - start
total = sum(r[2] for r in results_single)
print(f"单进程: {single_time:.2f}s,共 {total} 个质数")
# 多进程
n_workers = min(mp.cpu_count(), len(ranges))
start = time.perf_counter()
with ProcessPoolExecutor(max_workers=n_workers) as executor:
futures = [
executor.submit(count_primes_in_range, s, e)
for s, e in ranges
]
results_multi = [f.result() for f in futures]
multi_time = time.perf_counter() - start
total = sum(r[2] for r in results_multi)
print(f"多进程({n_workers}核): {multi_time:.2f}s,共 {total} 个质数")
print(f"加速比: {single_time / multi_time:.2f}x\n")
def producer_worker(task_queue: mp.Queue, n_consumers: int) -> None:
"""生产任务"""
for i in range(1, 51):
task_queue.put(i * 10000)
for _ in range(n_consumers):
task_queue.put(None) # 终止信号
def consumer_worker(task_queue: mp.Queue, result_queue: mp.Queue, worker_id: int) -> None:
"""消费任务"""
while True:
task = task_queue.get()
if task is None:
break
# CPU 密集任务:找 task 以内的所有质数数量
count = sum(1 for n in range(task, task + 10000) if is_prime(n))
result_queue.put((worker_id, task, count))
result_queue.put((worker_id, -1, -1)) # 完成信号
def demo_producer_consumer():
print("=== 生产者-消费者多进程管道 ===\n")
n_consumers = min(mp.cpu_count(), 4)
task_queue = mp.Queue(maxsize=20)
result_queue = mp.Queue()
# 启动生产者
p = mp.Process(target=producer_worker, args=(task_queue, n_consumers))
p.start()
# 启动消费者
consumers = []
for i in range(n_consumers):
c = mp.Process(target=consumer_worker, args=(task_queue, result_queue, i))
c.start()
consumers.append(c)
# 等待所有消费者完成
completed = 0
results = []
while completed < n_consumers:
worker_id, task, count = result_queue.get()
if task == -1:
completed += 1
else:
results.append((task, count))
p.join()
for c in consumers:
c.join()
results.sort()
print(f"完成 {len(results)} 个任务")
total_primes = sum(c for _, c in results)
print(f"共找到 {total_primes} 个质数\n")
if __name__ == "__main__":
demo_process_pool()
demo_producer_consumer()七、踩坑实录 1:Windows 下的 spawn 问题
# 错误:Windows 下不加 if __name__ == "__main__",会无限创建子进程
import multiprocessing as mp
def worker():
print("工作中")
# 在 Windows 上,这会导致每个子进程再次导入模块并再次创建进程
mp.Process(target=worker).start()
# 正确:始终把进程启动代码放在 if __name__ == "__main__" 里
if __name__ == "__main__":
mp.Process(target=worker).start()八、踩坑实录 2:进程间不能传递不可序列化的对象
# 错误:lambda 不能被 pickle,无法传给子进程
from multiprocessing import Pool
def bad_example():
with Pool(4) as pool:
# lambda 无法序列化!
pool.map(lambda x: x**2, range(10)) # PicklingError
# 正确:用普通函数或 functools.partial
import functools
def power(x, exp):
return x ** exp
with Pool(4) as pool:
squares = pool.map(functools.partial(power, exp=2), range(10))九、踩坑实录 3:共享内存没有锁保护导致数据竞争
from multiprocessing import Value, Process
import ctypes
counter = Value(ctypes.c_int, 0)
# 错误:没有锁,多进程同时修改,数据不一致
def unsafe_increment():
for _ in range(10000):
counter.value += 1 # 非原子操作!
# 正确:使用 get_lock()
def safe_increment():
for _ in range(10000):
with counter.get_lock():
counter.value += 1总结
Python 多进程的核心方案:
- ProcessPoolExecutor:推荐首选,现代 API,与 Future 集成
- multiprocessing.Pool:经典 API,
map/imap适合批量任务 - Queue/Pipe:进程间通信,Queue 多对多,Pipe 点对点
- Value/Array/SharedMemory:共享状态,必须加锁
- Windows 兼容:始终加
if __name__ == "__main__"
