我的需求如下:现有一批任务,使用进程池执行,每个任务执行耗时不一样,任务并发执行期间,需要每隔一段时间监控任务执行进度
直接贴代码:
import multiprocessing
import time
import random
from multiprocessing import Pool, Manager
def worker(task_id, task_status, lock):
"""任务执行函数,模拟不同耗时操作"""
try:
# 记录任务开始状态
with lock:
task_status[task_id] = "running"
# 随机生成任务执行时间(30秒到30分钟)
execution_time = random.uniform(1, 30)
time.sleep(execution_time)
# 更新任务完成状态
with lock:
task_status[task_id] = "completed"
return f"Task {task_id} completed"
except Exception as e:
with lock:
task_status[task_id] = f"failed: {str(e)}"
return f"Task {task_id} failed"
def monitor_tasks(task_status, total_tasks, lock):
"""任务监控函数,每30秒输出状态"""
while True:
time.sleep(10)
with lock:
running = [k for k, v in task_status.items() if v == "running"]
completed = [k for k, v in task_status.items() if v == "completed"]
failed = [k for k, v in task_status.items() if v == "failed"]
remaining = total_tasks - len(running) - len(completed)
print(f"\n=== 系统状态监控 ===")
print(f"正在执行的任务数: {len(running)}")
print(f"任务详情: {running}")
print(f"剩余任务数: {remaining}")
print(f"已完成任务数: {len(completed)}")
print(f"失败任务数: {len(failed)}")
print("====================")
if __name__ == '__main__':
# 初始化管理器
manager = Manager()
lock = manager.Lock()
# 创建任务状态字典(任务ID: 状态)
task_status = manager.dict()
total_tasks = 100
# 初始化所有任务状态为"pending"
for i in range(total_tasks):
task_status[i] = "pending"
# 创建进程池
with Pool(processes=16) as pool:
# 提交所有任务
for task_id in range(total_tasks):
pool.apply_async(worker, args=(task_id, task_status, lock))
# 启动监控线程
monitor_process = multiprocessing.Process(
target=monitor_tasks,
args=(task_status, total_tasks, lock)
)
monitor_process.start()
# 等待所有任务完成
pool.close()
pool.join()
# 终止监控进程
monitor_process.terminate()
monitor_process.join()