当前位置: 首页 > news >正文

Windows系统下【Celery任务队列】python使用celery 详解(二)

开发阶段的自动重载

celery -A celery_tasks worker --loglevel=info -P eventlet --autoreload

--autoreload 仅适用于开发环境,不建议在生产环境中使用,因为它可能会影响性能。

配置任务跟踪启动状态

app.conf.task_track_started = True

app.conf.task_track_started = True 是 Celery 的一个配置项,用于启用任务执行状态的跟踪。具体来说,它允许 Celery 在任务开始执行时更新任务状态为 STARTED,而不仅仅是在任务完成(SUCCESS)或失败(FAILURE)时才更新状态。

1.配置任务过期时间

在 Celery 中,任务结果的过期时间由 result_expires 配置项控制。
Celery 的默认配置是 24 小时(86400 秒)

  1. 全局配置

    # 设置任务结果保留 1 小时(3600 秒)
    app.conf.result_expires = 3600  # 秒
    
  2. 2.针对单个任务设置
    使用 @app.task 装饰器的 expires 参数为特定任务单独设置过期时间

    # 定义发送消息任务
    @app.task(expires=1800)  # 该任务结果保留 30 分钟
    def send_msg(message):print(f"开始发送消息: {message}")time.sleep(3)print(f"发送消息完成: {message}")return message
    
  3. 永久保留:设置为 None0 时,结果不会自动过期(需手动清理)。

    app.conf.result_expires = None  # 永久保留
    
  4. 立即过期:设置为负数(如 -1)时,结果会在任务完成后立即删除。

    app.conf.result_expires = -1  # 立即删除
    

2.获取异步任务的状态与结果

在 Celery 里,AsyncResult 类可用于获取异步任务的状态与结果。

2.1状态检查方法

  • ready():检查任务是否已经完成(无论成功还是失败),返回布尔值。
    • 频繁检查 result.ready() 会增加与结果后端(如 Redis)的通信开销
  • successful():检查任务是否成功完成,返回布尔值。如果任务仍在执行或失败,返回 False
  • failed():检查任务是否执行失败,返回布尔值。
  • status:获取任务的当前状态,常见的状态包括:
    • PENDING:任务等待执行或状态未知。
    • STARTED:任务已开始执行(需要配置 task_track_started=True 才能看到此状态)。
    • SUCCESS:任务成功完成。
    • FAILURE:任务执行失败。
    • REVOKED:任务已被撤销。
    • RETRY:任务因异常正在重试。

2.2结果获取方法

由于任务是异步执行的,可能需要等待一段时间才能完成。可以使用 get() 方法阻塞当前线程,直到任务完成并返回结果,但要注意设置合理的超时时间,避免长时间阻塞。

  • get(timeout=None, propagate=True, interval=0.5)
    等待任务完成并返回结果。
    • timeout:设置超时时间(秒),超过此时间将抛出 TimeoutError
    • propagate:如果为 True(默认值),任务失败时会抛出异常;如果为 False,则返回异常对象。
    • interval:检查任务状态的间隔时间(秒)。
  • result:获取任务的结果。如果任务尚未完成,返回 None(除非使用 get() 方法等待)。

2.3任务控制方法

  • revoke(terminate=False, signal=None, wait=False, timeout=None)
    撤销(取消)任务的执行。
    • terminate:如果为 True,会强制终止正在执行的任务。
    • signal:指定终止任务时使用的信号(如 SIGKILL)。
    • wait:如果为 True,会等待任务被撤销后再返回。
    • timeout:等待撤销操作完成的超时时间。
  • forget():从结果后端中删除任务结果,释放资源。调用此方法后,将无法再获取该任务的结果。

2.4错误信息获取方法

  • traceback:获取任务失败时的详细堆栈跟踪信息。
  • exception:获取任务失败时抛出的异常对象。

2.5任务元数据方法

  • date_done:获取任务完成的时间(datetime 对象)。
  • children:获取由该任务创建的子任务列表。
import time
from celery.result import AsyncResult
from celery_tasks import send_msg, app# 调用任务
result = send_msg.delay('hello world')# 获取任务 ID
task_id = result.id
print(f"Task ID: {task_id}")# 创建 AsyncResult 对象
async_result = AsyncResult(task_id, app=app)# 检查任务状态
print(f"Initial status: {async_result.status}")# 等待任务完成并获取结果(设置超时时间为 10 秒)
try:result_value = async_result.get(timeout=10, propagate=False)print(f"Task result: {result_value}")if async_result.successful():print("Task completed successfully.")elif async_result.failed():print(f"Task failed with exception: {async_result.exception}")print(f"Traceback: {async_result.traceback}")
except TimeoutError:print("Task timed out.")# 撤销任务async_result.revoke(terminate=True, signal='SIGKILL')print("Task revoked.")# 检查任务状态
print(f"Initial status: {async_result.status}")# 检查任务是否被撤销
if async_result.status == 'REVOKED':print("Task is revoked.")# 释放结果资源
async_result.forget()
print("Task result forgotten.")

相关文章:

  • Windows 下 dll转换成lib
  • vue知识点总结 依赖注入 动态组件 异步加载
  • 【星海随笔】信息安全相关标准
  • Windows下Dify连接Ollama无效
  • 反向沙箱介绍
  • C++ —— 类的嵌套和循环依赖问题
  • KERNEL32!NlsServerInitialize函数分析创建了一个目录对象和目录对象下面的5个对象
  • 【MySQL】-- 事务
  • 华为5.7机考第一题充电桩问题Java代码实现
  • 嵌入式系统架构验证工具:AADL Inspector v1.10 全新升级
  • 优雅草星云智控系统产品发布会前瞻:SNMP协议全设备开启指南-优雅草卓伊凡
  • 代码随想论图论part06冗余连接
  • 【MySQL】存储引擎 - ARCHIVE、BLACKHOLE、MERGE详解
  • 多模型协同预测在风机故障预测的应用(demo)
  • Java设计模式之抽象工厂模式:从入门到精通
  • 服务器配置错误导致SSL/TLS出现安全漏洞,如何进行排查?
  • 在自然语言处理任务中,像 BERT 这样的模型会在输入前自动加上一些特殊token
  • 从概念表达到安全验证:智能驾驶功能迎来系统性规范
  • 金仓数据库永久增量备份技术原理与操作
  • 如何清除windows 远程桌面连接的IP记录
  • 巴基斯坦首都及邻近城市听到巨大爆炸声
  • 2025年上海市模范集体、劳动模范和先进工作者名单揭晓
  • 深圳两家会所涉卖淫嫖娼各被罚7万元逾期未缴,警方发催告书
  • 新买宝马竟是“维修车”,男子发视频维权被4S店索赔100万
  • 正荣地产:前4个月销售14.96亿元,控股股东已获委任联合清盘人
  • 乌克兰议会批准美乌矿产协议