Windows系统下【Celery任务队列】python使用celery 详解(二)
开发阶段的自动重载
celery -A celery_tasks worker --loglevel=info -P eventlet --autoreload
--autoreload 仅适用于开发环境,不建议在生产环境中使用,因为它可能会影响性能。
配置任务跟踪启动状态
app.conf.task_track_started = True
app.conf.task_track_started = True 是 Celery 的一个配置项,用于启用任务执行状态的跟踪。具体来说,它允许 Celery 在任务开始执行时更新任务状态为 STARTED,而不仅仅是在任务完成(SUCCESS)或失败(FAILURE)时才更新状态。
1.配置任务过期时间
在 Celery 中,任务结果的过期时间由 result_expires 配置项控制。
 Celery 的默认配置是 24 小时(86400 秒)
-  全局配置 # 设置任务结果保留 1 小时(3600 秒) app.conf.result_expires = 3600 # 秒
-  2.针对单个任务设置 
 使用@app.task装饰器的expires参数为特定任务单独设置过期时间# 定义发送消息任务 @app.task(expires=1800) # 该任务结果保留 30 分钟 def send_msg(message):print(f"开始发送消息: {message}")time.sleep(3)print(f"发送消息完成: {message}")return message
-  永久保留:设置为 None或0时,结果不会自动过期(需手动清理)。app.conf.result_expires = None # 永久保留
-  立即过期:设置为负数(如 -1)时,结果会在任务完成后立即删除。app.conf.result_expires = -1 # 立即删除
2.获取异步任务的状态与结果
在 Celery 里,AsyncResult 类可用于获取异步任务的状态与结果。
2.1状态检查方法
- ready():检查任务是否已经完成(无论成功还是失败),返回布尔值。- 频繁检查 result.ready()会增加与结果后端(如 Redis)的通信开销
 
- 频繁检查 
- successful():检查任务是否成功完成,返回布尔值。如果任务仍在执行或失败,返回- False。
- failed():检查任务是否执行失败,返回布尔值。
- status:获取任务的当前状态,常见的状态包括:- PENDING:任务等待执行或状态未知。
- STARTED:任务已开始执行(需要配置- task_track_started=True才能看到此状态)。
- SUCCESS:任务成功完成。
- FAILURE:任务执行失败。
- REVOKED:任务已被撤销。
- RETRY:任务因异常正在重试。
 
2.2结果获取方法
由于任务是异步执行的,可能需要等待一段时间才能完成。可以使用 get() 方法阻塞当前线程,直到任务完成并返回结果,但要注意设置合理的超时时间,避免长时间阻塞。
- get(timeout=None, propagate=True, interval=0.5)
 等待任务完成并返回结果。- timeout:设置超时时间(秒),超过此时间将抛出- TimeoutError。
- propagate:如果为- True(默认值),任务失败时会抛出异常;如果为- False,则返回异常对象。
- interval:检查任务状态的间隔时间(秒)。
 
- result:获取任务的结果。如果任务尚未完成,返回- None(除非使用- get()方法等待)。
2.3任务控制方法
- revoke(terminate=False, signal=None, wait=False, timeout=None)
 撤销(取消)任务的执行。- terminate:如果为- True,会强制终止正在执行的任务。
- signal:指定终止任务时使用的信号(如- SIGKILL)。
- wait:如果为- True,会等待任务被撤销后再返回。
- timeout:等待撤销操作完成的超时时间。
 
- forget():从结果后端中删除任务结果,释放资源。调用此方法后,将无法再获取该任务的结果。
2.4错误信息获取方法
- traceback:获取任务失败时的详细堆栈跟踪信息。
- exception:获取任务失败时抛出的异常对象。
2.5任务元数据方法
- date_done:获取任务完成的时间(- datetime对象)。
- children:获取由该任务创建的子任务列表。
import time
from celery.result import AsyncResult
from celery_tasks import send_msg, app# 调用任务
result = send_msg.delay('hello world')# 获取任务 ID
task_id = result.id
print(f"Task ID: {task_id}")# 创建 AsyncResult 对象
async_result = AsyncResult(task_id, app=app)# 检查任务状态
print(f"Initial status: {async_result.status}")# 等待任务完成并获取结果(设置超时时间为 10 秒)
try:result_value = async_result.get(timeout=10, propagate=False)print(f"Task result: {result_value}")if async_result.successful():print("Task completed successfully.")elif async_result.failed():print(f"Task failed with exception: {async_result.exception}")print(f"Traceback: {async_result.traceback}")
except TimeoutError:print("Task timed out.")# 撤销任务async_result.revoke(terminate=True, signal='SIGKILL')print("Task revoked.")# 检查任务状态
print(f"Initial status: {async_result.status}")# 检查任务是否被撤销
if async_result.status == 'REVOKED':print("Task is revoked.")# 释放结果资源
async_result.forget()
print("Task result forgotten.")