1. 背景&目标
背景:一个任务可分为多个阶段(各个阶段非CPU密集型任务,而是属于IO密集型任务),希望每个阶段能够交给各自的线程去执行。
目标:构建支持多并发的稳定的程序高效处理上述问题的程序,要求能够灵活设置并发。
2. show me the code
假设任务分为三个阶段,分别是download、resize和upload,代码采用管道将三个阶段进行拼接。
三个阶段的处理简化为三个函数,处理结果通过Queue传到下个阶段,各个阶段可以创建不同的线程去消费Queue中的数据,直到所有数据处理完成。代码如下:
from threading import Thread
from queue import Queue
import time
my_queue = Queue()
class ClosableQueue(Queue):
SENTINEL = object()
def close(self):
self.put(self.SENTINEL)
def __iter__(self):
while True:
item = self.get()
try:
if item is self.SENTINEL:
# 退出机制
return
yield item
finally:
self.task_done()
class Worker(Thread):
def __init__(self, func, in_queue, out_queue):
super().__init__()
self.func = func
self.in_queue = in_queue
self.out_queue = out_queue
def run(self):
# 确保线程退出的机制:在生产者-消费者模型中,通常需要使用特殊标志(如 SENTINEL)通知消费者线程结束循环。
for item in self.in_queue:
result = self.func(item)
self.out_queue.put(result)
def download(obj):
print(f'[download] id= {id(obj)}')
time.sleep(0.1)
return obj
def resize(obj):
print(f'[resize] id= {id(obj)}')
time.sleep(0.01)
return obj
def upload(obj):
print(f'[upload] id= {id(obj)}')
time.sleep(1)
return obj
def start_threads(count, *args):
threads = [Worker(*args) for _ in range(count)]
for thread in threads:
thread.start()
return threads
def stop_threads(closable_queue, threads):
# close次数根据threads次数来,保障每个每个线程都能正确关闭
for _ in threads:
closable_queue.close()
# 阻塞调用线程,直到队列中的所有任务都被处理完成
# 每次向队列中添加一个任务(通过 .put()),队列内部的任务计数器会增加 1。
# 每次调用 .task_done(),任务计数器会减少 1。
# .join() 方法会一直阻塞,直到任务计数器降为 0,也就是队列中的所有任务都被标记为完成(通过调用 .task_done())
closable_queue.join()
for thread in threads:
thread.join()
if __name__ == '__main__':
download_queue = ClosableQueue()
resize_queue = ClosableQueue()
upload_queue = ClosableQueue()
done_queue = ClosableQueue()
download_threads = start_threads(3, download, download_queue, resize_queue)
resize_threads = start_threads(4, resize, resize_queue, upload_queue)
upload_threads = start_threads(5, upload, upload_queue, done_queue)
obj = object()
for _ in range(1000):
download_queue.put(obj)
stop_threads(download_queue, download_threads)
stop_threads(resize_queue, resize_threads)
stop_threads(upload_queue, upload_threads)
print(done_queue.qsize(), 'items fininished')
3.知识点
上述代码涉及到几个知识点,挺有意思的:
- 当我们想用queue来传递数据时,头疼的点在于:
①下游任务该怎么判断上游生产了数据呢?轮巡有点不优雅,可能会造成性能影响。
②上游任务啥时候告诉下游数据生产完毕了呢?可以通过插入一个特殊的数据告诉下游生产完毕了。
③队列该设置多大呢?如果下游数据消费不过来,上游一直生产数据插入到队列,容易oom。
④怎么判断中间队列的数据消费完毕了呢?即如何优雅地结束程序。
上述代码利用Queue类的特性比较优雅地解决了上述的几个问题:
① Queue非常优雅,设置size之后,如果size满了,put方法会阻塞,直到数据被消费了才可以往里面添加数据。当queue为空,get方法会阻塞,直到有数据进来。
② 如代码中SENTINEL对象,如果调用close方法,就往队列中插入一个哨兵对象,告诉下游,上游数据生产完毕了。
③ 代码中没有设置队列大小,但是Queue支持设置。
④ Queue中.join() 方法会一直阻塞,直到任务计数器降为 0,也就是队列中的所有任务都被标记为完成。
其中,任务计数器的原理为:
- 每次向队列中添加一个任务(通过 .put()),队列内部的任务计数器会增加 1。
- 每次调用 .task_done(),任务计数器会减少 1。
注意看,改写__iter__方法时,每次获取一个元素都调用了一次task_done()方法,即告诉任务计数器需要-1了。
4. 总结
这段代码还是比较精髓,其中关于队列Queue的用法,关于threads的用法和线程中共享数据的用法值得学习。