当前位置: 首页 > news >正文

flask Celery入门:轻松实现异步任务处理

Celery 分布式任务队列入门

Celery介绍和基本使用

现实生活中的同步就是代码中异步,反之亦然

Celery 是一个 基于python开发的分布式异步消息任务队列,通过它可以轻松的实现任务的异步处理, 如果你的业务场景中需要用到异步任务,就可以考虑使用celery。

Celery是一个功能完备即插即用的任务队列。它使得我们不需要考虑复杂的问题,使用非常简单。 celery适用异步处理问题,当发送邮件、或者文件上传, 图像处理等等一些比较耗时的操作,我们可将其异步执行,这样用户不需要等待很久,提高用户体验。

celery的特点是:

简单,易于使用和维护,有丰富的文档。 高效,单个celery进程每分钟可以处理数百万个任务。 灵活,celery中几乎每个部分都可以自定义扩展。 celery非常易于集成到一些web开发框架中.

任务队列

任务队列是一种跨线程、跨机器工作的一种机制.

任务队列中包含称作任务的工作单元。有专门的工作进程持续不断的监视任务队列,并从中获得新的任务并处理.

celery通过消息进行通信,通常使用一个叫Broker(中间人)来协client(任务的发出者)和worker(任务的处理者). clients发出消息到队列中,broker将队列中的信息派发给worker来处理。

 一个celery系统可以包含很多的worker和broker,可增强横向扩展性和高可用性能。

安装必须要的依赖
pip install flask celery redis flower eventlet

在Windows安装Redis
  • 下载链接,建议下载.msi后缀名文件,点击就可以安装了

  • 安装一直下一步即可

https://github.com/tporadowski/redis/releases

项目结构
/flask-celery-demo├── app.py          # 主应用文件├── celery_config.py # Celery配置├── tasks.py        # 异步任务定义└── templates/└── index.html

配置Celery
  • redis默认端口号为:6379

  • redis有0-15个库,可以随意选择

# 创建一个celery_config.py 文件
class CeleryConfig:# 使用 Redis 作为消息代理  :6379  是redis默认端口号broker_url = 'redis://localhost:6379/0'# 任务结果存储(禁用可提升性能)result_backend = 'redis://localhost:6379/1'# 定时任务配置(可选)beat_schedule = {'every-30-seconds': {'task': 'tasks.periodic_task','schedule': 30.0,  # 每30秒执行},}

异步任务定义
# 创建tasks.py文件
from celery import Celery
​
# 初始化Celery实例
celery = Celery(__name__)
# 导入配置文件
celery.config_from_object('celery_config.CeleryConfig')
​
@celery.task(name='tasks.send_email')
def send_email(receiver, message):"""模拟邮件发送任务"""import timeprint(f"[邮件开始] 发送给 {receiver}")time.sleep(5)  # 模拟耗时操作print(f"[邮件成功] {receiver}: {message}")return f"邮件已发送至 {receiver}"
​
@celery.task
def periodic_task():print("定时任务执行: 系统状态检查完成")
Flask主应用
from flask import Flask, render_template, request, jsonify
from tasks import send_email, celery
​
app = Flask(__name__)
​
# 可选:动态加载Celery配置
app.config['CELERY_CONFIG'] = {'broker_url': 'redis://localhost:6379/0','result_backend': 'redis://localhost:6379/0'
}
celery.conf.update(app.config['CELERY_CONFIG'])
​
@app.route('/')
def index():return render_template('index.html')
​
@app.route('/send-mail', methods=['POST'])
def trigger_email():# 从表单获取数据email = request.form.get('email')message = request.form.get('message')# 异步调用任务(非阻塞)task = send_email.delay(email, message)return jsonify({"message": "邮件任务已提交","task_id": task.id})
​
@app.route('/check-task/<task_id>')
def check_task(task_id):# 获取任务状态task_result = celery.AsyncResult(task_id)return jsonify({"status": task_result.status,"result": task_result.result if task_result.ready() else None})
​
if __name__ == "__main__":app.run(debug=True)
创建模板文件,用于发送邮件内容进行测试
<!DOCTYPE html>
<html>
<head><title>Celery邮件演示</title>
</head>
<body><h1>发送测试邮件</h1><form action="/send-mail" method="POST"><input type="email" name="email" placeholder="接收邮箱" required><textarea name="message" placeholder="邮件内容"></textarea><button type="submit">发送邮件</button></form>
​<script></script>
</body>
</html>
启动应用并测试
  • 下述内容在终端中运行

  • 启动 Redis 服务

  • redis-server  可能会存在端口号被占用的问题,可以临时修改启动端口号:redis-server --port 6380 ,或者安装时修改默认端口号
  • 启动 Celery Worker

  • celery -A app.celery worker --loglevel=info -P solo
  • 启动Flask应用(也可以直接启动,不需要在终端中执行命令)

  • python app.py

http://www.dtcms.com/a/349668.html

相关文章:

  • 前端通过node本地转译rtsp流,配合hls实现浏览
  • 【SQL】深入理解MySQL存储过程:从入门到实战
  • CUDA 工具包 13.0 正式发布:开启新一代 GPU 计算的基石!
  • 使用EasyExcel根据模板导出文件
  • QtExcel/QXlsx
  • 深入浅出 Java 多态:从原理到实践的全面解析
  • 【RAGFlow代码详解-5】配置系统
  • 基于深度学习的翻拍照片去摩尔纹在线系统设计与实现
  • UE5 HoudiniPivotPainter1.0使用
  • NFC 滤波网络设计考虑
  • 车载通信架构---通过CANoe做 SOME/IP 模拟的配置例如
  • 库存指标怎么算?一文讲清3大库存分析指标
  • 大数据治理域——离线数据开发
  • 小白成长之路-k8s部署项目(二)
  • Legion Y7000P IRX9 DriveList
  • 【数据可视化-100】使用 Pyecharts 绘制人口迁徙图:步骤与数据组织形式
  • 程序设计---状态机
  • KVM 虚拟化技术与部署
  • ZKmall开源商城多端兼容实践:鸿蒙、iOS、安卓全平台适配的技术路径
  • 朴素贝叶斯学习笔记
  • Selenium框架Java实践截图服务
  • 面向过程与面向对象
  • 了解检验和
  • 四,设计模式-原型模式
  • 设计模式5-代理模式
  • 无锁队列的设计与实现
  • jdbc相关内容
  • 基于TimeMixer的帕金森语音分类:WAV音频输入与训练全流程
  • 基于开源 AI 智能名片链动 2+1 模式 S2B2C 商城小程序的新开非连锁品牌店开业引流策略研究
  • 云计算之中间件与数据库