当前位置: 首页 > wzjs >正文

创建团购网站网站页面构架

创建团购网站,网站页面构架,上海有哪些外贸进出口公司,十堰微网站建设价格需求阐述 工作中使用fastStream模块消费kafka事件的时候,遇到以下问题。1. 由于main.py模块要用到许多其他模块 ,main模块,和其他模块都有日志输出。2. 要实现异步消费事件,但是又不希望日志打印的很乱。3. 想着在每次消费事件的…

需求阐述

工作中使用fastStream模块消费kafka事件的时候,遇到以下问题。1. 由于main.py模块要用到许多其他模块 ,main模块,和其他模块都有日志输出。2. 要实现异步消费事件,但是又不希望日志打印的很乱。3. 想着在每次消费事件的时候,生成一个唯一的id,然后根据id生成一个log处理器。4. 这个独一无二的log处理器可以透传到其他模块。但是我又不想通过参数的方式传递,那样太麻烦了。5. 尝试借助上下文变量的方式。

询问deepseek

我的描述如下

我有一个需求帮我设计以日志框架 1.我使用fastStream模块,消费kafka事件。首先框架里至少有多个自定义模块,log.py, main.py, git.py, change.py等。 2. 这四个模块,分别对应日志处理,主进程,git命令,change信息的处理。每个模块都需要打印日志信息。 3. 我希望当每个事件进入到主进程后,能生成一id,这个id将生成一log处理器。这个log处理在主进程调度相应的git,change.py模块的时候,能通过全局的方式对应使用这个事件对应id生成的log对象。有肯能是需要使用到上下文的方式来传递,但我不希望每次调度一个git或者change的函数或者方法的时候都将这个log处理器作为参数。我希望在主进程设置,在子进程直接使用。 4. 这个框架能够适用于异步框架。5. log使用loguru模块,并且需要实现每个id的日志写到id.log文件中,并且20天自动清理一次。

# log.py
from loguru import logger
from contextvars import ContextVar
import os
from pathlib import Path
from datetime import timedeltaLOG_DIR = Path("logs")
RETENTION_DAYS = 20
current_event_id = ContextVar('event_id', default=None)def configure_logger():"""全局日志配置"""logger.remove()# 添加控制台输出logger.add(sink=lambda msg: print(msg, end=""),format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level.icon} | {message}",level="INFO",colorize=True,enqueue=True  # 确保异步安全)def add_event_logger(event_id: str):"""为事件添加文件日志"""log_file = LOG_DIR / f"{event_id}.log"sink_id = logger.add(sink=log_file,format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",retention=timedelta(days=RETENTION_DAYS),filter=lambda record: record["extra"].get("event_id") == event_id,enqueue=True)return sink_idclass EventLoggerContext:"""事件日志上下文管理器"""def __init__(self, event_id: str):self.event_id = event_idself.sink_id = Nonedef __enter__(self):self.sink_id = add_event_logger(self.event_id)token = current_event_id.set(self.event_id)logger.configure(extra={"event_id": self.event_id})return selfdef __exit__(self, exc_type, exc_val, exc_tb):logger.remove(self.sink_id)current_event_id.set(None)logger.configure(extra={"event_id": None})# git.py
from loguru import logger
from .log import current_event_idasync def git_operation(repo_url: str):event_id = current_event_id.get()logger.bind(event_id=event_id).info(f"Cloning repository: {repo_url}")# ... git操作逻辑logger.bind(event_id=event_id).success("Git operation completed")# change.py
from loguru import logger
from .log import current_event_idasync def process_changes(changes: list):event_id = current_event_id.get()logger.bind(event_id=event_id).debug(f"Processing {len(changes)} changes")# ... change处理逻辑logger.bind(event_id=event_id).warning("Found potential conflicts")# main.py
from faststream import Context
from loguru import logger
from .log import EventLoggerContext, configure_logger, LOG_DIR
from .git import git_operation
from .change import process_changes
import uuidasync def handle_event(event):event_id = str(uuid.uuid4())  # 生成唯一事件IDwith EventLoggerContext(event_id):logger.info(f"Start processing event {event_id}")# 主处理流程await git_operation(event["repo_url"])await process_changes(event["changes"])logger.success(f"Event {event_id} processed")def setup():"""初始化配置"""LOG_DIR.mkdir(exist_ok=True)configure_logger()# 启动FastStream消费者
from faststream.kafka import KafkaBrokerbroker = KafkaBroker("localhost:9092")
@broker.subscriber("events-topic")
async def on_event(event):await handle_event(event)

调试代码

  • 项目结构
    在这里插入图片描述

  • main.py中的代码

