Django 自定义celery-beat调度器,查询自定义表的Cron表达式进行任务调度

发布于:2025-05-01 ⋅ 阅读:(10) ⋅ 点赞:(0)

学习目标:

通过自定义的CronScheduler调度器在兼容标准的调度器的情况下,查询自定义任务表去生成调度任务并分配给celery worker进行执行

不了解Celery框架的小伙伴可以先看一下我的上一篇文章:Celery框架组件分析及使用


学习内容:

  1. 创建自定义的Scheduler,设置自定义的Scheduler实现对原有配置的定时任务的兼容
  2. 如何启动自定义Scheduler对任务进行调度
  3. 创建自定义Scheduler遇到的一些问题

如何创建自定义的Scheduler:

在创建自定义Scheduler之前,我们先了解一下Scheduler类,这个类是Celery Beat的核心类,维护了任务的创建逻辑,调度逻辑。入下图是整个celery-beat启动执行流程图

     +---------------------+
     | Celery Beat 启动    |
     +----------+----------+
                |
                v
     +----------+----------+
     | Service.start()     |  --> 初始化 Scheduler(此时调用 setup_schedule)
     +----------+----------+
                |
                v
     +----------+----------+
     | scheduler.tick()    | 每隔 interval 触发一次,把当前任务按最近执行时间放入 _heap(调度堆)
     +----------+----------+
                |
                v
     +--------------------------+
     | schedule.get_schedule()  |
     | 返回所有任务 ScheduleEntry | 此处会获取所有的可执行的调度任务,需要重写
     +--------------------------+
                |
                v
     +-----------------------------+
     | entry.is_due()              |
     | 判断任务是否需要执行		   |
     +-----------------------------+
                |
     如果是 True           否则等待下次 tick
                |
                v
     +-----------------------------+
     | apply_async(entry)          |
     | 发出任务执行                  |
     +-----------------------------+

要定义一个DjangoCronScheduler继承Scheduler,需要封装自己的数据库Cron表达式数据加载逻辑

import logging
import re

from celery.beat import Scheduler, ScheduleEntry
from celery.schedules import crontab
from django.db import close_old_connections

from record.models import KeywordVideoSchedule, AccountVideoSchedule

logger = logging.getLogger(__name__)


