当前位置: 首页 > news >正文

基于 Celery 的分布式文件监控系统

Celery 是一款基于 Python 开发的分布式任务队列(Distributed Task Queue),核心作用是将应用中的耗时任务、定时任务或跨服务任务 “剥离” 出主流程,通过异步执行的方式提升系统响应速度、稳定性与可扩展性。它广泛应用于 Web 开发、数据分析、自动化运维等场景,例如用户注册后的邮件发送、订单支付后的状态异步更新、定时数据备份、大规模数据计算等。

一、核心定位:解决 “任务阻塞” 问题

在传统应用中,若直接在请求链路里执行耗时操作(如生成 10 万条数据的报表、调用响应慢的第三方 API),会导致用户等待时间过长,甚至引发请求超时。Celery 通过 “解耦任务发起与执行” 解决这一痛点:

  • 不阻塞主流程:Web 应用(如 Django/Flask 项目)发起任务后,无需等待任务完成,直接返回结果给用户,任务在后台独立执行;
  • 隔离任务风险:任务执行失败(如邮件服务器故障)不会影响主应用运行,可通过重试机制保障任务最终完成;
  • 支持分布式扩展:可在多台服务器部署任务执行节点,分担高并发场景下的任务压力。

二、核心组件与工作流程

Celery 的运行依赖 “中间件” 和 “自身模块” 的配合,核心组件分为 4 类,工作流程清晰可追溯:

1. 四大核心组件

  • 生产者(Producer):发起任务的应用(如 Web 项目),通过 Celery 提供的 API 将任务 “提交” 到任务队列;
  • 任务队列(Broker):暂存任务的中间件,负责任务的接收、存储与分发,是 “生产者” 与 “消费者” 的通信桥梁,常用实现有 Redis(轻量、易部署)、RabbitMQ(可靠、支持复杂路由);
  • 消费者(Worker):独立运行的进程 / 线程,持续监听任务队列,从队列中 “取出” 任务并执行(如调用函数发送邮件);
  • 结果后端(Result Backend):可选组件,用于存储任务执行结果(如成功 / 失败状态、函数返回值),供生产者后续查询(如 Web 应用查询 “报表生成是否完成”),常用实现有 Redis、MySQL、MongoDB。

2. 典型工作流程(以 “用户注册发送验证邮件” 为例)

  1. 用户在 Web 端提交注册信息,Web 应用(生产者)验证信息无误后,调用 Celery 任务的 delay() 方法,将 “用户邮箱” 作为参数,提交任务到 Broker(如 Redis);
  2. Broker 接收到任务后,将其存入指定队列(默认队列名为 celery,也可自定义队列区分任务类型);
  3. 后台运行的 Worker 进程(消费者)持续监听该队列,检测到新任务后,取出任务并执行 “发送验证邮件” 的逻辑;
  4. 若配置了 Result Backend,Worker 会将任务执行结果(如 “邮件发送成功” 或 “邮箱不存在导致失败”)存入后端;
  5. 若 Web 应用需要确认任务状态,可通过任务 ID 从 Result Backend 中查询结果。

三、核心特性:灵活、可靠、可扩展

Celery 能成为 Python 生态的主流任务队列,得益于其丰富且实用的特性:

  • 灵活的任务配置:支持为单个任务设置重试策略(如失败后重试 3 次,每次间隔 5 秒)、超时时间(如任务执行超过 10 秒则终止)、优先级(高优先级任务先执行);
  • 定时任务调度:内置 Beat 调度器,可替代 Linux Crontab 实现更灵活的定时任务(如 “每天凌晨 2 点执行数据备份”“每小时同步一次第三方数据”),支持精确到秒的调度规则;
  • 分布式与负载均衡:可在多台服务器部署 Worker,Broker 会自动将任务分发到空闲 Worker,实现负载均衡;支持按任务类型分配队列,让特定 Worker 只处理指定类型任务(如 “报表生成任务” 由高性能服务器的 Worker 处理);
  • 故障处理与监控:支持任务失败后的回调(如失败时发送告警邮件),提供 flower 等监控工具,可实时查看 Worker 状态、任务执行进度、失败任务详情;
  • 多语言兼容:虽然核心基于 Python,但通过协议扩展,支持其他语言(如 Go、Java)作为生产者或消费者与 Celery 交互;
  • 与 Python 生态无缝集成:可直接与 Django、Flask、Tornado 等主流 Web 框架结合,也支持与 SQLAlchemy、Django ORM 等数据工具协同,降低开发成本。

