TaskIQ 是什么,怎么做异步任务
TaskIQ 是什么(一句话版)
• 一个异步分布式任务队列:声明函数 → 装饰成任务 → 通过 broker(NATS、RabbitMQ、Redis、Kafka 等)发送到 worker 执行;async/同步函数都支持。 
• 常见用法:RabbitMQ 当 broker + Redis 当 结果存储(官方文档也推荐这个组合)。 
和 FastAPI 结合(最小可用示例)
下面给出一个“请求立即返回 202,任务在后台跑”的最小示例,使用 RabbitMQ 作为 broker、Redis 作为结果后端,并通过 taskiq-fastapi 复用 FastAPI 依赖。
安装
pip install fastapi uvicorn taskiq taskiq-fastapi taskiq-aio-pika taskiq-redis
启动 RabbitMQ 与 Redis(示例用 Docker)
# RabbitMQ
docker run --rm -d -p 5672:5672 -p 15672:15672 \-e RABBITMQ_DEFAULT_USER=guest -e RABBITMQ_DEFAULT_PASS=guest \rabbitmq:3.8.27-management-alpine# Redis
docker run --rm -d -p 6379:6379 redis(以上命令来自 TaskIQ 文档示例。) app.py(API + 任务定义 + 集成)# app.py
import asyncio
from fastapi import FastAPI, Response
from pydantic import BaseModel
from taskiq_fastapi import init as taskiq_init
from taskiq_aio_pika import AioPikaBroker
from taskiq_redis import RedisAsyncResultBackend# 1) 声明 broker(RabbitMQ)并挂上 Redis 结果后端
broker = AioPikaBroker("amqp://guest:guest@localhost:5672") \.with_result_backend(RedisAsyncResultBackend("redis://localhost:6379/0"))app = FastAPI()# 2) 让 taskiq-fastapi 初始化依赖上下文(可复用 FastAPI 的依赖)
taskiq_init(broker, "app:app") # 这里的 "app:app" 指向本文件里的 FastAPI 实例# 3) FastAPI 生命周期里启动/关闭 broker(避免在 worker 进程里重复启动)
@app.on_event("startup")
async def _startup() -> None:if not broker.is_worker_process:await broker.startup()@app.on_event("shutdown")
async def _shutdown() -> None:if not broker.is_worker_process:await broker.shutdown()# 4) 定义异步任务
@broker.task
async def long_job(n: int) -> int:await asyncio.sleep(2) # 模拟耗时return n * 2# 5) API:收到请求立即返回,后台执行任务```bash
class JobIn(BaseModel):n: int@app.post("/jobs", status_code=202)
async def submit_job(body: JobIn, response: Response):task = await long_job.kiq(body.n) # 投递任务(关键是 .kiq)# 也可以把 task 的标识存起来做轮询,这里简单返回已排队return {"queued": True}
(可选)提供结果查询接口:需结合结果后端读取 task 结果,自行实现
启动
终端1:启动 TaskIQ worker(指向定义 broker 的模块与变量)
taskiq worker app:broker
终端2:启动 FastAPI
uvicorn app:app --reload
(.kiq(...) 用于入队;taskiq worker 模块路径:broker变量名 用于启动 worker,这些都来自官方文档。) 小提示• taskiq-fastapi 会创建与真实请求无关的“worker 级别”Request/HTTPConnection 用于依赖注入,便于在任务里复用 FastAPI 依赖;注意它不是当前这次 HTTP 请求对象本身。 • 若只是“简单、短平快”的后台动作(比如发邮件、写日志),FastAPI 自带的 BackgroundTasks 就够用,但它不提供独立进程/持久化的队列能力;涉及耗时/可靠性需求时用 TaskIQ 更稳。 关键概念对照• 投递任务:await my_task.kiq(args...)(返回 TaskIQ 的任务句柄,可等待结果或结合结果后端查询)。 • 启动 worker:taskiq worker 路径:broker;可加 --fs-discover 自动发现 tasks.py。 • 推荐组合:RabbitMQ(taskiq-aio-pika)作 broker + Redis(taskiq-redis)作结果后端。