目录
在网络开发中,一台服务器在同一时间内往往需要服务成千上万个客户端,因此并发编程应运而生,并发是大数据运算和网络编程必须考虑的问题。实现并发的方式有多种,如多进程、多线程、协程等,Python 支持多进程、多线程、协程技术,能够实现在同一时间内运行多个任务。本文将介绍 Python 线程的工作机制和基本应用。
【学习重点】
- 了解什么是进程和线程
- 掌握正确创建线程的方法
- 使用线程锁
- 熟悉线程之间的通信方式
一、概念简单回顾
在 《100天精通Python——基础篇 2025 第19天:并发编程启蒙——理解CPU、线程与进程的那些事》一文中我们详细讲解了一些基础概念,包括操作系统、CPU、进程、线程等,由本文开始正式进入编程阶段,故先对其中的一些概念进行一下简单的复习。
并发和并行区别:
- 并行,Parallelism: 同时做某些事,可以互不干扰的同一个时刻做几件事
- 并发,Concurrency: 也是同时做某些事,但是强调,一个时段内有事情要处理。
- 举例: 高速公路的车道,双向4车道,所有车辆(数据)可以互不干扰的在自己的车道上奔跑(传输)。在同一个时刻,每条车道上可能同时有车辆在跑,是同时发生的概念,这是并行。在一段时间内,有这么多车要通过,这是并发。并行不过是使用水平扩展方式解决并发的一种手段而已。
进程和线程:
- 进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是操作系统进行资源分配和调度的基本单位,是操作系统结构的基础。
- 进程和程序的关系:程序是源代码编译后的文件,而这些文件存放在磁盘上。当程序被操作系统加载到内存中,就是进程,进程中存放着指令和数据(资源)。一个程序的执行实例就是一个进程。它也是线程的容器。Linux 进程有父进程、子进程,Windows 的进程是平等关系。
- 在实现了线程的操作系统中,线程是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。线程,有时被称为轻量级进程(Lightweight Process,LWP),是程序执行流的最小单元。一个标准的线程由线程 ID,当前指令指针(PC)、寄存器集合和堆、栈组成(前面也提到过)。在许多系统中,创建一个线程比创建一个进程快
10-100
倍。 - 进程、线程的理解: 现代操作系统提出进程的概念,每一个进程都认为自己独占所有的计算机硬件资源。进程就是独立的王国,进程间不可以随便的共享数据。线程就是省份,同一个进程内的线程可以共享进程的资源,每一个线程拥有自己独立的堆栈。
线程的状态:
就绪(Ready): 线程能够运行,但在等待被调度。可能线程刚刚创建启动,或刚刚从阻塞中恢复,或者被其他线程抢占
运行(Running): 线程正在运行
阻塞(Blocked): 线程等待外部事件发生而无法运行,如 I/O 操作
终止(Terminated): 线程完成,或退出,或被取消
Python 中的进程和线程: 运行程序会启动一个解释器进程,线程共享一个解释器进程。
二、Python的线程开发
Python 的线程开发使用标准库 threading
。 进程靠线程执行代码,至少有一个主线程,其它线程是工作线程。主线程是第一个启动的线程。父线程: 如果线程A中启动了一个线程B,A就是B的父线程。子线程: B就是A的子线程。
2.1 Thread类
Python 中的线程是通过 threading 模块来实现的。其核心是 Thread 类,用于创建并管理线程。
import threading
Thread 类参数详解:
In [2]: threading.Thread?
Init signature:
threading.Thread(
group=None,
target=None,
name=None,
args=(),
kwargs=None,
*,
daemon=None,
)
# 参数说明:
# 1.group: 保留参数,一般设置为None
# 2.target: 线程启动后要执行的任务,即函数(写函数名)
# 3.name: 线程名,后续可以使用线程对象.name属性查看
# 4.args与kwargs: 传递给函数的参数,与正常函数传参是一样的,args传给target的位置参数,kwargs传给target的关键字参数
# 5.daemon: daemon是否为daemon(守护)线程,为True: 主线程结束时,子线程也会自动退出(daemon线程--守护线程),
# False: 主线程会等待子线程执行完毕,与缺省参数None效果是一样的,后续会专门有一小节详细讲解daemon线程与non-daemon线程
Docstring:
A class that represents a thread of control.
This class can be safely subclassed in a limited fashion. There are two ways
to specify the activity: by passing a callable object to the constructor, or
by overriding the run() method in a subclass.
Init docstring:
This constructor should always be called with keyword arguments. Arguments are:
*group* should be None; reserved for future extension when a ThreadGroup
class is implemented.
*target* is the callable object to be invoked by the run()
method. Defaults to None, meaning nothing is called.
*name* is the thread name. By default, a unique name is constructed of
the form "Thread-N" where N is a small decimal number.
*args* is a list or tuple of arguments for the target invocation. Defaults to ().
2.1.1 线程启动
# -*- coding: utf-8 -*-
# @Time : 2025-05-20 10:54
# @Author : AmoXiang
# @File : 1.线程的启动.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading
# 最简单的线程程序
def worker():
print('I"m working')
print('Finished')
t = threading.Thread(target=worker, name='worker') # 线程对象
t.start() # 启动
通过 threading.Thread 创建一个线程对象,target 是目标函数,可以使用 name 为线程指定名称。但是线程没有启动,需要调用 start 方法。线程之所以执行函数,是因为线程中就是要执行代码的,而最简单的代码封装就是函数,所以还是函数调用。函数执行完,线程也就退出了。那么,如果不让线程退出,或者让线程一直工作怎么办呢?
# -*- coding: utf-8 -*-
# @Time : 2025-05-20 10:54
# @Author : AmoXiang
# @File : 1.线程的启动.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading
import time
# 最简单的线程程序
def worker():
# 使用while True 可以让线程一直工作下去
while True: # for i in range(10):
time.sleep(0.5)
print('I"m working')
print('Finished')
t = threading.Thread(target=worker, name='worker') # 线程对象
t.start() # 启动
print('~' * 30) # 注意观察这行~是什么时候打印的?
控制台会先打印 '~'
,你可以这样理解,线程启动的一瞬间,我通知你了,你要去干 worker 的活,那通知到位之后,你怎么去处理我是不管的,我继续执行我后续的任务,所以在这里你可能会先看到 '~'
,当然,如果在 worker 函数中,while True 下我们如果没有设置延时操作,即 time.sleep(0.5),那么该函数中的 print('I"m working')
可能会与 print('~' * 30)
争抢控制台,所以你能先看到 I"m working
的结果也不一定。
2.1.2 线程退出
Python 没有提供线程退出的方法,线程一般在下面情况时退出:
- 线程函数内语句执行完毕
- 线程函数中抛出未处理的异常
示例代码:
# -*- coding: utf-8 -*-
# @Time : 2025-05-20 11:09
# @Author : AmoXiang
# @File : 2.线程的退出.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading
import time
def worker():
for i in range(10):
time.sleep(0.5)
if i > 5:
# break # 终止循环
# return # 函数返回 finished 这个在打印的时候就看不到了
# raise 1 / 0 # 抛异常
raise RuntimeError # 抛异常
print('I am working')
print('finished')
t = threading.Thread(target=worker, name='worker')
t.start()
print('~' * 30)
Python 的线程没有优先级、没有线程组的概念,也不能被销毁、停止、挂起,那也就没有恢复、中断了。
2.1.3 线程的传参
线程传参和函数传参没什么区别,本质上就是函数传参。示例:
# -*- coding: utf-8 -*-
# @Time : 2025-05-20 11:14
# @Author : AmoXiang
# @File : 3.线程的传参.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading, time
def add(x, y):
print(f'{x} + {y} = {x + y}')
t1 = threading.Thread(target=add, args=(1, 2))
t1.start()
time.sleep(0.5)
t2 = threading.Thread(target=add, kwargs={'x': 3, 'y': 4})
t2.start()
time.sleep(0.5)
# 传参会报错哦: TypeError: add() got multiple values for argument 'x'
# 前面的知识点,这里不再赘述
# t3 = threading.Thread(target=add, args=(6,), kwargs={'x': 5})
# t3 = threading.Thread(target=add, kwargs={'x': 5}, args=(6,))
# 正确写法
t3 = threading.Thread(target=add, args=(6,), kwargs={'y': 5})
t3.start()
time.sleep(0.5)
2.1.4 threading的属性和方法
下面是 Python threading 模块中常用的一些 函数
的详细说明:
函数名 | 说明 |
---|---|
threading.active_count() |
返回当前活动线程的数量(包括主线程) |
threading.current_thread() |
返回当前调用者的控制线程的 Thread 对象 |
threading.enumerate() |
以列表形式返回当前所有存活的 Thread 对象 |
threading.get_ident() |
返回当前线程的 "线程标识符" (为一个非0整数,唯一) |
threading.get_native_id() |
返回操作系统分配的原生线程 ID(Python 3.8+) |
threading.main_thread() |
返回主 Thread 对象。一般情况下,主线程是 Python 解释器开始时创建的线程。 |
threading.settrace(func) |
为所有 threading 模块开始的线程设置追踪函数。在每个线程的 run() 方法被调用前,func 会被传递给 sys.settrace()。-----用的较少 |
threading.setprofile(func) |
为所有 threading 模块开始的线程设置性能测试函数。在每个线程的 run() 方法被调用前,func 会被传递给 sys.setprofile()。-----用的较少 |
threading.stack_size([size]) |
返回创建线程时用的堆栈大小。 |
示例1:
# -*- coding: utf-8 -*-
# @Time : 2025-05-20 11:43
# @Author : AmoXiang
# @File : 4.threading的属性和方法.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading
import time
def show_thread_info():
print('current thread = {}\nmain thread = {}\nactive count = {}'.format(threading.current_thread(),
threading.main_thread(),
threading.active_count()))
def worker():
show_thread_info()
for i in range(5):
time.sleep(1)
print('i am working')
print('finished')
t = threading.Thread(target=worker, name='worker') # 线程对象
show_thread_info()
time.sleep(1)
t.start() # 启动
print('===end===')
示例2:
# -*- coding: utf-8 -*-
# @Time : 2025-05-20 11:49
# @Author : AmoXiang
# @File : 4.threading的属性和方法2.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading
def main():
print(threading.active_count()) # 活动线程数
print(threading.enumerate()) # 存活的线程对象列表
print(threading.get_ident()) # 返回当前线程的"线程标识符"
print(threading.get_native_id())
print(threading.current_thread()) # 返回当前调用的线程对象
print(threading.main_thread()) # 主线程
print(threading.stack_size()) # 线程的堆栈大小
"""
1
[<_MainThread(MainThread, started 19868)>]
19868
19868
<_MainThread(MainThread, started 19868)>
<_MainThread(MainThread, started 19868)>
0
"""
if __name__ == "__main__":
main()
2.1.5 Thread实例的属性和方法
Thread 对象也包含多个实例方法,简单说明如下:
属性/方法 | 类型 | 说明 |
---|---|---|
start() |
方法 | 启动线程,调用后线程开始执行 |
run() |
方法 | 线程执行的具体逻辑(通常不手动调用) |
join([timeout]) |
方法 | 等待至线程中止或者指定的时间,时间由参数指定,单位为秒 |
is_alive() |
方法 | 返回线程是否处于活动的状态 |
name |
属性 | 线程名称,可读写 |
ident |
属性 | Python 层的线程 ID,创建后才有 |
native_id |
属性 | 操作系统线程 ID(Python 3.8+) |
daemon |
属性 | 是否为守护线程,必须在 start() 前设置 |
isDaemon() / setDaemon() |
方法 | 获取/设置守护线程(已被 daemon 属性取代) |
示例:
# -*- coding: utf-8 -*-
# @Time : 2025-05-20 11:56
# @Author : AmoXiang
# @File : 5.Thread实例的属性和方法.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading
import time
def worker():
for i in range(5):
time.sleep(1)
print('i am working')
print('finished')
t = threading.Thread(target=worker, name='worker') # 线程对象
# 返回结果: # worker None None 说明此刻线程还未真正的被创建
print(t.name, t.ident, t.native_id)
time.sleep(1)
t.start()
print('=====end=====')
while True:
time.sleep(1)
# 控制台输出会争抢 你会看到错乱显示,这里可以使用logging模块替代print()函数
print('{} {} {}'.format(t.name, t.ident, 'alive' if t.is_alive() else 'dead'))
if not t.is_alive():
print('{} restart'.format(t.name))
# RuntimeError: threads can only be started once
t.start() # 线程重启??此处会报错,后续会详细讲解Thread实例的start()方法与run()方法
2.1.6 start和run方法
threading.Thread 类,Thread 是 Python 提供的线程封装类。可以通过创建 Thread 对象并传入 target=函数
启动线程;也可以通过继承 Thread 并重写 run() 方法
来定义线程行为。
# -*- coding: utf-8 -*-
# @Time : 2025-05-20 14:28
# @Author : AmoXiang
# @File : 6.自定义线程类写法.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading
import time
class SubThread(threading.Thread):
def run(self):
for i in range(3):
time.sleep(1)
msg = "子线程" + self.name + '执行,i=' + str(i) # name属性中保存的是当前线程的名字
print(msg)
if __name__ == '__main__':
print('-----主线程开始-----')
t1 = SubThread() # 创建子线程t1
t2 = SubThread() # 创建子线程t2
t1.start() # 启动子线程t1
t2.start() # 启动子线程t2
'''
-----主线程开始-----
-----主线程结束-----
子线程Thread-1执行,i=0子线程Thread-2执行,i=0
子线程Thread-2执行,i=1
子线程Thread-1执行,i=1
子线程Thread-1执行,i=2子线程Thread-2执行,i=2
'''
print('-----主线程结束-----')
start()方法 与 run() 方法的本质区别:
方法 | 作用 | 调用方式 | 是否创建新线程 |
---|---|---|---|
start() |
真正启动线程 | 系统自动调用 run() |
✅ 是(创建新线程) |
run() |
定义线程任务逻辑 | 手动调用不会新建线程 | ❌ 否(在主线程中运行) |
示例代码:
# -*- coding: utf-8 -*-
# @Time : 2025-05-20 13:28
# @Author : AmoXiang
# @File : 6.start和run方法.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading
import time
def worker():
for i in range(5):
time.sleep(1)
print('i am working')
print('finished')
class SubThread(threading.Thread):
def start(self):
# 线程对象调用start()方法,会走这里逻辑,我们不知道怎么去系统调用创建一个真正的线程
# 所以如果什么都不做的话,那么真正的线程不会被创建,你可以自己进行测试
# 想要创建真正的线程,我们就去调用父类的start()方法,让它去系统调用创建真正的线程
print('start ~~~')
super(SubThread, self).start()
def run(self):
# run() 方法是线程真正要做的任务逻辑,可以通过传函数(target)或者继承重写
# 同理,如果你光打印,什么都不做,最后虽然你能看到多个线程,可是你看不到该线程干的啥活
# 最简单的方式直接调用父类的run()方法,让它处理
print('run ~~~')
super().run()
def main():
# 测试
print('-----主线程开始-----')
t = SubThread(target=worker, name='worker') # 线程对象
print('before t.start,active_count==> {}'.format(threading.active_count()))
# TODO 测试1.正确方式: 启动线程,会自动在新线程中调用 run()
# t.start() # 启动
# TODO 测试2.错误/不推荐: 手动调用 run()
t.run() # 这不会开启新线程,而是当前线程同步执行
# TODO 测试3.第2次启动线程
# t.start() # RuntimeError: threads can only be started once
# TODO 测试4.两次手动调用run()方法或者先启动线程,即调用start()方法后再次调用run,也会报错
t.run() # AttributeError: 'SubThread' object has no attribute '_target'
print('after t.start,active_count==> {}'.format(threading.active_count()))
print('-----主线程结束-----')
if __name__ == '__main__':
main()
去源码中看看 t.start() 只能调用一次的原因,跟进到父类的 start() 方法,如下:
def start(self):
"""Start the thread's activity.
It must be called at most once per thread object. It arranges for the
object's run() method to be invoked in a separate thread of control.
This method will raise a RuntimeError if called more than once on the
same thread object.
"""
if not self._initialized:
raise RuntimeError("thread.__init__() not called")
# 根据 RuntimeError: threads can only be started once 错误提示,我们可以推断到此处
# 如果self._started.is_set()为False,正常执行,self._started.is_set()返回True,则抛出异常。
if self._started.is_set():
raise RuntimeError("threads can only be started once")
with _active_limbo_lock:
_limbo[self] = self
try:
_start_new_thread(self._bootstrap, ())
except Exception:
with _active_limbo_lock:
del _limbo[self]
raise
self._started.wait()
我们来看看 self._started.is_set() 这玩意是啥:
# 在 Thread 类 __init__ 初始化方法中有这样的定义
self._started = Event()
# 定位 Event,发现是一个类,类初始化方法中有下面这样的定义
self._flag = False
# 并且发现 is_set 是 Event 类中的一个方法
def is_set(self):
"""Return true if and only if the internal flag is true."""
return self._flag
走到这里我们可以得出,第一次进入到 start() 方法中的时候,self._started.is_set() 调用返回的是 Event 类初始化时候的值,即 False,所以不会进入到 if 语句中,就不会抛出异常,那这里我们就可以猜想,在后续的操作中,是不是有哪个地方更改了这个 _flag 值,对 self._started 要留点心思,我们接着往 start() 方法下面的逻辑往后看:
# 这个函数看名字太明显了 开启一个新的线程
_start_new_thread(self._bootstrap, ())
# _bootstrap是一个函数,里面实际又调用的是 _bootstrap_inner
def _bootstrap(self):
try:
self._bootstrap_inner()
except:
if self._daemonic and _sys is None:
return
raise
# 看看: _bootstrap_inner
def _bootstrap_inner(self):
try:
self._set_ident()
self._set_tstate_lock()
if _HAVE_THREAD_NATIVE_ID:
self._set_native_id()
# 这里我们看到再次使用了 self._started 这个 Event 实例对象
# 调用了它的set()方法,我们定位到set()方法,看它里面做了些啥
self._started.set()
with _active_limbo_lock:
_active[self._ident] = self
del _limbo[self]
if _trace_hook:
_sys.settrace(_trace_hook)
if _profile_hook:
_sys.setprofile(_profile_hook)
try:
self.run()
except:
self._invoke_excepthook(self)
finally:
self._delete()
def set(self):
with self._cond:
# 破案了,太明显了,这个操作,也就是说我们调用了一次start方法之后,Thread对象的属性_started 指向的Event实例对象,
# 会将其属性_flag 置为True,再次调用start方法时,又根据这个值判断,是否抛出异常
# 那么前面我们已经讲解到,为True时,抛出异常,故多次调用start()方法会抛异常就是这么来的
self._flag = True
self._cond.notify_all()
多次调用 start() 方法报错,我们分析完毕了,接下来我们去看看多次调用 run() 方法为何报错?这个相对来说就比较简单了:
# 在函数_bootstrap_inner中,我们可以看到run()方法被调用
try:
self.run()
except:
self._invoke_excepthook(self)
# run()方法定义
def run(self):
try:
# 实例化Thread对象时,传入的target参数,即函数名称
# self._target ⇒ self._target = target
if self._target is not None:
self._target(*self._args, **self._kwargs)
finally:
del self._target, self._args, self._kwargs
run() 方法的逻辑很明显,使用 try...finally
结构,finally 块中的代码是一定会执行的,其逻辑是使用 del 关键字删除对象身上的 _target
属性,即我们第一次调用 run() 方法时,对象身上的 _target
属性被删除,当再次调用 run() 方法时,对象身上已经没有了 _target
属性,而在 if 判断语句中去取对象的 _target
属性,故会报 AttributeError 属性错误。
2.2 多线程
顾名思义,多个线程,一个进程中如果有多个线程运行,就是多线程,实现一种并发。其实在之前我们的演练过程中,就已经出现多线程,一个主线程与其他工作的线程,示例代码:
# -*- coding: utf-8 -*-
# @Time : 2025-05-20 15:11
# @Author : AmoXiang
# @File : 7.多线程.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading
import time
import sys
def worker(f=sys.stdout):
t = threading.current_thread()
for i in range(5):
time.sleep(1)
print('i am working', t.name, t.ident, file=f)
print('finished', file=f)
t1 = threading.Thread(target=worker, name='worker1')
t2 = threading.Thread(target=worker, name='worker2', args=(sys.stderr,))
t1.start()
t2.start()
可以看到 worker1 和 worker2 交替执行。当使用 start 方法启动线程后,进程内有多个活动的线程并行的工作,就是多线程。一个进程中至少有一个线程,并作为程序的入口,这个线程就是主线程。一个进程至少有一个主线程,其他线程称为工作线程。
2.3 线程安全
多线程执行一段代码,不会产生不确定的结果,那这段代码就是线程安全的。多线程在运行过程中,由于共享同一进程中的数据,多线程并发使用同一个数据,那么数据就有可能被相互修改,从而导致某些时刻无法确定这个数据的值,最终随着多线程运行,运行结果不可预期,这就是线程不安全。------在文章后面部分会进行详细地讲解
2.4 daemon线程
daemon 线程,有人翻译成后台线程,也有人翻译成守护线程。Python 中,构造线程的时候,可以设置 daemon 属性,这个属性必须在 start() 方法前设置好。源码 Thread 类的 __init__()
方法中:
if daemon is not None:
if daemon and not _daemon_threads_allowed():
raise RuntimeError('daemon threads are disabled in this (sub)interpreter')
self._daemonic = daemon
else:
self._daemonic = current_thread().daemon
线程 daemon 属性,如果设定就是用户的设置,否则就取当前线程的 daemon 值。主线程是 non-daemon 线程,即 daemon = False。
class _MainThread(Thread):
def __init__(self):
Thread.__init__(self, name="MainThread", daemon=False)
self._set_tstate_lock()
self._started.set()
self._set_ident()
if _HAVE_THREAD_NATIVE_ID:
self._set_native_id()
with _active_limbo_lock:
_active[self._ident] = self
示例代码:
# -*- coding: utf-8 -*-
# @Time : 2025-05-20 16:47
# @Author : AmoXiang
# @File : 7.daemon线程.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import time
import threading
def foo():
time.sleep(5)
for i in range(5):
print(i)
# 主线程是non-daemon线程
t1 = threading.Thread(target=foo, daemon=False)
t1.start()
'''
RuntimeError: cannot set daemon status of active thread
daemon属性: 表示线程是否是daemon线程,这个值必须在start()之前设置,否则引发RuntimeError异常
==> t1.daemon = True
isDaemon(): 是否是daemon线程
setDaemon(): 设置为daemon线程,必须在start方法之前设置 已经被daemon属性替代了
'''
print('Main Thread Exits')
发现线程 t1 依然执行,主线程已经执行完,但是一直等着线程 t1。修改为 t1 = threading.Thread(target=foo, daemon=True)
试一试,结果程序立即结束了,进程根本没有等 daemon 线程 t1。看一个例子,看看主线程何时结束 daemon 线程:
# -*- coding: utf-8 -*-
# @Time : 2025-05-20 16:58
# @Author : AmoXiang
# @File : 7.daemon线程2.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import time
import threading
def worker(name, timeout):
time.sleep(timeout)
print('{} working'.format(name))
# 主线程 是non-daemon线程
'''
Main Thread Exits
t1 working
t2 working
'''
t1 = threading.Thread(target=worker, args=('t1', 2), daemon=True)
t1.start()
t2 = threading.Thread(target=worker, args=('t2', 3), daemon=False)
t2.start()
print('Main Thread Exits')
print('~' * 30)
# 调换2和3看看效果
'''
运行下面代码的时候记得将上面的给注释掉,调换2和3后,程序执行结果如下:
Main Thread Exits
t2 working
'''
t1 = threading.Thread(target=worker, args=('t1', 3), daemon=True)
t1.start()
t2 = threading.Thread(target=worker, args=('t2', 2), daemon=False)
t2.start()
print('Main Thread Exits')
上例说明,如果还有 non-daemon 线程在运行,进程不结束,进程也不会杀掉其它所有 daemon 线程。直到所有 non-daemon 线程全部运行结束(包括主线程),不管有没有 daemon 线程,程序退出。总结:
- 线程具有一个 daemon 属性,可以手动设置为 True 或 False,也可以不设置,则取默认值 None,如果不设置 daemon,就取当前线程的 daemon 来设置它
- 主线程是 non-daemon 线程,即 daemon = False
- 从主线程创建的所有线程的不设置 daemon 属性,则默认都是 daemon = False,也就是 non-daemon 线程
- Python 程序在没有活着的 non-daemon 线程运行时,程序退出,也就是除主线程之外剩下的只能都是 daemon 线程,主线程才能退出,否则主线程就只能等待
join() 方法,先看一个简单的例子,看看效果:
# -*- coding: utf-8 -*-
# @Time : 2025-05-20 17:08
# @Author : AmoXiang
# @File : 8.join()方法.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import time
import threading
def worker(name, timeout):
time.sleep(timeout)
print('{} working'.format(name))
t1 = threading.Thread(target=worker, args=('t1', 3), daemon=True)
t1.start()
'''
t1 working
Main Thread Exits
'''
t1.join() # 设置join,取消join对比一下
# 取消join() ==> Main Thread Exits
print('Main Thread Exits')
使用了 join() 方法后,当前线程阻塞了,daemon 线程执行完了,主线程才退出了。
# -*- coding: utf-8 -*-
# @Time : 2025-05-20 17:08
# @Author : AmoXiang
# @File : 8.join()方法.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import time
import threading
def worker(name, timeout):
time.sleep(timeout)
print('{} working'.format(name))
t1 = threading.Thread(target=worker, args=('t1', 10), daemon=True)
t1.start()
t1.join(2)
print('~~~~~~~~~~~')
t1.join(2)
print('~~~~~~~~~~~')
print('Main Thread Exits')
def join(self, timeout=None)
:
- join() 方法是线程的标准方法之一
- 一个线程中调用另一个线程的 join() 方法,调用者将被阻塞,直到被调用线程终止,或阻塞超时一个线程可以被 join 多次
- timeout 参数指定调用者等待多久,没有设置超时,就一直等到被调用线程结束
- 调用谁的 join() 方法,就是 join() 谁,就要等谁
daemon 线程应用场景,主要应用场景有:
- 后台任务。如发送心跳包、监控,这种场景最多
- 主线程工作才有用的线程。如主线程中维护这公共的资源,主线程已经清理了,准备退出,而工作线程使用这些资源工作也没有意义了,一起退出最合适
- 随时可以被终止的线程
如果主线程退出,想所有其它工作线程一起退出,就使用 daemon=True 来创建工作线程。比如,开启一个线程定时判断 WEB 服务是否正常工作,主线程退出,工作线程也没有必须存在了,应该随着主线程退出一起退出。这种 daemon 线程一旦创建,就可以忘记它了,只用关心主线程什么时候退出就行了。daemon 线程,简化了程序员手动关闭线程的工作。
2.5 threading.local类
# -*- coding: utf-8 -*-
# @Time : 2025-05-20 18:19
# @Author : AmoXiang
# @File : 9.threading.local类.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading
import time
import logging
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
def worker():
x = 0
for i in range(5):
time.sleep(0.2)
x += 1
logging.info(x)
def main():
for i in range(10):
threading.Thread(target=worker, name='t-{}'.format(i)).start()
if __name__ == '__main__':
main()
上例使用多线程,每个线程完成不同的计算任务。x 是局部变量,可以看出每一个线程的 x 是独立的,互不干扰的,为什么?因为 x 是定义在 worker() 函数内部的变量,是一个局部变量(local variable):
def worker():
x = 0 # 局部变量,每个线程各有一个副本
线程启动后,每个线程都会单独执行一次 worker() 函数,每个线程都有自己独立的栈空间(stack),函数内部的变量就存在于线程的私有栈中,互不干扰。能否改造成使用全局变量完成?
# -*- coding: utf-8 -*-
# @Time : 2025-05-20 19:20
# @Author : AmoXiang
# @File : 9.threading.local类2.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading
import time
import logging
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
class A:
def __init__(self):
self.x = 0
# 全局对象
global_data = A()
def worker():
global_data.x = 0
for i in range(10):
time.sleep(0.0001)
global_data.x += 1
logging.info(global_data.x)
def main():
for i in range(10):
threading.Thread(target=worker, name='t-{}'.format(i)).start()
if __name__ == '__main__':
main()
上例虽然使用了全局对象,但是线程之间互相干扰,导致了不期望的结果,即线程不安全。能不能既使用全局对象,还能保持每个线程使用不同的数据呢?python 提供 threading.local 类,将这个类实例化得到一个全局对象,但是不同的线程使用这个对象存储的数据其他线程看不见。
# 将上面示例23行代码改为下列代码
global_data = threading.local()
执行程序,结果显示和使用局部变量的效果一样。再看 threading.local 的例子:
# -*- coding: utf-8 -*-
# @Time : 2025-05-20 19:28
# @Author : AmoXiang
# @File : 9.threading.local类3.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading
import time
import logging
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
# 全局对象
X = 'abc'
global_data = threading.local()
global_data.x = 100
print(global_data, global_data.x)
print('~' * 30)
time.sleep(2)
def worker():
logging.info(X)
logging.info(global_data)
logging.info(global_data.x)
worker() # 普通函数调用
print('=' * 30)
time.sleep(2)
threading.Thread(target=worker, name='worker').start() # 启动一个线程
从运行结果来看,另起一个线程打印 global_data.x 出错了,如下图所示:
AttributeError: '_thread._local' object has no attribute 'x'
,但是,global_data 打印没有出错,说明看到 global_data,但是 global_data 中的 x 看不到,这个 x 不能跨线程。要想知道其中是怎么实现的,只能去看源码,那我们就去看看。整个 local 类的代码并不多,如下:
class local:
# __slots__参考《2.6 __slots__拓展》小节的讲解
__slots__ = '_local__impl', '__dict__'
def __new__(cls, /, *args, **kw):
if (args or kw) and (cls.__init__ is object.__init__):
raise TypeError("Initialization arguments are not supported")
self = object.__new__(cls)
impl = _localimpl()
impl.localargs = (args, kw)
impl.locallock = RLock()
object.__setattr__(self, '_local__impl', impl)
# We need to create the thread dict in anticipation of
# __init__ being called, to make sure we don't call it
# again ourselves.
impl.create_dict()
return self
def __getattribute__(self, name):
with _patch(self):
return object.__getattribute__(self, name)
def __setattr__(self, name, value):
if name == '__dict__':
raise AttributeError(
"%r object attribute '__dict__' is read-only"
% self.__class__.__name__)
with _patch(self):
return object.__setattr__(self, name, value)
def __delattr__(self, name):
if name == '__dict__':
raise AttributeError(
"%r object attribute '__dict__' is read-only"
% self.__class__.__name__)
with _patch(self):
return object.__delattr__(self, name)
这几个魔术方法在 《100天精通Python——基础篇 2025 第14天:深入掌握魔术方法与元类,玩转高级OOP技巧》我们就已经详细学习过,我这里简单提一下,如果有不懂的自己回去看看。在我们使用 threading.local() 时,首先会走 __new__()
方法中的逻辑,如下:
def __new__(cls, /, *args, **kw):
# 判断是否传入参数,如果有且没有重写__init__()方法,就会抛出异常,因为: 默认的 object.__init__() 是不接受任何参数的
if (args or kw) and (cls.__init__ is object.__init__):
raise TypeError("Initialization arguments are not supported")
# 调用父类的__new__()方法创建对象
self = object.__new__(cls)
# 跟进_localimpl,发现它是一个类,那这里相当于就是在实例化,大概扫一眼它的属性和方法
impl = _localimpl()
impl.localargs = (args, kw) # 给localargs属性赋值
impl.locallock = RLock() # 给locallock属性赋值, RLock我后面会讲
# 注意: 8-10行都是在操作impl实例
# 调用object基类的__setattr__方法为local实例添加属性,添加_local__impl属性,对应值为impl,
# 即上面_localimpl类实例化的实例
object.__setattr__(self, '_local__impl', impl)
# We need to create the thread dict in anticipation of
# __init__ being called, to make sure we don't call it
# again ourselves.
# 调用impl实例的create_dict()方法,接下来就把实例self返回了,所以我们要重点看create_dict()方法中的逻辑
impl.create_dict()
return self
class _localimpl:
"""A class managing thread-local dicts"""
__slots__ = 'key', 'dicts', 'localargs', 'locallock', '__weakref__'
def __init__(self):
# The key used in the Thread objects' attribute dicts.
# We keep it a string for speed but make it unlikely to clash with
# a "real" attribute.
self.key = '_threading_local._localimpl.' + str(id(self))
# { id(Thread) -> (ref(Thread), thread-local dict) }
self.dicts = {}
def get_dict(self):
"""Return the dict for the current thread. Raises KeyError if none
defined."""
thread = current_thread()
return self.dicts[id(thread)][1]
def create_dict(self):
"""Create a new dict for the current thread, and return it."""
localdict = {}
key = self.key
thread = current_thread()
idt = id(thread)
def local_deleted(_, key=key):
# When the localimpl is deleted, remove the thread attribute.
thread = wrthread()
if thread is not None:
del thread.__dict__[key]
def thread_deleted(_, idt=idt):
# When the thread is deleted, remove the local dict.
# Note that this is suboptimal if the thread object gets
# caught in a reference loop. We would like to be called
# as soon as the OS-level thread ends instead.
local = wrlocal()
if local is not None:
dct = local.dicts.pop(idt)
wrlocal = ref(self, local_deleted)
wrthread = ref(thread, thread_deleted)
thread.__dict__[key] = wrlocal
self.dicts[idt] = wrthread, localdict
return localdict
跟进 impl.create_dict(),如下:
def create_dict(self):
"""Create a new dict for the current thread, and return it."""
# 定义一个变量localdict,为字典类型
localdict = {}
# self.key = '_threading_local._localimpl.' + str(id(self))
# _threading_local._localimpl.拼接上_localimpl实例的地址
key = self.key
thread = current_thread() # 当前线程
idt = id(thread) # 当前线程地址值
# 函数定义跳过
def local_deleted(_, key=key):
# When the localimpl is deleted, remove the thread attribute.
thread = wrthread()
if thread is not None:
del thread.__dict__[key]
# 函数定义跳过
def thread_deleted(_, idt=idt):
# When the thread is deleted, remove the local dict.
# Note that this is suboptimal if the thread object gets
# caught in a reference loop. We would like to be called
# as soon as the OS-level thread ends instead.
local = wrlocal()
if local is not None:
dct = local.dicts.pop(idt)
# ref弱引用
wrlocal = ref(self, local_deleted)
wrthread = ref(thread, thread_deleted)
# 给当前线程实例对象__dict__设置key,value
thread.__dict__[key] = wrlocal
# dicts是_localimpl类初始化时所定义的一个属性
# 即impl实例属性dicts中多了一对数据,以当前线程id为key,值为 wrthread, localdict
# self.dicts = {'id(thread)': wrthread, localdict}
self.dicts[idt] = wrthread, localdict
return localdict # 返回了一个空字典
至此 impl.create_dict() 整个逻辑执行完毕,回到 __new__()
方法中,返回了当前 local 类的实例。知道大致逻辑之后,我们来分析一下这个报错 AttributeError: '_thread._local' object has no attribute 'x'
def worker():
logging.info(X)
logging.info(global_data)
logging.info(global_data.x)
threading.Thread(target=worker, name='worker').start()
# 启动线程,执行 worker 中的逻辑
# global_data.x 调用会走 ⇒ __getattribute__()方法逻辑
# 即 local 类的:
def __getattribute__(self, name):
with _patch(self):
return object.__getattribute__(self, name)
# 上下文,跟进
@contextmanager
def _patch(self):
# yield 后面没有东西 说明是在之前做增强
# 调用local类实例的_local__impl属性,即global_data的_local__impl属性
# 在之前我们的源码分析中_local__impl属性为impl,即_localimpl的实例
# 前面已经说过,impl dicts属性中挂着数据对:{'id(thread)': wrthread, localdict}
impl = object.__getattribute__(self, '_local__impl')
try:
# get_dict()方法就是: self.dicts[id(thread)][1]
# 即取到localdict ==> {} 大的空字典
dct = impl.get_dict()
except KeyError:
dct = impl.create_dict()
args, kw = impl.localargs
self.__init__(*args, **kw)
# 锁不用管
with impl.locallock:
# 可以看到逻辑是调用object __setattr__()方法
# self传入的是local实例,即此处给local实例的 __dict__ 设置了一个空字典
object.__setattr__(self, '__dict__', dct)
yield
def get_dict(self):
"""Return the dict for the current thread. Raises KeyError if none
defined."""
thread = current_thread()
return self.dicts[id(thread)][1]
# 上下文完成之后,回到__getattribute__,执行:
return object.__getattribute__(self, name)
# local类实例 __dict__ 为 {},那么使用基类的
# object.__getattribute__怎么能取到 x 属性呢,这就是报错的原因
小结: threading.local 类通过属性 _local__impl
属性对应的值,即 _localimpl 类的实例构建了一个大字典,存放所有线程相关的字典,即:{ id(Thread) ⇒ (ref(Thread), thread-local dict) },每一线程实例的 id 为 key,元组为 value。value 中2部分别为线程对象引用,每个线程自己的字典。运行时,threading.local 实例处在不同的线程中,就从大字典中找到当前线程相关键值对中的字典,覆盖 threading.local 实例的 __dict__
,这样就可以在不同的线程中,安全地使用线程独有的数据,做到了线程间数据隔离,如同本地变量一样安全。
2.6 __slots__拓展
Python 中的 __slots__
是一个性能优化工具,常用于减少内存占用和限制类的属性动态创建。在一些对性能敏感或者需要大量创建实例的场景中,使用 __slots__
可以带来明显好处。
什么是 __slots__
?默认情况下,Python 的类实例是通过一个叫做 __dict__
的字典来存储属性的:
class Person:
def __init__(self, name):
self.name = name
p = Person('棒棒编程修炼场')
print(p.__dict__) # {'name': '棒棒编程修炼场'}
这样虽然灵活,但会多占用内存。__slots__
的作用是:告诉 Python 不使用 __dict__
,而是使用更紧凑的内部结构来固定属性,从而节省内存。为什么使用 __slots__
?
- 节省内存:没有
__dict__
,每个实例占用的内存更少。 - 防止添加不存在的属性:限制只能使用定义好的属性,提升代码健壮性。
- 访问属性更快(略微):属性访问通过更底层的数据结构,访问速度稍有提升。
典型使用场景:
- 大量创建实例(如爬虫、数据处理、图节点对象)
- 属性固定的轻量对象(如数据库模型、配置项等)
- 嵌入式或低内存环境(如微型服务器、树莓派)
- 对性能要求极高的系统
示例1:基本使用
# -*- coding: utf-8 -*-
# @Time : 2025-05-24 4:57
# @Author : bb_bcxlc
# @File : __slots__例子1.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
class Person:
__slots__ = ('name', 'age') # 只允许这两个属性
def __init__(self, name, age):
self.name = name
self.age = age
p = Person("棒棒编程修炼场", 18)
print(p.name)
# 下面代码执行会报错: AttributeError: 'Person' object has no attribute 'gender'
p.gender = 'female'
__dict__
声明后,Python 不会为每个实例分配 __dict__
,实例只能拥有指定的属性(更节省内存)。
# AttributeError: 'Person' object has no attribute '__dict__'. Did you mean: '__dir__'?
print(p.__dict__)
示例2:内存占用对比
# -*- coding: utf-8 -*-
# @Time : 2025-05-24 5:01
# @Author : bb_bcxlc
# @File : __slots__例子2.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
class Normal:
def __init__(self, x, y):
self.x = x
self.y = y
class Slotted:
__slots__ = ('x', 'y')
def __init__(self, x, y):
self.x = x
self.y = y
from pympler import asizeof
# 使用 pympler 包来真实统计内存占用
# 需要使用pip命令进行安装: pip install pympler
many_a = [Normal(1, 2) for _ in range(100000)]
many_b = [Slotted(1, 2) for _ in range(100000)]
print("NoSlots total size:", asizeof.asizeof(many_a))
print("WithSlots total size:", asizeof.asizeof(many_b))
示例3:继承限制
class A:
__slots__ = ('x',)
class B(A):
pass
b = B()
b.y = 10 # 正常,子类没有 __slots__,就会自动启用 __dict__
要想子类也限制属性,需要子类也声明 __slots__
:
class B(A):
__slots__ = ('y',)
b = B()
b.x = 10
print(b.x) # 继承的属性是有的,动态添加属性不行
# b.z = 10 # AttributeError: 'B' object has no attribute 'z'
可以显式允许使用 __dict__
(脱裤子放屁,多此一举):
class Person:
__slots__ = ('name', '__dict__') # 允许动态添加属性
p = Person()
p.name = '棒棒编程修炼场'
p.age = 25 # 现在可以添加
# 棒棒编程修炼场 {'age': 25} __slots__限定的属性不会出现在__dict__中
# 转而使用一个更轻量的、固定的结构来保存属性。这正是它节省内存的关键(猜想)
# 棒棒编程修炼场 {'age': 25}
print(p.name, p.__dict__)
如果你要创建大量结构固定的对象(如数据结构、节点对象),建议使用。如果你在做性能优化、节省内存,使用 __slots__
是非常值得考虑的。
三、线程同步
概念: 线程同步是指通过某种机制,使多个线程在访问共享数据时能够有序进行,确保同一时刻只有一个线程能访问或修改特定资源,从而避免数据冲突和不一致。补充点小细节(区分两个概念):
概念 | 含义 |
---|---|
线程同步(Synchronization) | 控制多个线程对 同一共享资源 的访问 |
线程协作 / 协调(Coordination) | 让多个线程 有逻辑顺序地协作完成任务,比如:线程A等待线程B的信号 |
举个例子:
共享资源 = 厨房
"线程 A 正在做饭" → 加了锁
"线程 B 想用厨房" → 只能等 A 出来(解锁)
这就是线程同步: 只有一个人能操作厨房
Python 中实现线程同步的常见技术有:
1.互斥锁: Lock = _allocate_lock
2.可重入锁: _RLock
def RLock(*args, **kwargs):
"""Factory function that returns a new reentrant lock.
A reentrant lock must be released by the thread that acquired it. Once a
thread has acquired a reentrant lock, the same thread may acquire it again
without blocking; the thread must release it once for each time it has
acquired it.
"""
# 看源码中这个 _PyRLock 其实指向的是 _RLock类
# _PyRLock = _RLock
if _CRLock is None:
return _PyRLock(*args, **kwargs)
return _CRLock(*args, **kwargs)
3.条件变量: class Condition:
4.信号量: class Semaphore:
5.事件: class Event:
6.还有一些高级封装,如 queue.Queue 自带线程安全
接下来一一进行讲解,首先来看事件通信 Event。
3.1 Event
Event 事件,是线程间通信机制中最简单的实现,使用一个内部的标记 flag,通过 flag 的 True 或 False 的变化来进行操作。Event 实例常用方法:
方法名 | 作用描述 |
---|---|
.set() |
将事件标志设为 True ,所有正在等待这个事件的线程将被唤醒。当 flag 标志为 True 时,调用 wait() 方法的线程不会被阻塞 |
.clear() |
将事件标志重设为 False ,之后调用 wait() 方法的线程将会被阻塞,直到调用 set() 方法将内部标志再次设置为 True |
.is_set() |
检查事件标志当前是否为 True ,当且仅当内部标志为 True 时返回 True |
.wait(timeout) |
如果事件标志为 False ,阻塞线程直到它变成 True 或超时 |
简单示例:
# -*- coding: utf-8 -*-
# @Time : 2025-05-24 15:17
# @Author : bb_bcxlc
# @File : 11.event事件简单例子.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading
event = threading.Event()
print(event.is_set()) # False
# event.wait() # 阻塞住了
event.set() # 将标志置为True,无返回值
print(event.is_set()) # True
event.wait() # 设置为True后,wait()方法不阻塞
print('end ~')
练习:老板雇佣了一个工人,让他生产杯子,老板一直盯着这个工人,直到生产完10个杯子。下面的代码是否能够完成功能?
# -*- coding: utf-8 -*-
# @Time : 2025-05-24 15:52
# @Author : bb_bcxlc
# @File : 11.event练习.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import logging
import threading
import time
Format = '%(asctime)s - %(threadName)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=Format)
flag = False
def boss():
logging.info('I"m boss,watching U.')
while True:
time.sleep(1)
if flag:
break
logging.info('Good Job~~~')
def worker(count=10):
global flag
logging.info('I"m working for U.')
cups = []
while True:
time.sleep(0.5)
if len(cups) >= count:
flag = True
break
cups.append(1) # 模拟杯子
logging.info('I have finished my Job. cups: {}'.format(len(cups)))
w = threading.Thread(target=worker, name='worker')
b = threading.Thread(target=boss, name='boss')
b.start()
w.start()
上面代码基本能够完成,但上面代码中老板一直要不停的查询 worker 的状态变化。使用 Event 修改代码后如下:
# -*- coding: utf-8 -*-
# @Time : 2025-05-24 16:53
# @Author : bb_bcxlc
# @File : 11.event练习-改进.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import logging
import threading
import time
Format = '%(asctime)s - %(threadName)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=Format)
event = threading.Event() # 1:n 一个通知多个
def boss(e):
logging.info('I"m boss,watching U.')
e.wait() # 没有设置时间,则等到天荒地老
logging.info('Good Job~~~')
def worker(e, count=10):
logging.info('I"m working for U.')
cups = []
# while True:
# e.wait如果没有设置标志,返回值为False
# 使用wait阻塞等待
while not e.wait(0.5):
# time.sleep(0.5)
if len(cups) >= count:
e.set()
# break 为啥可以注释break呢?
cups.append(1) # 模拟杯子
logging.info('I have finished my Job. cups: {}'.format(len(cups)))
w = threading.Thread(target=worker, name='worker', args=(event,))
b1 = threading.Thread(target=boss, name='boss1', args=(event,))
b2 = threading.Thread(target=boss, name='boss2', args=(event,))
b1.start()
b2.start()
time.sleep(5)
w.start()
总结: 需要使用同一个 Event 对象的标记 flag,谁 wait 就是等到 flag 变为 True,或等到超时返回 False,不限制等待者的个数,通知所有等待者。测试 wait() 方法:
# -*- coding: utf-8 -*-
# @Time : 2025-05-24 17:07
# @Author : bb_bcxlc
# @File : 11.event测试wait.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading
event = threading.Event()
print(event.is_set())
print(event.wait(3))
print(event.is_set())
print('~' * 30)
threading.Timer(3, lambda: event.set()).start()
print(event.wait(5))
print(event.is_set())
补充案例:模拟红绿灯交通。其中标志位设置为 True,代表绿灯,直接通行;标志位被清空,代表红灯;wait() 等待变绿灯。
# -*- coding: utf-8 -*-
# @Time : 2025-05-24 17:15
# @Author : bb_bcxlc
# @File : 11.event红绿灯.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading, time # 导入threading和time模块
event = threading.Event() # 创建Event对象
def lighter(): # 红绿灯处理线程函数
'''0<count<2为绿灯,2<count<5为红灯,count>5重置标志'''
event.set() # 设置标志位为True
count = 0 # 递增变量,初始为0
while True:
if count > 2 and count < 5:
event.clear() # 将标志设置为False
print("\033[1;41m 现在是红灯 \033[0m")
elif count > 5:
event.set() # 设置标志位为True
count = 0 # 恢复初始值
else:
print("\033[1;42m 现在是绿灯 \033[0m")
time.sleep(1)
count += 1 # 递增变量
def car(name): # 小车处理线程函数
'''红灯停,绿灯行'''
while True:
if event.is_set(): # 当标志位为True时
print(f"[{name}] 正在开车...")
time.sleep(0.25)
else: # 当标志位为False时
print(f"[{name}] 看见了红灯,需要等几秒")
event.wait()
print(f"\033[1;34;40m 绿灯亮了,[{name}]继续开车 \033[0m")
# 开启红绿灯
light = threading.Thread(target=lighter, )
light.start()
# 开始行驶
car = threading.Thread(target=car, args=("张三",))
car.start()
3.2 线程锁Lock
定义一个全局变量 g_num,分别创建2个子线程对 g_num 执行不同的操作,并输出操作后的结果。代码如下:
from threading import Thread # 导入线程
import time
g_num = 100 # 定义一个全局变量
def plus(): # 第一个线程函数
print('-------子线程1开始------')
global g_num # 定义全局变量
g_num += 50 # 全局变量值加50
print('plus: g_num is %d' % g_num)
print('-------子线程1结束------')
def minus(): # 第二个线程函数
time.sleep(3)
print('-------子线程2开始------')
global g_num # 定义全局变量
g_num -= 50 # 全局变量值减50
print('minus: g_num is %d' % g_num)
print('-------子线程2结束------')
if __name__ == '__main__':
print('-------主线程开始------')
print('main: g_num is %d' % g_num)
t1 = Thread(target=plus) # 实例化线程t1
t2 = Thread(target=minus) # 实例化线程t2
t1.start() # 开启线程t1
t2.start() # 开启线程t2
t1.join() # 等待t1线程结束
t2.join() # 等待t2线程结束
print('-------主线程结束------')
上述代码中,定义一个全局变量 g_num,赋值为 100,然后创建2个线程。一个线程将 g_num 增加 50,一个线程将 g_num 减少 50。如果 g_num 的最终结果为 100,则说明线程之间可以共享数据。运行结果如下图所示:
从上面的例子可以得出,在一个进程内的所有线程共享全局变量,能够在不使用其他方式的前提下完成多线程之间的数据共享。由于线程可以对全局变量随意修改,这就可能造成多线程之间对全局变量的混乱操作。举个例子,订单要求生产1000个杯子,组织10个工人生产。请忽略老板,关注工人生成杯子:
# -*- coding: utf-8 -*-
# @Time : 2025-05-24 18:22
# @Author : bb_bcxlc
# @File : 12.锁的引入.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading
from threading import Thread
import time
import logging
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
cups = []
def worker(count=1000):
logging.info("I'm working")
while True:
if len(cups) >= count:
break
time.sleep(0.0001) # 为了看出线程切换效果,模拟杯子制作时间
cups.append(1)
logging.info('I finished my job. cups = {}'.format(len(cups)))
for i in range(1, 11):
t = Thread(target=worker, name="w{}".format(i), args=(1000,))
t.start()
从上例的运行结果看出,多线程调度,导致了判断失效,多生产了杯子,即造成了全局变量 cups 最后的结果混乱,不准确。再以生活中的房子为例,当房子内只有一个居住者时(单线程),他可以任意时刻使用任意一个房间,如厨房、卧室和卫生间等。但是,当这个房子有多个居住者时(多线程),他就不能在任意时刻使用某些房间,如卫生间,否则就会造成混乱。如何解决这个问题呢?一个防止他人进入的简单方法,就是门上加一把锁。先到的人锁上门,后到的人就在门口排队,等锁打开再进去。如下图所示:
这就是互斥锁(Mutual exclusion,缩写 Mutex),防止多个线程同时读写某一块内存区域。互斥锁为资源引入一个状态:锁定和非锁定。某个线程要更改共享数据时,先将其锁定,此时资源的状态为 "锁定"
,其他线程不能更改;直到该线程释放资源,将资源的状态变成 "非锁定"
时,其他的线程才能再次锁定该资源。互斥锁保证了每次只有一个线程进行写入操作,从而保证了多线程情况下数据的正确性。Python 中的 Lock 类是 mutex 互斥锁。一旦一个线程获得锁,其它试图获取锁的线程将被阻塞,直到拥有锁的线程释放锁。凡是存在共享资源争抢的地方都可以使用锁,从而保证只有一个使用者可以完全使用这个资源。
在 threading 模块中使用 Lock 类可以方便地处理锁定。Lock 类有2个方法:acquire() 锁定和 release() 释放锁。示例用法如下:
import threading
mutex = threading.Lock() # 创建锁
# 获取锁定,如果有必要,需要阻塞到锁定释放为止
# 1.默认阻塞,阻塞可以设置超时时间。非阻塞时,timeout禁止设置
# 2.如果提供blocking参数并将它设置为False,当无法获取锁定时将立即返回False;如果成功获取锁定则返回True
mutex.acquire(blocking=True, timeout=-1) # 锁定
# 简单讲: 释放锁。可以从任何线程调用释放,已上锁的锁,会被重置为unlocked,未上锁的锁上调用,抛 RuntimeError 异常
mutex.release() # 释放锁
锁的基本使用:
# -*- coding: utf-8 -*-
# @Time : 2025-05-24 17:58
# @Author : bb_bcxlc
# @File : 12.锁的基本使用.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading
import time
lock = threading.Lock() # 互斥mutex
lock.acquire()
print('-' * 30)
def worker(l):
print('worker start', threading.current_thread())
l.acquire()
print('worker done', threading.current_thread())
for i in range(10):
threading.Thread(target=worker, name="w{}".format(i), args=(lock,), daemon=True).start()
print('-' * 30)
while True:
cmd = input(">>>")
if cmd == 'r': # 按r后枚举所有线程看看
lock.release()
print('released one locker')
elif cmd == 'quit':
lock.release()
break
else:
print(threading.enumerate())
print(lock.locked())
上例可以看出不管在哪一个线程中,只要对一个已经上锁的锁发起阻塞地请求,该线程就会阻塞。加锁,修改生产杯子的需求:
# -*- coding: utf-8 -*-
# @Time : 2025-05-24 19:03
# @Author : bb_bcxlc
# @File : 12.生产杯子-加锁.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading
import time
import logging
FORMAT = "%(asctime)s %(threadName)s %(thread)d %(message)s"
logging.basicConfig(format=FORMAT, level=logging.INFO)
cups = []
mutex = threading.Lock()
def worker(lock, count=1000):
logging.info("I'm working")
while True:
lock.acquire() # 获取锁
if len(cups) >= count:
# lock.release() # 锁位置1
break
# lock.release() # 锁位置2
time.sleep(0.0001) # 为了看出线程切换效果,模拟杯子制作时间
cups.append(1)
# lock.release() # 锁位置3
logging.info('I finished my job. cups = {}'.format(len(cups)))
for i in range(1, 11):
threading.Thread(target=worker, name="w{}".format(i), args=(mutex, 1000)).start()
锁分析:
- 锁位置2: 假设某一个瞬间,有一个工作线程A获取了锁,len(cups) 正好有 999 个,然后就释放了锁,可以继续执行下面的语句,生产一个杯子,这地方不阻塞,但是正好杯子也没有生产完。锁释放后,其他线程就可以获得锁,线程B获得了锁,发现 len(cups) 也是 999 个,然后释放锁,然后也可以去生产一个杯子。锁释放后,其他的线程也可能获得锁。就说A和B线程都认为是 999 个,都会生产一个杯子,那么实际上最后一定会超出 1000 个。假设某个瞬间一个线程获得锁,然后发现杯子到了 1000 个,没有释放锁就直接 break 了,由于其他线程还在阻塞等待锁释放,这就成了死锁了。在多任务系统下,当一个或多个线程等待系统资源,而资源又被线程本身或其他线程占用时,就形成了死锁,如下图所示:
- 锁位置3分析: 获得锁的线程发现是 999,有资格生产杯子,生产一个,释放锁,看似很完美。问题在于,获取锁的线程发现杯子有 1000 个,直接 break,没释放锁离开了,死锁了。
- 锁位置1分析: 如果线程获得锁,发现是1000,break 前释放锁,没问题。问题在于,A线程获得锁后,发现小于1000,继续执行,其他线程获得锁全部阻塞。A线程再次执行循环后,自己也阻塞了。死锁了。
问题:究竟怎样加锁才正确呢?要在锁位置1和锁位置3同时加 release()。锁是典型必须释放的,Python 提供了上下文支持。查看 Lock 类的上下文方法,__enter__()
方法返回 bool 表示是否获得锁,__exit__()
方法中释放锁。由此上例可以修改为:
def worker(lock, count=1000):
logging.info("I'm working")
while True:
with lock:
if len(cups) >= count:
break
time.sleep(0.0001) # 为了看出线程切换效果,模拟杯子制作时间
cups.append(1)
logging.info('I finished my job. cups = {}'.format(len(cups)))
锁的应用场景: 锁适用于访问和修改同一个共享资源的时候,即读写同一个资源的时候。如果全部都是读取同一个共享资源需要锁吗?不需要。因为这时可以认为共享资源是不可变的,每一次读取它都是一样的值,所以不用加锁。使用锁的注意事项:
- 少用锁,必要时用锁。使用了锁,多线程访问被锁的资源时,就成了串行,要么排队执行,要么争抢执行。举例,高速公路上车并行跑,可是到了只开放了一个收费口,过了这个口,车辆依然可以在多车道上一起跑。过收费口的时候,如果排队一辆辆过,加不加锁一样效率相当,但是一旦出现争抢,就必须加锁一辆辆过。注意,不管加不加锁,只要是一辆辆过,效率就下降了。
- 加锁时间越短越好,不需要就立即释放锁。一定要避免死锁
不使用锁,有了效率,但是结果是错的。使用了锁,效率低下,但是结果是对的。所以,我们是为了效率要错误结果呢?还是为了对的结果,让计算机去计算吧。
3.3 递归锁RLock
在 threading 模块中,可以定义两种类型的锁:threading.Lock 和 threading.RLock。它们的区别是:Lock 不允许重复调用 acquire() 方法来获取锁,否则容易出现死锁;而 RLock 允许在同一线程中多次调用 acquire(),不会阻塞程序,这种锁也称为递归锁。在一个线程中,acquire 和 release 必须成对出现,即调用了n次 acquire() 方法,就必须调用n次的 release() 方法,这样才能真正释放所占用的锁。
示例:
# -*- coding: utf-8 -*-
# @Time : 2025-05-24 19:47
# @Author : bb_bcxlc
# @File : 12.RLock.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading # 导入threading模块
deposit = 0 # 定义变量,初始为存款余额
rlock = threading.RLock() # 创建递归锁
def run_thread(n): # 线程处理函数
global deposit # 声明为全局变量
for i in range(1000000): # 无数次重复操作,对变量执行先存后取相同的值
rlock.acquire() # 获取锁
rlock.acquire() # 在同一线程内,程序不会堵塞。
try: # 执行修改
deposit = deposit + n
deposit = deposit - n
finally:
rlock.release() # 释放锁
rlock.release() # 释放锁
# 创建2个线程,并分别传入不同的值
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
# 开始执行线程
t1.start()
t2.start()
# 阻塞线程
t1.join()
t2.join()
print(f'存款余额为:{deposit}')
3.4 同步协作Condition
Condition 是 threading 模块的一个子类,用于维护多个线程之间的同步协作。一个 Condition 对象允许一个或多个线程在被其他线程通知之前进行等待。其内部使用的也是 Lock 或者 RLock,同时增加了等待池功能。Condition 对象包含以下方法:
# 1.acquire(): 请求底层锁
# 2.release(): 释放底层锁
# 3.wait(self, timeout=None): 等待直到被通知发生超时
# 等待,直到条件计算为真。参数 predicate 为一个可调用对象,而且它的返回值可被解释为一个布尔值
# 4.wait_for(self, predicate, timeout=None):
# 默认唤醒一个等待这个条件的线程。这个方法唤醒最多n个正在等待这个条件变量的线程
# 5.notify(self, n=1):
# 6.notify_all(self): # 唤醒所有正在等待这个条件的线程
【示例】使用 Condition 来协调两个线程之间的工作,实现两个线程的交替说话。对话模拟效果如下:
张三: 床前明月光
李四: 疑是地上霜
张三: 举头望明月
李四: 低头思故乡
如果只有两句,可以使用锁机制,让某个线程先执行,本示例有多句话交替出现,适合使用 Condition。示例完整代码如下:
# -*- coding: utf-8 -*-
# @Time : 2025-05-24 19:51
# @Author : bb_bcxlc
# @File : 13.Condition.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading # 导入threading模块
class ZSThead(threading.Thread): # 张三线程类
def __init__(self, name, cond): # 初始化函数,接收说话人的姓名和Condition对象
super(ZSThead, self).__init__()
self.name = name
self.cond = cond
def run(self):
# 必须先调用with self.cond,才能使用wait()、notify()方法
with self.cond:
# 讲话
print("{}:床前明月光".format(self.name))
# 等待李四的回应
self.cond.notify() # 通知
self.cond.wait() # 等待状态
# 讲话
print("{}:举头望明月".format(self.name))
# 等待李四的回应
self.cond.notify() # 通知
self.cond.wait() # 等待状态
class LSThread(threading.Thread): # 李四线程类
def __init__(self, name, cond):
super(LSThread, self).__init__()
self.name = name
self.cond = cond
def run(self):
with self.cond:
# wait()方法不仅能获得一把锁,并且能够释放cond的大锁,
# 这样张三才能进入with self.cond中
self.cond.wait()
print(f"{self.name}:疑是地上霜")
# notify()释放wait()生成的锁
self.cond.notify() # 通知
self.cond.wait() # 等待状态
print(f"{self.name}:低头思故乡")
self.cond.notify() # 通知
c = threading.Condition() # 创建条件对象
zs = ZSThead("张三", c) # 实例化张三线程
ls = LSThread("李四", c) # 实例化李四线程
ls.start() # 李四开始说话
zs.start() # 张三接着说话
ls.start() 和 zs.start() 的启动顺序很重要,必须先启动李四,让他在那里等待,因为先启动张三时,他说了话就发出了通知,但是当时李四的线程还没有启动,并且 Condition 外面的大锁也没有释放,李四也没法获取 self.cond 这把大锁。Condition 有两层锁,一把底层锁在线程调用了 wait() 方法时就会释放,每次调用 wait() 方法后,都会创建一把锁放进 Condition 的双向队列中,等待 notify() 方法的唤醒。程序运行结果如下所示:
3.5 Queue的线程安全
queue 是一个线程安全的 FIFO/LIFO/Priority 队列模块,可以在多个线程之间安全地传递数据,内部实现已经自带了锁机制,不需要我们再手动加锁。模块提供的三种队列类型:
队列类 | 描述 |
---|---|
Queue.Queue (即 queue.Queue ) |
先进先出队列(FIFO)✅ 最常用 |
queue.LifoQueue |
后进先出队列(LIFO,类似栈) |
queue.PriorityQueue |
优先级队列,元素必须是 (priority, item) 的形式 |
常用 API 一览:
方法 | 描述 |
---|---|
put(item) |
放入一个元素,若队列满则阻塞(可配超时) |
get() |
获取一个元素,若队列空则阻塞(可配超时) |
qsize() |
当前队列元素数量(非精确,慎用) |
empty() |
判断队列是否为空(不保证绝对准确) |
full() |
判断队列是否满(同样不保证绝对准确) |
put_nowait(item) |
非阻塞放入 |
get_nowait() |
非阻塞获取 |
示例代码:
# -*- coding: utf-8 -*-
# @Time : 2025-05-24 21:18
# @Author : bb_bcxlc
# @File : 14.queue.py
# @Software: PyCharm
# @Blog: https://blog.csdn.net/xw1680
import threading
import queue
import time
import random
q = queue.Queue(maxsize=5)
# 生产者线程函数
def producer(name):
for i in range(10):
item = f"item-{i}"
q.put(item) # 会阻塞直到队列有空位
print(f"{name} 生产了 {item}")
time.sleep(random.random())
# 消费者线程函数
def consumer(name):
while True:
item = q.get() # 会阻塞直到队列非空
print(f"{name} 消费了 {item}")
time.sleep(random.random())
q.task_done()
# 启动线程
t1 = threading.Thread(target=producer, args=("生产者1",))
t2 = threading.Thread(target=consumer, args=("消费者1",), daemon=True)
t1.start()
t2.start()
t1.join()
q.join() # 等待队列任务完成
print("所有任务处理完毕")
特别注意下面的代码在多线程中使用:
import queue
q = queue.Queue(8)
if q.qsize() == 7:
q.put() # 上下两句可能被打断
if q.qsize() == 1:
q.get() # 未必会成功
如果不加锁,是不可能获得准确的大小的,因为你刚读取到了一个大小,还没有取走数据,就有可能被其他线程改了。Queue 类的 size 虽然加了锁,但是,依然不能保证立即 get、put 就能成功,因为读取大小和 get、put 方法是分开的。小结:
# 优点:
# 1.线程安全: 内部自带锁机制,避免竞态
# 2.简单高效: 使用简洁,支持阻塞/非阻塞操作
# 3.多种模式: FIFO、LIFO、优先级队列满足不同需求
# 4.可用于生产者-消费者模型: 线程间数据交互的首选方式
# 使用注意事项
# 1.empty() 和 full() 不可靠,只是近似值(文档明确说明)
# 2.task_done() 必须匹配 get(),否则 join() 会一直阻塞
# 3.队列满/空时不建议使用非阻塞操作直接判断,应通过 try-except 或 timeout 控制
# Queue 适合使用的场景
# 1.多线程中线程安全的数据通信
# 2.实现 生产者-消费者模式
# 3.控制任务并发量
# 4.异步任务调度(搭配 ThreadPoolExecutor 等)
至此今天的学习就到此结束了,笔者在这里声明,笔者写文章只是为了学习交流,以及让更多学习Python语言的读者少走一些弯路,节省时间,并不用做其他用途,如有侵权,联系博主删除即可。感谢您阅读本篇博文,希望本文能成为您编程路上的领航者。祝您阅读愉快!
好书不厌读百回,熟读课思子自知。而我想要成为全场最靓的仔,就必须坚持通过学习来获取更多知识,用知识改变命运,用博客见证成长,用行动证明我在努力。
如果我的博客对你有帮助、如果你喜欢我的博客内容,请点赞
、评论
、收藏
一键三连哦!听说点赞的人运气不会太差,每一天都会元气满满呦!如果实在要白嫖的话,那祝你开心每一天,欢迎常来我博客看看。
编码不易,大家的支持就是我坚持下去的动力。点赞后不要忘了关注
我哦!