【星海出品】rabbitMQ - 叁 应用篇
rabbitMQ 的基础知识这里就不阐述了,可以参看我早年写的文章 -> rabbitMQ 入门
https://blog.csdn.net/weixin_41997073/article/details/118724779
Celery 官网:http://www.celeryproject.org/
Celery 官方文档英文版:http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/
celery 还有定时运行,redis 缓冲使用等高级特性,可以参看 Celery的官网进行研究。
这里讲些 rabbitMQ 如何与程序嵌入,使用的是 django 的基础框架
django 使用celery 插件,在配置文件配置,并启动 celery 就可以与 rabbitMQ 进行交互
配置如下
INSTALLED_APPS = ['rest_framework',
]
from celery import Celery
from kombu import QueueCELERY_WORKER_POOL = 'prefork' # 禁用gevent池
CELERY_BROKER_URL = 'amqp://test:test_password@localhost:5672/test_vhost'
CELERY_RESULT_BACKEND = 'django-db' # 使用数据库存储结果
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = False
CELERY_TASK_TRACK_STARTED = True
CELERY_RESULT_EXPIRES = 3600 # 任务结果保存1小时CELERY_WORKER_CONCURRENCY = 4
# [ IO 密集型, 每个worker进程可以预取 并发数 × 预乘数 个任务到内存中 ]
# 设置为1000,表示每个worker进程处理1000个任务后会自动重启
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERY_WORKER_MAX_TASKS_PER_CHILD = 1000
CELERY_TASK_ACKS_LATE = True# Celery 队列配置
CELERY_TASK_QUEUES = (Queue('default', routing_key='task.default'),Queue('high_priority', routing_key='task.high_priority'),Queue('celery', routing_key='task.celery'),
)# Celery 路由配置
CELERY_TASK_ROUTES = {'metadata.tasks.process_rabbitmq_message': {'queue': 'high_priority','routing_key': 'task.high_priority'},'metadata.tasks.async_update_record': {'queue': 'default','routing_key': 'task.default'},
}# CELERY_TASK_QUEUES = (
# Queue('high_priority', Exchange('high_priority'), routing_key='high_priority'),
# Queue('default', Exchange('default'), routing_key='default'),
# Queue('low_priority', Exchange('low_priority'), routing_key='low_priority'),
# )MONKEY_PATCH_SETTINGS = {'subprocess': False,'thread': False, # 禁用线程补丁
}
注意:
django创建的数据模型需要手动迁移,还有rabbitMQ 的虚拟主机和队列也需要进行验证,确保可用。
在 django 的主入口 url 目录中建立celery 脚本 ,并在该目录的初始化 init 配置文件中添加 celery 信息
就可以在 django 的 manage.py 平级目录使用 celery 进行启动与 rabbitMQ 的交互 。
celery -A djangoProjectMetaDataManagement worker --loglevel=info -Q high_priority,defaultcelery -A djangoProjectMetaDataManagement.celery worker --loglevel=info为了安装考虑看情况也可以创建一个独立的用户运行celery
# 创建专用用户
useradd celeryuser# 使用非 root 用户运行
celery -A djangoProjectMetaDataManagement.celery worker --loglevel=info --uid=celeryuser --gid=celeryuser
celery.py
import os
from celery import Celery
from kombu import QueueCELERY_BROKER_URL = 'amqp://test:test_password@localhost/test_vhost'
CELERY_RESULT_BACKEND = Noneos.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangoProjectMetaDataManagement.settings')app = Celery('djangoProjectMetaDataManagement')
#app.config_from_object('django.conf:settings', namespace='CELERY')# 使用 Django 的设置文件配置 Celery
app.config_from_object('django.conf:settings', namespace='CELERY')app.conf.update(worker_prefetch_multiplier=1, # 每个Worker进程每次只预取1个任务worker_max_tasks_per_child=100, # 可选:限制每个Worker进程处理的任务数后重启
)app.conf.task_queues = (Queue('default'