当前位置: 首页 > wzjs >正文

建设网站注意什么百度浏览器入口

建设网站注意什么,百度浏览器入口,阿里云ecs云服务器,上海网站建设哪家好在 Python 开发过程中,日志(Logging)是不可或缺的调试和监控工具。合理的日志管理不仅能帮助开发者快速定位问题,还能提供丰富的数据支持,让应用更具可观测性。本文将带你全面了解 Python logging 模块,涵盖…

在这里插入图片描述
在 Python 开发过程中,日志(Logging)是不可或缺的调试和监控工具。合理的日志管理不仅能帮助开发者快速定位问题,还能提供丰富的数据支持,让应用更具可观测性。本文将带你全面了解 Python logging 模块,涵盖日志输出到控制台、文件、Elasticsearch(ES),以及日志格式的介绍。

一、日志输出到控制台

最简单的日志管理方式就是将日志信息输出到控制台(标准输出)。这种方式在调试时非常方便,能够实时查看程序运行状态。默认情况下,Python logging 模块会以 WARNING 级别及以上的日志输出到控制台。

  def add_stream_handle(self):if LOG_TO_CONSOLE and LOG_ENABLED:handler = logging.StreamHandler(sys.stdout)handler.setLevel(level=CONSOLE_LEVEL)handler.setFormatter(logging.Formatter(LOG_FORMAT))self.logger.addHandler(handler)

二、日志输出到文件

在实际开发中,日志不仅仅是为了调试,更重要的是长期存储以便后续分析。因此,我们通常需要将日志写入文件,以便在程序运行后仍能追踪问题。通过 FileHandler,我们可以轻松地将日志写入指定的文件,并支持日志轮转(如按日期或文件大小分割)。

    def add_file_handle(self):if LOG_ENABLED and LOG_TO_FILE:logs_dir = os.path.join(LOG_DIR, "logs")os.makedirs(logs_dir, exist_ok=True)file = os.path.join(logs_dir, f'{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')handler = logging.FileHandler(file, encoding="utf-8")handler.setLevel(level=FILE_LEVEL)handler.setFormatter(logging.Formatter(LOG_FORMAT))self.logger.addHandler(handler)

三、日志输出到 Elasticsearch(ES)

对于需要集中式日志管理的场景,可以将日志直接存储到 Elasticsearch 中,配合 Kibana 进行可视化分析。通过 logging 模块的自定义 Handler 或者 elastic-apm 等第三方库,我们可以将 Python 产生的日志数据直接推送到 ES,并结合全文检索和仪表盘进行日志分析。