# main.py
import asynciofrom faststream import Context
from loguru import logger
from log import EventLoggerContext, configure_logger, LOG_DIR
from git import git_operation
from change import process_changes
import uuidasync def handle_event(event):event_id = str(uuid.uuid4())  # 生成唯一事件IDwith EventLoggerContext(event_id):logger.info(f"Start processing event {event_id}")# 主处理流程await git_operation(event["repo_url"])await process_changes(event["changes"])logger.success(f"Event {event_id} processed")def setup():"""初始化配置"""LOG_DIR.mkdir(exist_ok=True)configure_logger()# 启动FastStream消费者
# from faststream.kafka import KafkaBroker
#
# broker = KafkaBroker("localhost:9092")
#
#
# @broker.subscriber("events-topic")
# async def on_event(event):
#     await handle_event(event)async def stress_test():events = [{"repo_url": f"https://github.com/user/repo_{i}.git","changes": "#"*i} for i in range(100)]await asyncio.gather(*[handle_event(e) for e in events])# 测试运行(在入口文件添加)
if __name__ == "__main__":setup()asyncio.run(stress_test())
  • log.py中的代码
# log.py
from loguru import logger
from contextvars import ContextVar
import os
from pathlib import Path
from datetime import timedeltaLOG_DIR = Path("logs")
RETENTION_DAYS = 20
current_event_id = ContextVar('event_id', default=None)def configure_logger():"""全局日志配置"""logger.remove()# 添加控制台输出logger.add(sink=lambda msg: print(msg, end=""),format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | {level.icon} | {message}",level="INFO",colorize=True,enqueue=True  # 确保异步安全)def add_event_logger(event_id: str):"""为事件添加文件日志"""log_file = LOG_DIR / f"{event_id}.log"sink_id = logger.add(sink=log_file,format="{time:YYYY-MM-DD HH:mm:ss} | {level} | {message}",retention=timedelta(days=RETENTION_DAYS),filter=lambda record: record["extra"].get("event_id") == event_id,enqueue=True)return sink_idclass EventLoggerContext:"""事件日志上下文管理器"""def __init__(self, event_id: str):self.event_id = event_idself.sink_id = Nonedef __enter__(self):self.sink_id = add_event_logger(self.event_id)token = current_event_id.set(self.event_id)logger.configure(extra={"event_id": self.event_id})return selfdef __exit__(self, exc_type, exc_val, exc_tb):logger.remove(self.sink_id)current_event_id.set(None)logger.configure(extra={"event_id": None})
  • git.py中的代码
# git.py
from loguru import logger
from log import current_event_idasync def git_operation(repo_url: str):event_id = current_event_id.get()logger.bind(event_id=event_id).info(f"Cloning repository: {repo_url}")# ... git操作逻辑logger.bind(event_id=event_id).success("Git operation completed")

chang.py中的代码

# change.py
from loguru import logger
from log import current_event_idasync def process_changes(changes: list):event_id = current_event_id.get()logger.bind(event_id=event_id).debug(f"Processing {len(changes)} changes")# ... change处理逻辑logger.bind(event_id=event_id).warning("Found potential conflicts")

验证结果

在这里插入图片描述

http://www.dtcms.com/wzjs/562097.html

相关文章:

  • 阿里巴巴网站优化怎么做wordpress主题响应式
  • 帝国网站制作广告wordpress 邮件通知
  • 邀约网站怎么做请柬详情页设计思路怎么写
  • 做网站记者好吗模板免费下载官网
  • 晋江做网站的公司哪家好品牌查询网站
  • 网站ui案例自助建站信息网
  • 花卉网站建设规划书温州网站建设专业的公司
  • 靖江网站建设制作附近旧模板出售市场
  • 泰州网站快速排名优化太仓网站开发建设服务
  • 购物网站建设包括哪些上海比较好的seo公司
  • 免费的网站建设一般多少钱徐州市小程序制作
  • 网站推广公司就去柚米上海 网站建设 500强
  • 网站建设考察试卷唐山做网站建设公司
  • 男女做污污的网站设计公司室内设计
  • 织梦网站后台如何做百度优化电子商务网站的建设论文
  • 网页设计的三大基本技术搜索网站建设推广优化
  • 长沙便宜做网站深圳网站优化怎么做
  • 设计师常备设计网站大全郑州软件开发培训
  • 金华网站如何制作飞机选做网站
  • 都匀网站简速做网站工作室
  • 专门查企业信息的网站微信里的小程序怎么开店
  • 做百度网站优化多少钱网站建设公司的组织架构
  • 网站备案信息变更如何在本地搭建wordpress
  • 手机网站js电话悬浮如何开通免费网站
  • 音乐资源网站开发网站我优化
  • 国外游戏网站欣赏百度权重站长工具
  • 如何制作公司网站品牌推广网站策划设计
  • 做网站必须租服务器吗网站开发地图
  • 素材天下网站安徽城乡建设 厅网站
  • 微网站站点名称muse转换wordpress