概述
AsyncIOScheduler 是 APScheduler(Advanced Python Scheduler)库中的一员,专为 Python 异步编程环境设计。它充分利用 asyncio 库,能够在异步环境中轻松调度定时任务,而不会阻塞程序的其他操作。与传统的同步调度器不同,AsyncIOScheduler 通过与事件循环无缝配合,确保任务在指定时间执行时保持程序的高效运行。
在现代编程中,时间管理直接关联到程序的效率与灵活性。如果你还在使用 time.sleep()
来控制延时任务,可能会遇到程序卡住、线程阻塞、性能急剧下滑等问题。AsyncIOScheduler 提供了一个更智能、更高效的解决方案,让你摆脱传统的阻塞式等待,实现真正的异步任务调度。
安装与基础配置
安装依赖
要使用 AsyncIOScheduler,首先需要安装 APScheduler 库。可以通过 pip 命令安装:
pip install apscheduler[asyncio]
对于需要更完整功能的用户,可以安装所有可选依赖:
pip install apscheduler[all]
基本导入
在 Python 脚本中,首先需要导入必要的模块:
import asyncio
from apscheduler.schedulers.asyncio import AsyncIOScheduler
核心概念
调度器 (Scheduler)
AsyncIOScheduler 是 APScheduler 中的异步调度器实现,专为 asyncio 设计。它负责管理所有定时任务的调度和执行。
任务 (Job)
在调度器中,Job 表示一个需要在指定时间执行的操作。每个 Job 都有自己的触发条件和执行函数。
触发器 (Trigger)
触发器决定任务何时执行。APScheduler 支持多种触发器类型:
date
: 在特定时间点执行一次interval
: 固定时间间隔执行cron
: 类似 Linux 的 cron 系统,可以设置复杂的定时规则
事件循环 (Event Loop)
asyncio 的核心组件,负责调度并运行异步任务。AsyncIOScheduler 依赖于事件循环来执行任务。
基本使用方法
创建调度器实例
scheduler = AsyncIOScheduler()
如果需要指定特定的事件循环:
loop = asyncio.get_event_loop()
scheduler = AsyncIOScheduler(event_loop=loop)
定义异步任务函数
任务函数必须是一个异步函数(使用 async def
定义):
async def my_task():
print("任务正在执行...")
# 这里可以添加你的异步操作
await asyncio.sleep(1)
添加任务到调度器
使用 add_job
方法添加任务:
# 每隔5秒执行一次
scheduler.add_job(my_task, 'interval', seconds=5)
# 使用cron表达式,每天早上8点执行
scheduler.add_job(my_task, 'cron', hour=8)
# 在特定时间点执行一次
from datetime import datetime
scheduler.add_job(my_task, 'date', run_date=datetime(2025, 7, 28, 12, 0, 0))
启动调度器
scheduler.start()
# 保持程序运行
try:
asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
pass
停止调度器
scheduler.shutdown()
高级功能
任务管理
AsyncIOScheduler 提供了丰富的任务管理功能:
# 暂停任务
job = scheduler.add_job(my_task, 'interval', seconds=5)
job.pause()
# 恢复任务
job.resume()
# 修改任务
job.modify(seconds=10) # 修改执行间隔为10秒
# 移除任务
job.remove()
# 获取所有任务
jobs = scheduler.get_jobs()
错误处理
可以为任务添加错误处理函数:
async def error_handler(job, exc):
print(f"任务 {job.id} 执行出错: {str(exc)}")
scheduler.add_job(my_task, 'interval', seconds=5)
scheduler.add_listener(error_handler, EVENT_JOB_ERROR)
任务优先级
当多个任务需要同时执行时,可以设置优先级:
scheduler.add_job(important_task, 'interval', minutes=30, priority=2)
scheduler.add_job(normal_task, 'interval', minutes=30, priority=1)
数字越大表示优先级越高。
任务标签
使用标签对任务进行分类管理:
scheduler.add_job(backup_data, 'interval', hours=1, tags=['backup', 'important'])
scheduler.add_job(check_status, 'interval', minutes=30, tags=['monitor'])
# 根据标签批量操作
backup_jobs = scheduler.get_jobs(tags=['backup'])
for job in backup_jobs:
job.pause()
动态调整任务
可以在运行时动态调整任务的执行时间和间隔:
job = scheduler.add_job(flexible_task, 'interval', hours=2)
# 修改下次执行时间
from datetime import datetime, timedelta
next_run = datetime.now() + timedelta(minutes=30)
job.next_run_time = next_run
# 修改执行间隔
job.interval = timedelta(hours=3)
配置管理
从配置文件加载任务
可以通过 YAML 或 JSON 配置文件管理任务:
import yaml
def load_config(filename):
with open(filename, 'r') as stream:
return yaml.safe_load(stream)
def setup_scheduler(config):
scheduler = AsyncIOScheduler()
for job in config['jobs']:
job_id = job.get('id')
trigger = job.get('trigger')
scheduler.add_job(job_function, trigger['type'], args=[job_id], **trigger['args'])
return scheduler
if __name__ == '__main__':
config = load_config('config.yaml')
scheduler = setup_scheduler(config)
scheduler.start()
asyncio.get_event_loop().run_forever()
示例配置文件 config.yaml
:
jobs:
- id: 1
trigger:
type: 'interval'
args:
seconds: 10
- id: 2
trigger:
type: 'cron'
args:
day_of_week: 'mon-fri'
hour: 8
minute: 0
最佳实践
性能优化
- 合理设置任务间隔:避免过于频繁的任务调度,合理设置时间间隔,防止系统过载。
- 使用异步操作:确保任务是异步执行的,避免阻塞事件循环。
- 限制并发任务数:对于资源密集型任务,可以设置最大并发数:
scheduler.max_concurrent_jobs = 5
- 批量添加任务:当有多个任务时,可以批量添加以提高效率。
错误处理与日志
添加错误处理:为重要任务添加错误处理逻辑。
记录任务历史:可以记录任务的执行历史用于调试和监控:
class JobHistory: def __init__(self): self.history = [] async def record(self, job): self.history.append({ "job_name": job.id, "run_time": datetime.now(), "status": "success" }) history = JobHistory() scheduler.on_job_success = history.record
定期检查任务状态:通过日志或调试工具检查任务的执行情况,确保任务按时执行。
条件触发任务
有时候任务不是单纯按时间执行,还需要满足特定条件:
async def check_condition():
# 假设检查某个条件
return True
async def conditional_task():
if await check_condition():
print("条件满足,执行任务...")
else:
print("条件不满足,跳过执行...")
scheduler.add_job(conditional_task, 'interval', minutes=5)
常见问题与解决方案
调度器不启动
- 确保事件循环正在运行:调用
scheduler.start()
后,需要保持事件循环运行。 - 检查任务是否添加正确:确认任务函数是异步的,且触发器设置正确。
CronTrigger 失效问题
当使用 asyncio.get_event_loop()
时,可能会遇到 CronTrigger
无法正常工作的情况。这是因为事件循环冲突导致的。
解决方案:
统一事件循环:
async def main(): loop = asyncio.get_event_loop() scheduler = AsyncIOScheduler(event_loop=loop) scheduler.add_job(my_task, 'cron', hour=12, minute=0) scheduler.start() await asyncio.sleep(3600) asyncio.run(main())
使用
run_in_executor
:async def main(): loop = asyncio.get_event_loop() with ThreadPoolExecutor() as executor: await loop.run_in_executor(executor, my_task) asyncio.run(main())
任务阻塞事件循环
如果在任务中执行了阻塞操作,会影响其他任务的执行。解决方案:
将阻塞操作放到线程池中执行:
async def my_task(): await asyncio.get_event_loop().run_in_executor(None, blocking_function)
使用专门的异步库:如数据库操作使用
aiomysql
或asyncpg
等异步驱动。
实际应用场景
定时数据备份
async def backup_database():
print(f"开始备份数据库... {datetime.now()}")
# 这里添加实际的备份逻辑
await asyncio.sleep(1)
scheduler.add_job(backup_database, 'cron', day_of_week='mon-fri', hour=2, minute=30)
定期API请求
import aiohttp
async def fetch_api_data():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com/data') as resp:
data = await resp.json()
print(f"获取到数据: {data}")
scheduler.add_job(fetch_api_data, 'interval', hours=1)
系统健康检查
async def system_health_check():
print(f"执行系统健康检查... {datetime.now()}")
# 检查系统资源使用情况
# 检查服务可用性
# 记录检查结果
scheduler.add_job(system_health_check, 'interval', minutes=5)
异步文件处理
结合 aiofiles
进行异步文件操作:
import aiofiles
async def process_logs():
async with aiofiles.open('app.log', mode='r') as f:
async for line in f:
# 处理每一行日志
pass
scheduler.add_job(process_logs, 'interval', hours=6)
结合FastApi使用
定时清理白名单过期IP
@asynccontextmanager async def lifespan( app: FastAPI, ): # 1. 初始化 Redis(单例检查) if not hasattr(app.state, "redis_client"): log.info("初始化 Redis 客户端...") app.state.redis_client = AsyncRedisClient() await app.state.redis_client.connect() # 2. 初始化ip_whitelist_dao if not hasattr(app.state, "ip_whitelist_dao"): log.info("初始化 IP 白名单 DAO...") app.state.ip_whitelist_dao = IpWhitelistDao() # 3. 使用 Redis 分布式锁控制定时任务初始化 async with app.state.redis_client.client.lock("scheduler_init_lock", timeout=5): if not hasattr(app.state, "scheduler"): scheduler = AsyncIOScheduler(timezone="Asia/Shanghai") scheduler.add_job( clean_expired_whitelist, # trigger=CronTrigger(second="*"), trigger=CronTrigger(minute=0, hour="*"), args=[app], id="whitelist_cleanup_job", ) scheduler.start() app.state.scheduler = scheduler log.info("定时任务已启动") yield # 应用运行 # 资源清理 if hasattr(app.state, "scheduler") and app.state.scheduler.running: app.state.scheduler.shutdown() await app.state.redis_client.close() # 定时任务函数 async def clean_expired_whitelist(app: FastAPI): """每小时清理一次过期的白名单IP""" key = "whitelist_cleanup_running" if await app.state.redis_client.client.set(key, "1", nx=True, ex=3600): try: await app.state.redis_client.cleanup_expired_proxies() finally: await app.state.redis_client.client.delete(key) def create_application() -> FastAPI: application = FastAPI(lifespan=lifespan) application.include_router(api_router, prefix="") # 添加CORS中间件 application.add_middleware( CORSMiddleware, allow_origins=["*"], # 允许所有源,也可以指定具体的源 allow_credentials=True, allow_methods=["*"], # 允许所有方法 allow_headers=["*"], # 允许所有头 ) # 将静态资源文件夹挂载到 '/static' 路径上 application.mount("/", StaticFiles(directory=Config.frontend_path, html=True)) return application
定时清理临时文件夹
@asynccontextmanager async def lifespan( app: FastAPI, ): # 1. 初始化 Redis(单例检查) if not hasattr(app.state, "redis_client"): log.info("初始化 Redis 客户端...") app.state.redis_client = AsyncRedisClient() await app.state.redis_client.connect() # 2. 使用 Redis 分布式锁控制定时任务初始化 async with app.state.redis_client.client.lock("scheduler_init_lock", timeout=5): if not hasattr(app.state, "scheduler"): scheduler = AsyncIOScheduler(timezone="Asia/Shanghai") scheduler.add_job( clean_expired_whitelist, # trigger=CronTrigger(second="*"), trigger=CronTrigger(minute=0, hour="*"), args=[app], id="clean_temp_dir", ) scheduler.start() app.state.scheduler = scheduler log.info("定时任务已启动") yield # 应用运行 # 资源清理 if hasattr(app.state, "scheduler") and app.state.scheduler.running: app.state.scheduler.shutdown() await app.state.redis_client.close() # 定时任务函数 async def clean_temp_dir(app: FastAPI): """清理临时文件夹""" key = "clean_temp_dir" if await app.state.redis_client.client.set(key, "1", nx=True, ex=3600): cleanup_tmp_dir() # 每小时清理临时文件夹 def cleanup_tmp_dir(): """每小时清理一次临时文件夹,删除超过 24 小时未修改的文件和空文件夹""" tmp_dir = temp_file_path current_time = time.time() # 24 小时的秒数 expiration_time = 24 * 60 * 60 if os.path.exists(tmp_dir): for item in os.listdir(tmp_dir): item_path = os.path.join(tmp_dir, item) try: # 获取文件或文件夹的最后修改时间 last_modified_time = os.path.getmtime(item_path) # 判断是否超过 24 小时 if (current_time - last_modified_time) > expiration_time: if os.path.isdir(item_path): # 仅删除空文件夹 if not os.listdir(item_path): os.rmdir(item_path) else: os.remove(item_path) except Exception as e: print(f"清理 {item_path} 时出错: {e}") def create_application() -> FastAPI: application = FastAPI(lifespan=lifespan) application.include_router(api_router, prefix="") # 添加CORS中间件 application.add_middleware( CORSMiddleware, allow_origins=["*"], # 允许所有源,也可以指定具体的源 allow_credentials=True, allow_methods=["*"], # 允许所有方法 allow_headers=["*"], # 允许所有头 ) # 将静态资源文件夹挂载到 '/static' 路径上 application.mount("/", StaticFiles(directory=Config.frontend_path, html=True)) return application
总结
AsyncIOScheduler 是一个功能强大且灵活的异步任务调度工具,它能够帮助开发者在不阻塞程序的情况下,轻松调度定时任务,提高代码的优雅性与执行力。无论是简单的定时任务,还是复杂的周期性调度,它都能游刃有余地处理,甚至可以应对多任务并发运行的挑战。
通过 AsyncIOScheduler,可以:
- 告别传统的阻塞式编程方式(如
time.sleep()
) - 实现精确的任务时间控制
- 轻松管理多个并发任务
- 构建高效、可靠的异步应用程序