当前位置: 首页 > news >正文

Flask与Celery 项目应用(shared_task使用)

目录

    • 1. 项目概述
      • 主要功能
      • 技术栈
    • 2. 项目结构
    • 3. 环境设置
      • 创建虚拟环境并安装依赖
      • 主要依赖
    • 4. 应用配置
      • Flask应用初始化 (`__init__.py`)
      • Celery应用初始化 (`make_celery.py`)
    • 5. 定义Celery任务 (`tasks.py`)
      • 任务说明
    • 6. 创建API端点 (`views.py`)
      • API端点说明
    • 7. 前端界面 (`index.html`)
      • 前端JavaScript实现
    • 8. 运行应用
    • 用docker启动redis
      • 启动Celery Worker
      • 启动Flask开发服务器
    • 9. 关键概念解析
      • Celery任务装饰器
      • 任务ID
      • 任务状态查询

1. 项目概述

本教程将指导您创建一个Flask应用,该应用使用Celery处理后台任务。我们将构建一个简单的Web界面,允许用户提交三种不同类型的任务,并通过JavaScript轮询查看任务结果。

主要功能

  • 简单计算任务:将两个数字相加
  • 阻塞任务:模拟长时间运行的任务
  • 进度报告任务:显示任务执行进度

技术栈

  • Flask: Web框架
  • Celery: 分布式任务队列
  • Redis: 消息代理和结果后端
  • JavaScript: 前端交互和轮询

2. 项目结构

flask_celery_app/
├── .venv/                  # 虚拟环境
├── README.md              # 项目说明
├── make_celery.py         # Celery应用初始化
├── pyproject.toml         # 项目配置
├── requirements.txt       # 依赖列表
└── src/└── task_app/├── __init__.py    # Flask应用初始化├── tasks.py       # Celery任务定义├── views.py       # Flask视图和API└── templates/└── index.html  # 前端界面

3. 环境设置

创建虚拟环境并安装依赖

uv venv
Using CPython 3.12.10
Creating virtual environment at: .venv
Activate with: source .venv/bin/activate
uv pip install -r requirements.txt 
uv pip install -e .

主要依赖

  • Flask
  • Celery
  • Redis

4. 应用配置

Flask应用初始化 (__init__.py)

from celery import Celery
from celery import Task
from flask import Flask
from flask import render_templatedef create_app() -> Flask:app = Flask(__name__)app.config.from_mapping(CELERY=dict(broker_url="redis://localhost",result_backend="redis://localhost",task_ignore_result=True,),)app.config.from_prefixed_env()celery_init_app(app)@app.route("/")def index() -> str:return render_template("index.html")from . import viewsapp.register_blueprint(views.bp)return appdef celery_init_app(app: Flask) -> Celery:class FlaskTask(Task):def __call__(self, *args: object, **kwargs: object) -> object:with app.app_context():return self.run(*args, **kwargs)celery_app = Celery(app.name, task_cls=FlaskTask)celery_app.config_from_object(app.config["CELERY"])celery_app.set_default()app.extensions["celery"] = celery_appreturn celery_app

Celery应用初始化 (make_celery.py)

from task_app import create_appflask_app = create_app()
celery_app = flask_app.extensions["celery"]

5. 定义Celery任务 (tasks.py)

import timefrom celery import shared_task
from celery import Task@shared_task(ignore_result=False)
def add(a: int, b: int) -> int:return a + b@shared_task()
def block() -> None:time.sleep(5)@shared_task(bind=True, ignore_result=False)
def process(self: Task, total: int) -> object:for i in range(total):self.update_state(state="PROGRESS", meta={"current": i + 1, "total": total})time.sleep(1)return {"current": total, "total": total}

任务说明

  1. add任务: 简单的加法计算,设置ignore_result=False以保存结果
  2. block任务: 模拟耗时操作,不返回结果
  3. process任务: 带进度报告的任务,使用update_state更新进度

6. 创建API端点 (views.py)

from celery.result import AsyncResult
from flask import Blueprint
from flask import requestfrom . import tasksbp = Blueprint("tasks", __name__, url_prefix="/tasks")@bp.get("/result/<id>")
def result(id: str) -> dict[str, object]:result = AsyncResult(id)ready = result.ready()return {"ready": ready,"successful": result.successful() if ready else None,"value": result.get() if ready else result.result,}@bp.post("/add")
def add() -> dict[str, object]:a = request.form.get("a", type=int)b = request.form.get("b", type=int)result = tasks.add.delay(a, b)return {"result_id": result.id}@bp.post("/block")
def block() -> dict[str, object]:result = tasks.block.delay()return {"result_id": result.id}@bp.post("/process")
def process() -> dict[str, object]:result = tasks.process.delay(total=request.form.get("total", type=int))return {"result_id": result.id}

API端点说明

  1. /tasks/result/: 获取任务结果
  2. /tasks/add: 提交加法任务
  3. /tasks/block: 提交阻塞任务
  4. /tasks/process: 提交带进度的任务

7. 前端界面 (index.html)

