当前位置: 首页 > news >正文

Celery分布式任务队列

文章目录

    • Celery
    • 核心组件
    • 工作原理
    • 处理异步任务
      • Docker部署运行Kafka
    • 处理定时任务

Celery

Celery 是一个分布式任务队列,主要用于处理异步任务、定时任务和大规模并发任务。它可以将耗时操作(如数据处理、邮件发送、文件转换等)从主程序中剥离,通过异步执行提高系统响应速度和资源利用率。
在这里插入图片描述

核心组件

消息中间件(Broker):作为任务的调度中心,接收生产者发送的任务消息,再将消息分发给消费者(Worker)。

执行单元(Worker):实际执行任务的进程 / 线程,负责从 Broker 中获取任务、执行任务,并将结果发送给Backend, 通过命令: celery -A 项目名 worker --loglevel=info 启动。
结果存储(Backend):存储 Worker 执行任务后的结果,供生产者查询。
Celery Beat:定时任务调度器,用于周期性生成任务,其原理是定期向 Broker 发送任务消息。启动命令:celery -A 项目名 beat --loglevel=info

工作原理

  • 生产者调用 Celery 任务,任务被序列化后发送到 Broker。

  • Worker 从 Broker 中获取任务并执行。

  • 任务执行完成后,Worker 将执行结果存入 Backend。

  • 生产者通过 Backend 查询任务结果。

如果是定时任务,则由 Celery Beat 定期向 Broker 发送任务,后续流程一样。

处理异步任务

异步处理订单信息,Broker采用Kafka,Backend采用Redis。

Docker部署运行Kafka

创建目录配置文件

mkdir -p ~/kafka-docker && cd ~/kafka-docker

创建docker-compose.yml文件

vim docker-compose.yml
services:zookeeper:image: confluentinc/cp-zookeeper:latestcontainer_name: zookeeperenvironment:ZOOKEEPER_CLIENT_PORT: 2181ZOOKEEPER_TICK_TIME: 2000ports:- "2181:2181"volumes:- zookeeper-data:/var/lib/zookeeper/datanetworks:- kafka-networkhealthcheck:test: ["CMD", "echo", "ruok", "|", "nc", "localhost", "2181"]interval: 5stimeout: 5sretries: 5kafka:image: confluentinc/cp-kafka:latestcontainer_name: kafkadepends_on:zookeeper:condition: service_healthyports:- "9092:9092"- "9093:9093"environment:KAFKA_PROCESS_ROLES: broker,controllerKAFKA_BROKER_ID: 1KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:9093KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXTKAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://xxxxxxxx:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"CLUSTER_ID: "MkU3OEVBNTcwNTJENDM2Qk"KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLERKAFKA_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093volumes:- kafka-data:/var/lib/kafka/datanetworks:- kafka-networknetworks:kafka-network:driver: bridgevolumes:zookeeper-data:kafka-data:

运行kafka

docker compose up -d

查看是否运行正常

docker ps

在这里插入图片描述
celery_app.py配置信息及定义任务

from celery import Celery
import timeapp = Celery('kafka_redis_demo', broker='confluentkafka://xxxxxxxx:9092',  broker_transport='confluentkafka',  broker_transport_options={'queue_name_prefix': 'demo-',  'bootstrap_servers': ['xxxxxxxx:9092'], 'socket_timeout_ms': 30000, },backend='redis://localhost:6379/0',task_serializer='json',result_serializer='json',accept_content=['json'],timezone='Asia/Shanghai',enable_utc=True,task_routes={'celery_app.process_order': {'queue': 'demo-celery'}}
)# 定义任务
@app.task(bind=True,max_retries=3,retry_delay=2,queue='demo-celery'  
)
def process_order(self, order_id: str, amount: float, user_id: str):try:print(f"开始处理订单 {order_id}(用户 {user_id},金额 {amount} 元)")time.sleep(2)  return {"order_id": order_id,"status": "success","amount": amount,"user_id": user_id,"processed_at": time.strftime("%Y-%m-%d %H:%M:%S")}except Exception as e:print(f"订单 {order_id} 处理失败,重试中...")self.retry(exc=e)

启动worker
需要在celery当前目录执行

celery -A celery_app worker --loglevel=info --queues=demo-celery --pool=solo

在这里插入图片描述
producer.py生产者发送消息

from celery_app import process_orderdef submit_tasks():task_ids = []for i in range(1, 4):order_id = f"ORD{202408:06d}" task = process_order.delay(order_id=order_id,amount=99.9 * i,user_id=f"USER{i:03d}")task_ids.append({"order_id": order_id, "task_id": task.id})print(f"提交订单 {order_id},任务ID:{task.id}")return task_idsif __name__ == "__main__":submitted_tasks = submit_tasks()with open("task_ids.txt", "w") as f:import jsonjson.dump(submitted_tasks, f)

查看kafka队列已经收到消息
在这里插入图片描述
worker执行任务在这里插入图片描述
result_checker.py查询结果