class DjangoCronScheduler(Scheduler):
    """从Django模型动态加载cron任务的调度器"""

    def __init__(self, *args, **kwargs):
        self._schedule = {}
        self._cron_schedule = {}
        self._last_refresh = None
        self._refresh_interval = kwargs.pop('refresh_interval', 3600)  # 默认3600秒刷新一次
        super().__init__(*args, **kwargs)
	
	# 第一步,初始化所有的动态cron表达式的任务
    def setup_schedule(self):
        """初始化调度"""
        super().setup_schedule()
        # 加载静态配置中的任务(即 app.conf.beat_schedule)
        for name, entry in self.app.conf.beat_schedule.items():
            if isinstance(entry, dict):
                self._schedule[name] = self.Entry(
                    name=name,
                    task=entry['task'],
                    schedule=entry['schedule'],
                    args=entry.get('args', []),
                    kwargs=entry.get('kwargs', {}),
                    options=entry.get('options', {}),
                    app=self.app
                )
            else:
                # 已经是 ScheduleEntry 的实例
                self._schedule[name] = entry
        self._refresh_cron_tasks(force=True)

    def tick(self):
        """重写tick方法,处理动态刷新"""
        if self.should_refresh():
            self._refresh_cron_tasks()

        return super().tick()

    def should_refresh(self):
        """检查是否需要刷新任务"""
        if self._last_refresh is None:
            return True

        now = self.app.now()
        return (now - self._last_refresh).total_seconds() >= self._refresh_interval

    def _refresh_cron_tasks(self, force=False):
        """从Django模型刷新cron任务"""
        logger.info(f'begin refresh_cron_tasks: force={force}')
        try:
            close_old_connections()  # 确保数据库连接有效

            # 获取所有启用的任务(关键词视频任务)
            db_tasks_keyword = {
                f'keyword_video_{task.id}': task
                for task in KeywordVideoSchedule.objects.filter(del_flag=False).exclude(frequency=1)
            }

            # 获取所有启用的任务(账号视频任务)
            db_tasks_account = {
                f'account_video_{task.id}': task
                for task in AccountVideoSchedule.objects.filter(del_flag=False).exclude(frequency=1)
            }
            # 合并两个字典
            db_tasks = {**db_tasks_keyword, **db_tasks_account}
            current_names = set(self._cron_schedule.keys())
            new_names = set(db_tasks.keys())

            # 删除不再存在的任务
            for name in current_names - new_names:
                del self._cron_schedule[name]
                logging.info(f"Removed cron task: {name}")

            # 添加或更新任务
            for name, db_task in db_tasks.items():
                existing = self._cron_schedule.get(name)
                # 判断任务类型,并赋值任务名
                if name.startswith('keyword_video_'):
                    task_name = 'record.tasks.spider_keyword_video'
                elif name.startswith('account_video_'):
                    task_name = 'record.tasks.spider_account_video'
                else:
                    logger.error(f"Unknown cron task: {name}")
                    continue
                # 如果是新任务或cron表达式有变化
                if not existing or getattr(existing, 'cron_expr', None) != db_task.collect_cron:
                    try:
                        crontab_schedule = crontab(*self._parse_cron_expr(db_task.collect_cron))
                        entry = ScheduleEntry(
                            name=name,
                            task=task_name,
                            schedule=crontab_schedule,
                            args=[db_task.id],
                            kwargs={},
                            options={},
                            total_run_count=0,
                            last_run_at=existing.last_run_at if existing else None
                        )
                        self._cron_schedule[name] = entry
                        logging.info(f"{'Updated' if existing else 'Added'} cron task: {name} ({db_task.collect_cron})")
                    except ValueError as e:
                        logging.error(f"Invalid cron expression {db_task.collect_cron} for task {name}: {str(e)}")
                elif force:
                    # 强制刷新时更新其他参数
                    self._cron_schedule[name].update({
                        'args': [db_task.id],
                        'kwargs': {}
                    })

            self._last_refresh = self.app.now()
        except Exception as e:
            logging.error(f"Failed to refresh cron tasks: {str(e)}")
        finally:
            close_old_connections()  # 清理数据库连接

    def _parse_cron_expr(self, cron_expr):
        """解析cron表达式为celery crontab参数"""

        # 标准的cron表达式是包含秒的,但是python的crontab对象最小的单位为分,如果是6位含s的数据,需要舍弃前面的秒的处理
        cron_parts = cron_expr.strip().split()
        if len(cron_parts) == 6:
            # 忽略秒段(第 0 段)
            cron_parts = cron_parts[1:]
        if len(cron_parts) != 5:
            logger.error(f"无效的 Cron 表达式: {cron_expr} (需要5或6个字段)")
            raise ValueError("Cron expression must have 5 parts or 6 parts")
        logger.info(f'cron_parts : {cron_parts}')
        # 通用转换规则
        converted = []
        for part in cron_parts:
            # 规则1: 将 0/x 转换为 */x。celery不支持0/5这种格式
            if re.match(r'^0/\d+$', part):
                part = part.replace('0/', '*/')

            # 规则2: 将 1-5/x 转换为 1-5/x (保持不变)
            # 规则3: 保持 *、数字、, 等标准语法不变
            converted.append(part)
        return (
            converted[0],  # minute
            converted[1],  # hour
            converted[2],  # day_of_month
            converted[3],  # month_of_year
            converted[4]  # day_of_week
        )

    @property
    def schedule(self):
        """确保 Celery Beat 能识别自定义调度任务"""
        return self.schedule()

    def get_schedule(self):
        """获取当前调度任务"""
        return {**self._schedule, **self._cron_schedule}

  1. 定义一个_schedule属性接收原有的通过代码编写的固定cron表达式的执行task,例如现有项目在celery_app.py当中配置的
