当前位置: 首页 > news >正文

python 实现 Celery 任务队列系统

这是一个基于 Celery 和 Redis 的分布式任务队列系统,用于处理异步任务和定时任务。

系统要求

  • Python 3.x
  • Redis 服务器
  • 依赖包:
    • celery==5.3.6
    • redis==5.0.1

系统架构

系统主要由以下组件构成:

  1. 任务定义模块 (tasks.py):包含所有可执行的任务定义
  2. 主程序模块 (main.py):负责任务的调度和监控
  3. Redis 服务器:作为消息代理(Broker)和结果后端(Result Backend)

配置说明

Celery 配置

broker_url = 'redis://:123456@127.0.0.1:6379/1'
result_backend = 'redis://:123456@127.0.0.1:6379/1'

主要配置项:

  • 任务序列化:JSON
  • 时区:Asia/Shanghai
  • 工作进程数:1

功能模块

1. 基础运算任务

  • add(x, y): 加法运算
  • multiply(x, y): 乘法运算
  • chain_calculation(numbers): 链式计算(求和、平均值、最大值、最小值)

2. 文本处理任务

  • process_text(text): 文本处理(大写转换、长度统计、单词计数)

3. 系统监控任务

  • system_monitor(): 每5秒执行一次,监控系统状态
    • CPU使用率
    • 内存使用率
    • 系统状态

4. 报告生成任务

  • generate_report(): 生成实时报告
  • daily_report(): 每天早上9点生成日报
  • workday_task(): 工作日每小时执行的任务

定时任务配置

系统包含以下定时任务:

  1. 系统监控:每5秒执行一次
  2. 日报生成:每天早上9点执行
  3. 工作日任务:工作日(周一至周五)9:00-18:00每小时执行

使用说明

1. 启动系统

  1. 确保Redis服务器已启动
  2. 启动Celery工作进程:
celery -A tasks worker --loglevel=info
  1. 启动Celery Beat进程(用于定时任务):
celery -A tasks beat
  1. 运行主程序:
python main.py

2. 系统监控

主程序运行后会自动执行以下操作:

  • 实时显示系统监控数据
  • 执行常规任务示例
  • 通过按下 Ctrl+C 可以优雅退出程序

错误处理

系统实现了完整的错误处理机制:

  • 任务执行错误捕获和日志记录
  • 优雅的程序退出处理
  • 自动重试机制

注意事项

  1. Redis连接配置需要根据实际环境修改
  2. 确保系统时区设置正确
  3. 建议在生产环境中调整工作进程数
  4. 监控数据目前为模拟数据,实际使用时需要替换为真实的系统监控指标

代码示例

任务执行示例:

# 执行加法任务
result = add.delay(4, 6)
print(f"任务ID: {result.id}")
if result.ready():
    print(f"结果: {result.get()}")

系统监控示例:

main.py

# 执行系统监控
result = system_monitor.delay()
data = result.get()
print(f"CPU使用率: {data['cpu_usage']:.1f}%")
print(f"内存使用率: {data['memory_usage']:.1f}%")
### 所有代码:

```python
from tasks import add, multiply, process_text, generate_report, chain_calculation, system_monitor
import time
import json
from datetime import datetime
import threading
import signal
import sys
from celery.result import AsyncResult

# 全局变量控制程序运行
running = True

def signal_handler(signum, frame):
    """处理退出信号"""
    global running
    print("\n收到退出信号,正在关闭程序...")
    running = False

def monitor_system_task():
    """监控系统任务的执行结果"""
    while running:
        try:
            # 执行系统监控任务
            result = system_monitor.delay()
            
            # 等待结果(最多等待4秒)
            for _ in range(4):
                if result.ready():
                    data = result.get()
                    if data:
                        print(f"\n系统监控结果:")
                        print(f"CPU使用率: {data['cpu_usage']:.1f}%")
                        print(f"内存使用率: {data['memory_usage']:.1f}%")
                        print(f"系统状态: {data['status']}")
                        print("-" * 50)
                    break
                time.sleep(1)
            
            # 等待剩余时间,确保大约每5秒执行一次
            time.sleep(1)
            
        except Exception as e:
            print(f"监控任务出错: {e}")
            time.sleep(5)

def run_regular_task():
    """运行普通任务的示例"""
    while running:
        try:
            # 执行一些常规任务
            print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 执行常规任务...")
            
            # 1. 执行加法任务
            result = add.delay(4, 6)
            print(f"加法任务ID: {result.id}")
            if result.ready():
                print(f"4 + 6 = {result.get()}")
            
            # 2. 执行文本处理
            text = f"这是一条测试消息 - {datetime.now()}"
            result = process_text.delay(text)
            print(f"文本处理任务ID: {result.id}")
            if result.ready():
                print(json.dumps(result.get(), indent=2, ensure_ascii=False))
            
            # 休眠5秒后继续下一轮
            for _ in range(5):
                if not running:
                    break
                time.sleep(1)
                
        except Exception as e:
            print(f"执行任务时出错: {e}")
            time.sleep(5)

def main():
    """主函数"""
    # 注册信号处理器(用于优雅退出)
    signal.signal(signal.SIGINT, signal_handler)
    signal.signal(signal.SIGTERM, signal_handler)
    
    print("程序启动...")
    print("提示:按 Ctrl+C 可以优雅退出程序")
    print("\n=== 主程序开始运行 ===")
    print("- 系统监控每5秒执行一次")
    print("- 常规任务每5秒执行一次")
    print("- 所有任务的执行结果会实时显示")
    
    try:
        # 创建并启动监控线程
        monitor_thread = threading.Thread(target=monitor_system_task)
        monitor_thread.daemon = True
        monitor_thread.start()
        
        # 创建并启动常规任务线程
        task_thread = threading.Thread(target=run_regular_task)
        task_thread.daemon = True
        task_thread.start()
        
        # 主线程保持运行
        while running:
            time.sleep(1)
            
    except KeyboardInterrupt:
        print("\n程序正在关闭...")
    finally:
        print("程序已退出。")

if __name__ == "__main__":
    main()

tasks.py

from celery import Celery
from celery.schedules import crontab
import time
from datetime import datetime
import random

# 创建 Celery 实例
app = Celery('tasks')

# 配置 Celery
app.conf.update(
    broker_url='redis://:123456@127.0.0.1:6379/1',
    result_backend='redis://:123456@127.0.0.1:6379/1',
    task_serializer='json',
    result_serializer='json',
    accept_content=['json'],
    timezone='Asia/Shanghai',
    enable_utc=True,
    worker_pool_restarts=True,
    worker_concurrency=1,
)

# 配置定时任务
app.conf.beat_schedule = {
    # 每5秒执行一次系统监控
    'monitor-every-5-seconds': {
        'task': 'tasks.system_monitor',
        'schedule': 5.0,  # 每5秒执行一次
    },
    # 每天早上9点执行
    'daily-morning-report': {
        'task': 'tasks.daily_report',
        'schedule': crontab(hour=9, minute=0),
    },
    # 工作日每小时执行
    'workday-hourly-task': {
        'task': 'tasks.workday_task',
        'schedule': crontab(hour='9-18', minute=0, day_of_week='1-5'),
    }
}

@app.task
def add(x, y):
    """简单的加法任务"""
    time.sleep(1)
    return x + y

@app.task
def multiply(x, y):
    """乘法运算任务"""
    time.sleep(2)
    return x * y

@app.task
def process_text(text):
    """文本处理任务"""
    time.sleep(1)
    result = {
        'original': text,
        'upper': text.upper(),
        'length': len(text),
        'words': len(text.split())
    }
    return result

@app.task
def generate_report():
    """生成报告任务"""
    time.sleep(3)
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    data = {
        'timestamp': current_time,
        'temperature': random.uniform(20, 30),
        'humidity': random.uniform(40, 80),
        'status': random.choice(['正常', '警告', '错误'])
    }
    return data

@app.task
def chain_calculation(numbers):
    """链式计算任务"""
    time.sleep(2)
    result = sum(numbers)
    average = result / len(numbers)
    maximum = max(numbers)
    minimum = min(numbers)
    return {
        'sum': result,
        'average': average,
        'max': maximum,
        'min': minimum,
        'count': len(numbers)
    }

@app.task
def system_monitor():
    """每5秒执行一次的系统监控任务"""

    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"\n[{current_time}] 每5秒执行一次的系统监控任务 执行系统监控...")
    
    # 模拟获取系统信息
    data = {
        'timestamp': current_time,
        'cpu_usage': random.uniform(0, 100),
        'memory_usage': random.uniform(0, 100),
        'status': 'running'
    }
    
    # 打印监控信息
    print(f"CPU使用率: {data['cpu_usage']:.1f}%")
    print(f"内存使用率: {data['memory_usage']:.1f}%")
    print(f"系统状态: {data['status']}")
    print("-" * 50)
    
    return data

@app.task
def daily_report():
    """每天早上9点执行的日报任务"""
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"\n[{current_time}] 生成每日报告...")
    
    report = {
        'report_type': '日报',
        'generated_at': current_time,
        'summary': '这是一个自动生成的日报示例',
        'metrics': {
            'total_tasks': random.randint(100, 1000),
            'completed_tasks': random.randint(50, 500),
            'success_rate': random.uniform(0.8, 1.0)
        }
    }
    
    print(f"报告类型: {report['report_type']}")
    print(f"生成时间: {report['generated_at']}")
    print(f"任务完成率: {report['metrics']['success_rate']:.1%}")
    print("-" * 50)
    
    return report

@app.task
def workday_task():
    """工作日每小时执行的任务"""
    current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"\n[{current_time}] 执行工作时间任务...")
    
    data = {
        'task_type': '工作时间任务',
        'executed_at': current_time,
        'status': random.choice(['完成', '进行中', '计划中']),
        'workload': random.randint(1, 100)
    }
    
    print(f"任务状态: {data['status']}")
    print(f"工作负载: {data['workload']}%")
    print("-" * 50)
    
    return data

相关文章:

  • LXC 导入(Rockylinux,almalinux,oraclelunx,debian,ubuntu,openEuler,kail,opensuse)
  • 从全球首发到独家量产,远峰科技持续领跑数字钥匙赛道
  • 如何使用cpp操作香橙派GPIO --使用<wiringPi.h>
  • 数据治理的主题库是做什么的
  • pip安装timm依赖失败
  • C++进阶知识复习 1~15
  • Sentinel[超详细讲解]-5
  • 【ROS实战】04-自定义消息并实现ROS服务
  • Java 锁机制详解:用“厕所门”和“防盗门”轻松理解多线程同步
  • delphi intraweb 警告框
  • bluecode-数字增殖问题
  • CPU 4核8个逻辑处理器
  • 微服务集成测试 -华为OD机试真题(A卷、JavaScript)
  • 洛谷题单2-P5717 【深基3.习8】三角形分类-python-流程图重构
  • 页面加载过多图片导致卡顿——解决方案详解
  • 【蓝桥杯】单片机设计与开发,速成备赛
  • idea打包Plugin ‘org.springframework.boot:spring-boot-maven-plugin:’ not found
  • 【奇点时刻】GPT-4o新生图特性深度洞察报告
  • QT之QML(简单示例)
  • Three.js 实现 3D 数学欧拉角
  • 殷墟出土鸮尊时隔50年首次聚首,北京新展“看·见殷商”
  • 山东茌平民企巨头实控人省外再出手:斥资16亿拿下山西一宗探矿权
  • 外交部驻港公署正告美政客:威胁恫吓撼动不了中方维护国家安全的决心
  • 美国新泽西客运公司遭遇罢工:40年来首次,35万人受影响
  • 爬坡难下坡险,居民出行难题如何解?
  • “大型翻车现场”科技满满,黄骅打造现代化港口和沿海新城典范