from celery_app import app
import jsondef check_results():with open("task_ids.txt", "r") as f:tasks = json.load(f)for task in tasks:task_id = task["task_id"]order_id = task["order_id"]result = app.AsyncResult(task_id)print(f"\n订单 {order_id}(任务ID:{task_id})")print(f"状态:{result.status}") print(f"任务信息:{result.info}")  if result.ready():if result.successful():print("结果:", result.get())  else:print("错误:", str(result.result))  else:print("任务尚未完成")if __name__ == "__main__":check_results()

查询结果
在这里插入图片描述
redis中的最终存储结果
在这里插入图片描述

处理定时任务

celery_config.py

from celery import Celery
from celery.schedules import crontab
import time
import os
import logging
app = Celery('kafka_redis_schedule_demo',broker='confluentkafka://xxxxxxxx:9092',broker_transport='confluentkafka',broker_transport_options={'bootstrap_servers': ['xxxxxxxx:9092'],'socket_timeout_ms': 30000,'api_version': (2, 8, 1),'queue_name_prefix': 'schedule-'},backend='redis://localhost:6379/0',task_serializer='json',result_serializer='json',accept_content=['json'],timezone='Asia/Shanghai',enable_utc=False,task_routes={'celery_config.print_current_time': {'queue': 'schedule-tasks2'}}
)app.conf.beat_schedule = {# 任务1:每30秒执行一次'every-30-seconds-print-time': {'task': 'celery_config.print_current_time','schedule': 30.0,'args': (),},# 任务2:每天凌晨3点执行'daily-2am-cleanup-logs': {'task': 'celery_config.cleanup_logs','schedule': crontab(hour=3, minute=0),'args': ('./var/log/app',),},# 任务3:每周一上午8点执行'weekly-monday-10am-statistics': {'task': 'celery_config.generate_weekly_report','schedule': crontab(hour=8, minute=0, day_of_week=1),'args': ('weekly_report.csv',),}
}logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)# 任务1:打印当前时间
@app.task(queue='schedule-tasks2')
def print_current_time():current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())print(f"当前时间: {current_time}")return {"status": "success", "current_time": current_time}# 任务2:清理日志文件
@app.task(queue='schedule-tasks2')
def cleanup_logs(log_dir):try:if not os.path.exists(log_dir):return {"status": "failed", "message": f"日志目录不存在: {log_dir}"}print(f"开始清理日志目录: {log_dir}")time.sleep(2)print(f"日志清理完成: {log_dir}")return {"status": "success", "cleaned_dir": log_dir}except Exception as e:print(f"日志清理失败: {str(e)}")return {"status": "failed", "error": str(e)}# 任务3:生成每周报告
@app.task(queue='schedule-tasks2')
def generate_weekly_report(output_file):try:print(f"开始生成每周报告: {output_file}")time.sleep(3) print(f"每周报告生成完成: {output_file}")return {"status": "success","report_file": output_file,"period": "2025-08-11至2025-08-17"}except Exception as e:return {"status": "failed", "error": str(e)}

启动任务调度器,Celery Beat每隔30s往Broker发送消息

celery -A celery_config beat --loglevel=info

在这里插入图片描述
启动worker执行任务

celery -A celery_config worker --loglevel=info --pool=solo --queues=schedule-tasks2

在这里插入图片描述

http://www.dtcms.com/a/324124.html

相关文章:

  • vscode uv 发布一个python包:编辑、调试与相对路径导包
  • Rust进阶-part6-宏
  • WPFC#超市管理系统(5)商品布局、顾客登录、商品下单
  • redis集群-docker环境
  • JAVA+AI简化开发操作
  • `sk_buff` 结构体详解(包含全生命周期解析)
  • How Websites Work 网站如何运作
  • 社交与职场中的墨菲定律
  • 2025年6月电子学会全国青少年软件编程等级考试(Python三级)真题及答案
  • DBAPI 实现不同角色控制查看表的不同列
  • 量子计算机实用化:从理论到现实的艰难跨越
  • 数据中心可视化:算力中枢智能运维
  • 编程基础之字符串——统计数字字符个数
  • JDBC的连接过程(超详细)
  • 【Docker-Day 12】揭秘容器网络:深入理解 Docker Bridge 模式与端口映射
  • Python爬虫-爬取政务网站的文档正文内容和附件数据
  • 【Linux | 网络】数据链路层
  • 电脑清理3步速成法
  • 三相LLC拓扑分析
  • CompletableFuture实现Excel sheet页导出
  • 产品经理入门 - 产品解决方案(需求分析、 功能优先级划分、功能价值、用户体验)
  • RabbitMQ面试精讲 Day 19:网络调优与连接池管理
  • NLP---IF-IDF案例分析
  • AI编程插件对比分析:CodeRider、GitHub Copilot及其他
  • 构建企业级Odoo 18 WMS——功能、架构与拓展蓝图
  • 宝塔面板部署sentinel
  • 【传奇开心果系列】基于Flet框架开发的增强版“Hello World“应用程序学习flet框架的拔高起点
  • 【后端】Java 8 特性 `User::getId` 语法(方法引用)介绍
  • Linux常用命令(后端开发版)
  • SQL(结构化查询语言)的四大核心分类