rabbitMQ 的基础知识这里就不阐述了,可以参看我早年写的文章 -> rabbitMQ 入门
https://blog.csdn.net/weixin_41997073/article/details/118724779
Celery 官网:http://www.celeryproject.org/
Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/
celery 还有定时运行,redis 缓冲使用等高级特性,可以参看 Celery的官网进行研究。
这里讲些 rabbitMQ 如何与程序嵌入,使用的是 django 的基础框架
django 使用celery 插件,在配置文件配置,并启动 celery 就可以与 rabbitMQ 进行交互
配置如下
INSTALLED_APPS = [
'rest_framework',
]
from celery import Celery
from kombu import Queue
CELERY_WORKER_POOL = 'prefork' # 禁用gevent池
CELERY_BROKER_URL = 'amqp://test:test_password@localhost:5672/test_vhost'
CELERY_RESULT_BACKEND = 'django-db' # 使用数据库存储结果
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False
CELERY_TASK_TRACK_STARTED = True
CELERY_RESULT_EXPIRES = 3600 # 任务结果保存1小时
CELERY_WORKER_CONCURRENCY = 4
# [ IO 密集型, 每个worker进程可以预取 并发数 × 预乘数 个任务到内存中 ]
# 设置为1000,表示每个worker进程处理1000个任务后会自动重启
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000
CELERY_TASK_ACKS_LATE = True
# Celery 队列配置
CELERY_TASK_QUEUES = (
Queue('default', routing_key='task.default'),
Queue('high_priority', routing_key='task.high_priority'),
Queue('celery', routing_key='task.celery'),
)
# Celery 路由配置
CELERY_TASK_ROUTES = {
'metadata.tasks.process_rabbitmq_message': {
'queue': 'high_priority',
'routing_key': 'task.high_priority'
},
'metadata.tasks.async_update_record': {
'queue': 'default',
'routing_key': 'task.default'
},
}
# CELERY_TASK_QUEUES = (
# Queue('high_priority', Exchange('high_priority'), routing_key='high_priority'),
# Queue('default', Exchange('default'), routing_key='default'),
# Queue('low_priority', Exchange('low_priority'), routing_key='low_priority'),
# )
MONKEY_PATCH_SETTINGS = {
'subprocess': False,
'thread': False, # 禁用线程补丁
}
注意:
django创建的数据模型需要手动迁移,还有rabbitMQ 的虚拟主机和队列也需要进行验证,确保可用。
在 django 的主入口 url 目录中建立celery 脚本 ,并在该目录的初始化 init 配置文件中添加 celery 信息
就可以在 django 的 manage.py 平级目录使用 celery 进行启动与 rabbitMQ 的交互 。
celery -A djangoProjectMetaDataManagement worker --loglevel=info -Q high_priority,default
celery -A djangoProjectMetaDataManagement.celery worker --loglevel=info
为了安装考虑看情况也可以创建一个独立的用户运行celery
# 创建专用用户
useradd celeryuser
# 使用非 root 用户运行
celery -A djangoProjectMetaDataManagement.celery worker --loglevel=info --uid=celeryuser --gid=celeryuser
celery.py
import os
from celery import Celery
from kombu import Queue
CELERY_BROKER_URL = 'amqp://test:test_password@localhost/test_vhost'
CELERY_RESULT_BACKEND = None
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoProjectMetaDataManagement.settings')
app = Celery('djangoProjectMetaDataManagement')
#app.config_from_object('django.conf:settings', namespace='CELERY')
# 使用 Django 的设置文件配置 Celery
app.config_from_object('django.conf:settings', namespace='CELERY')
app.conf.update(
worker_prefetch_multiplier=1, # 每个Worker进程每次只预取1个任务
worker_max_tasks_per_child=100, # 可选:限制每个Worker进程处理的任务数后重启
)
app.conf.task_queues = (
Queue('default'