Python多线程编程

发布于:2024-05-06 ⋅ 阅读:(26) ⋅ 点赞:(0)

1、python并发模型介绍

Python并发编程是指在Python中同时执行多个任务的技术。在传统的程序中,任务是按照顺序依次执行的,而在并发编程中,多个任务可以同时执行。

1.1、python支持的并发模型

  1. 多线程:使用threading模块可以创建和管理多个线程。可以使用Thread类来创建线程,并使用start()方法启动线程。使用join()方法可以等待线程执行完毕。
  2. 使用multiprocessing模块可以创建和管理多个进程。可以使用Process类来创建进程,并使用start()方法启动进程。使用join()方法可以等待进程执行完毕。
  3. 协程:使用asyncio模块可以编写协程代码。使用async关键字定义一个协程函数,使用await关键字可以等待一个协程执行完毕。

本文主要讲述python多线程并发模型。

1.2、与java多线程模型的区别:

  • Python与java都支持基于线程与锁的并发模型。然而,由于GIL(全局解释器锁)的存在,Python在多线程并发编程中性能受限。而Java的多线程性能相对较好,能够充分利用多核处理器的优势。
  • 库和生态系统:Java拥有丰富的并发编程库和框架,如线程池、CountDownLatch、CyclicBarrier等,可以很方便地处理各种并发场景。而Python的并发编程库相对较少,主要的库有threading、multiprocessing和asyncio等,比Java的并发编程库要简单一些。
  • GIL对I/O密集型任务的影响较小:对于I/O密集型的任务,由于线程在等待I/O操作时会释放GIL,因此多线程编程的性能可能会有所提升。这是因为线程在等待I/O时可以让出GIL的控制权,让其他线程有机会执行。

综上所述:Python与java都支持并发编程,然而,python的全局解释器锁对Python的多线程编程产生了一些限制。但Python提供了一些替代方案来实现并发编程,如多进程编程(使用multiprocessing模块)和协程(使用asyncio模块),这些技术可以绕过GIL的限制,实现并发执行。因此,Python的线程模型适用于IO密集型任务,而对于CPU密集型任务,由于GIL的存在,多线程并不一定能够提高性能。对于CPU密集型任务,建议使用多进程并发模型。

2、多线程编程语法

在Python中,可以使用threading模块来实现多线程编程。

以下是threading.Thread类常用的属性和方法:

Thread的属性
name 线程名称。可以通过构造函数的name参数指定,也可以通过getName()setName()方法来获取和设置
ident 线程的标识符
daemon 线程是否为守护线程。守护线程会在主线程结束时自动退出,而非守护线程会等待所有非守护线程执行完毕才会退出
target 线程要执行的目标函数
args 传递给目标函数的参数,以元组的形式
Thread的方法
start() 启动线程,使线程进入就绪状态,可以开始执行target指定的函数。
join(timeout=None) 等待线程结束。可以设置timeout参数来设置等待时间
is_alive() 判断线程是否处于活动状态
getName() 获取线程名称
setName(name) 设置线程名称

下面是一个简单的示例:

import threading

def thread_function(name):
    print("Thread {} started".format(name))
    # 线程执行的代码
    print("Thread {} finished".format(name))

# 创建线程对象
thread1 = threading.Thread(target=thread_function, args=("Thread 1",))
thread2 = threading.Thread(target=thread_function, args=("Thread 2",))

# 启动线程
thread1.start()
thread2.start()

# 等待线程执行完毕
thread1.join()
thread2.join()

print("All threads finished")
 

3、多线程编程实例

3.1、多线程下载网页内容(IO密集型,最适合多线程模式)

import threading
import requests

def download(url):
    response = requests.get(url)
    print(f"Downloaded {url}: {len(response.content)} bytes")

# 创建待下载的网页列表
urls = [
    "https://www.baidu.com",
    "https://www.github.com",
    "https://www.python.org"
]

# 创建线程列表
threads = []

# 创建并启动线程
for url in urls:
    thread = threading.Thread(target=download, args=(url,))
    thread.start()
    threads.append(thread)

# 等待所有线程执行完毕
for thread in threads:
    thread.join()

print("All downloads finished")

3.2、线程死锁演示

要懂得线程为什么死锁,我们最好先知道怎样的线程并发会导致死锁。其实,死锁的本质是由于,多条线程以不一致的顺序访问临界资源。

import threading

# 创建互斥锁
lock1 = threading.Lock()
lock2 = threading.Lock()


