当前位置: 首页 > news >正文

Celery时区设置问题源码探究

遇到的问题

项目中有使用到Celery框架,主要使用Celery来在使用Django搭建的项目中创建延时任务及周期任务。在使用过程中出现过延时任务及周期任务到预定时间未能执行的情况。Google、百度了一些网友的分析及解决方案,大多认为是Celery时区设置导致的问题。然而这些解答大多类似,而且并不能解决我心中的疑惑,因此决定研究源码一探究竟。

延时任务及周期任务

这里网上大多数解答存在问题的地方,将延时任务及周期任务混为一谈了。周期任务是存在一个周期,定时执行的任务,类似Linux系统的Crontab定时任务。而延时任务更类似一个普通的异步任务,不同的是存在一个ETA延时时间,这种任务只会执行一次。因此我们会分开讨论两种任务。

Celery时区设置的影响
对周期任务的影响

celery存在两个时区的配置 enable_utctimezone,前者表示是否使用UTC时间,后者表示celery使用的时区。celery默认使用UTC时间,若使用默认配置,则celery设置周期任务时,必须使用UTC时间,比如

    app.conf.update(CELERYBEAT_SCHEDULE = {# 每天9点30检查'check': {'task': 'tasks.check','schedule': crontab(hour=9, minute=30),},})

当系统时间是北京时间时,这样的设置会导致这个任务并不会在每天北京时间9:30执行,而是17:30,因为UTC时间和北京时间相差8小时。因此这里我们将配置修改为

enable_utc = False
timezone = 'Asia/Shanghai'

这样周期任务就能正常执行了

对延时任务的影响

大多数网上的解答止步于上述的结果,认为上述设置之后,延时任务就同样没有任何问题了,其实不然,例如

eta = datetime.datetime(2021, 8, 19, 18, 10, 30)
add.apply_async((2, 2), eta=eta)

我们准备让这个任务在2021-08-19 18:10:30去执行,结果呢

出乎我们的预料,celery把我们传入的时间当成了UTC时间。我们看下源码

        if req.eta:try:if req.utc:eta = to_timestamp(to_system_tz(req.eta))else:eta = to_timestamp(req.eta, app.timezone)except (OverflowError, ValueError) as exc:error("Couldn't convert ETA %r to timestamp: %r. Task: %r",req.eta, exc, req.info(safe=True), exc_info=True)req.reject(requeue=False)

当req.utc为True时,执行to_system_tz方法转换eta,否则直接使用Celery设置的时区转换,这个req.utc又是什么?

        if body is None and 'args' not in message.payload:body, headers, decoded, utc = (message.body, message.headers, False, app.uses_utc_timezone(),)if not body_can_be_buffer:body = bytes(body) if isinstance(body, buffer_t) else bodyelse:if 'args' in message.payload:body, headers, decoded, utc = hybrid_to_proto2(message,message.payload)else:body, headers, decoded, utc = proto1_to_proto2(message, body)

从这里可以看出,当我们设置了args参数时,由hybrid_to_proto2返回,继续

def hybrid_to_proto2(message, body):"""Create a fresh protocol 2 message from a hybrid protocol 1/2 message."""try:args, kwargs = body.get('args', ()), body.get('kwargs', {})kwargs.items  # pylint: disable=pointless-statementexcept KeyError:raise InvalidTaskError('Message does not have args/kwargs')except AttributeError:raise InvalidTaskError('Task keyword arguments must be a mapping',)···embed = {'callbacks': body.get('callbacks'),'errbacks': body.get('errbacks'),'chord': body.get('chord'),'chain': None,}return (args, kwargs, embed), headers, True, body.get('utc', True)

omg!!!原来apply_async方法还有一个参数叫utc吗,而且默认值是True,看文档的时候并没有注意到!到这里,我们大概能看出问题的所在了,延时任务也可以认为普通的异步任务,存在自己的时区配置参数,之前提到的enable_utc并不能影响到这里。我们再看看传入的eta做了哪些处理。

        eta = headers.get('eta')if eta is not None:try:eta = maybe_iso8601(eta)except (AttributeError, ValueError, TypeError) as exc:raise InvalidTaskError('invalid ETA value {0!r}: {1}'.format(eta, exc))self.eta = maybe_make_aware(eta, self.tzlocal)else:self.eta = None
def maybe_make_aware(dt, tz=None):"""Convert dt to aware datetime, do nothing if dt is already aware."""if is_naive(dt):dt = to_utc(dt)return localize(dt, timezone.utc if tz is None else timezone.tz_or_local(tz),)return dtdef to_utc(dt):"""Convert naive :class:`~datetime.datetime` to UTC."""return make_aware(dt, timezone.utc)def make_aware(dt, tz):"""Set timezone for a :class:`~datetime.datetime` object."""try:_localize = tz.localizeexcept AttributeError:return dt.replace(tzinfo=tz)else:# works on pytz timezonestry:return _localize(dt, is_dst=None)except AmbiguousTimeError:return min(_localize(dt, is_dst=True),_localize(dt, is_dst=False))

传入的native类型的时间,居然直接被转换成UTC时间了!!!

结论

讨论了这么多,我们能得出最终解决Celery时区问题的结论
1、对于周期任务,需要将celery的配置enable_utc设置成False,timezone设置成系统当前的时区
2、对于延时任务,如果需要设置eta,即精确在某一时间执行,则这个eta必须包含时区信息

补充

最终结论相当简单,但是花了大量时间去研究了源码。对于延时任务eta设置曾经也是相当困惑,其实对于官方文档确实有提到,eta必须要包含时区信息,之前并没有注意到

While countdown is an integer, eta must be a datetime object, specifying an exact date and time (including millisecond precision, and timezone information)
http://www.dtcms.com/a/422702.html

相关文章:

  • 音元分析流程
  • 懂的建设网站上海做网站优化
  • OpenLayers的OGC服务 -- 章节一:WMS服务详解
  • [信号与系统个人笔记]第三章 连续时间信号与系统的频域分析 Part 4
  • 多渠道打包gradle配置
  • 集中式架构还是分布式架构?SCADA架构选型的新趋势
  • 第八章 财务报表 2利润表(2025版)
  • 在Trae上使用Bright Data MCP采集数据,实时获取IPhone17价格信息
  • 番禺网站推广湖南网站建设有限公司
  • 刷题 | 牛客 - 前端面试手撕题 - 中等 - 1-2/20 知识点解答
  • 建立自动化SSL证书更新机制与多层监控体系
  • 岚图汽车 x Apache Doris : 海量车联网数据实时分析实践
  • chrome-devtools-mcp windows 环境安装
  • IOT_通讯控制器(IO模块)
  • 分布式计数器系统完整解决方案
  • 音频类AI工具扩展
  • PyCharm 开发 Python 项目后,将其打包并部署到 Nginx 服务器
  • 在 Trae 国际版中添加 Chrome Dev MCP Server(Windows 实战指南)
  • 个人商城网站备案互联网域名是什么意思
  • 太原微信网站商城网站建设定制
  • VR 太阳光参数与快速渲染
  • 垃圾分类魔法互动墙-垃圾分类展厅设备-VR垃圾分类软件
  • 九、Proteus817实现51单片机DHT22温湿度读取
  • 家庭录像损坏了无法播放?视频修复让回忆重现
  • 【StarRocks】-- 深入理解 StarRocks 窗口函数 LAG()
  • [C++项目组件]Elasticsearch简单介绍
  • 网站建设公司的服务15年做哪些网站致富
  • 学做软件的网站有哪些怎么制作网站后台
  • Wyn 商业智能软件:3D 可视化大屏搭建与工具使用全指南
  • 【Linux】IPC——匿名管道的使用