class JsonSerializer:"""JSON serializer relying on the standard library json module."""mimetype: ClassVar[str] = "application/json"def default(self, data: Any) -> Any:if isinstance(data, date):return data.isoformat()elif isinstance(data, uuid.UUID):return str(data)elif isinstance(data, Decimal):return float(data)raise Exception(f"Unable to serialize to JSON: {data!r} (type: {type(data).__name__})", )def json_dumps(self, data: dict) -> str:return json.dumps(data, default=self.default, ensure_ascii=False, separators=(",", ":"))def json_loads(self, data: str) -> dict:return json.loads(data)def loads(self, data: str) -> dict:return self.json_loads(data)def dumps(self, data: dict) -> str:# The body is already encoded to bytes# so we forward the request body along.if isinstance(data, str):return datareturn self.json_dumps(data)def _attempt_serialize_numpy(data: Any) -> Tuple[bool, Any]:try:import numpy as npif isinstance(data,(np.int_,np.intc,np.int8,np.int16,np.int32,np.int64,np.uint8,np.uint16,np.uint32,np.uint64,),):return True, int(data)elif isinstance(data,(np.float16,np.float32,np.float64,),):return True, float(data)elif isinstance(data, np.bool_):return True, bool(data)elif isinstance(data, np.datetime64):return True, data.item().isoformat()elif isinstance(data, np.ndarray):return True, data.tolist()except ImportError:# Since we failed to import 'numpy' we don't want to try again.return False, Nonereturn False, Nonedef _attempt_serialize_pandas(data: Any) -> Tuple[bool, Any]:try:import pandas as pdif isinstance(data, (pd.Series, pd.Categorical)):return True, data.tolist()elif isinstance(data, pd.Timestamp) and data is not getattr(pd, "NaT", None):return True, data.isoformat()elif data is getattr(pd, "NA", None):return True, Noneexcept ImportError:# Since we failed to import 'pandas' we don't want to try again.return False, Nonereturn False, Nonedef _attempt_serialize_numpy_or_pandas(data: Any) -> Tuple[bool, Any]:serialized, value = _attempt_serialize_numpy(data)if serialized:return serialized, valueserialized, value = _attempt_serialize_pandas(data)if serialized:return serialized, valuereturn False, NoneTIME_TYPES = (date, datetime)
FLOAT_TYPES = (Decimal,)
INTEGER_TYPES = ()class EsJsonSerializer(JsonSerializer):mimetype: ClassVar[str] = "application/json"def default(self, data: Any) -> Any:if isinstance(data, TIME_TYPES):# Little hack to avoid importing pandas but to not# return 'NaT' string for pd.NaT as that's not a valid# Elasticsearch date.formatted_data = data.isoformat()if formatted_data != "NaT":return formatted_dataif isinstance(data, uuid.UUID):return str(data)elif isinstance(data, FLOAT_TYPES):return float(data)# This is kept for backwards compatibility even# if 'INTEGER_TYPES' isn't used by default anymore.elif INTEGER_TYPES and isinstance(data, INTEGER_TYPES):return int(data)# Special cases for numpy and pandas types# These are expensive to import so we try them last.serialized, value = _attempt_serialize_numpy_or_pandas(data)if serialized:return valueraise TypeError(f"Unable to serialize {data!r} (type: {type(data)})")class EsHandler(logging.Handler):def __init__(self, url, index, batch_size=100, flush_interval=6, extra_fields=None):super().__init__()self.default_raise = Falseself.index = indexself.batch_size = batch_sizeself.flush_interval = flush_intervalself.extra_fields = extra_fields if isinstance(extra_fields, dict) else {}self._buffer = []self.url = urlself._timer = Noneself.serializer = EsJsonSerializer()@staticmethoddef __get_es_datetime_str(timestamp):current_date = datetime.utcfromtimestamp(timestamp)return "{0!s}.{1:03d}Z".format(current_date.strftime('%Y-%m-%dT%H:%M:%S'), int(current_date.microsecond / 1000))def emit(self, record: logging.LogRecord) -> None:self.format(record)rec = self.extra_fields.copy()for key, value in record.__dict__.items():rec[key] = "" if value is None else valuerec["timestamp"] = self.__get_es_datetime_str(record.created)self._buffer.append(rec)# 如果缓存已满,直接刷新if len(self._buffer) >= self.batch_size:self.flush()else:self.schedule_flush()def schedule_flush(self):if self._timer is None:self._timer = Timer(self.flush_interval, self.flush)self._timer.daemon = Trueself._timer.start()def flush(self):# 如果缓存中有日志,则批量写入if self._timer is not None and self._timer.is_alive():self._timer.cancel()self._timer = Noneif self._buffer:try:logs_buffer = self._bufferself._buffer = []self.bulk_data(logs_buffer)except Exception as exception:if self.default_raise:raise exceptiondef bulk_data(self, logs_buffer):print("批量写入 Elasticsearch")request_body = ""for log_record in logs_buffer:actions = {"index": {'_index': self.index, "_id": str(uuid4())}}data = json.dumps(actions)request_body += datarequest_body += "\n"request_body += EsJsonSerializer().json_dumps(log_record)request_body += "\n"request_body += "\n"headers = {'content-type': 'application/json'}requests.post(self.url, data=request_body, headers=headers)def close(self):if self._timer is not None:self.flush()self._timer = None

四、日志格式介绍

