python-并发编程

发布于:2025-07-30 ⋅ 阅读:(28) ⋅ 点赞:(0)


前言

  1. 进程(Process):正在运行的程序,是系统进行资源分配的最小单位。每个进程都有自己独立的内存空间和系统资源
  2. 线程(Thread):运行在进程之上,系统进行调度的最小单位。一个进程可以包含多个线程,它们共享进程的内存空间和资源
  3. 协程(Coroutine):协程是一种用户态的轻量级线程,又称微线程。协程的调度完全由用户控制,不需要上下文切换的开销,因此执行效率极高
  4. 并行(Parallelism):指多个任务在同一时刻同时执行,需要多核 CPU 的支持
  5. 并发(Concurrency):指多个任务在同一时间段内交替执行,通过时间片轮转或协作式调度实现

进程和线程

  1. 一个进程可以有一个以上的线程,进程之间都是独立的,一个进程内的线程共享这个进程空间
  2. 同一个进程内的线程是可以直接通信的,进程要想通信,必须通过内核空间实现
  3. 创建新的线程很简单,创建新的进程需要对父进程进行克隆,所有的进程都是由另外一个进程创建的
  4. 一个线程可以控制和操作同一个进程内的其他线程,而进程只能操作子进程
  5. 一个主线程的改变可能会影响其他进程,而父进程不会影响子进程

多进程 安全性高 开销大 占用空间大 上下文切换开销大 分布式支持
多线程 安全性低 开销小 占用空间小 上下文切换开销小 不支持


一、多线程(threading)

1.1 多线程的使用

以从网页中下载图片为例

下载一张图片

def download_image(url, path):
    response = requests.get(url)
    with open(path, 'wb') as f:
        f.write(response.content)

url = 'https://pic35.photophoto.cn/20150511/0034034892281415_b.jpg'

path = '1.jpg'
download_image(url, path)

下载5张图片

import requests
import time

def runtime(func):
    def runtime_inner(*args, **kwargs): 
        start = time.time()
        result = func(*args, **kwargs) 
        end = time.time()
        print(f"执行函数{func.__name__}花费了{end - start}s")
        return result
    return runtime_inner

def download_image(url, path):
    response = requests.get(url)
    with open(path, 'wb') as f:
        f.write(response.content)
        
@runtime
def main():
    for i in range(5):
        download_image(url, str(i) + ".jpg")

url = 'https://pic35.photophoto.cn/20150511/0034034892281415_b.jpg'
main()
# 执行函数main花费了1.8841361999511719s

使用多线程下载5张图片

import threading
@runtime
def main():
    print("start...")
    t_list = []
    for i in range(5):
        # 创建线程对象
        # target -->    指定传入一个callable对象    做什么
        # args  -->     指定方法需要传入的参数     元组类型  (1,)
        t = threading.Thread(target=download_image, args=(url, str(i) + ".jpg"))
        # 启动线程  start --> run   <--target
        # 默认情况为前台线程 主线程要等待子线程结束才退出
        # 设置后台线程    主线程执行结束,子线程也要退出
        t.daemon = True
        # t.setDaemon(True)   # 设置为后台线程,在start之前设置
        t.start()
        t_list.append(t)
        for t in t_list:
            t.join()  # 阻塞当前环境上下文,直到t的线程执行完成
        # 去掉t.jon()     执行函数main花费了0.0039768218994140625s


print("start...")
main()
print("end...")

# start...
# start...
# 执行函数main花费了0.6438114643096924s
# end...

1.2 自定义线程类

class MyThread(threading.Thread):
    def __init__(self, num):
        super().__init__()
        self.num = num

    def run(self):
        print(f"running...{self.num}")
        
t1 = MyThread(1)
t2 = MyThread(2)
t1.start()
t2.start()

使用自定义线程类,创建多线程下载图片

class MyThread(threading.Thread):
    def __init__(self, url, path):
        super().__init__()
        self.url = url
        self.path = path

    def run(self):
        response = requests.get(self.url)
        with open(self.path, 'wb') as f:
            f.write(response.content)
            
url = 'https://pic35.photophoto.cn/20150511/0034034892281415_b.jpg'
for i in range(5):
    t = MyThread(url, str(i) + ".jpg")
    t.start()

1.3 线程锁

为什么需要线程锁?

公共资源进行访问修改,存在资源争抢,造成脏数据

解决公共资源竞争
限制同一时刻只有一个线程可以访问公共资源

1.3.1 互斥锁

import threading
import time
from threading import Lock, RLock

