一、celery依赖安装
# python 3.11版本
pip install celery redis django-celery-beat django_celery_results eventlet
二、celery 参数配置
django项目的settings.py中新增如下celery配置
##
INSTALLED_APPS = [
......
'firewall_app',
'django_celery_beat', # 添加Celery应用
'django_celery_results', # 添加Celery结果展示应用
]
# Celery Configuration Options
# 使用 Redis 作为消息代理
CELERY_BROKER_URL = 'redis://localhost:6379/0' # 或 'amqp://guest:guest@localhost:5672//' 如果使用 RabbitMQ
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai' # 设置时区
CELERY_ENABLE_UTC = True
# Celery Beat Settings (如果使用定时任务)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' # 如果希望在 Django Admin 中管理定时任务,需要安装 django-celery-beat
# 或者使用默认的本地调度器:
# CELERY_BEAT_SCHEDULER = 'celery.beat:PersistentScheduler'
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True
#日志输出配置
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'file': {
'level': 'INFO',
'class': 'logging.FileHandler',
'filename': 'celery.log', # 日志文件路径
},
},
'loggers': {
'firewall_app.tasks': { # 匹配您的任务模块
'handlers': ['file'],
'level': 'INFO',
'propagate': True,
},
},
}
三、celery定时任务
1、新建tasks.py
import logging
import time
from celery import shared_task
logger = logging.getLogger(__name__)
def run_crawler_logic():
print("执行爬虫任务...")
# 在这里调用 FortinetCrawler 或相关爬虫函数
# crawler = FortinetCrawler()
# crawler.run()
time.sleep(10) # 模拟任务执行
print("爬虫任务完成.")
def run_mapping_logic():
print("执行漏洞映射任务...")
# 在这里调用 map_vulnerabilities_for_all_firewalls 或相关函数
# map_vulnerabilities_for_all_firewalls()
# 推迟导入爬虫函数,避免循环引用
time.sleep(5) # 模拟任务执行
print("漏洞映射任务完成.")
@shared_task
def run_crawler_task():
"""Celery task for running the web crawler."""
# 确保 Django 环境已设置 (如果任务需要访问 Django 模型)
# 如果 Celery worker 和 Django 运行在同一环境,通常不需要手动设置
# 但为了保险起见,可以加上
# if not django.apps.apps.ready:
# os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'firewall_monitor.settings')
# django.setup()
logger.info("漏洞爬虫任务开始执行")
run_crawler_logic()
logger.info("漏洞爬虫任务完成")
return "漏洞爬虫任务成功执行"
@shared_task
def run_firewall_mapping_task():
"""Celery task for running the firewall vulnerability mapping."""
# 同上,确保 Django 环境
# if not django.apps.apps.ready:
# os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'firewall_monitor.settings')
# django.setup()
logger.info("防火墙漏洞映射任务开始执行")
run_mapping_logic()
logger.info("防火墙漏洞映射任务完成")
return "防火墙漏洞映射任务成功执行"
2、新建celery.py
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "mysite.settings")
app = Celery('firewall_monitor')
# 使用 Django settings.py 中的配置
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动发现应用中的任务(最好指定tasks路径)
app.autodiscover_tasks(['mysite.tasks'])
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
(3) __init__.py添加
from .celery import app as celery_app
__all__ = ('celery_app',)
四、celery任务启动
# 1、Redis-x64-5.0.14.1 window版本
# redis启动
redis-server.exe redis.windows.conf
# 2、启动djiango项目
python manage.py runserver 8000
# 3、celery beat启动
celery -A mysite beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler
# 4、celery 任务启动
celery -A mysite worker --pool=solo -l info -P eventlet
五、celery任务配置
启动djiango项目后,在Django-Admin后台配置定时任务。