字段作用
%(levelname)s日志级别(如 DEBUG、INFO、WARNING、ERROR、CRITICAL),用于标识日志的严重程度。
%(asctime)s日志的时间戳,默认格式为 YYYY-MM-DD HH:MM:SS,mmm(可自定义 datefmt)。
%(process)d进程 ID,表示当前日志属于哪个进程(适用于多进程应用)。
%(filename)s记录日志的 Python 文件名(不包含路径)。
%(name)s记录日志的 Logger 名称,通常是 logging.getLogger(name) 设定的名称。
%(lineno)d代码中的行号,表示日志记录发生的确切位置。
%(module)s模块名称,即 filename 去掉 .py 的部分。
%(message)s实际的日志信息,即 logger.info(“内容”) 传入的内容。
%(levelno)s日志级别的数值(如 DEBUG=10, INFO=20, WARNING=30, ERROR=40, CRITICAL=50)
%(pathname)s记录日志的 Python 文件完整路径。
%(funcName)s记录日志所在的函数名。
%(thread)d线程 ID(适用于多线程应用)。
%(threadName)s线程名称。
%(processName)s记录日志时距 logging 模块加载后的毫秒数。
%(msecs)d记录日志的时间戳的毫秒部分。
# 示例
LOG_FORMAT = '%(levelname)s - %(asctime)s - [PID: %(process)d] [Thread: %(threadName)s] - %(pathname)s - %(funcName)s - Line: %(lineno)d - %(message)s'

五、完整的代码