num = 0
def sum_num(i):
    # lock.acquire()  # 获取锁
    with lock:
        global num
        time.sleep(1)
        num += i
    print(num)
    # lock.release()  # 释放锁

# 创建锁对象
lock = Lock()
# lock = RLock()

t_list = []
for i in range(10):
    t = threading.Thread(target=sum_num, args=(1,))
    t.start()
    t_list.append(t)

[t.join() for t in t_list]
print("end...")

# Lock 原始锁  获取锁之前不做判断,直到获取到锁为止
# RLock 重入锁 获取锁之前先判断,如果有这把锁了就 立即返回

r1 = Lock()
r2 = RLock()
r1.acquire()
# print("lock1 acquired 1")
# r1.acquire()    # 死锁
# print("lock1 acquired 2")

r2.acquire()
print("lock1 acquired 1")
r2.acquire()
print("lock1 acquired 2")

死锁 程序设计不到位

尽量避免产生死锁

  1. 尽量避免同一个线程对多lock进行锁定
  2. 多个线程需要对多个lock进行锁定,尽量保证他们以相同的顺序获取锁
  3. 设置超时
# 当两个线程以相反顺序调用 transfer时,会发生死锁
class Account:
    def __init__(self, id, balance, lock):
        self.id = id
        self.balance = balance
        self.lock = lock

    # 取钱
    def withdraw(self, amount):
        self.balance -= amount
    # 存钱
    def deposit(self, amount):
        self.balance += amount
    # 查看余额
    def get_balance(self):
        return self.balance

def transfer(from_id, to_id, amount):
    if from_id.lock.acquire():
        from_id.withdraw(amount)
        time.sleep(1)
        print("wait...end")
        if to_id.lock.acquire():
            to_id.deposit(amount)
            to_id.lock.release()
        from_id.lock.release()
    print(f"{from_id.id}{to_id.id}转了{amount}元")

huang = Account("huang", 10000, RLock())
zhang = Account("zhang", 20000, RLock())

t1 = threading.Thread(target=transfer, args=(huang, zhang, 5000))
t2 = threading.Thread(target=transfer, args=(zhang, huang, 2000))

t1.start()
t2.start()

t1.join()
t2.join()
print(huang.get_balance())
print(zhang.get_balance())

1.3.2 信号量

信号量允许指定数量的线程同时执行

from threading import BoundedSemaphore
num = 0
def sum_num(i):
    # lock.acquire()  # 获取锁
    with lock:
        global num
        time.sleep(1)
        num += i
    print(num)
    # lock.release()  # 释放锁

# 信号量锁对象,最多允许2和线程同时执行
lock = BoundedSemaphore(2)

t_list = []
for i in range(10):
    t = threading.Thread(target=sum_num, args=(1,))
    t.start()
    t_list.append(t)

[t.join() for t in t_list]
print("end...")

1.4 全局解释器锁(GIL)

GIL全称Global Interpreter Lock

GIL 和 Python 语言没有任何关系,只是因为历史原因导致在官方推荐的解释器Cpython中遗留的问题(Jpython无此类问题)

每个线程在执行的过程中都需要先获取GIL,保证同一时刻同一个进程内只有一个线程可以执行代码

GIL最基本的行为只有两个:

  1. 当前执行的线程持有GIL
  2. 当线程遇到io阻塞时,会释放GIL
    计算密集型(cpu) 使用多进程
    io密集型(频繁阻塞等待) 使用多线程

二、多进程(multiprocessing)

2.1 使用os库创建多进程

import os, time
# linux系统中
result = os.fork()

# 父进程运行时result为子进程的pid
# 子进程运行时这个result就是0
print("outerside pid is:", result)

if result == 0:
    print("child process")
    #time.sleep(60)
    print("child pid is:", os.getpid())
    print("child-parent pid is:", os.getppid())
else:
    print("parent process")
    #time.sleep(60)
    print("parent pid is:", os.getpid())

僵尸进程: 子进程退出,父进程没有调用wait或者waitpid取获取子进程的状态 --> 无time.sleep()
那么这个子进程的进程描述符就依然存在系统中,这种进程称之为僵尸进程

孤儿进程: 父进程退出,子进程还在运行,那么这个子进程就会称为孤儿进程 --> 取消父进程的sleep(60)注释,保留子进程的sleep(60)
孤儿进程会被pid为1的进程所收养


2.2 多进程的使用

import multiprocessing
from multiprocessing import Process, current_process
import time

lst = []
def task(i):
    print(current_process().name, i, 'start...')
    time.sleep(2)
    lst.append(i)
    print(lst)
    print(current_process().name, i, 'end...')

