Celery在Django中的应用

发布于:2025-08-14 ⋅ 阅读:(23) ⋅ 点赞:(0)

在这里插入图片描述

一、项目配置

1.1 确认celery及django版本相对应,本文使用django3.2celery5.5
1.2 创建一个名为CeleryStudydjango项目,以及一个名为test1_app,目录结构如下:
在这里插入图片描述
1.3 配置celery的setting参数(大部分不需要全局配置,可以针对tasks单独配置)

CELERY_BROKER_URL = 'redis://127.0.0.1:6379/0'
CELERY_RESULT_BACKEND = 'django-db'  # django-db(使用 Django 数据库存储结果)
CELERY_ACCEPT_CONTENT = ['json'] # 指定 Celery 接受的任务序列化格式(避免反序列化安全问题)。
CELERY_TASK_SERIALIZER = 'json' # 指定任务的序列化方式
CELERY_RESULT_SERIALIZER = 'json' # 指定结果的序列化方式
CELERY_TIMEZONE = TIME_ZONE # 设置 Celery 的时区(影响定时任务的调度时间)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' # 指定定时任务调度器的后端
# 默认内存调度:celery.beat:PersistentScheduler(需配合 beat_schedule_filename)
# Redis 调度:celery.beat:RedisScheduler(需安装 celery-redis-scheduler)
CELERYD_CONCURRENCY = 4 # Worker 并发数(默认 CPU 核心数)
CELERY_BEAT_SCHEDULE = {  # 一般在celery.py文件配置
    'every-10-seconds': {
        'task': 'myapp.tasks.debug',
        'schedule': 10.0,
    },
}
CELERY_BEAT_MAX_LOOP_INTERVAL = 300  # 秒, Beat 调度器的最大循环间隔(默认 5 分钟)
CELERY_TASK_TIME_LIMIT = 300  # 硬超时 5 分钟(任务被强制终止)
CELERY_TASK_SOFT_TIME_LIMIT = 240  # 软超时 4 分钟(触发 `SoftTimeLimitExceeded`)
CELERY_TASK_DEFAULT_RETRY_DELAY = 60  # 任务重试间隔 1 分钟
CELERY_WORKER_LOG_FORMAT = '%(asctime)s [%(levelname)s] %(message)s' # 自定义 Worker 日志格式
...... # 等等等等等等, 还有一大堆配置

1.4 celery全局配置

# celery.py

import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'CeleryStudy.settings')  # 其作用是为 Django 提供配置文件的定位信息,确保框架能正确加载项目的各项设置

app = Celery('CeleryStudy')		# celery实例,一般命名为项目名称
app.config_from_object('django.conf:settings', namespace='CELERY')	# celery实例从setting中CELERY开头的配置获取

app.autodiscover_tasks()  # 自动发现并注册项目中定义的tasks,会发现 @shared_task 和 @app.task

1.5 修改项目init文件,通过给外部导入

# __init__.py

from .celery import app as celery_app

__all__ = ("celery_app",)

1.6 在app中新建tasks写入任务逻辑

# tasks.py
"""
@app.task 是“专属任务”,绑定到具体应用,适合简单场景。
@shared_task 是“共享任务”,解耦于应用,适合复杂架构。
"""

写法一:
from celery import Celery
app = Celery('proj')
 
@app.task  # 绑定到当前 `app` 实例
def add(x, y):
    return x + y
 
写法二: 
import time
from celery import shared_task

@shared_task
def test_add(x, y):
    time.sleep(2)
    return x + y


@shared_task
def pre_task_test(x):
    # 定时任务
    return x

二、异步任务

2.1 普通用法

2.1.1 通过delay

# views.py

from django.http import HttpResponse
from .tasks import test_add

# Create your views here.

def test_celery(request):
    result = test_add.delay(1, 5)
    return HttpResponse(result.task_id + ' : ' + result.status)

2.1.2 通过apply_async

# countdown: 延迟执行(秒)。
# eta: 指定具体执行时间(datetime)。
# queue: 指定任务队列。
# expires: 任务过期时间。
# retry: 是否启用重试

from datetime import datetime, timedelta
 
# 延迟 10 秒执行
test_add.apply_async(args=(1, 5), countdown=10)
# 指定具体执行时间
test_add.apply_async(args=(1, 5), eta=datetime.now() + timedelta(minutes=1))
# 指定队列和过期时间
test_add.apply_async(args=(1, 5), queue='priority', expires=3600)

