当前位置: 首页 > news >正文

AI开发实战:从数据准备到模型部署的完整经验分享

AI开发实战:从数据准备到模型部署的完整经验分享

引言:AI开发的现代范式转变

人工智能开发正经历着前所未有的变革,从传统的特征工程和模型设计转向以数据为中心、端到端的深度学习范式。作为一名从业者,我在多年的AI开发实践中积累了大量经验教训,本文将系统性地分享从数据准备到模型部署的全流程实战经验,帮助开发者避开常见陷阱,提升开发效率。

随着Transformer架构的出现,AI模型的能力边界被大幅扩展,但同时也带来了新的挑战:模型复杂度增加、计算资源需求增长、部署难度加大。本文将深入探讨这些挑战的解决方案,并提供可立即应用的代码示例和实践建议。

一、环境配置与工具链建设

1.1 可复现的开发环境

建立可复现的开发环境是AI项目成功的基石。使用Docker容器化技术可以确保环境一致性,避免"在我机器上能运行"的典型问题。

# 基于官方PyTorch镜像
FROM pytorch/pytorch:2.0.1-cuda11.7-cudnn8-runtime# 设置工作目录
WORKDIR /app# 复制依赖文件
COPY requirements.txt .# 安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt && \pip install torchmetrics[image] && \pip install transformers[torch]# 安装系统依赖
RUN apt-get update && apt-get install -y \libglib2.0-0 \libsm6 \libxext6 \libxrender-dev \&& rm -rf /var/lib/apt/lists/*# 暴露端口(用于API服务)
EXPOSE 8000# 设置默认命令
CMD ["python", "app.py"]

环境配置文件中明确指定了所有依赖的版本,这是确保实验可复现的关键。使用官方基础镜像可以减少潜在冲突,同时安装必要的系统依赖库以支持图像处理等操作。

1.2 实验跟踪与管理

有效的实验跟踪能大幅提升开发效率。MLflow提供了轻量级的实验管理解决方案:

import mlflow
import mlflow.pytorch
from datetime import datetimeclass ExperimentTracker:def __init__(self, experiment_name):self.experiment_name = experiment_namemlflow.set_experiment(experiment_name)def start_run(self, run_name=None):if run_name is None:run_name = f"run_{datetime.now().strftime('%Y%m%d_%H%M%S')}"self.run = mlflow.start_run(run_name=run_name)return self.rundef log_params(self, params):mlflow.log_params(params)def log_metrics(self, metrics, step=None):mlflow.log_metrics(metrics, step=step)def log_model(self, model, artifact_path):mlflow.pytorch.log_model(model, artifact_path)def end_run(self):mlflow.end_run()# 使用示例
tracker = ExperimentTracker("image_classification_project")# 记录超参数
params = {"learning_rate": 0.001,"batch_size": 32,"optimizer": "AdamW","model_architecture": "resnet50"
}
tracker.log_params(params)# 在训练循环中记录指标
for epoch in range(num_epochs):train_metrics = train_epoch(model, train_loader)val_metrics = validate(model, val_loader)tracker.log_metrics({"train_loss": train_metrics["loss"]}, step=epoch)tracker.log_metrics({"val_accuracy": val_metrics["accuracy"]}, step=epoch)

通过系统化的实验跟踪,开发者可以轻松比较不同超参数配置下的模型性能,快速识别最佳配置。MLflow自动记录每次运行的代码版本、参数和结果,形成完整的实验历史。

二、数据处理与增强策略

2.1 高效数据管道构建

PyTorch的Dataset和DataLoader类为构建高效数据管道提供了强大基础:

import torch
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import numpy as np
import albumentations as A
from albumentations.pytorch import ToTensorV2
import osclass CustomImageDataset(Dataset):def __init__(self, image_dir, label_file, transform=None, is_train=True):self.image_dir = image_dirself.transform = transformself.is_train = is_train# 加载标签数据self.labels = {}with open(label_file, 'r') as f:for line in f:image_name, label = line.strip().split(',')self.labels[image_name] = int(label)self.image_names = list(self.labels.keys())def __len__(self):return len(self.image_names)def __getitem__(self, idx):img_name = self.image_names[idx]img_path = os.path.join(self.image_dir, img_name)# 加载图像image = Image.open(img_path).convert('RGB')image = np.array(image)label = self.labels[img_name]# 应用数据增强if self.transform:augmented = self.transform(image=image)image = augmented['image']return image, label# 定义训练和验证的数据增强策略
def get_train_transforms(image_size=224):return A.Compose([A.Resize(image_size, image_size),A.HorizontalFlip(p=0.5),A.VerticalFlip(p=0.2),A.RandomRotate90(p=0.3),A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.1, rotate_limit=15, p=0.5),A.HueSaturationValue(hue_shift_limit=20, sat_shift_limit=30, val_shift_limit=20, p=0.5),A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.5),A.CoarseDropout(max_holes=8, max_height=16, max_width=16, fill_value=0, p=0.3),A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),ToTensorV2()])def get_val_transforms(image_size=224):return A.Compose([A.Resize(image_size, image_size),A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),ToTensorV2()])# 创建数据加载器
train_dataset = CustomImageDataset(image_dir="data/train",label_file="data/train_labels.csv",transform=get_train_transforms(),is_train=True
)val_dataset = CustomImageDataset(image_dir="data/val",label_file="data/val_labels.csv",transform=get_val_transforms(),is_train=False
)train_loader = DataLoader(train_dataset,batch_size=32,shuffle=True,num_workers=4,pin_memory=True
)val_loader = DataLoader(val_dataset,batch_size=32,shuffle=False,num_workers=4,pin_memory=True
)

数据管道实现了高效并行加载和实时增强,使用Albumentations库提供丰富的数据增强技术。pin_memory参数加速了CPU到GPU的数据传输,num_workers允许并行数据加载,显著提升训练效率。

2.2 智能数据增强策略

先进的数据增强策略能显著提升模型泛化能力,特别是在数据有限的情况下:

import torch
import random
from torchvision import transformsclass SmartAugmentation:def __init__(self, augmentation_pools):self.augmentation_pools = augmentation_poolsdef __call__(self, img):# 随机选择一组增强策略aug_pool = random.choice(self.augmentation_pools)# 应用选中的增强for aug in aug_pool:if random.random() < aug['probability']:img = aug['transform'](img)return img# 定义不同的增强策略组
augmentation_pools = [# 强增强策略[{'transform': transforms.RandomHorizontalFlip(p=1.0), 'probability': 0.8},{'transform': transforms.ColorJitter(brightness=0.4, contrast=0.4, saturation=0.4, hue=0.1), 'probability': 0.7},{'transform': transforms.RandomAffine(degrees=15, translate=(0.1, 0.1), scale=(0.9, 1.1)), 'probability': 0.6}],# 中等增强策略[{'transform': transforms.RandomHorizontalFlip(p=1.0), 'probability': 0.5},{'transform': transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.05), 'probability': 0.5},{'transform': transforms.RandomAffine(degrees=10, translate=(0.05, 0.05), scale=(0.95, 1.05)), 'probability': 0.4}],# 弱增强策略(有时不应用任何增强)[{'transform': transforms.RandomHorizontalFlip(p=1.0), 'probability': 0.3},{'transform': transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1, hue=0.02), 'probability': 0.2}]
]smart_aug = SmartAugmentation(augmentation_pools)# 集成到转换管道中
train_transform = transforms.Compose([transforms.Resize((256, 256)),transforms.Lambda(lambda x: smart_aug(x)),transforms.ToTensor(),transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

智能增强策略通过随机选择不同强度的增强组合,为模型提供更多样化的训练样本。这种方法比固定增强策略更能提高模型鲁棒性,特别是在面对分布外数据时表现更佳。

三、模型构建与优化技巧

3.1 高效模型架构设计

现代AI模型需要在性能和效率之间找到平衡。以下是一个使用PyTorch实现的高效卷积神经网络示例:

import torch
import torch.nn as nn
import torch.nn.functional as Fclass EfficientConvBlock(nn.Module):def __init__(self, in_channels, out_channels, kernel_size=3, stride=1, groups=1, use_se=False, reduction_ratio=16):super(EfficientConvBlock, self).__init__()self.use_se = use_se# 深度可分离卷积提升效率self.depthwise = nn.Conv2d(in_channels, in_channels, kernel_size=kernel_size,stride=stride, padding=kernel_size//2, groups=in_channels)self.pointwise = nn.Conv2d(in_channels, out_channels, kernel_size=1)self.bn1 = nn.BatchNorm2d(in_channels)self.bn2 = nn.BatchNorm2d(out_channels)# 可选:Squeeze-and-Excitation注意力机制if use_se:self.se = SELayer(out_channels, reduction_ratio)def forward(self, x):residual = xx = self.depthwise(x)x = self.bn1(x)x = F.relu6(x)x = self.pointwise(x)x = self.bn2(x)if self.use_se:x = self.se(x)# 残差连接(当维度匹配时)if residual.shape == x.shape:x = x + residualreturn F.relu6(x)class SELayer(nn.Module):def __init__(self, channel, reduction_ratio=16):super(SELayer, self).__init__()self.avg_pool = nn.AdaptiveAvgPool2d(1)self.fc = nn.Sequential(nn.Linear(channel, channel // reduction_ratio),nn.ReLU(inplace=True),nn.Linear(channel // reduction_ratio, channel),nn.Sigmoid())def forward(self, x):b, c, _, _ = x.size()y = self.avg_pool(x).view(b, c)y = self.fc(y).view(b, c, 1, 1)return x * y.expand_as(x)class EfficientNet(nn.Module):def __init__(self, num_classes=1000, width_mult=1.0, depth_mult=1.0):super(EfficientNet, self).__init__()# 根据宽度和深度乘数调整通道数和层数channels = [int(32 * width_mult), int(64 * width_mult), int(128 * width_mult), int(256 * width_mult)]layers = [1, 2, 2, 3]layers = [int(l * depth_mult) for l in layers]self.stem = nn.Sequential(nn.Conv2d(3, channels[0], 3, stride=2, padding=1),nn.BatchNorm2d(channels[0]),nn.ReLU6(inplace=True))self.blocks = nn.ModuleList()in_channel = channels[0]for i, (out_channel, num_layer) in enumerate(zip(channels, layers)):for j in range(num_layer):stride = 2 if (i > 0 and j == 0) else 1use_se = (i > 1)  # 在较深层使用SE注意力block = EfficientConvBlock(in_channel, out_channel, stride=stride, use_se=use_se)self.blocks.append(block)in_channel = out_channelself.avgpool = nn.AdaptiveAvgPool2d(1)self.classifier = nn.Linear(channels[-1], num_classes)def forward(self, x):x = self.stem(x)for block in self.blocks:x = block(x)x = self.avgpool(x)x = x.view(x.size(0), -1)x = self.classifier(x)return x

该模型采用了深度可分离卷积减少计算量,可选用的SE注意力机制能动态调整通道重要性,宽度和深度乘数允许根据资源约束调整模型规模。这种设计在保持较高精度的同时大幅减少参数量和计算需求。

3.2 高级优化技术与正则化

现代优化技术能显著提升模型训练效果:

import torch
import torch.nn as nn
from torch.optim import AdamW
from torch.optim.lr_scheduler import CosineAnnealingLR, OneCycleLRdef create_optimizer(model, learning_rate=0.001, weight_decay=0.05):# 区分权重和偏置的参数组,应用不同的权重衰减decay_params = []no_decay_params = []for name, param in model.named_parameters():if not param.requires_grad:continue# 偏置和归一化层参数通常不应用权重衰减if name.endswith('.bias') or any(norm in name for norm in ['bn', 'ln', 'norm']):no_decay_params.append(param)else:decay_params.append(param)optimizer = AdamW([{'params': decay_params, 'weight_decay': weight_decay},{'params': no_decay_params, 'weight_decay': 0.0}],lr=learning_rate,betas=(0.9, 0.999),eps=1e-8)return optimizerdef create_scheduler(optimizer, num_epochs, steps_per_epoch, scheduler_type='cosine', max_lr=0.01):if scheduler_type == 'cosine':scheduler = CosineAnnealingLR(optimizer, T_max=num_epochs * steps_per_epoch)elif scheduler_type == 'onecycle':scheduler = OneCycleLR(optimizer,max_lr=max_lr,total_steps=num_epochs * steps_per_epoch,pct_start=0.3,div_factor=25,final_div_factor=10000)else:scheduler = Nonereturn scheduler# 高级训练循环集成优化技术
class AdvancedTrainer:def __init__(self, model, device, grad_clip=1.0, accumulation_steps=4):self.model = model.to(device)self.device = deviceself.grad_clip = grad_clipself.accumulation_steps = accumulation_stepsdef train_epoch(self, train_loader, optimizer, scheduler, criterion):self.model.train()total_loss = 0optimizer.zero_grad()for i, (inputs, targets) in enumerate(train_loader):inputs, targets = inputs.to(self.device), targets.to(self.device)outputs = self.model(inputs)loss = criterion(outputs, targets) / self.accumulation_stepsloss.backward()# 梯度累积if (i + 1) % self.accumulation_steps == 0:# 梯度裁剪防止爆炸torch.nn.utils.clip_grad_norm_(self.model.parameters(), self.grad_clip)optimizer.step()optimizer.zero_grad()if scheduler is not None:scheduler.step()total_loss += loss.item() * self.accumulation_stepsreturn total_loss / len(train_loader)

这段代码展示了现代深度学习中关键的优化技术:区分参数类型的权重衰减策略防止过拟合,梯度累积模拟大批量训练,梯度裁剪保障训练稳定性,以及先进的学习率调度策略加速收敛。这些技术组合使用能显著提升训练效果和模型性能。

四、训练策略与性能优化

4.1 混合精度训练与分布式训练

利用现代硬件能力加速训练过程:

import torch
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.cuda.amp import autocast, GradScalerdef setup_distributed():# 初始化分布式环境dist.init_process_group(backend='nccl')torch.cuda.set_device(int(os.environ['LOCAL_RANK']))def cleanup_distributed():dist.destroy_process_group()class DistributedTrainer:def __init__(self, model, local_rank, world_size):self.local_rank = local_rankself.world_size = world_size# 模型移动到GPU并包装为DDPself.model = model.to(local_rank)self.model = DDP(self.model, device_ids=[local_rank])# 混合精度训练self.scaler = GradScaler()def train_step(self, inputs, targets, optimizer, criterion):optimizer.zero_grad()# 混合精度前向传播with autocast():outputs = self.model(inputs)loss = criterion(outputs, targets)# 缩放损失并反向传播self.scaler.scale(loss).backward()# 取消缩放并更新权重self.scaler.unscale_(optimizer)torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)self.scaler.step(optimizer)self.scaler.update()return loss.item()# 数据并行加载器
def create_distributed_loader(dataset, batch_size, num_workers=4):sampler = torch.utils.data.distributed.DistributedSampler(dataset, num_replicas=dist.get_world_size(), rank=dist.get_rank())loader = torch.utils.data.DataLoader(dataset,batch_size=batch_size,sampler=sampler,num_workers=num_workers,pin_memory=True,drop_last=True)return loader# 使用示例
if __name__ == "__main__":setup_distributed()local_rank = int(os.environ['LOCAL_RANK'])world_size = dist.get_world_size()# 创建模型和训练器model = EfficientNet(num_classes=10)trainer = DistributedTrainer(model, local_rank, world_size)# 创建分布式数据加载器train_dataset = CustomImageDataset(...)train_loader = create_distributed_loader(train_dataset, batch_size=32)# 训练循环for epoch in range(num_epochs):train_loader.sampler.set_epoch(epoch)for batch_idx, (inputs, targets) in enumerate(train_loader):loss = trainer.train_step(inputs, targets, optimizer, criterion)if batch_idx % 100 == 0 and local_rank == 0:print(f'Epoch: {epoch}, Batch: {batch_idx}, Loss: {loss:.4f}')cleanup_distributed()

分布式训练和混合精度技术能大幅加速大规模模型训练。DDP实现数据并行,每个GPU处理部分数据并同步梯度。混合精度训练使用FP16进行前向和反向传播,减少内存使用并加速计算,同时使用梯度缩放保持数值稳定性。

4.2 高级验证与模型选择

完善的验证策略确保选择最佳模型:

import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import torch
from torch.utils.data import DataLoaderclass ModelEvaluator:def __init__(self, model, device, metrics=None):self.model = modelself.device = deviceif metrics is None:self.metrics = {'accuracy': accuracy_score,'precision': lambda y_true, y_pred: precision_score(y_true, y_pred, average='macro'),'recall': lambda y_true, y_pred: recall_score(y_true, y_pred, average='macro'),'f1': lambda y_true, y_pred: f1_score(y_true, y_pred, average='macro')}else:self.metrics = metricsdef evaluate(self, data_loader, return_predictions=False):self.model.eval()all_preds = []all_targets = []with torch.no_grad():for inputs, targets in data_loader:inputs = inputs.to(self.device)targets = targets.to(self.device)outputs = self.model(inputs)preds = torch.argmax(outputs, dim=1)all_preds.extend(preds.cpu().numpy())all_targets.extend(targets.cpu().numpy())results = {}for name, metric_fn in self.metrics.items():results[name] = metric_fn(all_targets, all_preds)if return_predictions:return results, (all_preds, all_targets)else:return resultsclass EarlyStopping:def __init__(self, patience=10, min_delta=0.001, mode='min'):self.patience = patienceself.min_delta = min_deltaself.mode = modeself.counter = 0self.best_score = Noneself.early_stop = Falsedef __call__(self, current_score):if self.best_score is None:self.best_score = current_scorereturn Falseif self.mode == 'min':improvement = self.best_score - current_scoreelse:improvement = current_score - self.best_scoreif improvement > self.min_delta:self.best_score = current_scoreself.counter = 0else:self.counter += 1if self.counter >= self.patience:self.early_stop = Truereturn self.early_stop# 交叉验证实现
def cross_validate(model_class, dataset, folds=5, epochs=50, device='cuda'):fold_size = len(dataset) // foldsindices = list(range(len(dataset)))np.random.shuffle(indices)all_scores = []for i in range(folds):print(f"Training fold {i+1}/{folds}")# 划分训练和验证集val_indices = indices[i*fold_size:(i+1)*fold_size]train_indices = indices[:i*fold_size] + indices[(i+1)*fold_size:]train_subset = torch.utils.data.Subset(dataset, train_indices)val_subset = torch.utils.data.Subset(dataset, val_indices)train_loader = DataLoader(train_subset, batch_size=32, shuffle=True)val_loader = DataLoader(val_subset, batch_size=32, shuffle=False)# 创建新模型实例model = model_class().to(device)optimizer = create_optimizer(model)criterion = torch.nn.CrossEntropyLoss()# 训练for epoch in range(epochs):train_epoch(model, train_loader, optimizer, criterion)# 评估evaluator = ModelEvaluator(model, device)scores = evaluator.evaluate(val_loader)all_scores.append(scores)print(f"Fold {i+1} scores: {scores}")# 计算平均分数avg_scores = {}for key in all_scores[0].keys():avg_scores[key] = np.mean([s[key] for s in all_scores])return avg_scores, all_scores

完善的评估体系包括多种指标计算、早停机制和交叉验证。交叉验证提供更稳健的性能估计,早停机制防止过拟合,多指标评估全面了解模型性能。这些技术组合使用确保选择出真正泛化能力强的模型。

五、模型部署与生产化

5.1 模型优化与转换

部署前的模型优化至关重要:

import torch
import torch.onnx
import onnx
import onnxruntime as ort
from onnxsim import simplifyclass ModelOptimizer:def __init__(self, model, example_input):self.model = modelself.example_input = example_inputself.model.eval()def export_onnx(self, onnx_path, dynamic_axes=None):if dynamic_axes is None:dynamic_axes = {'input': {0: 'batch_size'},'output': {0: 'batch_size'}}torch.onnx.export(self.model,self.example_input,onnx_path,export_params=True,opset_version=13,do_constant_folding=True,input_names=['input'],output_names=['output'],dynamic_axes=dynamic_axes)# 简化ONNX模型onnx_model = onnx.load(onnx_path)simplified_model, check = simplify(onnx_model)onnx.save(simplified_model, onnx_path)return onnx_pathdef quantize_model(self, model_path, quantized_path):# 动态量化quantized_model = torch.quantization.quantize_dynamic(self.model, {torch.nn.Linear}, dtype=torch.qint8)# 保存量化模型torch.jit.save(torch.jit.script(quantized_model), quantized_path)return quantized_pathdef optimize_for_inference(self, onnx_path):# 使用ONNX Runtime进行图优化sess_options = ort.SessionOptions()sess_options.graph_optimization_level = ort.GraphOptimizationLevel.ORT_ENABLE_ALLsess_options.optimized_model_filepath = onnx_path.replace('.onnx', '_optimized.onnx')# 创建会话触发优化ort.InferenceSession(onnx_path, sess_options)return sess_options.optimized_model_filepath# 使用示例
def prepare_for_production(model, example_input, output_dir):optimizer = ModelOptimizer(model, example_input)# 导出ONNXonnx_path = optimizer.export_onnx(f"{output_dir}/model.onnx")# 量化quantized_path = optimizer.quantize_model(model, f"{output_dir}/model_quantized.pt")# 优化推理optimized_onnx = optimizer.optimize_for_inference(onnx_path)return {'onnx': onnx_path,'quantized': quantized_path,'optimized_onnx': optimized_onnx}

模型优化包括格式转换、量化和图优化。ONNX格式提供跨平台兼容性,量化减少模型大小和加速推理,图优化移除冗余计算。这些优化能显著提升生产环境中的推理性能。

5.2 高性能推理服务

使用现代推理服务器提供高效API服务:

from fastapi import FastAPI, File, UploadFile
import uvicorn
import numpy as np
from PIL import Image
import io
import onnxruntime as ortapp = FastAPI(title="AI Model Serving API")class ONNXModelServer:def __init__(self, model_path, providers=None):if providers is None:providers = ['CUDAExecutionProvider', 'CPUExecutionProvider']self.session = ort.InferenceSession(model_path, providers=providers)self.input_name = self.session.get_inputs()[0].nameself.output_name = self.session.get_outputs()[0].namedef preprocess(self, image_bytes):image = Image.open(io.BytesIO(image_bytes)).convert('RGB')image = image.resize((224, 224))image_array = np.array(image).astype(np.float32) / 255.0image_array = (image_array - [0.485, 0.456, 0.406]) / [0.229, 0.224, 0.225]image_array = np.transpose(image_array, (2, 0, 1))image_array = np.expand_dims(image_array, axis=0)return image_arraydef predict(self, input_data):outputs = self.session.run([self.output_name], {self.input_name: input_data})return outputs[0]# 初始化模型服务器
model_server = ONNXModelServer("models/optimized_model.onnx")@app.post("/predict")
async def predict_endpoint(file: UploadFile = File(...)):image_bytes = await file.read()# 预处理input_data = model_server.preprocess(image_bytes)# 推理predictions = model_server.predict(input_data)# 后处理predicted_class = int(np.argmax(predictions))confidence = float(np.max(predictions))return {"predicted_class": predicted_class,"confidence": confidence,"all_predictions": predictions.tolist()}@app.get("/health")
async def health_check():return {"status": "healthy", "model_loaded": True}if __name__ == "__main__":uvicorn.run(app, host="0.0.0.0", port=8000)

高性能推理服务使用ONNX Runtime实现跨平台高效推理,FastAPI提供现代异步API框架。预处理和后处理集成在服务中,提供完整的端到端预测功能。健康检查端点方便监控系统状态。

六、监控与维护

6.1 性能监控与日志

生产环境中的模型需要持续监控:

import logging
import time
from prometheus_client import Counter, Histogram, generate_latest
from datetime import datetime# 设置监控指标
REQUEST_COUNT = Counter('request_count', 'Total request count', ['endpoint', 'status'])
REQUEST_LATENCY = Histogram('request_latency_seconds', 'Request latency', ['endpoint'])
PREDICTION_CONFIDENCE = Histogram('prediction_confidence', 'Prediction confidence distribution')class MonitoringMiddleware:def __init__(self):self.logger = self.setup_logging()def setup_logging(self):logger = logging.getLogger('model_server')logger.setLevel(logging.INFO)# 文件处理器file_handler = logging.FileHandler('logs/model_server.log')file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')file_handler.setFormatter(file_formatter)# 控制台处理器console_handler = logging.StreamHandler()console_formatter = logging.Formatter('%(levelname)s: %(message)s')console_handler.setFormatter(console_formatter)logger.addHandler(file_handler)logger.addHandler(console_handler)return loggerdef log_request(self, endpoint, status, latency, confidence=None):REQUEST_COUNT.labels(endpoint=endpoint, status=status).inc()REQUEST_LATENCY.labels(endpoint=endpoint).observe(latency)if confidence is not None:PREDICTION_CONFIDENCE.observe(confidence)self.logger.info(f"Endpoint: {endpoint}, Status: {status}, "f"Latency: {latency:.3f}s, Confidence: {confidence}")# 集成到FastAPI应用中
@app.middleware("http")
async def monitor_requests(request, call_next):start_time = time.time()endpoint = request.url.pathtry:response = await call_next(request)latency = time.time() - start_time# 记录成功请求monitoring.log_request(endpoint=endpoint,status=response.status_code,latency=latency)return responseexcept Exception as e:latency = time.time() - start_timemonitoring.log_request(endpoint=endpoint,status=500,latency=latency)raise e@app.get("/metrics")
async def metrics_endpoint():return generate_latest()# 初始化监控
monitoring = MonitoringMiddleware()

完善的监控系统包括请求计数、延迟测量和置信度分布统计。Prometheus指标提供标准化的监控数据,结构化日志记录详细运行信息,异常处理确保系统稳定性。这些监控数据对于识别性能问题和理解模型行为至关重要。


文章转载自:

http://q7EBdIXA.yfddL.cn
http://OizMuCiw.yfddL.cn
http://Gzxk2SNu.yfddL.cn
http://yvqQ0e7o.yfddL.cn
http://87IZLnF5.yfddL.cn
http://3x5YkZqm.yfddL.cn
http://3bvh0IpL.yfddL.cn
http://AoIqrcDj.yfddL.cn
http://xnHeCtVv.yfddL.cn
http://bijxDlPa.yfddL.cn
http://Rw8PzNjR.yfddL.cn
http://xnk35qFM.yfddL.cn
http://Ahiq6dEN.yfddL.cn
http://G5f6DzPk.yfddL.cn
http://KE8NoZZU.yfddL.cn
http://A4uhgPaK.yfddL.cn
http://dPg8Tdgr.yfddL.cn
http://muIN8Rt4.yfddL.cn
http://QmK51Lxg.yfddL.cn
http://hph1x2SM.yfddL.cn
http://duZTbecb.yfddL.cn
http://jOJus0yY.yfddL.cn
http://wu1HV4PD.yfddL.cn
http://jb5Upgin.yfddL.cn
http://mPJgg5ho.yfddL.cn
http://BRpZVo4Q.yfddL.cn
http://gNjyOamn.yfddL.cn
http://pDcxhKU8.yfddL.cn
http://HhFtDhsl.yfddL.cn
http://h2o2ULcW.yfddL.cn
http://www.dtcms.com/a/388446.html

相关文章:

  • 【漏洞预警】大华DSS数字监控系统 user_edit.action 接口敏感信息泄露漏洞分析
  • RFID赋能光伏电池片制造智能化跃迁
  • 大数据 + 分布式架构下 SQL 查询优化:从核心技术到调优体系
  • FPGA硬件设计-DDR
  • 卫星通信天线的跟踪精度,含义、测量和计算
  • 忘记MySQL root密码,如何急救并保障备份?
  • Java 异步编程实战:Thread、线程池、CompletableFuture、@Async 用法与场景
  • 贪心算法应用:硬币找零问题详解
  • while语句中的break和continue
  • 10cm钢板矫平机:一场“掰直”钢铁的微观战争
  • Python实现计算点云投影面积
  • C++底层刨析章节二:迭代器原理与实现:STL的万能胶水
  • 学习Python中Selenium模块的基本用法(14:页面打印)
  • 便携式管道推杆器:通信与电力基础设施升级中的“隐形推手”
  • leetcode 349 两个数组的交集
  • UV映射!加入纹理!
  • 车辆DoIP声明报文/识别响应报文的UDP端口规范
  • Elasticsearch 2.x版本升级指南
  • OpenCV 人脸检测、微笑检测 原理及案例解析
  • [Python编程] Python3 集合
  • [性能分析与优化]伪共享问题(perf + cpp)
  • OC-动画实现折叠cell
  • 关于层级问题
  • Linux基础命令汇总
  • getchar 和 putchar
  • 【序列晋升】35 Spring Data Envers 轻量级集成数据审计
  • 快速入门HarmonyOS应用开发(二)
  • 绿联、极空间、飞牛NAS无需安装,实现快速远程访问
  • Datawhale 理工科-大模型入门实训课程 202509 第1次作业
  • 城市治理综合管理平台