当前位置: 首页 > news >正文

Django 使用 Celery 完成异步任务或定时任务

1 介绍

Celery是一个分布式任务队列,由三个主要组件组成:Celery worker、Celery beat 和消息代理(例如 Redis 或 RabbitMQ)。这些组件一起协作,让开发者能够轻松地执行异步任务和定时任务。

Celery worker:负责接收任务请求并执行任务。当您在 Django 应用程序中调用 apply_async 方法时,任务将被发送到 Celery worker,然后由 worker 执行。

Celery beat:负责调度定时任务。它会根据定义的规则定期触发任务,并将其发送到 Celery worker 处理。

所以,对于需要运行定时任务的情况,我们需要同时启动 Celery worker 和 Celery beat 进程来确保所有任务都可以被正确地处理和执行。

如果只需要使用 Celery 来执行异步任务,那么只需启动 Celery worker 即可。但如果需要周期性地执行任务,那么需要启动 Celery beat 来帮助完成调度这些任务。

运行 worker 与 beat

celery -A proj worker --loglevel=info --detach --pidfile=worker.pid --logfile=./logs/worker.log
celery -A proj beat --loglevel=info --detach --pidfile=beat.pid --logfile=./logs/beat.log
-A proj:指定 Celery 应用程序所在的模块或包,这里假设其名为 proj。
worker 或 beat:启动的进程名称,分别对应 worker 和 beat 两种类型的 Celery 进程。
–loglevel=info:设置日志级别为 info,即只记录 info 级别及以上的日志信息。
–detach:以守护进程(daemonized)方式启动 Celery 进程,使其在后台运行。
–pidfile=worker.pid 或 --pidfile=beat.pid:将进程 ID(PID)写入指定的 PID 文件,方便后续管理和监控。
–logfile=./logs/worker.log 或 --logfile=./logs/beat.log:指定日志文件路径,所有日志信息都会输出到该文件中。

2使用

这里使用celery调用两个脚本,两个脚本位于
custom/firewallvulnsmapping.py
custom/fortinet_crawler.py
1)安装依赖
确保你安装了以下包:

#进入虚拟环境
firewallenv\Scripts\activate
python -m pip install celery redis django-celery-beat
python -m pip install celery redis django_celery_results #查看任务执行结果

2)修改 settings.py
在 settings.py 中添加Celery的配置: myproject/settings.py

##
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'firewall_app',  
    'django_celery_beat',# 添加Celery应用
    'django_celery_results',# 添加Celery结果展示应用
]

# Celery Configuration Options
# 使用 Redis 作为消息代理
CELERY_BROKER_URL = 'redis://localhost:6379/0'  # 或 'amqp://guest:guest@localhost:5672//' 如果使用 RabbitMQ
CELERY_RESULT_BACKEND = 'django-db'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai' # 设置时区
CELERY_ENABLE_UTC = True
# Celery Beat Settings (如果使用定时任务)
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' # 如果希望在 Django Admin 中管理定时任务,需要安装 django-celery-beat
# 或者使用默认的本地调度器:
# CELERY_BEAT_SCHEDULER = 'celery.beat:PersistentScheduler'
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True

LOGGING = {
    'version': 1,
    'disable_existing_loggers': False,
    'handlers': {
        'file': {
            'level': 'INFO',
            'class': 'logging.FileHandler',
            'filename': 'celery.log',  # 日志文件路径
        },
    },
    'loggers': {
        'firewall_app.tasks': {  # 匹配您的任务模块
            'handlers': ['file'],
            'level': 'INFO',
            'propagate': True,
        },
    },
}

3)创建任务
在你的应用中(例如 myapp)创建 tasks.py 文件,定义一个任务:myapp/tasks.py

from django.core.management.base import BaseCommand
import logging
import time
from django.utils import timezone
from django.conf import settings
from celery import shared_task

logger = logging.getLogger(__name__)

def run_crawler_logic():
    print("执行爬虫任务...")
    # 在这里调用 FortinetCrawler 或相关爬虫函数
    # crawler = FortinetCrawler()
    # crawler.run()
    from firewall_app.custom.fortinet_crawler import crawl_fortinet_vulnerabilities
    crawl_fortinet_vulnerabilities()
    time.sleep(10) # 模拟任务执行
    print("爬虫任务完成.")

def run_mapping_logic():
    print("执行漏洞映射任务...")
    # 在这里调用 map_vulnerabilities_for_all_firewalls 或相关函数
    # map_vulnerabilities_for_all_firewalls()
    # 推迟导入爬虫函数,避免循环引用
    from firewall_app.custom.firewallvulnsmapping import run_mapping
    run_mapping()
    time.sleep(5) # 模拟任务执行
    print("漏洞映射任务完成.")

