Semaphore信号量
Semaphore信号量可以翻译为信号量,这个信号量代表了最多允许线程访问的数量,可以使用Semaphore(n)设定,n是信号数量,这是一个更高级的锁机制,Semaphore管理一个计数器,每次使用acquire计数器将会减一,表示可以允许线程访问的数量少了一个,使用release计数器加1,表示可允许线程访问的数量多了一个,只有占用信号量的线程数量超过信号量时候才会阻塞,也就是说计数器为0时候,若还有线程访问,则发生阻塞。
发生阻塞后就需要等待其他线程使用release这时候计数器会加1,然后被阻塞的线程就可以访问了。
在应用Semaphore信号量过程中,有时候可能会有bug造成多次release,因此有所谓的BoundedSemaphore,可以保证计数器次数不超过特定值,这时候使用BoundedSemaphore(n)设定,n是信号数量。
import threading
import time
# 需求:允许程序同时并发的线程数,如果设为3则代表最大允许线程数量为3
semaphore = threading.BoundedSemaphore(3)
def func():
if semaphore.acquire():
print(f'{threading.current_thread().name} is working...')
time.sleep(1)
print(f'{threading.current_thread().name} 完成任务...')
semaphore.release()
if __name__ == '__main__':
# 模拟程序同时并发创建大量的子线程
for i in range(10):
t = threading.Thread(target=func)
t.start()
执行结果
Barrier栅栏在线程中的使用
Barrier栅栏可以想象成赛马的栅栏,当线程抵达必须等待其他线程,当所有线程抵达时候,才放开栅栏,这些线程才可以往下执行:
import random
import threading
import time
# 定义任务函数
def player():
# 模拟延时
time.sleep(random.random())
print(f'{threading.current_thread().name} 抵达栅栏的时间是为:{time.ctime()}')
b.wait()
print(f'{threading.current_thread().name} 继续向后执行')
if __name__ == '__main__':
# 满足栅栏之后才能够往后执行
b = threading.Barrier(4)
print(f'比赛开始')
# 定义一个子线程列表
threads = []
for i in range(4):
thread = (threading.Thread(target=player))
thread.start()
threads.append(thread)
for thread in threads:
thread.join()
print('比赛结束')
执行结果
Event事件在线程中的应用说明
event事件是如何完成线程之间的通信的,看案例
import random
import threading
import time
def waiter(event):
for i in range(1,4):
print(f'{i} 等待标记flag 被设定')
event.wait()
# 等待线程如果想要继续向下执行,需要修改flag标记,只有设定线程修改了,等待线程才会往下
print(f'等待完成的时间:{time.ctime()}')
event.clear()
def setter(event):
for i in range(1, 4):
# 模拟CPU随机切换
time.sleep(random.randint(1, 3))
event.set()
print(f'设定完成:{i}')
if __name__ == '__main__':
event = threading.Event()
print('开始工作...')
waiter = threading.Thread(target=waiter, args=(event, ))
setter = threading.Thread(target=setter, args=(event, ))
waiter.start()
setter.start()
waiter.join()
setter.join()
print('工作结束')
执行结果
注意这里等待时间在使用完成之后需要被clear清空重置