四、项目概述

1.1项目背景

随着信息化时代的到来,文件系统的监控和管理变得尤为重要。无论是个人用户还是企业,都需要实时监控文件的变化(如创建、修改、删除等),以便及时响应和处理。本项目的目标是开发一个基于 Celery 分布式任务队列的跨平台文件监控系统(FileMonitor),支持 Windows、macOS 和 Linux 系统,并通过微信、钉钉、邮件等方式通知用户文件变化。同时,系统会将监控日志存储到 MySQL、Redis 和 MongoDB 中,便于后续查询和分析

1.2 项目框架

1.3功能介绍

(1)跨平台文件监控:支持 Windows、macOS 和 Linux 系统,使用 watchdog 模块监听文件变化(watchdog 是跨平台解决方案)

(2)分布式任务处理:使用 Celery 实现分布式任务队列,将文件监控任务分发到多个 Worker 节点

(3)多通知方式:支持微信、钉钉、邮件等方式发送文件变化通知

(4)多数据库支持:文件监控日志存储到 MySQL(关系型数据)、Redis(缓存和快速查询)、MongoDB(文档型数据)

(5)模块化设计:功能分模块开发(如监控模块、通知模块、数据库模块),提高可读性和可维护性

(6)容器化部署:使用 Docker 和 Docker Compose 实现环境标准化和快速部署

1.4技术栈

Python:项目核心语言

Celery:分布式任务队列,用于异步处理文件监控任务

watchdog:跨平台文件监控模块

Redis:作为 Celery 的消息代理(Broker)和结果存储(Result Backend)

MySQL:存储文件监控日志(关系型数据)

MongoDB:存储文件监控日志(文档型数据)

Docker:容器化部署,便于环境管理和分发

 2.监控模块

2.1项目环境的部署

version: '3'

services:

  pythonops:

    image: python-ssh:1.0

    restart: always

    privileged: true

    volumes:

      - /pythonops/:/pythonops/

      - /share/:/share/

    expose:

      - 21

      - 22

      - 80

      - 3306

      - 8080

      - 5000

      - 50000

    ports:

      - 21

      - 22

      - 80

      - 3306

      - 8080

      - 5000

      - 50000

    deploy:

      replicas: 3

2.2安装依赖

#vscode连接pythonops-1

#配置清华加速器

pip install --upgrade pip

pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple

#watchdog:跨平台文件监控模块

pip install watchdog

#celery:分布式任务队列

pip install celery

#redis:Celery 的消息代理(Broker)和结果存储(Result Backend)

pip install redis

#pymysql:MySQL 数据库操作

pip install pymysql

#pymongo:MongoDB 数据库操作

pip install pymongo

#PyYAML 是 FileMonitor 项目的依赖之一,用于解析 config.yml 配置文件

pip install pyyaml

2.3文件监控模块

#file_monitor.py

from watchdog.observers import Observer

from configuration import Configuration

import time

from file_handler import FileHandler

class FileMonitor():

    #创建初始化函数

    #添加成员变量handler

    def __init__(self):

        self.handler=FileHandler()

        self.watch_folder_path=Configuration().get_config()['folder_path']

    def start_watch(self):

        #启用监听服务

        #实例观察者

        dog=Observer()

        #调用 要观察目录,处理器

        #处理器,当文件发生书名什么事情之后的应对方案

        #我们会创建一个类去管理这些方案

        #这个处理器,我们定义在file_handler模块中

        #在该模块中创建File_Handler来处理

        dog.schedule(

            self.handler,

            self.watch_folder_path,

            recursive=True

        )

        #启动监听

        dog.start()

        #处理键盘ctrl-c的终止

        try:

            while True:

                time.sleep(1)

        except KeyboardInterrupt as e:

            #如果有键盘ctrl+c,直接停止监听

            dog.stop()

        #推出监听线程

        dog.join()