<!doctype html>
<html>
<head><meta charset=UTF-8><title>Celery Example</title>
</head>
<body>
<h2>Celery Example</h2>
Execute background tasks with Celery. Submits tasks and shows results using JavaScript.<hr>
<h4>Add</h4>
<p>Start a task to add two numbers, then poll for the result.
<form id=add method=post action="{{ url_for("tasks.add") }}"><label>A <input type=number name=a value=4></label><br><label>B <input type=number name=b value=2></label><br><input type=submit>
</form>
<p>Result: <span id=add-result></span></p><hr>
<h4>Block</h4>
<p>Start a task that takes 5 seconds. However, the response will return immediately.
<form id=block method=post action="{{ url_for("tasks.block") }}"><input type=submit>
</form>
<p id=block-result></p><hr>
<h4>Process</h4>
<p>Start a task that counts, waiting one second each time, showing progress.
<form id=process method=post action="{{ url_for("tasks.process") }}"><label>Total <input type=number name=total value="10"></label><br><input type=submit>
</form>
<p id=process-result></p><script>const taskForm = (formName, doPoll, report) => {document.forms[formName].addEventListener("submit", (event) => {event.preventDefault()fetch(event.target.action, {method: "POST",body: new FormData(event.target)}).then(response => response.json()).then(data => {report(null)const poll = () => {fetch(`/tasks/result/${data["result_id"]}`).then(response => response.json()).then(data => {report(data)if (!data["ready"]) {setTimeout(poll, 500)} else if (!data["successful"]) {console.error(formName, data)}})}if (doPoll) {poll()}})})}taskForm("add", true, data => {const el = document.getElementById("add-result")if (data === null) {el.innerText = "submitted"} else if (!data["ready"]) {el.innerText = "waiting"} else if (!data["successful"]) {el.innerText = "error, check console"} else {el.innerText = data["value"]}})taskForm("block", false, data => {document.getElementById("block-result").innerText = ("request finished, check celery log to see task finish in 5 seconds")})taskForm("process", true, data => {const el = document.getElementById("process-result")if (data === null) {el.innerText = "submitted"} else if (!data["ready"]) {el.innerText = `${data["value"]["current"]} / ${data["value"]["total"]}`} else if (!data["successful"]) {el.innerText = "error, check console"} else {el.innerText = "✅ done"}console.log(data)})</script>
</body>
</html>

前端JavaScript实现

前端使用JavaScript发送任务请求并轮询结果:

  1. 提交表单时阻止默认行为,使用fetch API发送POST请求
  2. 获取任务ID后,定期轮询/tasks/result/<id>端点
  3. 根据任务状态更新UI

8. 运行应用

用docker启动redis

docker run --name redis-server -p 6379:6379 -d redis

启动Celery Worker

# 在第一个终端窗口
celery -A make_celery worker --loglevel INFO

在这里插入图片描述

启动Flask开发服务器

# 在第二个终端窗口
flask -A task_app run --debug

在这里插入图片描述

访问 http://localhost:5000/ 使用应用。
在这里插入图片描述

9. 关键概念解析

Celery任务装饰器

  • @shared_task(ignore_result=False): 创建可共享的任务,并保存结果
  • bind=True: 将任务实例作为第一个参数传递给任务函数

任务ID

  • 每个任务都有一个自动生成的唯一ID
  • 通过result.id获取,用于后续查询任务状态

任务状态查询

  • AsyncResult(id): 通过ID获取任务结果对象
  • result.ready(): 检查任务是否完成
  • result.successful(): 检查任务是否成功完成
  • result.get(): 获取任务结果

项目链接:https://github.com/pallets/flask/tree/main/examples/celery
可用downgit单独下载项目中某个文件夹:https://minhaskamal.github.io/DownGit/#/home?url=https:%2F%2Fgithub.com%2Fpallets%2Fflask%2Ftree%2Fmain%2Fexamples%2Fcelery

相关文章:

  • Android资源ID冲突解决方案
  • 对比学习
  • 添加禁用状态
  • 黑马点评【基于redis实现共享session登录】
  • 6️⃣Go 语言中的哈希、加密与序列化:通往区块链世界的钥匙
  • 前端开发者常用网站
  • Cursor Rules 使用
  • AI Agent 架构设计:ReAct 与 Self-Ask 模式对比与分析
  • vue · 插槽 | $slots:访问所有命名插槽内容 | 插槽的使用:子组件和父组件如何书写?
  • JavaWeb基础入门 — SpringBoot Web 案例详解
  • Vue Fragment vs React Fragment
  • Redis主从复制原理二 之 主从复制工作流程
  • Redis专题-基础篇
  • 安卓基础(编译.Class)
  • 【题解】[UTPC2024] C.Card Deck
  • altium designer2024绘制stm32过程笔记x`
  • 多区域协同的异地多活AI推理服务架构
  • qt使用笔记二:main.cpp详解
  • Linux系统 - 线程 -6- 线程安全函数和可重入函数
  • LangChain4j 学习教程项目
  • 双语网站建设定制开发/2345网址导航官网
  • 重庆公司网站建设价格/网络营销理论包括哪些
  • wordpress页面位置/成都做整站优化
  • 哪家专门做特卖的网站?/谷歌网址
  • 南川网站制作/b站推广入口
  • 网站建设设计报价/网推团队