python 实现 Celery 任务队列系统
这是一个基于 Celery 和 Redis 的分布式任务队列系统,用于处理异步任务和定时任务。
系统要求
- Python 3.x
- Redis 服务器
- 依赖包:
- celery==5.3.6
- redis==5.0.1
系统架构
系统主要由以下组件构成:
- 任务定义模块 (
tasks.py
):包含所有可执行的任务定义 - 主程序模块 (
main.py
):负责任务的调度和监控 - Redis 服务器:作为消息代理(Broker)和结果后端(Result Backend)
配置说明
Celery 配置
broker_url = 'redis://:123456@127.0.0.1:6379/1'
result_backend = 'redis://:123456@127.0.0.1:6379/1'
主要配置项:
- 任务序列化:JSON
- 时区:Asia/Shanghai
- 工作进程数:1
功能模块
1. 基础运算任务
add(x, y)
: 加法运算multiply(x, y)
: 乘法运算chain_calculation(numbers)
: 链式计算(求和、平均值、最大值、最小值)
2. 文本处理任务
process_text(text)
: 文本处理(大写转换、长度统计、单词计数)
3. 系统监控任务
system_monitor()
: 每5秒执行一次,监控系统状态- CPU使用率
- 内存使用率
- 系统状态
4. 报告生成任务
generate_report()
: 生成实时报告daily_report()
: 每天早上9点生成日报workday_task()
: 工作日每小时执行的任务
定时任务配置
系统包含以下定时任务:
- 系统监控:每5秒执行一次
- 日报生成:每天早上9点执行
- 工作日任务:工作日(周一至周五)9:00-18:00每小时执行
使用说明
1. 启动系统
- 确保Redis服务器已启动
- 启动Celery工作进程:
celery -A tasks worker --loglevel=info
- 启动Celery Beat进程(用于定时任务):
celery -A tasks beat
- 运行主程序:
python main.py
2. 系统监控
主程序运行后会自动执行以下操作:
- 实时显示系统监控数据
- 执行常规任务示例
- 通过按下 Ctrl+C 可以优雅退出程序
错误处理
系统实现了完整的错误处理机制:
- 任务执行错误捕获和日志记录
- 优雅的程序退出处理
- 自动重试机制
注意事项
- Redis连接配置需要根据实际环境修改
- 确保系统时区设置正确
- 建议在生产环境中调整工作进程数
- 监控数据目前为模拟数据,实际使用时需要替换为真实的系统监控指标
代码示例
任务执行示例:
# 执行加法任务
result = add.delay(4, 6)
print(f"任务ID: {result.id}")
if result.ready():
print(f"结果: {result.get()}")
系统监控示例:
main.py
# 执行系统监控
result = system_monitor.delay()
data = result.get()
print(f"CPU使用率: {data['cpu_usage']:.1f}%")
print(f"内存使用率: {data['memory_usage']:.1f}%")
### 所有代码:
```python
from tasks import add, multiply, process_text, generate_report, chain_calculation, system_monitor
import time
import json
from datetime import datetime
import threading
import signal
import sys
from celery.result import AsyncResult
# 全局变量控制程序运行
running = True
def signal_handler(signum, frame):
"""处理退出信号"""
global running
print("\n收到退出信号,正在关闭程序...")
running = False
def monitor_system_task():
"""监控系统任务的执行结果"""
while running:
try:
# 执行系统监控任务
result = system_monitor.delay()
# 等待结果(最多等待4秒)
for _ in range(4):
if result.ready():
data = result.get()
if data:
print(f"\n系统监控结果:")
print(f"CPU使用率: {data['cpu_usage']:.1f}%")
print(f"内存使用率: {data['memory_usage']:.1f}%")
print(f"系统状态: {data['status']}")
print("-" * 50)
break
time.sleep(1)
# 等待剩余时间,确保大约每5秒执行一次
time.sleep(1)
except Exception as e:
print(f"监控任务出错: {e}")
time.sleep(5)
def run_regular_task():
"""运行普通任务的示例"""
while running:
try:
# 执行一些常规任务
print(f"\n[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] 执行常规任务...")
# 1. 执行加法任务
result = add.delay(4, 6)
print(f"加法任务ID: {result.id}")
if result.ready():
print(f"4 + 6 = {result.get()}")
# 2. 执行文本处理
text = f"这是一条测试消息 - {datetime.now()}"
result = process_text.delay(text)
print(f"文本处理任务ID: {result.id}")
if result.ready():
print(json.dumps(result.get(), indent=2, ensure_ascii=False))
# 休眠5秒后继续下一轮
for _ in range(5):
if not running:
break
time.sleep(1)
except Exception as e:
print(f"执行任务时出错: {e}")
time.sleep(5)
def main():
"""主函数"""
# 注册信号处理器(用于优雅退出)
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
print("程序启动...")
print("提示:按 Ctrl+C 可以优雅退出程序")
print("\n=== 主程序开始运行 ===")
print("- 系统监控每5秒执行一次")
print("- 常规任务每5秒执行一次")
print("- 所有任务的执行结果会实时显示")
try:
# 创建并启动监控线程
monitor_thread = threading.Thread(target=monitor_system_task)
monitor_thread.daemon = True
monitor_thread.start()
# 创建并启动常规任务线程
task_thread = threading.Thread(target=run_regular_task)
task_thread.daemon = True
task_thread.start()
# 主线程保持运行
while running:
time.sleep(1)
except KeyboardInterrupt:
print("\n程序正在关闭...")
finally:
print("程序已退出。")
if __name__ == "__main__":
main()
tasks.py
from celery import Celery
from celery.schedules import crontab
import time
from datetime import datetime
import random
# 创建 Celery 实例
app = Celery('tasks')
# 配置 Celery
app.conf.update(
broker_url='redis://:123456@127.0.0.1:6379/1',
result_backend='redis://:123456@127.0.0.1:6379/1',
task_serializer='json',
result_serializer='json',
accept_content=['json'],
timezone='Asia/Shanghai',
enable_utc=True,
worker_pool_restarts=True,
worker_concurrency=1,
)
# 配置定时任务
app.conf.beat_schedule = {
# 每5秒执行一次系统监控
'monitor-every-5-seconds': {
'task': 'tasks.system_monitor',
'schedule': 5.0, # 每5秒执行一次
},
# 每天早上9点执行
'daily-morning-report': {
'task': 'tasks.daily_report',
'schedule': crontab(hour=9, minute=0),
},
# 工作日每小时执行
'workday-hourly-task': {
'task': 'tasks.workday_task',
'schedule': crontab(hour='9-18', minute=0, day_of_week='1-5'),
}
}
@app.task
def add(x, y):
"""简单的加法任务"""
time.sleep(1)
return x + y
@app.task
def multiply(x, y):
"""乘法运算任务"""
time.sleep(2)
return x * y
@app.task
def process_text(text):
"""文本处理任务"""
time.sleep(1)
result = {
'original': text,
'upper': text.upper(),
'length': len(text),
'words': len(text.split())
}
return result
@app.task
def generate_report():
"""生成报告任务"""
time.sleep(3)
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
data = {
'timestamp': current_time,
'temperature': random.uniform(20, 30),
'humidity': random.uniform(40, 80),
'status': random.choice(['正常', '警告', '错误'])
}
return data
@app.task
def chain_calculation(numbers):
"""链式计算任务"""
time.sleep(2)
result = sum(numbers)
average = result / len(numbers)
maximum = max(numbers)
minimum = min(numbers)
return {
'sum': result,
'average': average,
'max': maximum,
'min': minimum,
'count': len(numbers)
}
@app.task
def system_monitor():
"""每5秒执行一次的系统监控任务"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"\n[{current_time}] 每5秒执行一次的系统监控任务 执行系统监控...")
# 模拟获取系统信息
data = {
'timestamp': current_time,
'cpu_usage': random.uniform(0, 100),
'memory_usage': random.uniform(0, 100),
'status': 'running'
}
# 打印监控信息
print(f"CPU使用率: {data['cpu_usage']:.1f}%")
print(f"内存使用率: {data['memory_usage']:.1f}%")
print(f"系统状态: {data['status']}")
print("-" * 50)
return data
@app.task
def daily_report():
"""每天早上9点执行的日报任务"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"\n[{current_time}] 生成每日报告...")
report = {
'report_type': '日报',
'generated_at': current_time,
'summary': '这是一个自动生成的日报示例',
'metrics': {
'total_tasks': random.randint(100, 1000),
'completed_tasks': random.randint(50, 500),
'success_rate': random.uniform(0.8, 1.0)
}
}
print(f"报告类型: {report['report_type']}")
print(f"生成时间: {report['generated_at']}")
print(f"任务完成率: {report['metrics']['success_rate']:.1%}")
print("-" * 50)
return report
@app.task
def workday_task():
"""工作日每小时执行的任务"""
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"\n[{current_time}] 执行工作时间任务...")
data = {
'task_type': '工作时间任务',
'executed_at': current_time,
'status': random.choice(['完成', '进行中', '计划中']),
'workload': random.randint(1, 100)
}
print(f"任务状态: {data['status']}")
print(f"工作负载: {data['workload']}%")
print("-" * 50)
return data