2.4使用配置文件设置需要监听的目录

#file_handle.py

from watchdog.events import FileSystemEventHandler

from date_time_milliseconds import get_current_time_with_ms

from get_current_user import get_current_username

from msg import send_mail,send_dingding,send_wechat

from log import save_to_mongo,save_to_mysql,save_to_redis

#文件监听处理器

class FileHandler(FileSystemEventHandler):

    #重写继承方法

    #应对文件被人打开的处理方法

    def on_opened(self, event):

        print(event.src_path,'打开')

        log={

            'time':str(get_current_time_with_ms()),

            'user':get_current_username(),

            'file':event.src_path,

            'ops':'open'

        }

        msg='消息\n'+"time:"+str(get_current_time_with_ms())+'\n'+'user:'

+str(get_current_username())+'\n'+'file:'+event.src_path+'\n'+'ops:open'

        #发送告警到电子邮件

        send_mail.delay(msg)

        #发送告警到钉钉

        send_dingding.delay(msg)

        #发送告警到微信

        send_wechat.delay(msg)

        #保存日志到Mongo

        save_to_mongo.delay(log)

        #保存日志到redis

        save_to_redis.delay(log)

        #保存日志到mysql

        save_to_mysql.delay(log)

    #应对文件被删除的处理方法

    def on_deleted(self, event):

        print(event.src_path,'删除')

        #发送告警到电子邮件

        log={

            'time':str(get_current_time_with_ms()),

            'user':get_current_username(),

            'file':event.src_path,

            'ops':'delete'

        }

        msg='消息\n'+"time:"+str(get_current_time_with_ms())+'\n'+'user:'

+str(get_current_username())+'\n'+'file:'+event.src_path+'\n'+'ops:delete'

        #发送告警到电子邮件

        send_mail.delay(msg)

        #发送告警到钉钉

        send_dingding.delay(msg)

        #发送告警到微信

        send_wechat.delay(msg)

        #保存日志到Mongo

        save_to_mongo.delay(log)

        #保存日志到redis

        save_to_redis.delay(log)

        #保存日志到mysql

        save_to_mysql.delay(log)

#设置需要监控的目录

folder_path: /opt/

redis:

  host: 192.168.157.11

  port: 6379

  msg_broker_db: 0

  msg_backend_db: 1

  log_broker_db: 2

  log_backend_db: 3

  msg_broker_url: redis://192.168.157.11:6379/0

  msg_backend_url: redis://192.168.157.11:6379/1

  log_broker_url: redis://192.168.157.11:6379/2

  log_backend_url: redis://192.168.157.11:6379/3

2.5使用 watchdog 实现跨平台监控

#能够对win、mac、linux等文件系统监控

#get_current_user.py

import os

import sys

import getpass

import platform

from datetime import datetime

def get_current_username():

    """获取当前用户名"""

    return getpass.getuser()

def get_user_fullname():

    """获取用户的完整名称(如果可用)"""

    try:

        if sys.platform.startswith('win'):

            # Windows 系统

            import ctypes

            from ctypes import wintypes

            

            # 定义 Windows API 函数

            GetUserNameEx = ctypes.windll.secur32.GetUserNameExW

            NameDisplay = 3  # 显示名称

            

            # 获取缓冲区大小

            size = ctypes.c_uint(0)

            GetUserNameEx(NameDisplay, None, ctypes.byref(size))

            

            # 分配缓冲区并获取显示名称

            buffer = ctypes.create_unicode_buffer(size.value)

            if GetUserNameEx(NameDisplay, buffer, ctypes.byref(size)):

                return buffer.value

            else:

                return None

        else:

            # Linux/macOS 系统

            import pwd

            

            # 获取当前用户的 passwd 条目

            user_entry = pwd.getpwuid(os.getuid())

            

            # GECOS 字段通常包含用户的全名

            gecos = user_entry.pw_gecos

            if gecos:

                # 分割 GECOS 字段(格式可能为 "Full Name,Room,Work Phone,Home Phone")

                full_name = gecos.split(',')[0]

                return full_name if full_name else None

            return None

    except Exception as e:

        print(f"获取用户完整名称时出错: {e}")

        return None

