python celery框架结合django的使用
学习目标:
通过文章了解celery的运行机制以及如何结合django去使用
- 熟悉celery的运行原理
- 属性celery在django项目当中的配置
- 如何启动运行celery框架
学习内容:
熟悉celery的运行原理,简单来说
Celery 是一个“任务排队机+后台处理器”。帮你把一些慢的、耗时的工作,丢到后台去异步处理,不影响用户继续正常使用网站。Celery内部又分为这些组件,任务的流转图如下,通常我们需要关注的就是
定义任务,和生产任务投递到worker当中
:
组件名 | 简单理解 | 主要作用 | 实现方式 |
---|---|---|---|
Task(任务) | 要干的事情 定义要做的函数 | 比如发邮件、跑批函数 | @shared_task或者 @app.task注解 |
Broker(消息中间件) | 传话的中介人 Django丢给Broker | 比如发邮件、跑批函数 | Redis对列或RabbitMQ对列 |
Worker(工人) | 真正干活的人 后台进程 | 不停从队列拿任务,执行任务 | 通过命令创建worker,celery -A myproject worker --loglevel=info启动一个worker, 可以启用多个worker,进行并行并行处理,此时需要注意并行处理的数据安全 |
Result Backend(结果存储器) | 存储执行结果 | worker执行的结果需要存储 | 通过Redis、关系型数据库都可以实现 |
Scheduler(调度器) | 生成任务并把任务存储起来 | 将任务投递给 对应的消息中间件Broker当中 | 常用的Celery Beat,同时可以解析cron表达式进行任务投递 |
为什么要在django当中引入Celery?
- Django本身是同步阻塞的,它不适合干很慢的活。 假设我们在处理用户注册时,需要调用给用户发送邮件的功能,但是这个功能处理又很耗时,如果都放在 Django 请求里同步跑,会出现什么问题?用户界面卡住,网页转圈圈、请求超时,服务器崩溃,这样用户的体验就会很差。
- 另外一个重要的用途,我们的系统当中一般都有跑批定时任务,我们通过Celery可以很好的通过异步的方式处理这种跑批的操作。
一般我们在系统当中引入Celery基于这种情况会比较多一些
如何在django项目当中配置celery
1. 安装 Celery 和 Redis
pip install celery
pip install redis
2. 在项目根目录(和settings.py同级)新建一个celery.py
# myproject/celery.pyimport os
from celery import Celery# 设置环境变量,指向你的Django设置
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')# 创建Celery应用
app = Celery('myproject')# 从Django settings中加载Celery配置(以CELERY_开头)
app.config_from_object('django.conf:settings', namespace='CELERY')# 自动发现Django应用中的tasks.py文件
app.autodiscover_tasks()@app.task(bind=True)
def debug_task(self):print(f'Request: {self.request!r}')
如果我们使用了其他的名称,例如:customer_task.py,只要我们在内部方式使用了@shared_task(bind=True),一样是可以扫描到的,Celery启动的时候,会去你项目里所有安装的 Django App,自动查找能被发现的模块,只是默认是找tasks.py。
3. 修改__init__.py让 Django 启动时自动加载Celery,在myproject/init.py加上:
from .celery import app as celery_app__all__ = ('celery_app',)
4. 配置settings.py
# settings.py# 使用Redis作为broker
CELERY_BROKER_URL = 'redis://localhost:6379/0'# 存储任务结果(可选)
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'# 配置序列化格式
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'# 时区设置
CELERY_TIMEZONE = 'Asia/Shanghai' # 根据你的时区修改
5. 在你的 Django 应用中写tasks.py
比如你有个 app 叫myapp,在myapp/tasks.py或者custom_task.py里写任务:
# myapp/tasks.pyfrom celery import shared_task@shared_task
def test_print():print(f"now task is running")@shared_task
def send_email_task(email_address):# 假装这里发送邮件print(f"Sending email to {email_address}")
6. 启动 Celery Worker,启动完成后会打印启动日志和扫描到带@shared_task
或者 @app.task
注解的函数
celery -A myproject worker --loglevel=info # 启动一个worker
[tasks] 列表是Celery Worker扫描、注册成功的所有task 函数,当你定义的task函数在这里边就说明你的任务函数被扫描到了,但并不代理这个生产了可执行的任务
此时我们如果不通过代码调用生产任务或者使用Scheduler组件生成任务投递到Broker当中,worker是拉取不到可执行的任务的。
7. 通过代码或者Scheduler组件投递任务
- 调用方法的示例,上述我们定义了一个test_print方法,调用
test_print.delay()
就会生成一个任务投递到Broker当中 - 定义一个cron表达式任务,启动Scheduler线程,生成任务
# 不使用数据库示例 from celery.schedules import crontabapp.conf.beat_schedule = {'clear-logs-every-night': {'task': 'myapp.tasks.test_print','schedule': crontab(minute='*/1'), # 每分钟生成一个任务}, }
启动完成后就可以在启动的Worker当中查看到任务的执行了,因为我起了两个Worker,所以任务并不是被一个Worker全部消费的。# 启动Scheduler线程,生产task任务投递到Broker当中 celery -A myproject beat -l info
总结
通过上面的一系列的学习,我们就能够很清晰的了解了Celery框架的组成和使用原理