python 自定义装饰器 + 框架
python django查询mongo,如果使用django自带的djongo或者使用mongoengine,尤其是使用mongoengine,必须有一个_cls属性,因我现在的项目有很多mongo的数据,查询不出来,如果需要使用的话,有一大堆的查询设置,DeepSeek了很多,看起来挺复杂。最终放弃,使用更灵活的pymongo,毕竟也经常用。查询结果部分字段需要格式化,自己实现了一个装饰器
url.py
# myapp/urls.py
from django.urls import path
from report.views import SmwtReportRetrieveView,CpetReportRetrieveView,ReportViewapp_name = 'report'urlpatterns = [path('<str:collection>/<str:id>/', ReportView.as_view(), name='api-report'),
]
view.py
# myapp/view.py
from rest_framework.parsers import JSONParser
from report.base.BaseReportView import MongoDBCRUDMixin
from rest_framework.views import APIViewclass ReportView(APIView,MongoDBCRUDMixin):parser_classes = [JSONParser]
查询抽象实现
from dbservice.mongodb_service import mongodb_service
from django.http import JsonResponse
from rest_framework import status
from report.processor import ProcessorFactoryclass MongoDBCRUDMixin:"""MongoDB CRUD 操作 Mixin"""def get_collection_name(self, kwargs):"""从 URL 参数中获取集合名称"""collection = kwargs.get('collection')if not collection:# 如果没有提供 collection 参数,可以使用默认值或返回错误return getattr(self, 'collection_name', None)return collectiondef get(self, request, *args, **kwargs):"""处理 GET 单条记录请求"""collection = self.get_collection_name(kwargs)id = kwargs.get('id')if not collection:return JsonResponse({"error": "Collection name is required"},status=status.HTTP_400_BAD_REQUEST)if not id:return JsonResponse({"error": "Document ID is required"},status=status.HTTP_400_BAD_REQUEST)try:document = mongodb_service.find_document(collection_name=collection,query={"_id": id})if not document:return JsonResponse({"error": "Document not found"},status=status.HTTP_404_NOT_FOUND)else:result = ProcessorFactory.get_processor(collection).process(document)return JsonResponse(result)return JsonResponse(document)except Exception as e:return JsonResponse({"error": f"Failed to retrieve document: {str(e)}"},status=status.HTTP_500_INTERNAL_SERVER_ERROR)def list(self, request, *args, **kwargs):"""处理 GET 列表请求(可选)"""collection = self.get_collection_name(kwargs)if not collection:return JsonResponse({"error": "Collection name is required"},status=status.HTTP_400_BAD_REQUEST)try:# 获取查询参数query = request.GET.dict()documents = mongodb_service.find_documents(collection_name=collection,query=query)return JsonResponse(list(documents), safe=False)except Exception as e:return JsonResponse({"error": f"Failed to retrieve documents: {str(e)}"},status=status.HTTP_500_INTERNAL_SERVER_ERROR)def post(self, request, *args, **kwargs):"""处理 POST 创建请求"""collection = self.get_collection_name(kwargs)if not collection:return JsonResponse({"error": "Collection name is required"},status=status.HTTP_400_BAD_REQUEST)# 解析请求体中的 JSON 数据try:document_data = request.dataexcept Exception as e:return JsonResponse({"error": "Invalid JSON data"},status=status.HTTP_400_BAD_REQUEST)try:result = mongodb_service.insert_document(collection_name=collection,document=document_data)return JsonResponse({"message": "Document inserted successfully","id": result,"collection": collection},status=status.HTTP_201_CREATED)except Exception as e:return JsonResponse({"error": f"Failed to insert document: {str(e)}"},status=status.HTTP_500_INTERNAL_SERVER_ERROR)def update(self, request, *args, **kwargs):"""处理 PUT 更新请求(可选)"""collection = self.get_collection_name(kwargs)id = kwargs.get('id')if not collection or not id:return JsonResponse({"error": "Collection name and document ID are required"},status=status.HTTP_400_BAD_REQUEST)try:update_data = request.dataexcept Exception as e:return JsonResponse({"error": "Invalid JSON data"},status=status.HTTP_400_BAD_REQUEST)try:result = mongodb_service.update_document(collection_name=collection,query={"_id": id},update_data=update_data)return JsonResponse({"message": "Document updated successfully","modified_count": result.modified_count,"collection": collection,"id": id},status=status.HTTP_200_OK)except Exception as e:return JsonResponse({"error": f"Failed to update document: {str(e)}"},status=status.HTTP_500_INTERNAL_SERVER_ERROR)def delete(self, request, *args, **kwargs):"""处理 DELETE 删除请求(可选)"""collection = self.get_collection_name(kwargs)id = kwargs.get('id')if not collection or not id:return JsonResponse({"error": "Collection name and document ID are required"},status=status.HTTP_400_BAD_REQUEST)try:result = mongodb_service.delete_document(collection_name=collection,query={"_id": id})return JsonResponse({"message": "Document deleted successfully","deleted_count": result,"collection": collection,"id": id},status=status.HTTP_200_OK)except Exception as e:return JsonResponse({"error": f"Failed to delete document: {str(e)}"},status=status.HTTP_500_INTERNAL_SERVER_ERROR)
formatters
from abc import ABC, abstractmethodclass BaseFormatter(ABC):@abstractmethoddef format(self, value, **kwargs):pass
from .base_formatter import BaseFormatter
from datetime import datetime
import reclass DateFormatter(BaseFormatter):def format(self, value, format_str="%Y-%m-%d %H:%M:%S"):if isinstance(value, datetime):return value.strftime(format_str)return valueclass NumberFormatter(BaseFormatter):def format(self, value, precision=2):try:return round(float(value), precision)except (ValueError, TypeError):return valueclass BooleanFormatter(BaseFormatter):def format(self, value, true_str="是", false_str="否"):if isinstance(value, bool):return true_str if value else false_strreturn valueclass StringFormatter(BaseFormatter):def format(self, value, max_length=None, strip=True):if not isinstance(value, str):value = str(value)if strip:value = value.strip()if max_length and len(value) > max_length:return value[:max_length] + "..."return value
class TimeFormatter(BaseFormatter):def format(self, value, **kwargs):"""将秒数转换为 HH:MM:SS 格式"""value = value // 1000h = value // 3600m = (value % 3600) // 60s = value % 60return f"{h:02d}:{m:02d}:{s:02d}"# 注册常用格式化器
COMMON_FORMATTERS = {'date': DateFormatter(),'number': NumberFormatter(),'boolean': BooleanFormatter(),'string': StringFormatter(),'time':TimeFormatter()
}
from .common_formatters import COMMON_FORMATTERSclass FormatterFactory:_formatters = {}@classmethoddef register(cls, name, formatter):cls._formatters[name] = formatter@classmethoddef get_formatter(cls, name):return cls._formatters.get(name)@classmethoddef format_value(cls, value, formatter_name, **kwargs):formatter = cls.get_formatter(formatter_name)if formatter:return formatter.format(value, **kwargs)return value# 注册默认格式化器
for name, formatter in COMMON_FORMATTERS.items():FormatterFactory.register(name, formatter)
processor
from .base import BaseProcessor
import importlib
import pkgutil
import inspectclass ProcessorFactory:"""处理器工厂,自动发现并管理所有处理器"""_processors = {}@classmethoddef discover_processors(cls, package_name='report.processor'):"""自动发现所有处理器"""package = importlib.import_module(package_name)for _, module_name, _ in pkgutil.iter_modules(package.__path__):try:module = importlib.import_module(f'{package_name}.{module_name}')for name, obj in inspect.getmembers(module):if (inspect.isclass(obj) andissubclass(obj, BaseProcessor) andobj != BaseProcessor andname.endswith('_processor')):# 提取处理器类型processor_type = name[:-10].lower()cls._processors[processor_type] = objprint(f"Discovered processor: {processor_type} -> {name}")except ImportError as e:print(f"Warning: Could not import module {module_name}: {e}")@classmethoddef get_processor(cls, processor_type)->BaseProcessor:"""获取处理器实例"""processor_class = cls._processors.get(processor_type.lower())if not processor_class:raise ValueError(f"Processor '{processor_type}' not found. Available: {list(cls._processors.keys())}")return processor_class()@classmethoddef list_processors(cls):"""列出所有可用处理器"""return list(cls._processors.keys())# 自动发现处理器
ProcessorFactory.discover_processors()
# processors/base.py
from abc import ABC, abstractmethod
from .formatters import FormatterFactory
from functools import wrapsclass BaseProcessor(ABC):def __init__(self):# 存储字段格式化规则self._formatting_rules = {}# 自动收集被装饰的方法self._collect_formatting_rules()def _collect_formatting_rules(self):"""收集所有被@format_field装饰的方法"""for attr_name in dir(self):attr = getattr(self, attr_name)if hasattr(attr, '_format_rule'):self._formatting_rules[attr_name] = attr._format_ruledef format_field(self, value, formatter_name, **kwargs):"""使用指定的格式化器格式化字段"""return FormatterFactory.format_value(value, formatter_name, **kwargs)def process(self, data):result = data.copy()for field_name, rules in self._formatting_rules.items():if field_name in result:formatter_name = rules['formatter']formatter_kwargs = {k: v for k, v in rules.items() if k != 'formatter'}result[field_name] = self.format_field(result[field_name], formatter_name, **formatter_kwargs)return resultdef format_field(formatter_name, **kwargs):"""字段格式化装饰器"""def decorator(method):@wraps(method)def wrapper(self, *args, **kwargs):return method(self, *args, **kwargs)# 存储格式化规则信息wrapper._format_rule = {'formatter': formatter_name, **kwargs}return wrapperreturn decorator
# processors/smwt_processor.py
from .base import BaseProcessor,format_fieldclass Smwt_processor(BaseProcessor):@format_field('time')def actualDuration(self,data):return data.get("actualDuration")@format_field('time')def test_duration(self, data):return data.get("test_duration")
结语
最终我需要根据我的业务,创建一个XXX_processor即可,然后格式化我需要的字段,比如这个 @format_field(‘time’)的注解会将actualDuration格式化为00:06:00这样的格式,调用的是TimeFormatter。这方法仅仅用于我学习,实际使用有现成的轮子直接使用。比如pydantic 。继续学习