【星海出品】rabbitMQ - 叁 应用篇

发布于:2025-09-13 ⋅ 阅读:(20) ⋅ 点赞:(0)

rabbitMQ 的基础知识这里就不阐述了,可以参看我早年写的文章 -> rabbitMQ 入门
https://blog.csdn.net/weixin_41997073/article/details/118724779

Celery 官网:http://www.celeryproject.org/
Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

celery 还有定时运行,redis 缓冲使用等高级特性,可以参看 Celery的官网进行研究。

这里讲些 rabbitMQ 如何与程序嵌入,使用的是 django 的基础框架

django 使用celery 插件,在配置文件配置,并启动 celery 就可以与 rabbitMQ 进行交互
配置如下

INSTALLED_APPS = [
    'rest_framework',
]
from celery import Celery
from kombu import Queue

CELERY_WORKER_POOL = 'prefork'  # 禁用gevent池
CELERY_BROKER_URL = 'amqp://test:test_password@localhost:5672/test_vhost'
CELERY_RESULT_BACKEND = 'django-db'  # 使用数据库存储结果
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False
CELERY_TASK_TRACK_STARTED = True
CELERY_RESULT_EXPIRES = 3600  # 任务结果保存1小时


CELERY_WORKER_CONCURRENCY = 4
# [ IO 密集型, 每个worker进程可以预取 并发数 × 预乘数 个任务到内存中 ]
# 设置为1000,表示每个worker进程处理1000个任务后会自动重启
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000
CELERY_TASK_ACKS_LATE = True

# Celery 队列配置
CELERY_TASK_QUEUES = (
    Queue('default', routing_key='task.default'),
    Queue('high_priority', routing_key='task.high_priority'),
    Queue('celery', routing_key='task.celery'),
)

# Celery 路由配置
CELERY_TASK_ROUTES = {
   
   
    'metadata.tasks.process_rabbitmq_message': {
   
   
        'queue': 'high_priority',
        'routing_key': 'task.high_priority'
    },
    'metadata.tasks.async_update_record': {
   
   
        'queue': 'default',
        'routing_key': 'task.default'
    },
}

# CELERY_TASK_QUEUES = (
#     Queue('high_priority', Exchange('high_priority'), routing_key='high_priority'),
#     Queue('default', Exchange('default'), routing_key='default'),
#     Queue('low_priority', Exchange('low_priority'), routing_key='low_priority'),
# )

MONKEY_PATCH_SETTINGS = {
   
   
    'subprocess': False,
    'thread': False,  # 禁用线程补丁
}

注意:
django创建的数据模型需要手动迁移,还有rabbitMQ 的虚拟主机和队列也需要进行验证,确保可用。

在 django 的主入口 url 目录中建立celery 脚本 ,并在该目录的初始化 init 配置文件中添加 celery 信息
就可以在 django 的 manage.py 平级目录使用 celery 进行启动与 rabbitMQ 的交互 。

celery -A djangoProjectMetaDataManagement worker --loglevel=info -Q high_priority,default

celery -A djangoProjectMetaDataManagement.celery worker --loglevel=info

为了安装考虑看情况也可以创建一个独立的用户运行celery
# 创建专用用户
useradd celeryuser

# 使用非 root 用户运行
celery -A djangoProjectMetaDataManagement.celery worker --loglevel=info --uid=celeryuser --gid=celeryuser

celery.py

import os
from celery import Celery
from kombu import Queue

CELERY_BROKER_URL = 'amqp://test:test_password@localhost/test_vhost'
CELERY_RESULT_BACKEND = None

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoProjectMetaDataManagement.settings')

app = Celery('djangoProjectMetaDataManagement')
#app.config_from_object('django.conf:settings', namespace='CELERY')

# 使用 Django 的设置文件配置 Celery
app.config_from_object('django.conf:settings', namespace='CELERY')

app.conf.update(
    worker_prefetch_multiplier=1,  # 每个Worker进程每次只预取1个任务
    worker_max_tasks_per_child=100, # 可选:限制每个Worker进程处理的任务数后重启
)

app.conf.task_queues = (
    Queue('default'

网站公告

今日签到

点亮在社区的每一天
去签到