flask使用celery通过数据库定时

发布于:2025-08-03 ⋅ 阅读:(13) ⋅ 点赞:(0)

celery基本配置

from .celery_signals import task_failure_handler,task_postrun_handler

broker_connection_retry_on_startup=True

task_serializer='json'
accept_content=['json']
result_serializer='json'
timezone='Asia/Shanghai'
enable_utc=True

include=['app.tasks.test_tasks']
beat_schedule= {} # 动态获取
# 使用自定义调度器
beat_scheduler = 'app.scheduler.database_scheduler.FlaskDBScheduler'
beat_scheduler_reload_interval = 30  # 可覆盖默认值

celery信号机制:

import datetime

from celery.signals import task_postrun, task_failure
from app.models import TaskExecutionLog, db


@task_postrun.connect
def task_postrun_handler(task_id, task, args, kwargs, retval, state, **other):
    from app import app
    if state == 'SUCCESS':
        duration = other.get('runtime', 0)
        last_run_at = datetime.datetime.now() - datetime.timedelta(seconds=duration)
        with app.app_context():
            db.session.add(TaskExecutionLog(task_id=task_id,
                                            task_name=task.name,
                                            status='SUCCESS',
                                            result=retval,
                                            args=args,
                                            kwargs=kwargs,
                                            start_time=last_run_at,
                                            end_time=datetime.datetime.now(datetime.timezone.utc),
                                            duration=duration))
            db.session.commit()


@task_failure.connect
def task_failure_handler(task_id, exception, traceback, task, args, kwargs, einfo, **other):
    # 记录返回异常的任务
    from app import app
    duration = other.get('runtime', 0)
    last_run_at = datetime.datetime.now() - datetime.timedelta(seconds=duration)
    with app.app_context():
        db.session.add(TaskExecutionLog(task_id=task_id,
                                        task_name=task.name,
                                        status='FAILURE',
                                        result=exception,
                                        args=args,
                                        kwargs=kwargs,
                                        start_time=last_run_at,
                                        end_time=datetime.datetime.now(datetime.timezone.utc),
                                        duration=duration))
        db.session.commit()

flask基本配置:

from flask import Flask
from celery import Celery
from flask_sqlalchemy import SQLAlchemy

from config import Config

celery = Celery()
db = SQLAlchemy()


def create_app(config_class=Config):
    app = Flask(__name__)
    app.config.from_object(config_class)
    # 注册celery
    init_celery(app)
    # 注册db
    init_db(app)
    # 注册路由
    from app.views import bp
    app.register_blueprint(bp)

    return app


def init_celery(app: Flask):
    from config import celery_config
    celery.config_from_object(celery_config)
    celery.conf.update(broker_url=app.config['CELERY_BROKER_URL'],
                       result_backend=app.config['CELERY_RESULT_BACKEND'])
    celery.flask_app = app #scheduler类可直接调用
    class ContextTask(celery.Task):
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return self.run(*args, **kwargs)
    celery.Task = ContextTask


def init_db(app: Flask):
    db.init_app(app)

app=create_app()

scheduler类重写:

import time
from typing import Dict

from celery.beat import Scheduler, ScheduleEntry
from celery.schedules import crontab

from app.models import ScheduledTask,db


class FlaskDBScheduler(Scheduler):
    def __init__(self, *args, **kwargs):
        self._schedule = {}
        self._last_reload = 0
        self.reload_interval = kwargs.get('reload_interval', 30)  # 默认30秒重载
        super().__init__(*args, **kwargs)

        # 确保在调度器初始化时创建表
        with self.app.flask_app.app_context():
            db.create_all()

    def setup_schedule(self):
        self._schedule = self.load_schedule()

    def load_schedule(self) -> Dict[str, ScheduleEntry]:
        """从Flask数据库加载调度配置"""
        schedule = {}

        with self.app.flask_app.app_context():
            for task in ScheduledTask.query.filter_by(enabled=1).all():
                schedule[task.name] = self.create_entry(task)

        return schedule

    def create_entry(self, db_task) -> ScheduleEntry:
        """将数据库记录转换为ScheduleEntry"""
        try:
            db_schedule=float(db_task.schedule)
        except:
            minute, hour, day, month, week_day = db_task.schedule.strip().split(' ')
            db_schedule = crontab(minute=minute, hour=hour, day_of_week=week_day, day_of_month=day,
                           month_of_year=month)
        return ScheduleEntry(
            name=db_task.name,
            task=db_task.task,
            schedule=db_schedule,
            args=db_task.args,
            kwargs=db_task.kwargs,
            options={'enabled': db_task.enabled}
        )

    def tick(self, event_t=None, min=None, max=None):
        """重载tick方法实现定期检查"""
        now = time.time()
        if now - self._last_reload > self.reload_interval:
            self._schedule = self.load_schedule()
            self._last_reload = now
            self.logger.debug('Reloaded schedule from Flask DB')

        return super().tick()

    @property
    def schedule(self) -> Dict[str, ScheduleEntry]:
        return self._schedule