import logging
import sys
import os
import json
import socket
import time
import uuid
from datetime import date, datetime
from decimal import Decimal
from typing import Any, ClassVar, Mapping, Optional, Tuplefrom threading import Timer
from multiprocessing import Lock
import requests
from uuid import uuid4LOG_ENABLED = True  # 是否开启日志
LOG_TO_CONSOLE = True  # 是否输出到控制台
CONSOLE_LEVEL = "INFO"  # 控制台的日志等级
LOG_TO_FILE = True  # 是否输出到文件
FILE_LEVEL = "INFO"  # 文件的日志等级
LOG_TO_ES = True  # 是否输出到 Elasticsearch
ES_LEVEL = "INFO"  # 输出到ES的日志等级LOG_LEVEL = 'DEBUG'  # 日志级别LOG_FORMAT = '%(levelname)s - %(asctime)s - process: %(process)d - %(filename)s - %(name)s - %(lineno)d - %(module)s - %(message)s'  # 每条日志输出格式ELASTIC_SEARCH_HOST = 'http://192.168.3.200:9200'  # Elasticsearch Host
ELASTIC_SEARCH_INDEX = 'python'  # Elasticsearch Index Name
APP_NAME = "demo-fastapi"
APP_ENVIRONMENT = 'dev'  # 运行环境,如测试环境还是生产环境
LOG_DIR = os.getcwd()class JsonSerializer:"""JSON serializer relying on the standard library json module."""mimetype: ClassVar[str] = "application/json"def default(self, data: Any) -> Any:if isinstance(data, date):return data.isoformat()elif isinstance(data, uuid.UUID):return str(data)elif isinstance(data, Decimal):return float(data)raise Exception(f"Unable to serialize to JSON: {data!r} (type: {type(data).__name__})", )def json_dumps(self, data: dict) -> str:return json.dumps(data, default=self.default, ensure_ascii=False, separators=(",", ":"))def json_loads(self, data: str) -> dict:return json.loads(data)def loads(self, data: str) -> dict:return self.json_loads(data)def dumps(self, data: dict) -> str:# The body is already encoded to bytes# so we forward the request body along.if isinstance(data, str):return datareturn self.json_dumps(data)def _attempt_serialize_numpy(data: Any) -> Tuple[bool, Any]:try:import numpy as npif isinstance(data,(np.int_,np.intc,np.int8,np.int16,np.int32,np.int64,np.uint8,np.uint16,np.uint32,np.uint64,),):return True, int(data)elif isinstance(data,(np.float16,np.float32,np.float64,),):return True, float(data)elif isinstance(data, np.bool_):return True, bool(data)elif isinstance(data, np.datetime64):return True, data.item().isoformat()elif isinstance(data, np.ndarray):return True, data.tolist()except ImportError:# Since we failed to import 'numpy' we don't want to try again.return False, Nonereturn False, Nonedef _attempt_serialize_pandas(data: Any) -> Tuple[bool, Any]:try:import pandas as pdif isinstance(data, (pd.Series, pd.Categorical)):return True, data.tolist()elif isinstance(data, pd.Timestamp) and data is not getattr(pd, "NaT", None):return True, data.isoformat()elif data is getattr(pd, "NA", None):return True, Noneexcept ImportError:# Since we failed to import 'pandas' we don't want to try again.return False, Nonereturn False, Nonedef _attempt_serialize_numpy_or_pandas(data: Any) -> Tuple[bool, Any]:serialized, value = _attempt_serialize_numpy(data)if serialized:return serialized, valueserialized, value = _attempt_serialize_pandas(data)if serialized:return serialized, valuereturn False, NoneTIME_TYPES = (date, datetime)
FLOAT_TYPES = (Decimal,)
INTEGER_TYPES = ()class EsJsonSerializer(JsonSerializer):mimetype: ClassVar[str] = "application/json"def default(self, data: Any) -> Any:if isinstance(data, TIME_TYPES):# Little hack to avoid importing pandas but to not# return 'NaT' string for pd.NaT as that's not a valid# Elasticsearch date.formatted_data = data.isoformat()if formatted_data != "NaT":return formatted_dataif isinstance(data, uuid.UUID):return str(data)elif isinstance(data, FLOAT_TYPES):return float(data)# This is kept for backwards compatibility even# if 'INTEGER_TYPES' isn't used by default anymore.elif INTEGER_TYPES and isinstance(data, INTEGER_TYPES):return int(data)# Special cases for numpy and pandas types# These are expensive to import so we try them last.serialized, value = _attempt_serialize_numpy_or_pandas(data)if serialized:return valueraise TypeError(f"Unable to serialize {data!r} (type: {type(data)})")class EsHandler(logging.Handler):def __init__(self, url, index, batch_size=100, flush_interval=6, extra_fields=None):super().__init__()self.default_raise = Falseself.index = indexself.batch_size = batch_sizeself.flush_interval = flush_intervalself.extra_fields = extra_fields if isinstance(extra_fields, dict) else {}self._buffer = []self.url = urlself._timer = Noneself.serializer = EsJsonSerializer()@staticmethoddef __get_es_datetime_str(timestamp):current_date = datetime.utcfromtimestamp(timestamp)return "{0!s}.{1:03d}Z".format(current_date.strftime('%Y-%m-%dT%H:%M:%S'), int(current_date.microsecond / 1000))def emit(self, record: logging.LogRecord) -> None:self.format(record)rec = self.extra_fields.copy()for key, value in record.__dict__.items():rec[key] = "" if value is None else valuerec["timestamp"] = self.__get_es_datetime_str(record.created)self._buffer.append(rec)# 如果缓存已满,直接刷新if len(self._buffer) >= self.batch_size:self.flush()else:self.schedule_flush()def schedule_flush(self):if self._timer is None:self._timer = Timer(self.flush_interval, self.flush)self._timer.daemon = Trueself._timer.start()def flush(self):# 如果缓存中有日志,则批量写入if self._timer is not None and self._timer.is_alive():self._timer.cancel()self._timer = Noneif self._buffer:try:logs_buffer = self._bufferself._buffer = []self.bulk_data(logs_buffer)except Exception as exception:if self.default_raise:raise exceptiondef bulk_data(self, logs_buffer):print("批量写入 Elasticsearch")request_body = ""for log_record in logs_buffer:actions = {"index": {'_index': self.index, "_id": str(uuid4())}}data = json.dumps(actions)request_body += datarequest_body += "\n"request_body += EsJsonSerializer().json_dumps(log_record)request_body += "\n"request_body += "\n"headers = {'content-type': 'application/json'}requests.post(self.url, data=request_body, headers=headers)def close(self):if self._timer is not None:self.flush()self._timer = Noneclass Logger:_instance = None_lock = Lock()def __init__(self, ):self.logger = logging.getLogger("__file__")self.logger.setLevel(LOG_LEVEL)self.add_stream_handle()self.add_file_handle()self.add_es_handle()def add_stream_handle(self):if LOG_TO_CONSOLE and LOG_ENABLED:handler = logging.StreamHandler(sys.stdout)handler.setLevel(level=CONSOLE_LEVEL)handler.setFormatter(logging.Formatter(LOG_FORMAT))self.logger.addHandler(handler)def add_file_handle(self):if LOG_ENABLED and LOG_TO_FILE:logs_dir = os.path.join(LOG_DIR, "logs")os.makedirs(logs_dir, exist_ok=True)file = os.path.join(logs_dir, f'{datetime.now().strftime("%Y%m%d_%H%M%S")}.log')handler = logging.FileHandler(file, encoding="utf-8")handler.setLevel(level=FILE_LEVEL)handler.setFormatter(logging.Formatter(LOG_FORMAT))self.logger.addHandler(handler)def add_es_handle(self):if LOG_ENABLED and LOG_TO_ES:# 添加 CMRESHandlerurl = f"{ELASTIC_SEARCH_HOST}/_bulk"extra_fields = {"app_name": APP_NAME, "app_env": APP_ENVIRONMENT, 'host': socket.gethostname(),'host_ip': socket.gethostbyname(socket.gethostname())}es_handler = EsHandler(url, index=ELASTIC_SEARCH_INDEX, extra_fields=extra_fields)es_handler.setLevel(level=LOG_LEVEL)formatter = logging.Formatter(LOG_FORMAT)es_handler.setFormatter(formatter)self.logger.addHandler(es_handler)@classmethoddef get_logger(cls):cls._lock.acquire()if cls._instance is None:cls._instance = Logger().loggercls._lock.release()return cls._instanceif __name__ == '__main__':logger = Logger.get_logger()logger.info("INFO")logger.warning("WARNING")logger.error("ERROR")time.sleep(100)