def get_user_home_dir():

    """获取用户主目录"""

    return os.path.expanduser("~")

def get_user_id():

    """获取用户 ID(Unix 系统)或 SID(Windows 系统)"""

    try:

        if sys.platform.startswith('win'):

            # Windows 系统获取用户 SID

            import win32security

            token = win32security.OpenProcessToken(

                win32process.GetCurrentProcess(),

                win32security.TOKEN_QUERY

            )

            user_sid, _ = win32security.GetTokenInformation(

                token, win32security.TokenUser

            )

            return str(user_sid)

        else:

            # Unix 系统获取用户 UID

            return os.getuid()

    except Exception as e:

        print(f"获取用户 ID 时出错: {e}")

        return None

def get_user_groups():

    """获取用户所属的组"""

    try:

        if sys.platform.startswith('win'):

            # Windows 系统

            import win32net

            import win32security

            

            username = get_current_username()

            groups = []

            

            # 获取直接所属的组

            user_groups = win32net.NetUserGetLocalGroups(None, username)

            groups.extend(user_groups)

            

            # 获取主组(通常是 Users)

            token = win32security.OpenProcessToken(

                win32process.GetCurrentProcess(),

                win32security.TOKEN_QUERY

            )

            groups_info = win32security.GetTokenInformation(

                token, win32security.TokenGroups

            )

            for group in groups_info:

                sid_name = win32security.LookupAccountSid(None, group[0])[0]

                if sid_name not in groups:

                    groups.append(sid_name)

            

            return groups

        else:

            # Unix 系统

            import grp

            

            username = get_current_username()

            primary_gid = os.getgid()

            groups = [grp.getgrgid(primary_gid).gr_name]

            

            # 获取所有补充组

            if hasattr(os, 'getgroups'):

                for gid in os.getgroups():

                    try:

                        group_name = grp.getgrgid(gid).gr_name

                        if group_name not in groups:

                            groups.append(group_name)

                    except KeyError:

                        # 忽略无法解析的组 ID

                        pass

            

            return groups

    except Exception as e:

        print(f"获取用户组时出错: {e}")

        return []

