对celery的,路径,任务路径问题。
用if_main__的celery启动模式:必须修改为threads
if __name__ == '__main__':
celery_app.worker_main(argv=['worker', '--loglevel=info',"--pool=threads",'-c',2])
将--pool为threads、gevent、solo,分别代表多线程、协程、单机模式。Celery默认为多进程模式prefork。
否则任务会被阻塞。
定义任务的问题:
如果你给task/share_task了bind=True
必须要给第一个参数为self。但是你在使用时不需要传输self,
否则报错。
你不需要用bind即可,但是一定不要有self。否则报错
在win上用命令启动celery的问题:
celery -A celerywork.mycelery worker --loglevel=info --pool=eventlet
如果使用python一定要安装eventlet/threads
而且在启动redis时一定要加上后缀eventlet/threads
正确的配置流程:最关键
主要就是导入的问题:
在celery中在这个main非常关键:这个就是所有需要配置和使用app里的task(不报错自定义task)导入的开头。否则一定会报错。
即注意如果你在app所在里修饰了任务。那么必须要将main修改到当前app所在。(因为main是app里配置文件所需的第一个相对位置,一般将从所有需要配置导入的相对上一级开始到当前app所在py/配置文件所在)
这样所有配置文件都能通过main向下寻找。也就是app文件里main是最高查询路径。
还有一点就是py的路径是在即,被需要用位置的相对使用的。
(被使用的函数所在py文件相对位置)
示例:
import json
# from celerywork.VerificationCode import generate_verification_code
from celery import Celery,shared_task
celery_app= Celery(main='celerywork')
# celery_app.conf.update(broker_url = 'redis://:123456@192.168.117.158:6379/0',
# result_backend = 'redis://:123456@192.168.117.158:6379/1',
# timezone ="Asia/shanghai",
# task_serializer='json',
# accept_content=['json'],
# enable_utc = True
# )
celery_app.config_from_object('celerywork.celeryconfig')
@shared_task(bind=True)
def add(self, x, y):
print(1)
return x+y
if __name__ == '__main__':
celery_app.worker_main(argv=['worker', '--loglevel=info',"--pool=eventlet",'-c',2])
broker_url = 'redis://:123456@192.168.117.158:6379/0'
result_backend = 'redis://:123456@192.168.117.158:6379/1'
timezone ="Asia/shanghai"
# 指定导入任务模块,这个一般用于是否需要的。一般是share_task修饰
autodiscover_tasks=(['celerywork.VerificationCode','celerywork.mycelery'])
# 指定自动导入的任务模块
imports = ('celerywork.VerificationCode',)
task_serializer='json'
accept_content=['json']
enable_utc = True
task_acks_late=False
worker_prefetch_multiplier = 1
from celerywork.VerificationCode import generate_verification_code
re=generate_verification_code.delay()
print(re.get())