@shared_task
def run_crawler_task():
    """Celery task for running the web crawler."""
    # 确保 Django 环境已设置 (如果任务需要访问 Django 模型)
    # 如果 Celery worker 和 Django 运行在同一环境,通常不需要手动设置
    # 但为了保险起见,可以加上
    # if not django.apps.apps.ready:
    #     os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'firewall_monitor.settings')
    #     django.setup()
    logger.info("漏洞爬虫任务开始执行")
    run_crawler_logic()
    logger.info("漏洞爬虫任务完成")
    return "漏洞爬虫任务成功执行"

@shared_task
def run_firewall_mapping_task():
    """Celery task for running the firewall vulnerability mapping."""
    # 同上,确保 Django 环境
    # if not django.apps.apps.ready:
    #     os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'firewall_monitor.settings')
    #     django.setup()
    logger.info("防火墙漏洞映射任务开始执行")
    run_mapping_logic() 
    logger.info("防火墙漏洞映射任务完成")
    return "防火墙漏洞映射任务成功执行" 

4)创建 celery.py
在你的Django项目目录中创建一个名为 celery.py 的文件:myproject/celery.py
使用celery调用定时任务有两种方式,
(1)第一种是不需要django admin界面操作,采用命令方式设置schedule定时运行脚本实例如下:

# 创建 Celery 应用实例
# 'firewall_monitor' 应替换为您的项目名称
app = Celery('firewall_monitor')

# 使用 Django settings.py 中的配置
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动发现应用中的任务
# 它会自动查找每个 app 目录下的 tasks.py 文件
app.autodiscover_tasks()

# 可选:定义定时任务 (Celery Beat)
app.conf.beat_schedule = {
    'run-crawler-every-hour': {
        'task': 'firewall_app.tasks.run_crawler_task', # 指向任务函数
        'schedule': crontab(minute=0, hour='*/1'), # 每小时执行一次
        # 'schedule': 3600.0, # 或者每 3600 秒执行一次
    },
    'run-mapping-every-hour': {
        'task': 'firewall_app.tasks.run_firewall_mapping_task', # 指向任务函数
        'schedule': crontab(minute=5, hour='*/1'), # 每小时的第 5 分钟执行一次
        # 'schedule': 3600.0, # 或者每 3600 秒执行一次
    },
}

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

(2)第二种,直接使用django admin中 来配置定时任务,操作如下:celery.py不需要额外配置定时任务


# 创建 Celery 应用实例
# 'firewall_monitor' 应替换为您的项目名称
app = Celery('firewall_monitor')

# 使用 Django settings.py 中的配置
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自动发现应用中的任务
# 它会自动查找每个 app 目录下的 tasks.py 文件
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
    print(f'Request: {self.request!r}')

在django admin中配置定时任务,首先添加定时如下:
在这里插入图片描述
然后配置tasks
在这里插入图片描述

5)先启动redis

./redis-server.exe

6)启动Celery Beat

#运行 Celery Beat: 即使您在 Admin 中配置了任务,您仍然需要运行 Celery Beat 进程来读取数据库中的计划并按时发送任务给 Worker。确保使用数据库调度器启动 Beat:
    python -m celery -A firewall_monitor beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

在这里插入图片描述
6)启动Celery Worker
在终端中启动Celery Worker,使用 --loglevel=info 参数以便查看日志:
–pool=solo 参数强制 Celery 使用单线程模式,这通常可以避免 Windows 上的权限问题。


#运行 Celery Worker: 同样,您需要运行 Worker 进程来实际执行任务:
    python -m celery -A firewall_monitor worker --pool=solo -l info

启动后Celery Worker就可以看到日志,一旦到了定时的时间 就会执行任务,日志如下所示:
在这里插入图片描述
7)查看调度结果:
django admin页面中可以看到任务调用结果
在这里插入图片描述

8)代码中手动调用任务并查看结果
在视图中触发任务并查看结果:myapp/views.py

from django.http import JsonResponse
from .tasks import my_task
def trigger_task(request):
    result = my_task.delay()  # 异步调用任务
    return JsonResponse({"task_id": result.id})  # 返回任务ID以便跟踪
  1. 查看任务状态和结果
    要查看任务的状态和结果,可以使用任务ID。你可以在Django Shell中查询:
    python manage.py shell
    然后执行以下代码:
from myapp.tasks import my_task
from celery.result import AsyncResult
#替换为你的任务ID
task_id = 'your_task_id_here'
result = AsyncResult(task_id)