def main():

    """主函数:显示当前用户的账户信息"""

    print(f"运行时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

    print(f"操作系统: {platform.system()} {platform.version()}")

    print(f"计算机名: {platform.node()}")

    print("")

    

    print("用户账户信息:")

    print(f"  用户名: {get_current_username()}")

    

    full_name = get_user_fullname()

    print(f"  完整名称: {full_name if full_name else 'N/A'}")

    

    print(f"  主目录: {get_user_home_dir()}")

    

    user_id = get_user_id()

    print(f"  用户 ID: {user_id if user_id is not None else 'N/A'}")

    

    groups = get_user_groups()

    print(f"  所属组: {', '.join(groups) if groups else 'N/A'}")

2.6正常发送日志和告警

#编写start.py

from file_monitor import FileMonitor

if __name__=='__main__':

    #启动file_monitor模块中的FileMonitor类中的

    #start_watch

    FileMonitor().start_watch()

#操作监控的目录

#启动监听

/share/filemonitor/.fm/bin/python /share/filemonitor/watch/start.py

#启动log服务

celery -A log worker --loglevel=info

#启动msg服务

celery -A msg worker --loglevel=info

#进入监听目录,创建,删除文件

cd /opt

touch 文件

rm 文件

3.日志模块

3.1日志保存字段

#编写yml文件

#config.yml

folder_path: /opt/

mongo:

  host: 192.168.157.11

  port: 27017

  user: admin

  pass: 123456

  db: filemonitor

  coll: mongo_log

mysql:

  host: 192.168.157.11

  port: 32772

  user: root

  pass: root

  db: filemonitor

  tb: logs

redis:

  host: 192.168.157.11

  port: 6379

  msg_broker_db: 0

  msg_backend_db: 1

  log_broker_db: 2

  log_backend_db: 3

  msg_broker_url: redis://192.168.157.11:6379/0

  msg_backend_url: redis://192.168.157.11:6379/1

  log_broker_url: redis://192.168.157.11:6379/2

  log_backend_url: redis://192.168.157.11:6379/3

  db: 4

  logurl: redis://192.168.157.11:6379/4

#初始化config

import yaml

import os

class Configuration():

    #在初始化函数中读取配置文件

    def __init__(self):

        #定义文件位置为当前目录下的config.yml

        config_path=os.path.join(

            os.path.dirname(os.path.abspath(__file__)),

            'config.yml'

        )

        #打开配置文件,读取配置文件

        with open(config_path,'r',encoding='utf-8') as f:

            self.config=yaml.safe_load(f)

    #get_config函数为外部提供配置文件的字典

    def get_config(self):

        return self.config

    

#if __name__=='__main__':

#    print(Configuration().get_config())

#日志包含字段:时间,用户,文件,操作等

#文件打开日志   

def on_opened(self, event):

        print(event.src_path,'打开')

        log={

            'time':str(get_current_time_with_ms()),

            'user':get_current_username(),

            'file':event.src_path,

            'ops':'open'

        }

        msg='消息\n'+"time:"+str(get_current_time_with_ms())+'\n'+'user:'+

str(get_current_username())+'\n'+'file:'+event.src_path+'\n'+'ops:open'

#文件删除日志

    def on_deleted(self, event):

        print(event.src_path,'删除')

        #发送告警到电子邮件

        log={

            'time':str(get_current_time_with_ms()),

            'user':get_current_username(),

            'file':event.src_path,

            'ops':'delete'

        }

        msg='消息\n'+"time:"+str(get_current_time_with_ms())+'\n'+'user:'+

str(get_current_username())+'\n'+'file:'+event.src_path+'\n'+'ops:delete'

3.2保存日志到mysql

#创建MySQL容器并且创建filemonitor数据库和logs表

#创建容器

docker run -d -p3306 -e MYSQL_ROOT_USERNAME=root -e MYSQL_ROOT_PASSWORD=root mysql:5.7.44

#创建filemonitor数据库

create database if not exists filemonitor charset=’utf8mb4’;

#创建logs表

use filemonitor;

create table logs(time varchar(45),user varchar(45),file varchar(45),ops varchar(45));

#编写MySQL部分的tasks.py文件

@app.task(name='save_to_mysql')

def save_to_mysql(filelog):

    config=Configuration().get_config()['mysql']

    host=config['host']

    port=config['port']

    user=config['user']

    passwd=config['pass']

    db=config['db']

    tb=config['tb']

    conn=pymysql.connect(

        host=host,

        port=port,

        user=user,

        passwd=passwd,

        db=db,

        cursorclass=DictCursor

    )

    cursor=conn.cursor()

sql=f'insert into '+tb+' values ("'+str(filelog['time'])+'","'+filelog["user"]+'","'+filelog["file"]+'",

"'+filelog["ops"]+'")'

    print(sql)

    cursor.execute(sql)

    conn.commit()

    conn.close()

3.3保存日志到mongodb

#创建mongodb容器

version: '3'

services:

  mongo01:

    image: mongo:latest

    container_name: mongodb

    volumes:

      - /opt/mongodb/data/db:/data/db

    environment:

      MONGO_INITDB_ROOT_USERNAME: admin  # 可选:直接通过环境变量创建用户

      MONGO_INITDB_ROOT_PASSWORD: 123456

    expose:

      - 27017

    ports:

      - "27017:27017"

    restart: always

#编写mongodb部分的tasks.py文件

@app.task(name='save_to_mongo')

def save_to_mongo(logfile):

    config=Configuration().get_config()

    host=config['mongo']['host']

    port=config['mongo']['port']

    user=config['mongo']['user']

    passwd=str(config['mongo']['pass'])

    db=config['mongo']['db']

    coll=config['mongo']['coll']

    url='mongodb://'+user+':'+passwd+'@'+host+':'+str(port)

    client=MongoClient(url)

    db=client[db]

    coll=db[coll]

    coll.insert_one(logfile)

3.4保存日志到redis

#创建redis容器

version: '3'

services:

  python0000:

    image: python-ssh:1.0

    volumes:

      - /pythona/:/pythonb/

    expose:

      - 22

    ports:

      - 22

    restart: always

    privileged: True

  redis0:

    image: redis:latest

    expose:

      - 6379

    ports:

      - 6379

