当前位置: 首页 > news >正文

TaskIQ 是什么,怎么做异步任务

TaskIQ 是什么(一句话版)
• 一个异步分布式任务队列:声明函数 → 装饰成任务 → 通过 broker(NATS、RabbitMQ、Redis、Kafka 等)发送到 worker 执行;async/同步函数都支持。 
• 常见用法:RabbitMQ 当 broker + Redis 当 结果存储(官方文档也推荐这个组合)。 

和 FastAPI 结合(最小可用示例)

下面给出一个“请求立即返回 202,任务在后台跑”的最小示例,使用 RabbitMQ 作为 broker、Redis 作为结果后端,并通过 taskiq-fastapi 复用 FastAPI 依赖。

安装

pip install fastapi uvicorn taskiq taskiq-fastapi taskiq-aio-pika taskiq-redis

启动 RabbitMQ 与 Redis(示例用 Docker)

# RabbitMQ
docker run --rm -d -p 5672:5672 -p 15672:15672 \-e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest \rabbitmq:3.8.27-management-alpine# Redis
docker run --rm -d -p 6379:6379 redis(以上命令来自 TaskIQ 文档示例。) app.py(API + 任务定义 + 集成)# app.py
import asyncio
from fastapi import FastAPI, Response
from pydantic import BaseModel
from taskiq_fastapi import init as taskiq_init
from taskiq_aio_pika import AioPikaBroker
from taskiq_redis import RedisAsyncResultBackend# 1) 声明 broker(RabbitMQ)并挂上 Redis 结果后端
broker = AioPikaBroker("amqp://guest:guest@localhost:5672") \.with_result_backend(RedisAsyncResultBackend("redis://localhost:6379/0"))app = FastAPI()# 2) 让 taskiq-fastapi 初始化依赖上下文(可复用 FastAPI 的依赖)
taskiq_init(broker, "app:app")  # 这里的 "app:app" 指向本文件里的 FastAPI 实例# 3) FastAPI 生命周期里启动/关闭 broker(避免在 worker 进程里重复启动)
@app.on_event("startup")
async def _startup() -> None:if not broker.is_worker_process:await broker.startup()@app.on_event("shutdown")
async def _shutdown() -> None:if not broker.is_worker_process:await broker.shutdown()# 4) 定义异步任务
@broker.task
async def long_job(n: int) -> int:await asyncio.sleep(2)          # 模拟耗时return n * 2# 5) API:收到请求立即返回,后台执行任务```bash
class JobIn(BaseModel):n: int@app.post("/jobs", status_code=202)
async def submit_job(body: JobIn, response: Response):task = await long_job.kiq(body.n)   # 投递任务(关键是 .kiq)# 也可以把 task 的标识存起来做轮询,这里简单返回已排队return {"queued": True}

(可选)提供结果查询接口:需结合结果后端读取 task 结果,自行实现

启动

终端1:启动 TaskIQ worker(指向定义 broker 的模块与变量)

taskiq worker app:broker

终端2:启动 FastAPI

uvicorn app:app --reload

(.kiq(...) 用于入队;taskiq worker 模块路径:broker变量名 用于启动 worker,这些都来自官方文档。) 小提示•	taskiq-fastapi 会创建与真实请求无关的“worker 级别”Request/HTTPConnection 用于依赖注入,便于在任务里复用 FastAPI 依赖;注意它不是当前这次 HTTP 请求对象本身。 •	若只是“简单、短平快”的后台动作(比如发邮件、写日志),FastAPI 自带的 BackgroundTasks 就够用,但它不提供独立进程/持久化的队列能力;涉及耗时/可靠性需求时用 TaskIQ 更稳。 关键概念对照•	投递任务:await my_task.kiq(args...)(返回 TaskIQ 的任务句柄,可等待结果或结合结果后端查询)。 •	启动 worker:taskiq worker 路径:broker;可加 --fs-discover 自动发现 tasks.py。 •	推荐组合:RabbitMQ(taskiq-aio-pika)作 broker + Redis(taskiq-redis)作结果后端。 
http://www.dtcms.com/a/486172.html

相关文章:

  • 服务器CPU达到100%解决思路
  • 在 Claude Code 中设置 MCP 服务器(技术总结)
  • 网站上传根目录如何制作线上投票
  • 移动端网站建设的请示东莞科技网站建设
  • EtherCAT转CCLKIE工业通讯网关突破:三菱PLC实时调度EtherCAT伺服完成精密加工
  • 深度学习实验一之图像特征提取和深度学习训练数据标注
  • 基于Matlab的深度堆叠自编码器(SAE)实现与分类应用
  • @Scope失效问题
  • Service 网络原理
  • 数据复制问题及其解决方案
  • Java-Spring入门指南(二十五)Android 的历史,认识移动应用和Android 基础知识
  • WPF依赖属性(Dependency Property)详解
  • 深度学习进阶(三)——生成模型的崛起:从自回归到扩散
  • 三门峡网站开发ict网站建设
  • 神经网络之链式法则
  • C#设计模式源码讲解
  • 性能测试单场景测试时,是设置并发读多个文件,还是设置不同的用户读不同的文件?
  • Qt初识(对象树,乱码问题,小结)
  • 基于Home Assistant的机器人低延迟通信项目详细调研报告
  • 深圳网站做的好的公司婚庆网站开发目的
  • 中小企业网站制作是什么宁德网站建设51yunsou
  • 代理模式 vs AOP:支付服务中的日志增强实践(含执行顺序详解)
  • linux系统运维教程,linux系统运维攻略
  • string字符集
  • Linux 命令:fsck
  • 如何提升生物科技研发辅助的效率?
  • ECEF坐标转换库
  • 企业商务网被公司优化掉是什么意思
  • 网站虚拟主机购买教程专业网站设计工作室
  • 数据库管理-第376期 Oracle AI DB 23.26新特性一览(20251016)