print("Task Status:", result.status)  # 查看任务状态
print("Task Result:", result.result)   # 查看任务结果
  1. 设置定时任务
    如果需要设置定时任务,可以在 tasks.py 中添加如下代码:myapp/tasks.py
from celery import shared_task
from celery import Celery
from celery.schedules import crontab
app = Celery('myproject')
@shared_task
def my_periodic_task():
    print("This task runs periodically!")
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # 每天11点50分执行
    sender.add_periodic_task(crontab(minute=50, hour=11), my_periodic_task.s())

项目目录如下

项目根目录/
├── manage.py                     # Django项目管理脚本
├── db.sqlite3                    # SQLite数据库文件
├── celery.log                    # Celery日志文件
├── firewall_vulns_mapping.log    # 防火墙漏洞映射日志
├── fortinet_crawler.log          # Fortinet爬虫日志
├── 操作手册.txt                  # 项目操作手册
├── firewallenv/                  # Python虚拟环境目录
│
├── firewall_monitor/             # Django项目主应用
│   ├── __init__.py               # 包初始化文件
│   ├── asgi.py                   # ASGI配置
│   ├── celery.py                 # Celery配置和任务调度
│   ├── settings.py               # Django项目设置
│   ├── urls.py                   # 项目URL配置
│   ├── wsgi.py                   # WSGI配置
│   └── .vscode/                  # VS Code配置目录
│
├── firewall_app/                 # 防火墙应用
│   ├── __init__.py               # 包初始化文件
│   ├── admin.py                  # Django管理界面配置
│   ├── apps.py                   # 应用配置
│   ├── models.py                 # 数据模型定义
│   ├── tasks.py                  # Celery任务定义
│   ├── urls.py                   # 应用URL配置
│   ├── views.py                  # 视图函数
│   ├── migrations/               # 数据库迁移文件目录
│   ├── custom/                   # 自定义脚本目录
│   │   ├── firewallvulnsmapping.py  # 防火墙漏洞映射脚本
│   │   ├── fortinet_crawler.py     # Fortinet漏洞爬虫脚本
│   │   └── update_official_links.py # 官方链接更新脚本
│   └── templates/                # 模板目录
│       └── firewall_app/         # 应用模板
│           ├── base.html         # 基础模板
│           ├── firewall_list.html      # 防火墙列表
│           ├── firewall_vulnerabilities.html # 防火墙漏洞页面
│           ├── vulnerability_detail.html    # 漏洞详情页
│           └── vulnerability_list.html      # 漏洞列表页
│
├── static/                       # 静态文件目录
├── staticfiles/                  # 收集的静态文件目录
└── logs/                         # 日志目录

相关文章:

  • 【Linux】进程创建、进程终止、进程等待
  • Lua 中的 table 类型详解
  • 深入解析 HTML 中 `<script>` 标签的 async 和 defer 属性
  • 高并发短信系统设计:基于SharingJDBC的分库分表、大数据同步与实时计算方案
  • autogenstudio设置
  • Redisson的红锁,分段锁,公平锁,联锁。。。。。。
  • 信息安全管理与评估2021年国赛正式卷答案截图以及十套国赛卷
  • 高负载WEB服务器--Tomcat
  • 深入理解 v-show 指令及其使用方法
  • 【本地图床搭建】宝塔+Docker+MinIO+PicGo+cpolar:打造本地化“黑科技”图床方案
  • github进阶使用教程
  • .net执行脚本:通过字符串的形式来执行按钮的点击操作
  • 【Python实时数据处理】流式计算与异步编程实战
  • 微服务之protobuf:下载、语法和使用一站式教程
  • Linux文件传输:让数据飞起来!
  • vue2项目集成Tailwindcss
  • 6.1 GitHub亿级数据采集实战:双通道架构+三级容灾设计,破解API限制与反爬难题
  • 青少年编程与数学 02-016 Python数据结构与算法 18课题、组合数学算法
  • Ubuntu 安装 Cursor AppImage 到应用程序中
  • n8n 本地部署及实践应用,实现零成本自动化运营 Telegram 频道(保证好使)
  • 两部门发布外汇领域行刑反向衔接案例,织密金融安全“防护网”
  • 九部门:对机动车特别是货车排放问题的监管将更加严格
  • 个人住房公积金贷款利率下调,100万元30年期贷款总利息将减少近5万元
  • 央行:5月15日起下调金融机构存款准备金率0.5个百分点
  • 证监会主席吴清:我们资本市场最重要的特征是“靠谱”
  • 纪念|“补白大王”郑逸梅,从藏扇看其眼光品味