#编写redis部分的tasks.py文件

@app.task(name='save_to_redis')

def save_to_redis(filelog):

    config=Configuration().get_config()['redis']

    host=config['host']

    port=config['port']

    db=config['db']

    r=redis.Redis(

        host=host,

        port=port,

        db=db

    )

    r.set(

        str(filelog['time']),

        json.dumps(filelog)

    )

    r.close()

3.5日志的保存

#redis

#mysql

#mongdb

4.告警模块

4.1邮件报警

import smtplib

from email.mime.text import MIMEText

from email.mime.multipart import MIMEMultipart

from email.header import Header

import yaml

import os

class ZMEmail(object):

    def __init__(self):

        fp=os.path.join(

            os.path.dirname(

                os.path.abspath(__file__)

            ),

            'config.yml'

        )

        file=open(fp,'r',encoding='utf-8')

        mailconfig=yaml.safe_load(file)

        self.sender=mailconfig['email']['sender']

        self.code=mailconfig['email']['code']

        self.receer=mailconfig['email']['receer']

        self.server=mailconfig['email']['server']

        file.close()

    def sendmail(self,msg):

        # 发件人邮箱账号和授权码

        sender_email = self.sender

        sender_password = self.code

        # 收件人邮箱账号

        receiver_email = self.receer

        # 创建一个带附件的邮件实例

        message = MIMEMultipart()

        message["From"] = Header(sender_email, "utf-8")

        message["To"] = Header(receiver_email, "utf-8")

        message["Subject"] = Header("邮件提醒", "utf-8")

        # 邮件正文内容

        mail_content = msg

        message.attach(MIMEText(mail_content, "plain", "utf-8"))

        # 配置 SMTP 服务器

        smtp_server = self.server

        smtp_port = 465

        try:

            # 创建 SMTP 连接对象,使用 SSL 加密

            server = smtplib.SMTP_SSL(smtp_server, smtp_port)

            # 登录发件人邮箱

            server.login(sender_email, sender_password)

            # 发送邮件

            server.sendmail(sender_email, receiver_email, message.as_string())

            print("邮件发送成功")

        except Exception as e:

            print(f"邮件发送失败: {e}")

        finally:

            # 关闭连接

            server.quit()

        print("=====邮件已经发送====")

4.2微信告警

from configuration import Configuration

import requests

class ZMWechat():

    def __init__(self):

        self.config=Configuration().get_config()['wechat']

    def send_wechat(self,msg):

        headers = {'Content-Type': 'application/json'}

        data = {

            "msgtype": "text",

            "text": {

                "content": msg

            }

        }

        try:

            response = requests.post(self.config['webhook'], headers=headers, json=data)

            if response.status_code == 200:

                result = response.json()

                if result.get('errcode') == 0:

                    print("微信消息发送成功")

                else:

                    print(f"微信消息发送失败: {result.get('errmsg')}")

            else:

                print(f"微信消息发送失败: {response.text}")

        except Exception as e:

            print(f"微信消息发送失败: {e}")

#if __name__=='__main__':

#    ZMWechat().send_wechat('why? tell me')

4.3钉钉告警

from configuration import Configuration

import requests

class ZMDing():

    def __init__(self):

        self.config=Configuration().get_config()['dingding']

    def send_dingding(self,msg):

        headers = {'Content-Type': 'application/json'}

        data = {

            "msgtype": "text",

            "text": {

                "content": msg

            },

            "at": {

                "isAtAll": False  # 如果需要@所有人,设置为True

            }

        }

        try:

            response = requests.post(self.config['webhook'], headers=headers, json=data)

            if response.status_code == 200:

                print("钉钉消息发送成功")

            else:

                print(f"钉钉消息发送失败: {response.text}")

        except Exception as e:

            print(f"钉钉消息发送失败: {e}")

4.4详细的报警信息

#编写yml文件

#config.yml

folder_path: /opt/

mongo:

  host: 192.168.157.11

  port: 27017

  user: admin

  pass: 123456

  db: filemonitor

  coll: mongo_log

mysql:

  host: 192.168.157.11

  port: 32772

  user: root

  pass: root

  db: filemonitor

  tb: logs

