在CPU密集型任务中,Python的multiprocessing
模块是突破GIL限制的关键工具。multiprocessing.Pool
(进程池)和multiprocessing.Process
(独立进程)是最常用的两种并行化方案,但其设计思想和适用场景截然不同。本文结合代码示例和性能对比,解析二者的核心差异及最佳实践。
一、multiprocessing.Process
:精细控制单个进程
核心特性
- 手动管理生命周期:通过
start()
启动进程,join()
等待结束,适合非均质任务调度。 - 跨平台限制:Windows系统需将进程代码包裹在
if __name__ == '__main__':
中,避免子进程递归创建。 - 进程间通信(IPC):需借助
Queue
、Pipe
或共享内存(如Value
/Array
)传递数据。
典型代码结构:
from multiprocessing import Process
def worker(num):
print(f"Worker {num} running")
if __name__ == '__main__':
processes = []
for i in range(3):
p = Process(target=worker, args=(i,))
processes.append(p)
p.start() # 启动进程
for p in processes:
p.join() # 阻塞至进程结束
适用场景:
✅ 需要精确控制每个进程的任务逻辑
✅ 进程执行时间差异大(如实时响应外部事件)
✅ 复杂IPC需求(如双向数据流)
二、multiprocessing.Pool
:批量任务的自动化调度
核心优势
- 进程复用:固定数量的工作进程反复处理任务,避免频繁创建/销毁开销。
- 任务分发API:
map(func, iterable)
:阻塞式,按顺序返回结果apply_async(func, args)
:非阻塞,通过get()
异步获取结果。
- 资源约束:通过
processes
参数限制并发数(默认等于CPU核心数)。
基础用法示例:
from multiprocessing import Pool
import time
def task(msg):
print(f"Start: {msg}")
time.sleep(2)
return f"End: {msg}"
if __name__ == '__main__':
with Pool(processes=3) as pool: # 限制3个进程
results = pool.apply_async(task, ("Hello", ))
print(results.get()) # 阻塞等待结果
# 批量提交任务
multiple_results = [pool.apply_async(task, (i,)) for i in range(4)]
print([res.get() for res in multiple_results])
关键操作:
pool.close()
:禁止新任务提交pool.join()
:等待所有子进程退出
适用场景:
✅ 处理大量同构任务(如数据分块处理)
✅ 需要自动负载均衡
✅ 简化并行代码结构
三、Pool vs Process 关键差异总结
特性 | multiprocessing.Pool |
multiprocessing.Process |
---|---|---|
进程管理 | 自动维护进程池,复用工作进程 | 手动创建/销毁单个进程 |
任务调度 | 支持map /apply_async 等高级分发 |
需自行实现任务分配逻辑 |
阻塞行为 | apply 为阻塞,apply_async 为非阻塞 |
完全依赖join() 控制阻塞 |
内存开销 | 较低(进程复用) | 较高(频繁创建新进程) |
适用任务类型 | 均匀任务(如批量计算) | 异构任务或需实时响应场景 |
四、性能陷阱与最佳实践
避免全局变量拷贝
Pool的任务函数需可序列化,避免包含大对象(可通过initializer
预加载资源):def init_pool(): global large_data # 子进程初始化时加载 large_data = load_heavy_model() pool = Pool(initializer=init_pool)
进程池不适用复杂IPC
Pool的任务函数无法直接使用multiprocessing.Queue
,需改用Manager().Queue()
:from multiprocessing import Manager manager = Manager() task_queue = manager.Queue() # 进程池安全的队列
超时控制与容错
apply_async
支持timeout
参数,避免僵尸进程:result = pool.apply_async(long_task, args=(...)) try: output = result.get(timeout=30) # 30秒超时 except TimeoutError: print("Task timed out")