2.2 高级用法

通过 signature 对象调用,预生成任务签名(task.s()),用于创建一个可序列化的任务调用对象。它允许你预定义任务及其参数,而无需立即执行,从而支持更灵活的任务组合(如链式调用、组调用等),签名对象是可序列化的,可以存储到数据库通过网络传递

sig = test_add.s(1, 5)  # 创建签名对象
sig.apply_async()       # 异步执行
sig.delay()             # 等价于 apply_async

2.2.1 任务回调(Callback)

在任务成功后触发另一个任务(通过 link 参数)

test_add.apply_async(args=(1, 5), link=send_notification.s("Task completed!"))

2.2.2 任务链(Chaining)

通过 | 符号或 chain() 将多个任务串联,前一个任务的结果作为后一个任务的输入

from celery import chain
 
# 方法1:使用 | 符号
result = (task1.s(1, 2) | task2.s() | task3.s())()
# 方法2:使用 chain()
result = chain(task1.s(1, 2), task2.s(), task3.s())()

2.2.3 任务组(Group)

并行执行多个任务,等待所有任务完成

from celery import group
 
result = group(task1.s(i) for i in range(10))()  # 并发执行 10 个 task1

2.2.4 任务和弦(Chord)

先并行执行一组任务(group),全部完成后执行一个汇总任务

from celery import chord
 
result = chord((task1.s(i) for i in range(10)), task2.s())()  # 10 个 task1 完成后执行 task2

三、定时任务

在celery文件中添加定时任务路由表

# celery.py

app.conf.beat_schedule = {
    'task-name': {  # 任务名称(自定义)
        'task': 'myapp.tasks.my_task',  # 任务函数路径(需可导入)
        'schedule': 30,  # 执行时间规则(固定间隔)
        # 或 'schedule': crontab(minute='*/5'),  # Cron 表达式
        'args': (16, 16),  # 传递给任务的参数(可选)
        'options': {'queue': 'priority'},  # 其他选项(如指定队列)
    },
    # 可定义多个任务
}

通过安装pip install django-celery-beat可以实现在admin后台动态修改定时任务配置

INSTALLED_APPS = [
    ...,
    'django_celery_beat',
]
 
# 替换 Celery 的调度器
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'

配置完记得迁移数据库 python manage.py migrate

在这里插入图片描述


四、启动celery

worker = 干活的(执行任务)。
beat = 发任务的(定时生成任务)。
协作关系:beat 是“计划部门”,worker是“执行部门”,两者通过 Broker(消息队列)解耦。
生产建议:分开启动,Worker 可横向扩展,Beat 保持单例
在这里插入图片描述
在这里插入图片描述

4.1 命令行方式

# 启动 Worker(处理任务)
celery -A CeleryStudy worker -l info

celery -A CeleryStudy worker -l info -P eventlet # windows环境下命令
"""
prefork 是 Celery 在 Linux 上的默认并发模型,它使用多进程(Multiprocessing)处理任务,
适合 CPU 密集型场景。但 Windows 系统不支持 fork() 系统调用,因此无法使用 prefork 池。
在 Windows 上尝试使用 prefork 会直接报错,导致 Worker 无法启动。

eventlet 是一个基于协程(Coroutine)的并发库,通过绿色线程(Green Thread)实现高并发,
适合 I/O 密集型任务(如 Celery 的异步任务场景)。
在 Windows 上,eventlet 是少数可用的高性能并发池之一。
它通过非阻塞 I/O 和协程调度,避免了线程切换的开销,同时绕过了 GIL 的限制,
能显著提升任务处理效率
"""

# 启动 Beat(调度任务)
celery -A CeleryStudy beat -l info

# 合并启动
celery -A CeleryStudy worker --beat -l info

4.2 脚本方式(容易出问题,建议用命令行方式,很多默认配置内置好)

注:涉及django自定义管理命令,自己创建一个commands_app

4.2.1 启动worker(脚本方式博主暂时没找到好方法能捕获任务结果)

# CeleryStudy/commands_app/management/commands/run_celery_worker.py
from django.core.management.base import BaseCommand
from CeleryStudy.celery import app as celery_app

