学习目标:
通过自定义的CronScheduler调度器在兼容标准的调度器的情况下,查询自定义任务表去生成调度任务并分配给celery worker进行执行
不了解Celery框架的小伙伴可以先看一下我的上一篇文章:Celery框架组件分析及使用
学习内容:
- 创建自定义的Scheduler,设置自定义的Scheduler实现对原有配置的定时任务的兼容
- 如何启动自定义Scheduler对任务进行调度
- 创建自定义Scheduler遇到的一些问题
如何创建自定义的Scheduler:
在创建自定义Scheduler之前,我们先了解一下Scheduler类,这个类是Celery Beat的核心类,维护了任务的创建逻辑,调度逻辑。入下图是整个celery-beat启动执行流程图
+---------------------+
| Celery Beat 启动 |
+----------+----------+
|
v
+----------+----------+
| Service.start() | --> 初始化 Scheduler(此时调用 setup_schedule)
+----------+----------+
|
v
+----------+----------+
| scheduler.tick() | 每隔 interval 触发一次,把当前任务按最近执行时间放入 _heap(调度堆)
+----------+----------+
|
v
+--------------------------+
| schedule.get_schedule() |
| 返回所有任务 ScheduleEntry | 此处会获取所有的可执行的调度任务,需要重写
+--------------------------+
|
v
+-----------------------------+
| entry.is_due() |
| 判断任务是否需要执行 |
+-----------------------------+
|
如果是 True 否则等待下次 tick
|
v
+-----------------------------+
| apply_async(entry) |
| 发出任务执行 |
+-----------------------------+
要定义一个DjangoCronScheduler继承Scheduler,需要封装自己的数据库Cron表达式数据加载逻辑
import logging
import re
from celery.beat import Scheduler, ScheduleEntry
from celery.schedules import crontab
from django.db import close_old_connections
from record.models import KeywordVideoSchedule, AccountVideoSchedule
logger = logging.getLogger(__name__)
class DjangoCronScheduler(Scheduler):
"""从Django模型动态加载cron任务的调度器"""
def __init__(self, *args, **kwargs):
self._schedule = {}
self._cron_schedule = {}
self._last_refresh = None
self._refresh_interval = kwargs.pop('refresh_interval', 3600) # 默认3600秒刷新一次
super().__init__(*args, **kwargs)
# 第一步,初始化所有的动态cron表达式的任务
def setup_schedule(self):
"""初始化调度"""
super().setup_schedule()
# 加载静态配置中的任务(即 app.conf.beat_schedule)
for name, entry in self.app.conf.beat_schedule.items():
if isinstance(entry, dict):
self._schedule[name] = self.Entry(
name=name,
task=entry['task'],
schedule=entry['schedule'],
args=entry.get('args', []),
kwargs=entry.get('kwargs', {}),
options=entry.get('options', {}),
app=self.app
)
else:
# 已经是 ScheduleEntry 的实例
self._schedule[name] = entry
self._refresh_cron_tasks(force=True)
def tick(self):
"""重写tick方法,处理动态刷新"""
if self.should_refresh():
self._refresh_cron_tasks()
return super().tick()
def should_refresh(self):
"""检查是否需要刷新任务"""
if self._last_refresh is None:
return True
now = self.app.now()
return (now - self._last_refresh).total_seconds() >= self._refresh_interval
def _refresh_cron_tasks(self, force=False):
"""从Django模型刷新cron任务"""
logger.info(f'begin refresh_cron_tasks: force={force}')
try:
close_old_connections() # 确保数据库连接有效
# 获取所有启用的任务(关键词视频任务)
db_tasks_keyword = {
f'keyword_video_{task.id}': task
for task in KeywordVideoSchedule.objects.filter(del_flag=False).exclude(frequency=1)
}
# 获取所有启用的任务(账号视频任务)
db_tasks_account = {
f'account_video_{task.id}': task
for task in AccountVideoSchedule.objects.filter(del_flag=False).exclude(frequency=1)
}
# 合并两个字典
db_tasks = {**db_tasks_keyword, **db_tasks_account}
current_names = set(self._cron_schedule.keys())
new_names = set(db_tasks.keys())
# 删除不再存在的任务
for name in current_names - new_names:
del self._cron_schedule[name]
logging.info(f"Removed cron task: {name}")
# 添加或更新任务
for name, db_task in db_tasks.items():
existing = self._cron_schedule.get(name)
# 判断任务类型,并赋值任务名
if name.startswith('keyword_video_'):
task_name = 'record.tasks.spider_keyword_video'
elif name.startswith('account_video_'):
task_name = 'record.tasks.spider_account_video'
else:
logger.error(f"Unknown cron task: {name}")
continue
# 如果是新任务或cron表达式有变化
if not existing or getattr(existing, 'cron_expr', None) != db_task.collect_cron:
try:
crontab_schedule = crontab(*self._parse_cron_expr(db_task.collect_cron))
entry = ScheduleEntry(
name=name,
task=task_name,
schedule=crontab_schedule,
args=[db_task.id],
kwargs={},
options={},
total_run_count=0,
last_run_at=existing.last_run_at if existing else None
)
self._cron_schedule[name] = entry
logging.info(f"{'Updated' if existing else 'Added'} cron task: {name} ({db_task.collect_cron})")
except ValueError as e:
logging.error(f"Invalid cron expression {db_task.collect_cron} for task {name}: {str(e)}")
elif force:
# 强制刷新时更新其他参数
self._cron_schedule[name].update({
'args': [db_task.id],
'kwargs': {}
})
self._last_refresh = self.app.now()
except Exception as e:
logging.error(f"Failed to refresh cron tasks: {str(e)}")
finally:
close_old_connections() # 清理数据库连接
def _parse_cron_expr(self, cron_expr):
"""解析cron表达式为celery crontab参数"""
# 标准的cron表达式是包含秒的,但是python的crontab对象最小的单位为分,如果是6位含s的数据,需要舍弃前面的秒的处理
cron_parts = cron_expr.strip().split()
if len(cron_parts) == 6:
# 忽略秒段(第 0 段)
cron_parts = cron_parts[1:]
if len(cron_parts) != 5:
logger.error(f"无效的 Cron 表达式: {cron_expr} (需要5或6个字段)")
raise ValueError("Cron expression must have 5 parts or 6 parts")
logger.info(f'cron_parts : {cron_parts}')
# 通用转换规则
converted = []
for part in cron_parts:
# 规则1: 将 0/x 转换为 */x。celery不支持0/5这种格式
if re.match(r'^0/\d+$', part):
part = part.replace('0/', '*/')
# 规则2: 将 1-5/x 转换为 1-5/x (保持不变)
# 规则3: 保持 *、数字、, 等标准语法不变
converted.append(part)
return (
converted[0], # minute
converted[1], # hour
converted[2], # day_of_month
converted[3], # month_of_year
converted[4] # day_of_week
)
@property
def schedule(self):
"""确保 Celery Beat 能识别自定义调度任务"""
return self.schedule()
def get_schedule(self):
"""获取当前调度任务"""
return {**self._schedule, **self._cron_schedule}
- 定义一个
_schedule
属性接收原有的通过代码编写的固定cron表达式的执行task,例如现有项目在celery_app.py
当中配置的
app.conf.beat_schedule = {
'resume_autoclip_task_5min': {
'task': 'record.comsumers.resume_autoclip_task',
'schedule': crontab(minute='*/1')
},
'flash_all_template_view_and_like_1day': {
'task': 'record.comsumers.flash_all_template_view_and_like',
'schedule': crontab(hour='1', minute='0')
}
}
定义一个
_cron_schedule
属性用来接收数据库当中配置的动态cron表达式的数据,根据这个表达式生成ScheduleEntry对象,也就是每个动态需要执行task的对象.定义一个
_refresh_interval
属性进行刷新,因为我们的任务数据是配置在数据,数据可能会删除,也可能会修改,暂时还没考虑实时去更新Scheduler的条件下,我们就需要考虑动态刷新的额情况对设置的属性进行初始化赋值,我们需要关注的函数为setup_schedule和tick setup_schedule函数是sever启动的初始化入口,我们可以在项目启动时把我们需要初始化的数据都初始化进去,我们需要额外注意的是
crontab
对象接收的cron表达式和我们传统不一样,一般我们传统的cron表达式都是6位,crontab只接收5位参数,不携带秒的那一位,而且crontab不接收0/5这种格式,会报错。所以我在此处特殊处理了cron表达式的,将其通过_parse_cron_expr
转换成crontab对象能够接收的参数。def schedule(self)函数的作用,tick() 方法将按照执行时间获取可执行的任务,会触发调用schedule(self)方法,该方法会将系统代码配置任务和cron表达式的任务合并返回给调度器。理论上在
setup_schedule
阶段也可以将所有的数据都加载到同一个_schedule属性下,但是博主测试之后发现不成功,只能使用该方式进行处理了。
至此我们通过自定义的Scheduler就算编写完成了,下面我们来配置及启动测试。
如何启动自定义Scheduler对任务进行调度
- 首先我们使用了celery beat组件,需要我们在项目当中引入celery beat,博主项目结合使用的是Django框架,所以在setting.py文件的属性
INSTALLED_APPS
当中添加组件django_celery_beat
INSTALLED_APPS = [
'django_celery_beat', # 需要引入celery_beat进行任务生成调度
]
- 启动celery beat使用自定义的Scheduler,有两种方式,一个是通过代码配置,另外一种是通过命令行参数启动
- 通过代码配置,需要在celery_app.py文件中指定我们的beat_scheduler
# 使用自定义的调度器,app为你的app名称,modules为定义的自定义Scheduler的文件名称 app.conf.beat_scheduler = 'app.modules.DjangoCronScheduler'
- 配置完成后我们通过标准的启动方式就能启动了
celery -A you_projiect_name beat -l info
- 启动完成后即可看到,启动的提示信息
- 通过代码配置,需要在celery_app.py文件中指定我们的beat_scheduler
- 启动参数指定启动的Scheduler,此时我们就不需要配置celery_app.py当中的beat_scheduler,直接在启动命令行上去配置
celery -A autoclip beat -S app.modules.DjangoCronScheduler -l info
创建自定义Scheduler遇到的一些问题
- 在编写自定义Scheduler之前,博主也不是很了解Scheduler的执行原理,只是看到网上很多使用了Scheduler自带的cron配置页去配置的cron表达式任务,它是有自己两张固定的表结构的,如果想使用自己的cron表达式和表数据,还必须将自己的数据同步到
django_celery_beat_periodictask
和django_celery_beat_crontabschedule
表当中。博主就想能不能使用自己的表去执行任务,统计结果,因为每次网上述的两个表同步数据感觉处理下来也不是很方便。 - Scheduler数据的加载的时机和机制,如果不了解这个的话,确实是无从下手,就算初始化了数据也不清楚是怎么同步到任务堆里的,需要好好看一下上述的执行流程图。
- crontab对象的参数和标准的cron表达式不一致导致的报错,包括长度的区别,对于0/4这种格式的处理区别。
- 理论上还确少,数据动态变化时实时的变更
schedule
的方法,博主这边偷懒使用了定时刷新的机制,这个使用场景对于配置实时性感知要求不高的情况下是满足要求的。