flask Celery入门:轻松实现异步任务处理
Celery 分布式任务队列入门
Celery介绍和基本使用
现实生活中的同步就是代码中异步,反之亦然
Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery。
Celery是一个功能完备即插即用的任务队列。它使得我们不需要考虑复杂的问题,使用非常简单。 celery适用异步处理问题,当发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作,我们可将其异步执行,这样用户不需要等待很久,提高用户体验。
celery的特点是:
简单,易于使用和维护,有丰富的文档。 高效,单个celery进程每分钟可以处理数百万个任务。 灵活,celery中几乎每个部分都可以自定义扩展。 celery非常易于集成到一些web开发框架中.
任务队列
任务队列是一种跨线程、跨机器工作的一种机制.
任务队列中包含称作任务的工作单元。有专门的工作进程持续不断的监视任务队列,并从中获得新的任务并处理.
celery通过消息进行通信,通常使用一个叫Broker(中间人)来协client(任务的发出者)和worker(任务的处理者). clients发出消息到队列中,broker将队列中的信息派发给worker来处理。
一个celery系统可以包含很多的worker和broker,可增强横向扩展性和高可用性能。
安装必须要的依赖
pip install flask celery redis flower eventlet
在Windows安装Redis
-
下载链接,建议下载.msi后缀名文件,点击就可以安装了
-
安装一直下一步即可
https://github.com/tporadowski/redis/releases
项目结构
/flask-celery-demo├── app.py # 主应用文件├── celery_config.py # Celery配置├── tasks.py # 异步任务定义└── templates/└── index.html
配置Celery
-
redis默认端口号为:6379
-
redis有0-15个库,可以随意选择
# 创建一个celery_config.py 文件
class CeleryConfig:# 使用 Redis 作为消息代理 :6379 是redis默认端口号broker_url = 'redis://localhost:6379/0'# 任务结果存储(禁用可提升性能)result_backend = 'redis://localhost:6379/1'# 定时任务配置(可选)beat_schedule = {'every-30-seconds': {'task': 'tasks.periodic_task','schedule': 30.0, # 每30秒执行},}
异步任务定义
# 创建tasks.py文件
from celery import Celery
# 初始化Celery实例
celery = Celery(__name__)
# 导入配置文件
celery.config_from_object('celery_config.CeleryConfig')
@celery.task(name='tasks.send_email')
def send_email(receiver, message):"""模拟邮件发送任务"""import timeprint(f"[邮件开始] 发送给 {receiver}")time.sleep(5) # 模拟耗时操作print(f"[邮件成功] {receiver}: {message}")return f"邮件已发送至 {receiver}"
@celery.task
def periodic_task():print("定时任务执行: 系统状态检查完成")
Flask主应用
from flask import Flask, render_template, request, jsonify
from tasks import send_email, celery
app = Flask(__name__)
# 可选:动态加载Celery配置
app.config['CELERY_CONFIG'] = {'broker_url': 'redis://localhost:6379/0','result_backend': 'redis://localhost:6379/0'
}
celery.conf.update(app.config['CELERY_CONFIG'])
@app.route('/')
def index():return render_template('index.html')
@app.route('/send-mail', methods=['POST'])
def trigger_email():# 从表单获取数据email = request.form.get('email')message = request.form.get('message')# 异步调用任务(非阻塞)task = send_email.delay(email, message)return jsonify({"message": "邮件任务已提交","task_id": task.id})
@app.route('/check-task/<task_id>')
def check_task(task_id):# 获取任务状态task_result = celery.AsyncResult(task_id)return jsonify({"status": task_result.status,"result": task_result.result if task_result.ready() else None})
if __name__ == "__main__":app.run(debug=True)
创建模板文件,用于发送邮件内容进行测试
<!DOCTYPE html>
<html>
<head><title>Celery邮件演示</title>
</head>
<body><h1>发送测试邮件</h1><form action="/send-mail" method="POST"><input type="email" name="email" placeholder="接收邮箱" required><textarea name="message" placeholder="邮件内容"></textarea><button type="submit">发送邮件</button></form>
<script></script>
</body>
</html>
启动应用并测试
-
下述内容在终端中运行
-
启动 Redis 服务
-
redis-server 可能会存在端口号被占用的问题,可以临时修改启动端口号:redis-server --port 6380 ,或者安装时修改默认端口号
-
启动 Celery Worker
-
celery -A app.celery worker --loglevel=info -P solo
-
启动Flask应用(也可以直接启动,不需要在终端中执行命令)
-
python app.py