# 定义线程1
def thread1():
    # 获取锁1
    lock1.acquire()
    print("Thread 1 acquired lock 1")

    # 延迟一段时间,模拟线程1在处理一些任务
    import time
    time.sleep(1)

    # 获取锁2
    lock2.acquire()
    print("Thread 1 acquired lock 2")

    # 释放锁2
    lock2.release()

    # 释放锁1
    lock1.release()


# 定义线程2
def thread2():
    # 获取锁2
    lock2.acquire()
    print("Thread 2 acquired lock 2")

    # 延迟一段时间,模拟线程2在处理一些任务
    import time
    time.sleep(1)

    # 获取锁1
    lock1.acquire()
    print("Thread 2 acquired lock 1")

    # 释放锁1
    lock1.release()

    # 释放锁2
    lock2.release()


# 创建线程1和线程2
t1 = threading.Thread(target=thread1)
t2 = threading.Thread(target=thread2)

# 启动线程1和线程2
t1.start()
t2.start()

# 等待线程1和线程2执行完毕
t1.join()
t2.join()

# 死锁,下面打印将不会输出
print("threads dead lock, both waiting for each other")

线程1已经获取锁1,期待得到锁2;线程2已经获取锁2,期待得到锁1。两条线程以相反的顺序获取竞争资源,互不谦让, 导致死锁。

解决方案之一,可以通过计算两个锁的hash值,确保锁的访问顺序与hash值的大小顺序相同(若hash相同再采用别的指标)。

3.3、生产者消费者模型

每次涉及到并发情景,都喜欢用生产者消费者模式,因为它太经典啦

2个面包师同时生产面包,5个顾客同时取面包。

import threading
import time
import random
from queue import Queue

# 创建一个队列作为生产者和消费者之间的共享缓冲区
buffer = Queue(maxsize=5)
persons = []


class Producer(threading.Thread):
    number = 0

    def __init__(self, id):
        super().__init__()
        self.id = id

    def run(self):
        for _ in range(10):
            buffer.put(Producer.number)
            Producer.number += 1
            print(f"Producer {self.id} produced {Producer.number}")
            time.sleep(random.randint(1, 3))


class Consumer(threading.Thread):
    def __init__(self, id):
        super().__init__()
        self.id = id

    def run(self):
        while True:
            item = buffer.get()
            print(f"Consumer {self.id} consumed {item}")
            time.sleep(random.randint(3, 5))


# 创建并启动生产者线程
for i in range(2):
    producer = Producer(i)
    persons.append(producer)
    producer.start()

# 创建并启动消费者线程
for i in range(5):
    consumer = Consumer(i)
    persons.append(consumer)
    consumer.start()

# 主线程等待生产者和消费者线程执行完毕
for person in persons:
    person.join()

print("All items have been produced and consumed")

3.4、实现CountDownLatch

JavaJUC包里有一个工具类CountDownLatch,用于控制一个或多个线程等待其他线程完成操作后再继续执行。在Python里,我们也可以通过threading模块的Thread类和Condition对象来实现。

import threading


class CountDownLatch:
    def __init__(self, count):
        self.count = count
        self.lock = threading.Condition()

    def count_down(self):
        with self.lock:
            self.count -= 1
            if self.count <= 0:
                self.lock.notify_all()

    def wait(self):
        with self.lock:
            while self.count > 0:
                self.lock.wait()


def worker(latch):
    print("Worker started")
    # 模拟工作时间
    import time
    time.sleep(1)
    print("Worker finished")
    latch.count_down()


if __name__ == "__main__":
    # 创建一个CountDownLatch对象,初始计数为2
    latch = CountDownLatch(2)
    # 创建2个线程并启动
    threading.Thread(target=worker, args=(latch,)).start()
    threading.Thread(target=worker, args=(latch,)).start()
    # 等待所有线程完成工作
    latch.wait()
    print("All workers finished")

3.5、两条线程依次打印ABA或者BAB

主要通过互斥量演示线程间的协助

import threading


class Worker(threading.Thread):
    def __init__(self, name, mutex):
        super().__init__()
        self.name = name
        self.lock = mutex

    def run(self):
        while True:
            with self.lock:
                print(self.name)
                self.lock.notify_all()
                self.lock.wait()


if __name__ == "__main__":
    lock = threading.Condition()
    worker1 = Worker("A", lock)
    worker2 = Worker("B", lock)

    worker1.start()
    worker2.start()

    worker1.join()
    worker2.join()