class Command(BaseCommand):
    def handle(self, *args, **options):
        worker = celery_app.Worker(
            hostname='worker1@%h',  # Worker 名称
            pool='eventlet',        # 进程池类型(prefork/solo/gevent)
            concurrency=4,         # 并发数
            loglevel='INFO',       # 日志级别
            logfile='/var/log/celery/worker.log',  # 日志文件(可选)
        )
        worker.start()
python manage.py run_celery_worker

4.2.2 启动beat

# CeleryStudy/commands_app/management/commands/run_celery_beat.py
from django.core.management.base import BaseCommand
from CeleryStudy.celery import app as celery_app

class Command(BaseCommand):
    def handle(self, *args, **options):
        beat = celery_app.Beat(
            loglevel='INFO',
            logfile='/var/log/celery/beat.log',  # 日志文件(可选)
            scheduler='django_celery_beat.schedulers:DatabaseScheduler',  # 使用数据库调度
        )
        beat.run()
python manage.py run_celery_beat

4.2.3 合并启动worker和beat

# CeleryStudy/commands_app/management/commands/run_celery.py
from django.core.management.base import BaseCommand
from threading import Thread
from CeleryStudy.celery import app as celery_app

class Command(BaseCommand):
    def handle(self, *args, **options):
        # 配置定时任务(可选)
        celery_app.conf.beat_schedule = {
            'add-every-10-seconds': {
                'task': 'proj.celery.debug_task',
                'schedule': 10.0,
                'args': (16, 16),
            },
        }

        # 启动 Worker
        worker = celery_app.Worker(
            hostname='worker1@%h',
            pool='prefork',
            concurrency=4,
            loglevel='INFO',
        )

        # 启动 Beat
        beat = celery_app.Beat(
            loglevel='INFO',
            scheduler='django_celery_beat.schedulers:DatabaseScheduler',
        )
		
		"""
		在后台线程中运行 Beat
		如果直接调用 beat.run(),它会阻塞主线程,导致 Worker 无法启动
		因此,需要通过线程(Thread)将 Beat 放在后台运行,避免阻塞主线程
		"""
        beat_thread = Thread(target=beat.run)
        beat_thread.daemon = True
        beat_thread.start()

        # 启动 Worker
        worker.start()
python manage.py run_celery

五、监控管理

5.1 celery inspect

作用:检查 Worker 状态、任务信息等(无需停止服务)。

celery -A proj inspect active          # 查看正在执行的任务
celery -A proj inspect registered      # 查看已注册的任务列表
celery -A proj inspect scheduled       # 查看待执行的定时任务(需 Beat 运行)
celery -A proj inspect reserved        # 查看 Worker 已获取但未执行的任务
celery -A proj inspect stats           # 查看 Worker 统计信息(如任务处理数)

5.2 celery control

作用:动态控制 Worker 行为(如关闭、重启、调整并发数)。

celery -A proj control shutdown        # 优雅关闭所有 Worker
celery -A proj control add_consumer Q1 # 动态添加监听队列 Q1
celery -A proj control cancel_consumer Q1 # 动态移除监听队列 Q1
celery -A proj control pool_grow 10    # 增加 Worker 并发数到 10
celery -A proj control pool_shrink 5   # 减少 Worker 并发数到 5

5.3 celery event

作用:监控 Celery 事件(如任务开始、成功、失败),可用于自定义仪表盘。

celery -A proj events                  # 启动事件监控(输出到终端)
celery -A proj events -d dump          # 以 JSON 格式输出事件
celery -A proj events -f events.log    # 将事件记录到文件

5.4 celery multi

作用:同时启动多个 Worker 或 Beat 实例(适用于分布式部署)。

celery multi start w1 w2 -A proj -l info -Q high,low  # 启动两个 Worker,分别监听不同队列
celery multi stop w1 w2                               # 停止指定 Worker

5.5 celery purge

作用:清空消息队列中的所有任务

celery -A proj purge -Q celery       # 清空默认队列
celery -A proj purge -Q high,low    # 清空多个队列

5.6 celery flower

作用:启动基于 Web 的监控仪表盘(需单独安装 flower 包)。

pip install flower
celery -A CeleryStudy flower --port=5555     # 访问 http://localhost:5555

网站公告

今日签到

点亮在社区的每一天
去签到