结语

掌握 Python 的 logging 模块,能让你的日志管理更加高效,无论是简单的控制台调试,还是日志文件持久化存储,甚至是对接 ELK 进行集中管理,都是开发者必备的技能。

http://www.dtcms.com/wzjs/469584.html

相关文章:

  • 沈阳网站建设建设公司排名他达拉非片的作用及功效副作用
  • 美国一级a做爰片免网站 视频教程网店推广方法有哪些
  • 微山县建设.局网站百度推销广告一年多少钱
  • 登录河北建设厅网站进入不了网店代运营的套路
  • 专业的移动网站建设公sem优化技巧
  • 企业官方网站建设如何鸿科经纬教网店运营推广
  • 做网站手把手淘宝推广工具
  • 东莞工程建设交易中心网seo免费推广
  • ae做的动效怎么放在网站上抖音关键词优化
  • 做网站需要哪几个板块广州网站关键词排名
  • 网站安全注意哪些问题吗百度认证服务平台
  • 展览展会公司关于seo如何优化
  • 数据网站建设工具模板国际新闻最新消息2022
  • 红酒公司网站建设镇江百度推广
  • 青海省网站建设平台新品怎么刷关键词
  • ?]后台的网站可以备案吗如何建网站要什么条件
  • 1688黄页网生产企业滁州网站seo
  • 网站怎么做自营销seo检查工具
  • 宝安网站建设 名匠广州番禺最新发布
  • 公司网站备案怎么办理视频广告联盟平台
  • 广西城乡和建设厅网站首页搜索引擎营销总结
  • wordpress 门户主题北京网站建设东轩seo
  • 医疗器械行业pc网站优化排名软件
  • 只做传统嫁衣网站seo专员是什么职位
  • 时彩网站开发营销策略有哪几种
  • 赛门博网站建设中关村标准化协会
  • 宁波专业网站建设模板服务深圳做seo有哪些公司
  • 官方网站查询叉车证搜索引擎关键词seo优化公司
  • 全国信用企业信息系统武汉seo 网络推广
  • 保定网站制作网页百度地图下载2022新版安装