if __name__ == '__main__':
    for i in range(10):
        p = Process(target=task, args=(i,))
        p.start()
        
# 进程之间资源隔离

# Process-4 3 start...
# Process-5 4 start...
# Process-2 1 start...
# Process-1 0 start...
# Process-6 5 start...
# Process-8 7 start...
# Process-10 9 start...
# Process-3 2 start...
# Process-9 8 start...
# Process-7 6 start...
# [3]
# Process-4 3 end...
# [4]
# Process-5 4 end...
# [1]
# Process-2 1 end...
# [0]
# Process-1 0 end...
# [5]
# Process-6 5 end...
# [7]
# Process-8 7 end...
# [2][9]
#
# Process-10 9 end...
# Process-3 2 end...
# [8]
# Process-9 8 end...
# [6]
# Process-7 6 end...

2.3 自定义进程类

import multiprocessing
class MyProcess(multiprocessing.Process):
    def __init__(self, i):
        super().__init__()
        self.i = i

    def run(self):
        print(self.name, self.i, 'start...')
        print(self.name, self.i, 'end...')

if __name__ == '__main__':
    for i in range(10):
        p = MyProcess(i)
        p.start()

2.4 进程通信方式

2.4.1 管道(pipe)

传递二进制数据流,消息之间没有明确界限
半双工的通信方式,本质上就是内核空间中固定大小的缓冲区 (只能从一边到另一边)
匿名管道 适用有亲缘关系的进程
命名管道 无亲缘关系也可以进行访问

2.4.2 消息队列(message queues)

消息队列是保存在内核中的消息链表,有明确的界限,支持多种数据类型传入

发送方和接收方不需要同时存在,消息可持久化

2.4.3 信号量(semaphores)

信号量就是一个计数器,用于控制最多n个进程对共享资源访问
p 操作(申请资源)-> 将信号量的值减 1
v 操作(释放资源) -> 将信号量的值加 1

若信号量 ≥ 0,表示资源可用,进程继续执行
若信号量 < 0,表示资源已被耗尽,进程被阻塞并放入等待队列

2.4.4 共享内存(shared memory)

多个进程通过映射共享同一片物理内存区域,这是最快的进程通信(IPC)方式
直接读写速度最快
配合信号量或者互斥锁来使用

2.4.5 信号(signal)

信号是最古老的进程通信方式,是一种异步通知机制,用来通知进程,控制进程的一些行为

ctrl + c 停止信号
ctrl + z 终止信号

2.4.5 socket通信

套接字,通常用于不同主机之间的通信
支持全双工通信,数据按字节流传输

分布式系统,跨网络通信


2.5 进程池

有效的降低频繁创建销毁线程多带来的额外开销

from multiprocessing import Pool, current_process
import time

lst = []
def task(i):
    print(current_process().name, i, 'start...')
    time.sleep(1)
    lst.append(i)
    print(lst)
    print(current_process().name, i, 'end...')
# 每个进程独立空间,互相隔离
if __name__ == "__main__":
    # 创建进程池,建议进程数和cpu核数一致
    # maxtaskperchild   指定每个子进程最多可以处理多少任务,防止过多的内存占用
    p = Pool(processes=4, maxtasksperchild=3)
    for i in range(20):
        # 进程池接受任务
        p.apply_async(func=task, args=(i,))
    # 关闭进程池,不接受任务
    p.close()
    # 阻塞当前环境,直到p子进程执行完成。如果没有join,父进程退出,子进程也会退出
    p.join()
    print("end...")

三、协程(coroutine)

协程是一种用户态的轻量级线程,协程的调度完全由用户控制。协程拥有自己的寄存器上下文和栈。又称为微线程,纤程

import asyncio

async def func1():
    print(1)
    await asyncio.sleep(1)
    print(2)

async def func2():
    print(3)
    await asyncio.sleep(1)
    print(4)

# 创建任务列表
tasks = [asyncio.ensure_future(func1()), asyncio.ensure_future(func2())]

# 生成事件循环 -- 监听
loop = asyncio.get_event_loop()
# 运行
loop.run_until_complete(asyncio.wait(tasks))

# 依次输出:1 3 2 4

总结

并发编程是提升 Python 程序效率的核心手段
选择方式需遵循 “任务类型优先” 原则:

  • CPU 密集型→多进程
  • I/O 密集型→协程(高并发)或多线程(简单场景)

同时,需注意同步机制(避免资源竞争)、GIL 限制(多线程的局限性)、进程 / 线程开销(控制数量)等问题,才能写出高效、可靠的并发程序


网站公告

今日签到

点亮在社区的每一天
去签到