基于 Celery 的分布式文件监控系统
Celery 是一款基于 Python 开发的分布式任务队列(Distributed Task Queue),核心作用是将应用中的耗时任务、定时任务或跨服务任务 “剥离” 出主流程,通过异步执行的方式提升系统响应速度、稳定性与可扩展性。它广泛应用于 Web 开发、数据分析、自动化运维等场景,例如用户注册后的邮件发送、订单支付后的状态异步更新、定时数据备份、大规模数据计算等。
一、核心定位:解决 “任务阻塞” 问题
在传统应用中,若直接在请求链路里执行耗时操作(如生成 10 万条数据的报表、调用响应慢的第三方 API),会导致用户等待时间过长,甚至引发请求超时。Celery 通过 “解耦任务发起与执行” 解决这一痛点:
- 不阻塞主流程:Web 应用(如 Django/Flask 项目)发起任务后,无需等待任务完成,直接返回结果给用户,任务在后台独立执行;
- 隔离任务风险:任务执行失败(如邮件服务器故障)不会影响主应用运行,可通过重试机制保障任务最终完成;
- 支持分布式扩展:可在多台服务器部署任务执行节点,分担高并发场景下的任务压力。
二、核心组件与工作流程
Celery 的运行依赖 “中间件” 和 “自身模块” 的配合,核心组件分为 4 类,工作流程清晰可追溯:
1. 四大核心组件
- 生产者(Producer):发起任务的应用(如 Web 项目),通过 Celery 提供的 API 将任务 “提交” 到任务队列;
- 任务队列(Broker):暂存任务的中间件,负责任务的接收、存储与分发,是 “生产者” 与 “消费者” 的通信桥梁,常用实现有 Redis(轻量、易部署)、RabbitMQ(可靠、支持复杂路由);
- 消费者(Worker):独立运行的进程 / 线程,持续监听任务队列,从队列中 “取出” 任务并执行(如调用函数发送邮件);
- 结果后端(Result Backend):可选组件,用于存储任务执行结果(如成功 / 失败状态、函数返回值),供生产者后续查询(如 Web 应用查询 “报表生成是否完成”),常用实现有 Redis、MySQL、MongoDB。
2. 典型工作流程(以 “用户注册发送验证邮件” 为例)
- 用户在 Web 端提交注册信息,Web 应用(生产者)验证信息无误后,调用 Celery 任务的
delay()
方法,将 “用户邮箱” 作为参数,提交任务到 Broker(如 Redis); - Broker 接收到任务后,将其存入指定队列(默认队列名为
celery
,也可自定义队列区分任务类型); - 后台运行的 Worker 进程(消费者)持续监听该队列,检测到新任务后,取出任务并执行 “发送验证邮件” 的逻辑;
- 若配置了 Result Backend,Worker 会将任务执行结果(如 “邮件发送成功” 或 “邮箱不存在导致失败”)存入后端;
- 若 Web 应用需要确认任务状态,可通过任务 ID 从 Result Backend 中查询结果。
三、核心特性:灵活、可靠、可扩展
Celery 能成为 Python 生态的主流任务队列,得益于其丰富且实用的特性:
- 灵活的任务配置:支持为单个任务设置重试策略(如失败后重试 3 次,每次间隔 5 秒)、超时时间(如任务执行超过 10 秒则终止)、优先级(高优先级任务先执行);
- 定时任务调度:内置
Beat
调度器,可替代 Linux Crontab 实现更灵活的定时任务(如 “每天凌晨 2 点执行数据备份”“每小时同步一次第三方数据”),支持精确到秒的调度规则; - 分布式与负载均衡:可在多台服务器部署 Worker,Broker 会自动将任务分发到空闲 Worker,实现负载均衡;支持按任务类型分配队列,让特定 Worker 只处理指定类型任务(如 “报表生成任务” 由高性能服务器的 Worker 处理);
- 故障处理与监控:支持任务失败后的回调(如失败时发送告警邮件),提供
flower
等监控工具,可实时查看 Worker 状态、任务执行进度、失败任务详情; - 多语言兼容:虽然核心基于 Python,但通过协议扩展,支持其他语言(如 Go、Java)作为生产者或消费者与 Celery 交互;
- 与 Python 生态无缝集成:可直接与 Django、Flask、Tornado 等主流 Web 框架结合,也支持与 SQLAlchemy、Django ORM 等数据工具协同,降低开发成本。
四、项目概述
1.1项目背景
随着信息化时代的到来,文件系统的监控和管理变得尤为重要。无论是个人用户还是企业,都需要实时监控文件的变化(如创建、修改、删除等),以便及时响应和处理。本项目的目标是开发一个基于 Celery 分布式任务队列的跨平台文件监控系统(FileMonitor),支持 Windows、macOS 和 Linux 系统,并通过微信、钉钉、邮件等方式通知用户文件变化。同时,系统会将监控日志存储到 MySQL、Redis 和 MongoDB 中,便于后续查询和分析
1.2 项目框架
1.3功能介绍
(1)跨平台文件监控:支持 Windows、macOS 和 Linux 系统,使用 watchdog 模块监听文件变化(watchdog 是跨平台解决方案)
(2)分布式任务处理:使用 Celery 实现分布式任务队列,将文件监控任务分发到多个 Worker 节点
(3)多通知方式:支持微信、钉钉、邮件等方式发送文件变化通知
(4)多数据库支持:文件监控日志存储到 MySQL(关系型数据)、Redis(缓存和快速查询)、MongoDB(文档型数据)
(5)模块化设计:功能分模块开发(如监控模块、通知模块、数据库模块),提高可读性和可维护性
(6)容器化部署:使用 Docker 和 Docker Compose 实现环境标准化和快速部署
1.4技术栈
Python:项目核心语言
Celery:分布式任务队列,用于异步处理文件监控任务
watchdog:跨平台文件监控模块
Redis:作为 Celery 的消息代理(Broker)和结果存储(Result Backend)
MySQL:存储文件监控日志(关系型数据)
MongoDB:存储文件监控日志(文档型数据)
Docker:容器化部署,便于环境管理和分发
2.监控模块
2.1项目环境的部署
version: '3'
services:
pythonops:
image: python-ssh:1.0
restart: always
privileged: true
volumes:
- /pythonops/:/pythonops/
- /share/:/share/
expose:
- 21
- 22
- 80
- 3306
- 8080
- 5000
- 50000
ports:
- 21
- 22
- 80
- 3306
- 8080
- 5000
- 50000
deploy:
replicas: 3
2.2安装依赖
#vscode连接pythonops-1
#配置清华加速器
pip install --upgrade pip
pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple
#watchdog:跨平台文件监控模块
pip install watchdog
#celery:分布式任务队列
pip install celery
#redis:Celery 的消息代理(Broker)和结果存储(Result Backend)
pip install redis
#pymysql:MySQL 数据库操作
pip install pymysql
#pymongo:MongoDB 数据库操作
pip install pymongo
#PyYAML 是 FileMonitor 项目的依赖之一,用于解析 config.yml 配置文件
pip install pyyaml
2.3文件监控模块
#file_monitor.py
from watchdog.observers import Observer
from configuration import Configuration
import time
from file_handler import FileHandler
class FileMonitor():
#创建初始化函数
#添加成员变量handler
def __init__(self):
self.handler=FileHandler()
self.watch_folder_path=Configuration().get_config()['folder_path']
def start_watch(self):
#启用监听服务
#实例观察者
dog=Observer()
#调用 要观察目录,处理器
#处理器,当文件发生书名什么事情之后的应对方案
#我们会创建一个类去管理这些方案
#这个处理器,我们定义在file_handler模块中
#在该模块中创建File_Handler来处理
dog.schedule(
self.handler,
self.watch_folder_path,
recursive=True
)
#启动监听
dog.start()
#处理键盘ctrl-c的终止
try:
while True:
time.sleep(1)
except KeyboardInterrupt as e:
#如果有键盘ctrl+c,直接停止监听
dog.stop()
#推出监听线程
dog.join()
2.4使用配置文件设置需要监听的目录
#file_handle.py
from watchdog.events import FileSystemEventHandler
from date_time_milliseconds import get_current_time_with_ms
from get_current_user import get_current_username
from msg import send_mail,send_dingding,send_wechat
from log import save_to_mongo,save_to_mysql,save_to_redis
#文件监听处理器
class FileHandler(FileSystemEventHandler):
#重写继承方法
#应对文件被人打开的处理方法
def on_opened(self, event):
print(event.src_path,'打开')
log={
'time':str(get_current_time_with_ms()),
'user':get_current_username(),
'file':event.src_path,
'ops':'open'
}
msg='消息\n'+"time:"+str(get_current_time_with_ms())+'\n'+'user:'
+str(get_current_username())+'\n'+'file:'+event.src_path+'\n'+'ops:open'
#发送告警到电子邮件
send_mail.delay(msg)
#发送告警到钉钉
send_dingding.delay(msg)
#发送告警到微信
send_wechat.delay(msg)
#保存日志到Mongo
save_to_mongo.delay(log)
#保存日志到redis
save_to_redis.delay(log)
#保存日志到mysql
save_to_mysql.delay(log)
#应对文件被删除的处理方法
def on_deleted(self, event):
print(event.src_path,'删除')
#发送告警到电子邮件
log={
'time':str(get_current_time_with_ms()),
'user':get_current_username(),
'file':event.src_path,
'ops':'delete'
}
msg='消息\n'+"time:"+str(get_current_time_with_ms())+'\n'+'user:'
+str(get_current_username())+'\n'+'file:'+event.src_path+'\n'+'ops:delete'
#发送告警到电子邮件
send_mail.delay(msg)
#发送告警到钉钉
send_dingding.delay(msg)
#发送告警到微信
send_wechat.delay(msg)
#保存日志到Mongo
save_to_mongo.delay(log)
#保存日志到redis
save_to_redis.delay(log)
#保存日志到mysql
save_to_mysql.delay(log)
#设置需要监控的目录
folder_path: /opt/
redis:
host: 192.168.157.11
port: 6379
msg_broker_db: 0
msg_backend_db: 1
log_broker_db: 2
log_backend_db: 3
msg_broker_url: redis://192.168.157.11:6379/0
msg_backend_url: redis://192.168.157.11:6379/1
log_broker_url: redis://192.168.157.11:6379/2
log_backend_url: redis://192.168.157.11:6379/3
2.5使用 watchdog 实现跨平台监控
#能够对win、mac、linux等文件系统监控
#get_current_user.py
import os
import sys
import getpass
import platform
from datetime import datetime
def get_current_username():
"""获取当前用户名"""
return getpass.getuser()
def get_user_fullname():
"""获取用户的完整名称(如果可用)"""
try:
if sys.platform.startswith('win'):
# Windows 系统
import ctypes
from ctypes import wintypes
# 定义 Windows API 函数
GetUserNameEx = ctypes.windll.secur32.GetUserNameExW
NameDisplay = 3 # 显示名称
# 获取缓冲区大小
size = ctypes.c_uint(0)
GetUserNameEx(NameDisplay, None, ctypes.byref(size))
# 分配缓冲区并获取显示名称
buffer = ctypes.create_unicode_buffer(size.value)
if GetUserNameEx(NameDisplay, buffer, ctypes.byref(size)):
return buffer.value
else:
return None
else:
# Linux/macOS 系统
import pwd
# 获取当前用户的 passwd 条目
user_entry = pwd.getpwuid(os.getuid())
# GECOS 字段通常包含用户的全名
gecos = user_entry.pw_gecos
if gecos:
# 分割 GECOS 字段(格式可能为 "Full Name,Room,Work Phone,Home Phone")
full_name = gecos.split(',')[0]
return full_name if full_name else None
return None
except Exception as e:
print(f"获取用户完整名称时出错: {e}")
return None
def get_user_home_dir():
"""获取用户主目录"""
return os.path.expanduser("~")
def get_user_id():
"""获取用户 ID(Unix 系统)或 SID(Windows 系统)"""
try:
if sys.platform.startswith('win'):
# Windows 系统获取用户 SID
import win32security
token = win32security.OpenProcessToken(
win32process.GetCurrentProcess(),
win32security.TOKEN_QUERY
)
user_sid, _ = win32security.GetTokenInformation(
token, win32security.TokenUser
)
return str(user_sid)
else:
# Unix 系统获取用户 UID
return os.getuid()
except Exception as e:
print(f"获取用户 ID 时出错: {e}")
return None
def get_user_groups():
"""获取用户所属的组"""
try:
if sys.platform.startswith('win'):
# Windows 系统
import win32net
import win32security
username = get_current_username()
groups = []
# 获取直接所属的组
user_groups = win32net.NetUserGetLocalGroups(None, username)
groups.extend(user_groups)
# 获取主组(通常是 Users)
token = win32security.OpenProcessToken(
win32process.GetCurrentProcess(),
win32security.TOKEN_QUERY
)
groups_info = win32security.GetTokenInformation(
token, win32security.TokenGroups
)
for group in groups_info:
sid_name = win32security.LookupAccountSid(None, group[0])[0]
if sid_name not in groups:
groups.append(sid_name)
return groups
else:
# Unix 系统
import grp
username = get_current_username()
primary_gid = os.getgid()
groups = [grp.getgrgid(primary_gid).gr_name]
# 获取所有补充组
if hasattr(os, 'getgroups'):
for gid in os.getgroups():
try:
group_name = grp.getgrgid(gid).gr_name
if group_name not in groups:
groups.append(group_name)
except KeyError:
# 忽略无法解析的组 ID
pass
return groups
except Exception as e:
print(f"获取用户组时出错: {e}")
return []
def main():
"""主函数:显示当前用户的账户信息"""
print(f"运行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"操作系统: {platform.system()} {platform.version()}")
print(f"计算机名: {platform.node()}")
print("")
print("用户账户信息:")
print(f" 用户名: {get_current_username()}")
full_name = get_user_fullname()
print(f" 完整名称: {full_name if full_name else 'N/A'}")
print(f" 主目录: {get_user_home_dir()}")
user_id = get_user_id()
print(f" 用户 ID: {user_id if user_id is not None else 'N/A'}")
groups = get_user_groups()
print(f" 所属组: {', '.join(groups) if groups else 'N/A'}")
2.6正常发送日志和告警
#编写start.py
from file_monitor import FileMonitor
if __name__=='__main__':
#启动file_monitor模块中的FileMonitor类中的
#start_watch
FileMonitor().start_watch()
#操作监控的目录
#启动监听
/share/filemonitor/.fm/bin/python /share/filemonitor/watch/start.py
#启动log服务
celery -A log worker --loglevel=info
#启动msg服务
celery -A msg worker --loglevel=info
#进入监听目录,创建,删除文件
cd /opt
touch 文件
rm 文件
3.日志模块
3.1日志保存字段
#编写yml文件
#config.yml
folder_path: /opt/
mongo:
host: 192.168.157.11
port: 27017
user: admin
pass: 123456
db: filemonitor
coll: mongo_log
mysql:
host: 192.168.157.11
port: 32772
user: root
pass: root
db: filemonitor
tb: logs
redis:
host: 192.168.157.11
port: 6379
msg_broker_db: 0
msg_backend_db: 1
log_broker_db: 2
log_backend_db: 3
msg_broker_url: redis://192.168.157.11:6379/0
msg_backend_url: redis://192.168.157.11:6379/1
log_broker_url: redis://192.168.157.11:6379/2
log_backend_url: redis://192.168.157.11:6379/3
db: 4
logurl: redis://192.168.157.11:6379/4
#初始化config
import yaml
import os
class Configuration():
#在初始化函数中读取配置文件
def __init__(self):
#定义文件位置为当前目录下的config.yml
config_path=os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'config.yml'
)
#打开配置文件,读取配置文件
with open(config_path,'r',encoding='utf-8') as f:
self.config=yaml.safe_load(f)
#get_config函数为外部提供配置文件的字典
def get_config(self):
return self.config
#if __name__=='__main__':
# print(Configuration().get_config())
#日志包含字段:时间,用户,文件,操作等
#文件打开日志
def on_opened(self, event):
print(event.src_path,'打开')
log={
'time':str(get_current_time_with_ms()),
'user':get_current_username(),
'file':event.src_path,
'ops':'open'
}
msg='消息\n'+"time:"+str(get_current_time_with_ms())+'\n'+'user:'+
str(get_current_username())+'\n'+'file:'+event.src_path+'\n'+'ops:open'
#文件删除日志
def on_deleted(self, event):
print(event.src_path,'删除')
#发送告警到电子邮件
log={
'time':str(get_current_time_with_ms()),
'user':get_current_username(),
'file':event.src_path,
'ops':'delete'
}
msg='消息\n'+"time:"+str(get_current_time_with_ms())+'\n'+'user:'+
str(get_current_username())+'\n'+'file:'+event.src_path+'\n'+'ops:delete'
3.2保存日志到mysql
#创建MySQL容器并且创建filemonitor数据库和logs表
#创建容器
docker run -d -p3306 -e MYSQL_ROOT_USERNAME=root -e MYSQL_ROOT_PASSWORD=root mysql:5.7.44
#创建filemonitor数据库
create database if not exists filemonitor charset=’utf8mb4’;
#创建logs表
use filemonitor;
create table logs(time varchar(45),user varchar(45),file varchar(45),ops varchar(45));
#编写MySQL部分的tasks.py文件
@app.task(name='save_to_mysql')
def save_to_mysql(filelog):
config=Configuration().get_config()['mysql']
host=config['host']
port=config['port']
user=config['user']
passwd=config['pass']
db=config['db']
tb=config['tb']
conn=pymysql.connect(
host=host,
port=port,
user=user,
passwd=passwd,
db=db,
cursorclass=DictCursor
)
cursor=conn.cursor()
sql=f'insert into '+tb+' values ("'+str(filelog['time'])+'","'+filelog["user"]+'","'+filelog["file"]+'",
"'+filelog["ops"]+'")'
print(sql)
cursor.execute(sql)
conn.commit()
conn.close()
3.3保存日志到mongodb
#创建mongodb容器
version: '3'
services:
mongo01:
image: mongo:latest
container_name: mongodb
volumes:
- /opt/mongodb/data/db:/data/db
environment:
MONGO_INITDB_ROOT_USERNAME: admin # 可选:直接通过环境变量创建用户
MONGO_INITDB_ROOT_PASSWORD: 123456
expose:
- 27017
ports:
- "27017:27017"
restart: always
#编写mongodb部分的tasks.py文件
@app.task(name='save_to_mongo')
def save_to_mongo(logfile):
config=Configuration().get_config()
host=config['mongo']['host']
port=config['mongo']['port']
user=config['mongo']['user']
passwd=str(config['mongo']['pass'])
db=config['mongo']['db']
coll=config['mongo']['coll']
url='mongodb://'+user+':'+passwd+'@'+host+':'+str(port)
client=MongoClient(url)
db=client[db]
coll=db[coll]
coll.insert_one(logfile)
3.4保存日志到redis
#创建redis容器
version: '3'
services:
python0000:
image: python-ssh:1.0
volumes:
- /pythona/:/pythonb/
expose:
- 22
ports:
- 22
restart: always
privileged: True
redis0:
image: redis:latest
expose:
- 6379
ports:
- 6379
#编写redis部分的tasks.py文件
@app.task(name='save_to_redis')
def save_to_redis(filelog):
config=Configuration().get_config()['redis']
host=config['host']
port=config['port']
db=config['db']
r=redis.Redis(
host=host,
port=port,
db=db
)
r.set(
str(filelog['time']),
json.dumps(filelog)
)
r.close()
3.5日志的保存
#redis
#mysql
#mongdb
4.告警模块
4.1邮件报警
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import Header
import yaml
import os
class ZMEmail(object):
def __init__(self):
fp=os.path.join(
os.path.dirname(
os.path.abspath(__file__)
),
'config.yml'
)
file=open(fp,'r',encoding='utf-8')
mailconfig=yaml.safe_load(file)
self.sender=mailconfig['email']['sender']
self.code=mailconfig['email']['code']
self.receer=mailconfig['email']['receer']
self.server=mailconfig['email']['server']
file.close()
def sendmail(self,msg):
# 发件人邮箱账号和授权码
sender_email = self.sender
sender_password = self.code
# 收件人邮箱账号
receiver_email = self.receer
# 创建一个带附件的邮件实例
message = MIMEMultipart()
message["From"] = Header(sender_email, "utf-8")
message["To"] = Header(receiver_email, "utf-8")
message["Subject"] = Header("邮件提醒", "utf-8")
# 邮件正文内容
mail_content = msg
message.attach(MIMEText(mail_content, "plain", "utf-8"))
# 配置 SMTP 服务器
smtp_server = self.server
smtp_port = 465
try:
# 创建 SMTP 连接对象,使用 SSL 加密
server = smtplib.SMTP_SSL(smtp_server, smtp_port)
# 登录发件人邮箱
server.login(sender_email, sender_password)
# 发送邮件
server.sendmail(sender_email, receiver_email, message.as_string())
print("邮件发送成功")
except Exception as e:
print(f"邮件发送失败: {e}")
finally:
# 关闭连接
server.quit()
print("=====邮件已经发送====")
4.2微信告警
from configuration import Configuration
import requests
class ZMWechat():
def __init__(self):
self.config=Configuration().get_config()['wechat']
def send_wechat(self,msg):
headers = {'Content-Type': 'application/json'}
data = {
"msgtype": "text",
"text": {
"content": msg
}
}
try:
response = requests.post(self.config['webhook'], headers=headers, json=data)
if response.status_code == 200:
result = response.json()
if result.get('errcode') == 0:
print("微信消息发送成功")
else:
print(f"微信消息发送失败: {result.get('errmsg')}")
else:
print(f"微信消息发送失败: {response.text}")
except Exception as e:
print(f"微信消息发送失败: {e}")
#if __name__=='__main__':
# ZMWechat().send_wechat('why? tell me')
4.3钉钉告警
from configuration import Configuration
import requests
class ZMDing():
def __init__(self):
self.config=Configuration().get_config()['dingding']
def send_dingding(self,msg):
headers = {'Content-Type': 'application/json'}
data = {
"msgtype": "text",
"text": {
"content": msg
},
"at": {
"isAtAll": False # 如果需要@所有人,设置为True
}
}
try:
response = requests.post(self.config['webhook'], headers=headers, json=data)
if response.status_code == 200:
print("钉钉消息发送成功")
else:
print(f"钉钉消息发送失败: {response.text}")
except Exception as e:
print(f"钉钉消息发送失败: {e}")
4.4详细的报警信息
#编写yml文件
#config.yml
folder_path: /opt/
mongo:
host: 192.168.157.11
port: 27017
user: admin
pass: 123456
db: filemonitor
coll: mongo_log
mysql:
host: 192.168.157.11
port: 32772
user: root
pass: root
db: filemonitor
tb: logs
redis:
host: 192.168.157.11
port: 6379
msg_broker_db: 0
msg_backend_db: 1
log_broker_db: 2
log_backend_db: 3
msg_broker_url: redis://192.168.157.11:6379/0
msg_backend_url: redis://192.168.157.11:6379/1
log_broker_url: redis://192.168.157.11:6379/2
log_backend_url: redis://192.168.157.11:6379/3
db: 4
logurl: redis://192.168.157.11:6379/4
email:
sender: '网易的邮箱'
code: 密钥
receer: QQ邮箱
server: smtp.163.com
dingding:
webhook: 钉钉机器人的webhook
wechat:
webhook: 微信机器人的webhook
#email监控的报警信息
#微信监控的报警信息
#钉钉监控的报警信息
5.celery框架
5.1 redis中间件的部署
redis:
host: 192.168.157.11
port: 6379
msg_broker_db: 0
msg_backend_db: 1
log_broker_db: 2
log_backend_db: 3
msg_broker_url: redis://192.168.157.11:6379/0
msg_backend_url: redis://192.168.157.11:6379/1
log_broker_url: redis://192.168.157.11:6379/2
log_backend_url: redis://192.168.157.11:6379/3
db: 4
logurl: redis://192.168.157.11:6379/4
5.2 任务的注册
@app.task(name='save_to_redis')
def save_to_redis(filelog):
config=Configuration().get_config()['redis']
host=config['host']
port=config['port']
db=config['db']
r=redis.Redis(
host=host,
port=port,
db=db
)
r.set(
str(filelog['time']),
json.dumps(filelog)
)
r.close()
5.3 app的创建
#msg
app = Celery(
'msg',
broker=f"redis://{config['redis']['host']}:{config['redis']['port']}/
{config['redis']['msg_broker_db']}",
backend=f"redis://{config['redis']['host']}:{config['redis']['port']}/
{config['redis']['msg_backend_db']}",
)
5.4任务的发现
app = Celery(
'log',
broker=f"redis://{config['redis']['host']}:{config['redis']['port']}/
{config['redis']['log_broker_db']}",
backend=f"redis://{config['redis']['host']}:{config['redis']['port']}/
{config['redis']['log_backend_db']}",
include=['tasks']
)