celery基本配置
from .celery_signals import task_failure_handler,task_postrun_handler
broker_connection_retry_on_startup=True
task_serializer='json'
accept_content=['json']
result_serializer='json'
timezone='Asia/Shanghai'
enable_utc=True
include=['app.tasks.test_tasks']
beat_schedule= {} # 动态获取
# 使用自定义调度器
beat_scheduler = 'app.scheduler.database_scheduler.FlaskDBScheduler'
beat_scheduler_reload_interval = 30 # 可覆盖默认值
celery信号机制:
import datetime
from celery.signals import task_postrun, task_failure
from app.models import TaskExecutionLog, db
@task_postrun.connect
def task_postrun_handler(task_id, task, args, kwargs, retval, state, **other):
from app import app
if state == 'SUCCESS':
duration = other.get('runtime', 0)
last_run_at = datetime.datetime.now() - datetime.timedelta(seconds=duration)
with app.app_context():
db.session.add(TaskExecutionLog(task_id=task_id,
task_name=task.name,
status='SUCCESS',
result=retval,
args=args,
kwargs=kwargs,
start_time=last_run_at,
end_time=datetime.datetime.now(datetime.timezone.utc),
duration=duration))
db.session.commit()
@task_failure.connect
def task_failure_handler(task_id, exception, traceback, task, args, kwargs, einfo, **other):
# 记录返回异常的任务
from app import app
duration = other.get('runtime', 0)
last_run_at = datetime.datetime.now() - datetime.timedelta(seconds=duration)
with app.app_context():
db.session.add(TaskExecutionLog(task_id=task_id,
task_name=task.name,
status='FAILURE',
result=exception,
args=args,
kwargs=kwargs,
start_time=last_run_at,
end_time=datetime.datetime.now(datetime.timezone.utc),
duration=duration))
db.session.commit()
flask基本配置:
from flask import Flask
from celery import Celery
from flask_sqlalchemy import SQLAlchemy
from config import Config
celery = Celery()
db = SQLAlchemy()
def create_app(config_class=Config):
app = Flask(__name__)
app.config.from_object(config_class)
# 注册celery
init_celery(app)
# 注册db
init_db(app)
# 注册路由
from app.views import bp
app.register_blueprint(bp)
return app
def init_celery(app: Flask):
from config import celery_config
celery.config_from_object(celery_config)
celery.conf.update(broker_url=app.config['CELERY_BROKER_URL'],
result_backend=app.config['CELERY_RESULT_BACKEND'])
celery.flask_app = app #scheduler类可直接调用
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
def init_db(app: Flask):
db.init_app(app)
app=create_app()
scheduler类重写:
import time
from typing import Dict
from celery.beat import Scheduler, ScheduleEntry
from celery.schedules import crontab
from app.models import ScheduledTask,db
class FlaskDBScheduler(Scheduler):
def __init__(self, *args, **kwargs):
self._schedule = {}
self._last_reload = 0
self.reload_interval = kwargs.get('reload_interval', 30) # 默认30秒重载
super().__init__(*args, **kwargs)
# 确保在调度器初始化时创建表
with self.app.flask_app.app_context():
db.create_all()
def setup_schedule(self):
self._schedule = self.load_schedule()
def load_schedule(self) -> Dict[str, ScheduleEntry]:
"""从Flask数据库加载调度配置"""
schedule = {}
with self.app.flask_app.app_context():
for task in ScheduledTask.query.filter_by(enabled=1).all():
schedule[task.name] = self.create_entry(task)
return schedule
def create_entry(self, db_task) -> ScheduleEntry:
"""将数据库记录转换为ScheduleEntry"""
try:
db_schedule=float(db_task.schedule)
except:
minute, hour, day, month, week_day = db_task.schedule.strip().split(' ')
db_schedule = crontab(minute=minute, hour=hour, day_of_week=week_day, day_of_month=day,
month_of_year=month)
return ScheduleEntry(
name=db_task.name,
task=db_task.task,
schedule=db_schedule,
args=db_task.args,
kwargs=db_task.kwargs,
options={'enabled': db_task.enabled}
)
def tick(self, event_t=None, min=None, max=None):
"""重载tick方法实现定期检查"""
now = time.time()
if now - self._last_reload > self.reload_interval:
self._schedule = self.load_schedule()
self._last_reload = now
self.logger.debug('Reloaded schedule from Flask DB')
return super().tick()
@property
def schedule(self) -> Dict[str, ScheduleEntry]:
return self._schedule
表基本配置:
from app import db
from datetime import datetime,timezone
class ScheduledTask(db.Model):
id = db.Column(db.Integer, primary_key=True,comment='id')
name = db.Column(db.String(128), unique=True, nullable=False) # 别名,望文生义
task = db.Column(db.String(256), nullable=False) # e.g. 'app.tasks.sample_task'
schedule = db.Column(db.String(128), nullable=False) # e.g. '10.0' or '0 8 * * *'
args = db.Column(db.JSON, default=list)
kwargs = db.Column(db.JSON, default=dict)
enabled = db.Column(db.Boolean, default=True)
last_updated = db.Column(db.DateTime, default=lambda :datetime.now(timezone.utc))
def __repr__(self):
return '<ScheduledTask %r %s>' % self.name,self.task
class TaskExecutionLog(db.Model):
id = db.Column(db.Integer, primary_key=True)
task_id = db.Column(db.String(128), index=True)
task_name = db.Column(db.String(256))
status = db.Column(db.String(50)) # SUCCESS, FAILURE, RETRY
result = db.Column(db.Text)
traceback = db.Column(db.Text)
args = db.Column(db.JSON)
kwargs = db.Column(db.JSON)
start_time = db.Column(db.DateTime)
end_time = db.Column(db.DateTime)
duration = db.Column(db.Float) # in seconds
def __repr__(self):
return f'<TaskExecution {self.task_name} {self.status}>'
接口展示及添加任务:
import importlib
from flask import Blueprint, jsonify, request
from app.models import ScheduledTask, TaskExecutionLog
from app import db
bp = Blueprint('api', __name__, url_prefix='/api')
@bp.route('/tasks', methods=['GET'])
def list_tasks():
tasks = ScheduledTask.query.all()
return jsonify([{
'id': t.id,
'name': t.name,
'task': t.task,
'schedule': t.schedule,
'enabled': t.enabled
} for t in tasks])
@bp.route('/tasks', methods=['POST'])
def create_task():
data = request.json
print(data)
task = ScheduledTask(
name=data['name'],
task=data['task'],
schedule=data['schedule'],
args=data.get('args') or [],
kwargs=data.get('kwargs') or {},
enabled=data.get('enabled', True)
)
db.session.add(task)
db.session.commit()
return jsonify({'message': 'Task created'}), 201
@bp.route('/run', methods=['POST'])
def run_task():
data = ScheduledTask.query.filter(ScheduledTask.id == request.json['id'], ScheduledTask.enabled == True).first()
if data and data.task:
module, func = data.task.rsplit('.', 1)
task = getattr(importlib.import_module(module), func)
result = task.delay(*data.args, **data.kwargs)
return jsonify({'task_id': result.task_id})
return jsonify({'message': 'Task not found'}), 404
@bp.route('/task-logs', methods=['GET'])
def list_task_logs():
logs = TaskExecutionLog.query.order_by(TaskExecutionLog.start_time.desc()).limit(50).all()
return jsonify([{
'task_name': log.task_name,
'status': log.status,
'start_time': log.start_time.isoformat(),
'duration': log.duration
} for log in logs])
项目根目录导出celery便于celery命令执行:
from app import celery
tasks任务编写:
from app import celery
from app.models import ScheduledTask
@celery.task
def add(x, y):
return x + y
@celery.task
def multiply(x, y):
return x * y
@celery.task
def hello_world():
return "Hello World!"
@celery.task
def monitor_task_list():
tasks = ScheduledTask.query.all()
return [{
'id': t.id,
'name': t.name,
'task': t.task,
'schedule': t.schedule,
'enabled': t.enabled
} for t in tasks]
版本依赖:
celery==5.2.7
Flask==3.1.1
flask_sqlalchemy==3.1.1
tasks简单编写方便举例配置:
from app import celery
from app.models import ScheduledTask
@celery.task
def add(x, y):
return x + y
@celery.task
def multiply(x, y):
return x * y
@celery.task
def hello_world():
return "Hello World!"
@celery.task
def monitor_task_list():
tasks = ScheduledTask.query.all()
return [{
'id': t.id,
'name': t.name,
'task': t.task,
'schedule': t.schedule,
'enabled': t.enabled
} for t in tasks]
celery启动:
celery -A make_celery worker --pool=solo --loglevel=info #worker启动命令 celery -A make_celery beat --loglevel=info # beat启动命令
数据库配置:
按照上述步骤配置即可生效