app.conf.beat_schedule = {
    'resume_autoclip_task_5min': {
        'task': 'record.comsumers.resume_autoclip_task',
        'schedule': crontab(minute='*/1')
    },
    'flash_all_template_view_and_like_1day': {
        'task': 'record.comsumers.flash_all_template_view_and_like',
        'schedule': crontab(hour='1', minute='0')
    }
}

  1. 定义一个 _cron_schedule 属性用来接收数据库当中配置的动态cron表达式的数据,根据这个表达式生成ScheduleEntry对象,也就是每个动态需要执行task的对象.

  2. 定义一个_refresh_interval属性进行刷新,因为我们的任务数据是配置在数据,数据可能会删除,也可能会修改,暂时还没考虑实时去更新Scheduler的条件下,我们就需要考虑动态刷新的额情况

  3. 对设置的属性进行初始化赋值,我们需要关注的函数为setup_schedule和tick setup_schedule函数是sever启动的初始化入口,我们可以在项目启动时把我们需要初始化的数据都初始化进去,我们需要额外注意的是crontab对象接收的cron表达式和我们传统不一样,一般我们传统的cron表达式都是6位,crontab只接收5位参数,不携带秒的那一位,而且crontab不接收0/5这种格式,会报错。所以我在此处特殊处理了cron表达式的,将其通过_parse_cron_expr转换成crontab对象能够接收的参数。

  4. def schedule(self)函数的作用,tick() 方法将按照执行时间获取可执行的任务,会触发调用schedule(self)方法,该方法会将系统代码配置任务和cron表达式的任务合并返回给调度器。理论上在setup_schedule阶段也可以将所有的数据都加载到同一个_schedule属性下,但是博主测试之后发现不成功,只能使用该方式进行处理了。

至此我们通过自定义的Scheduler就算编写完成了,下面我们来配置及启动测试。


如何启动自定义Scheduler对任务进行调度

  • 首先我们使用了celery beat组件,需要我们在项目当中引入celery beat,博主项目结合使用的是Django框架,所以在setting.py文件的属性INSTALLED_APPS 当中添加组件django_celery_beat
INSTALLED_APPS = [
    'django_celery_beat',  # 需要引入celery_beat进行任务生成调度
]
  • 启动celery beat使用自定义的Scheduler,有两种方式,一个是通过代码配置,另外一种是通过命令行参数启动
    • 通过代码配置,需要在celery_app.py文件中指定我们的beat_scheduler
      # 使用自定义的调度器,app为你的app名称,modules为定义的自定义Scheduler的文件名称
      app.conf.beat_scheduler = 'app.modules.DjangoCronScheduler'
      
    • 配置完成后我们通过标准的启动方式就能启动了
       celery -A you_projiect_name beat  -l info
      
    • 启动完成后即可看到,启动的提示信息
      在这里插入图片描述
  • 启动参数指定启动的Scheduler,此时我们就不需要配置celery_app.py当中的beat_scheduler,直接在启动命令行上去配置
    celery -A autoclip beat -S app.modules.DjangoCronScheduler -l info
    

创建自定义Scheduler遇到的一些问题

  • 在编写自定义Scheduler之前,博主也不是很了解Scheduler的执行原理,只是看到网上很多使用了Scheduler自带的cron配置页去配置的cron表达式任务,它是有自己两张固定的表结构的,如果想使用自己的cron表达式和表数据,还必须将自己的数据同步到django_celery_beat_periodictaskdjango_celery_beat_crontabschedule表当中。博主就想能不能使用自己的表去执行任务,统计结果,因为每次网上述的两个表同步数据感觉处理下来也不是很方便。
  • Scheduler数据的加载的时机和机制,如果不了解这个的话,确实是无从下手,就算初始化了数据也不清楚是怎么同步到任务堆里的,需要好好看一下上述的执行流程图。
  • crontab对象的参数和标准的cron表达式不一致导致的报错,包括长度的区别,对于0/4这种格式的处理区别。
  • 理论上还确少,数据动态变化时实时的变更schedule的方法,博主这边偷懒使用了定时刷新的机制,这个使用场景对于配置实时性感知要求不高的情况下是满足要求的。

网站公告

今日签到

点亮在社区的每一天
去签到