什么时候需要用到 multiprocessing?
场景速查表:
场景 | 是否需要 multiprocessing | 原因 |
---|---|---|
CPU 密集型任务(加密、科学计算、图像处理) | ✅ 必须 | GIL 限制,多线程无法并行 |
I/O 密集型任务(文件、网络) | ❌ 优先用多线程/异步 | 线程切换开销更小 |
需要多个 Python 解释器隔离 | ✅ 必须 | 每个进程独立内存空间 |
需要共享大量复杂对象 | ⚠️ 谨慎 | 进程间共享成本高于线程 |
结论:只要你要榨干多核 CPU,就绕不开 multiprocessing。否则可以先考虑线程或协程。
必须记忆的 7 组核心 API 与实战代码
下面把「面试时写得出、项目里用得着」的知识点浓缩成一篇 CSDN 风格的速查博客。每个点都给出最小可运行示例(MRE:Minimal Runnable Example),复制即可运行。
1. Process 基本用法(记住两段模板)
模板 A:无参函数
import multiprocessing, time
def worker():
print('Worker start', multiprocessing.current_process().name)
time.sleep(1)
if __name__ == '__main__':
p = multiprocessing.Process(target=worker, name='worker-A')
p.start()
p.join()
模板 B:带参函数(注意可 pickle)
import multiprocessing
def power(base, exp):
print(f'{base}^{exp} = {base**exp}')
if __name__ == '__main__':
for pair in [(2, 10), (3, 5)]:
multiprocessing.Process(target=power, args=pair).start()
2. 守护进程 Daemon(后台任务心跳)
import multiprocessing, time
def heartbeat():
while True:
print('heartbeat', time.strftime('%H:%M:%S'))
time.sleep(1)
if __name__ == '__main__':
d = multiprocessing.Process(target=heartbeat, daemon=True)
d.start()
time.sleep(3) # 主进程 3 秒后退出,守护进程随之被杀
3. 优雅退出:join、terminate、exitcode
import multiprocessing, time, signal
def slow():
time.sleep(5)
if __name__ == '__main__':
p = multiprocessing.Process(target=slow)
p.start()
time.sleep(1)
p.terminate() # 向子进程发 SIGTERM
p.join() # 回收资源
print('exitcode:', p.exitcode) # -15 代表被信号 15 杀死
4. 进程间通信:Queue(生产者-消费者模式)
import multiprocessing, random, time
def producer(q):
for i in range(5):
q.put(random.randint(1, 100))
time.sleep(0.2)
def consumer(q):
while True:
item = q.get()
if item is None: # 毒丸
break
print('Consumed', item)
if __name__ == '__main__':
q = multiprocessing.Queue()
p = multiprocessing.Process(target=producer, args=(q,))
c = multiprocessing.Process(target=consumer, args=(q,))
p.start(); c.start()
p.join()
q.put(None) # 发毒丸
c.join()
5. 共享状态:Manager 与 Lock
import multiprocessing
def add_to_dict(d, key, lock):
with lock: # 防止竞态
d[key] = key * key
if __name__ == '__main__':
mgr = multiprocessing.Manager()
shared_dict = mgr.dict()
lock = multiprocessing.Lock()
jobs = [multiprocessing.Process(target=add_to_dict, args=(shared_dict, i, lock))
for i in range(10)]
for j in jobs: j.start()
for j in jobs: j.join()
print(shared_dict)
6. 进程池 Pool(MapReduce 雏形)
import multiprocessing, os
def square(x):
return x * x
if __name__ == '__main__':
with multiprocessing.Pool(processes=os.cpu_count()) as pool:
result = pool.map(square, range(10))
print(result) # [0, 1, 4, 9, ... 81]
7. 自定义 Process 子类(可重用组件)
import multiprocessing, time
class TimerProcess(multiprocessing.Process):
def __init__(self, interval):
super().__init__()
self.interval = interval
def run(self):
for i in range(3):
print(f'{self.name}: {i+1}/{3}')
time.sleep(self.interval)
if __name__ == '__main__':
TimerProcess(1).start()
一张思维导图总结
multiprocessing
├─ 基本对象
│ ├─ Process
│ ├─ Queue / JoinableQueue
│ ├─ Lock / Semaphore / Condition / Event
│ └─ Manager
├─ 高级抽象
│ ├─ Pool
│ └─ 自定义 Process 子类
└─ 生命周期
├─ start / join / terminate
└─ daemon / exitcode
结语:记忆策略
- 先背 3 个模板(Process、Queue、Pool)。
- 再背 生命周期:start → join/terminate → exitcode。
- 其余 API 用的时候查表即可。