表基本配置:

from app import db
from datetime import datetime,timezone


class ScheduledTask(db.Model):
    id = db.Column(db.Integer, primary_key=True,comment='id')
    name = db.Column(db.String(128), unique=True, nullable=False) # 别名,望文生义
    task = db.Column(db.String(256), nullable=False)  # e.g. 'app.tasks.sample_task'
    schedule = db.Column(db.String(128), nullable=False)  # e.g. '10.0' or '0 8 * * *'
    args = db.Column(db.JSON, default=list)
    kwargs = db.Column(db.JSON, default=dict)
    enabled = db.Column(db.Boolean, default=True)
    last_updated = db.Column(db.DateTime, default=lambda :datetime.now(timezone.utc))

    def __repr__(self):
        return '<ScheduledTask %r %s>' % self.name,self.task


class TaskExecutionLog(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    task_id = db.Column(db.String(128), index=True)
    task_name = db.Column(db.String(256))
    status = db.Column(db.String(50))  # SUCCESS, FAILURE, RETRY
    result = db.Column(db.Text)
    traceback = db.Column(db.Text)
    args = db.Column(db.JSON)
    kwargs = db.Column(db.JSON)
    start_time = db.Column(db.DateTime)
    end_time = db.Column(db.DateTime)
    duration = db.Column(db.Float)  # in seconds

    def __repr__(self):
        return f'<TaskExecution {self.task_name} {self.status}>'

接口展示及添加任务:

import importlib

from flask import Blueprint, jsonify, request
from app.models import ScheduledTask, TaskExecutionLog
from app import db

bp = Blueprint('api', __name__, url_prefix='/api')

@bp.route('/tasks', methods=['GET'])
def list_tasks():
    tasks = ScheduledTask.query.all()
    return jsonify([{
        'id': t.id,
        'name': t.name,
        'task': t.task,
        'schedule': t.schedule,
        'enabled': t.enabled
    } for t in tasks])


@bp.route('/tasks', methods=['POST'])
def create_task():
    data = request.json
    print(data)
    task = ScheduledTask(
        name=data['name'],
        task=data['task'],
        schedule=data['schedule'],
        args=data.get('args') or [],
        kwargs=data.get('kwargs') or {},
        enabled=data.get('enabled', True)
    )
    db.session.add(task)
    db.session.commit()
    return jsonify({'message': 'Task created'}), 201


@bp.route('/run', methods=['POST'])
def run_task():
    data = ScheduledTask.query.filter(ScheduledTask.id == request.json['id'], ScheduledTask.enabled == True).first()
    if data and data.task:
        module, func = data.task.rsplit('.', 1)
        task = getattr(importlib.import_module(module), func)
        result = task.delay(*data.args, **data.kwargs)
        return jsonify({'task_id': result.task_id})
    return jsonify({'message': 'Task not found'}), 404


@bp.route('/task-logs', methods=['GET'])
def list_task_logs():
    logs = TaskExecutionLog.query.order_by(TaskExecutionLog.start_time.desc()).limit(50).all()
    return jsonify([{
        'task_name': log.task_name,
        'status': log.status,
        'start_time': log.start_time.isoformat(),
        'duration': log.duration
    } for log in logs])

项目根目录导出celery便于celery命令执行:

from app import celery

tasks任务编写:

from app import celery
from app.models import ScheduledTask

@celery.task
def add(x, y):
    return x + y

@celery.task
def multiply(x, y):
    return x * y

@celery.task
def hello_world():
    return "Hello World!"

@celery.task
def monitor_task_list():
    tasks = ScheduledTask.query.all()
    return [{
        'id': t.id,
        'name': t.name,
        'task': t.task,
        'schedule': t.schedule,
        'enabled': t.enabled
    } for t in tasks]

版本依赖:

celery==5.2.7
Flask==3.1.1
flask_sqlalchemy==3.1.1

tasks简单编写方便举例配置:

from app import celery
from app.models import ScheduledTask

@celery.task
def add(x, y):
    return x + y

@celery.task
def multiply(x, y):
    return x * y

@celery.task
def hello_world():
    return "Hello World!"

@celery.task
def monitor_task_list():
    tasks = ScheduledTask.query.all()
    return [{
        'id': t.id,
        'name': t.name,
        'task': t.task,
        'schedule': t.schedule,
        'enabled': t.enabled
    } for t in tasks]

celery启动:

celery -A make_celery worker --pool=solo --loglevel=info #worker启动命令
celery -A make_celery beat --loglevel=info # beat启动命令

数据库配置:

按照上述步骤配置即可生效


网站公告

今日签到

点亮在社区的每一天
去签到