redis:

  host: 192.168.157.11

  port: 6379

  msg_broker_db: 0

  msg_backend_db: 1

  log_broker_db: 2

  log_backend_db: 3

  msg_broker_url: redis://192.168.157.11:6379/0

  msg_backend_url: redis://192.168.157.11:6379/1

  log_broker_url: redis://192.168.157.11:6379/2

  log_backend_url: redis://192.168.157.11:6379/3

  db: 4

  logurl: redis://192.168.157.11:6379/4

  

email:

  sender: '网易的邮箱'

  code: 密钥

  receer: QQ邮箱

  server: smtp.163.com

dingding:

  webhook: 钉钉机器人的webhook

wechat:

  webhook: 微信机器人的webhook

#email监控的报警信息

#微信监控的报警信息

#钉钉监控的报警信息

5.celery框架

5.1 redis中间件的部署

redis:

  host: 192.168.157.11

  port: 6379

  msg_broker_db: 0

  msg_backend_db: 1

  log_broker_db: 2

  log_backend_db: 3

  msg_broker_url: redis://192.168.157.11:6379/0

  msg_backend_url: redis://192.168.157.11:6379/1

  log_broker_url: redis://192.168.157.11:6379/2

  log_backend_url: redis://192.168.157.11:6379/3

  db: 4

  logurl: redis://192.168.157.11:6379/4

5.2 任务的注册

@app.task(name='save_to_redis')

def save_to_redis(filelog):

    config=Configuration().get_config()['redis']

    host=config['host']

    port=config['port']

    db=config['db']

    r=redis.Redis(

        host=host,

        port=port,

        db=db

    )

    r.set(

        str(filelog['time']),

        json.dumps(filelog)

    )

    r.close()

5.3 app的创建

#msg

app = Celery(

    'msg',

broker=f"redis://{config['redis']['host']}:{config['redis']['port']}/

{config['redis']['msg_broker_db']}",

backend=f"redis://{config['redis']['host']}:{config['redis']['port']}/

{config['redis']['msg_backend_db']}",

)

5.4任务的发现

app = Celery(

    'log',

broker=f"redis://{config['redis']['host']}:{config['redis']['port']}/

{config['redis']['log_broker_db']}",

backend=f"redis://{config['redis']['host']}:{config['redis']['port']}/

{config['redis']['log_backend_db']}",

include=['tasks']

http://www.dtcms.com/a/426779.html

相关文章:

  • CATIA二次开发(2)C#启用AOT
  • Linux 驱动开发与内核通信机制——超详细教程
  • 【langgraph】本地部署方法及实例分析
  • Linux入门指南:从零掌握基础指令
  • 做笔记的网站源码江永网站建设
  • 是时候重启了:AIGC将如何重构UI设计师的学习路径与知识体系?
  • uniapp 请求接口封装和使用
  • AIGC重构数据可视化:你是进化中的“驯兽师”还是被替代的“画图工”?
  • Apache Doris 内部数据裁剪与过滤机制的实现原理
  • 专业做网站流程小程序开发步骤大全
  • C语言基础之指针2
  • 淘客网站怎么做 知乎wordpress淘宝联盟插件
  • flink工作流程
  • openHarmony之storage_daemon:分区挂载与设备节点管理机制讲解
  • 建站怎么赚钱个人官方网站怎么建设
  • 学习笔记093——Windows系统如何定时备份远程服务器的mysql文件到本地?
  • 操作系统内核架构深度解析:从单内核、微内核到鸿蒙分布式设计
  • MySQL 架构全景解析
  • .NET MVC中实现后台商品列表功能
  • oracle logwr,ckpt,dbwn 如何协同工作的
  • C# 网络通讯核心知识点笔记
  • Ubuntu之apt安装ClickHouse数据库
  • 在线音乐网站开发现状网站全屏弹出窗口
  • 泛型在Java集合框架中的应用有哪些?
  • 服务器中使用Docker部署前端项目
  • mysql之二进制日志
  • 【完整源码+数据集+部署教程】染色体图像分割系统: yolov8-seg-KernelWarehouse
  • Docker MySQL 使用全流程
  • Visual Studio主题、字体、快捷键、开发环境设置,自用
  • 火山 19 混音伴音 接口