当前位置: 首页 > news >正文

别人不能注册我的wordpress站企业做网站哪家公司好

别人不能注册我的wordpress站,企业做网站哪家公司好,大形电商网站开发费用,娄底网站建设是什么概述 本项目是一个基于 FastAPI 的实时目标检测服务,利用 YOLOv8 模型对 RTSP 视频流进行目标检测,并通过 WebSocket 将检测结果实时传输到客户端。 安装 2. 创建虚拟环境(可选但推荐) 使用 venv 或 conda 创建虚拟环境&#x…

概述

本项目是一个基于 FastAPI 的实时目标检测服务,利用 YOLOv8 模型对 RTSP 视频流进行目标检测,并通过 WebSocket 将检测结果实时传输到客户端。

安装

2. 创建虚拟环境(可选但推荐)

使用 venv 或 conda 创建虚拟环境,以隔离项目依赖:

# 使用 venv
python -m venv venv
source venv/bin/activate  # Linux/Mac
.\venv\Scripts\activate  # Windows# 使用 conda
conda create -n yolo_api python=3.8
conda activate yolo_api

3. 安装依赖

安装项目所需的 Python 依赖包:

pip install -r requirements.txt

确保安装的依赖包括:

  • fastapi
  • uvicorn
  • opencv-python
  • ultralytics
  • pydantic

4. 下载 YOLO 模型

将 YOLOv8 模型文件(如 yolo11n.ptyolo11s.pt 等)放置在项目根目录下。可以从 Ultralytics 官方仓库下载预训练模型。

代码:

import cv2
import json
import base64
import time
import threading
import logging
from typing import Optional, List, Dict, Any
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Query
from fastapi.responses import HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from ultralytics import YOLO# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)# 帧缓冲区类 - 存储最新帧
class FrameBuffer:def __init__(self):self.latest_frame = Noneself.lock = threading.Lock()self.frame_ready = threading.Event()self.is_active = True  # 标记缓冲区是否活跃def update(self, frame):with self.lock:if self.is_active:self.latest_frame = frameself.frame_ready.set()def get_latest(self):with self.lock:return self.latest_frame.copy() if self.latest_frame is not None else Nonedef deactivate(self):with self.lock:self.is_active = Falseself.latest_frame = None# 初始化FastAPI应用
app = FastAPI(title="YOLOv8 目标检测 API", description="提供实时目标检测服务")# 添加CORS中间件
app.add_middleware(CORSMiddleware,allow_origins=["*"],  # 仅用于开发环境,生产环境应限制为可信域名allow_credentials=True,allow_methods=["*"],allow_headers=["*"],
)# 加载YOLO模型
try:models = {"yolo11n.pt": YOLO("yolo11n.pt"),  # Nano版本,最快"yolo11s.pt": YOLO("yolo11n.pt"),  # Small版本,较快"yolo11m.pt": YOLO("yolo11n.pt"),  # Medium版本,平衡"yolo11l.pt": YOLO("yolo11n.pt"),  # Large版本,更准确"yolo11x.pt": YOLO("yolo11n.pt"),  # Extra版本,最准确}
except Exception as e:logger.error(f"模型加载失败: {str(e)}")models = {}# 连接管理器
class ConnectionManager:def __init__(self):self.active_connections: Dict[str, WebSocket] = {}  # 连接ID -> WebSocketself.rtsp_clients: Dict[str, cv2.VideoCapture] = {}  # RTSP URL -> VideoCaptureself.rtsp_connections: Dict[str, int] = {}  # RTSP URL -> 连接数self.rtsp_frame_buffers: Dict[str, FrameBuffer] = {}  # RTSP URL -> FrameBufferself.rtsp_threads: Dict[str, threading.Thread] = {}  # RTSP URL -> 线程self.max_connections_per_rtsp = 5  # 每个RTSP源的最大连接数self.heartbeat_interval = 30  # 心跳间隔(秒)self._lock = threading.Lock()  # 用于线程安全的锁async def connect(self, websocket: WebSocket, connection_id: str, rtsp_url: str):# 检查RTSP连接数限制with self._lock:if rtsp_url in self.rtsp_connections and self.rtsp_connections[rtsp_url] >= self.max_connections_per_rtsp:await websocket.close(code=1008, reason="连接数超过上限")raise HTTPException(status_code=429, detail="连接数超过上限")await websocket.accept()with self._lock:self.active_connections[connection_id] = websocket# 更新RTSP连接数self.rtsp_connections[rtsp_url] = self.rtsp_connections.get(rtsp_url, 0) + 1logger.info(f"新连接: {connection_id}, RTSP: {rtsp_url}, 当前连接数: {self.rtsp_connections[rtsp_url]}")# 如果是第一个连接到该RTSP的客户端,启动读取线程if self.rtsp_connections[rtsp_url] == 1:if rtsp_url not in self.rtsp_frame_buffers:self.rtsp_frame_buffers[rtsp_url] = FrameBuffer()# 启动独立线程读取RTSP流thread = threading.Thread(target=self._read_rtsp_stream, args=(rtsp_url,), daemon=True)self.rtsp_threads[rtsp_url] = threadthread.start()def disconnect(self, connection_id: str, rtsp_url: str):with self._lock:if connection_id in self.active_connections:del self.active_connections[connection_id]# 减少RTSP连接数if rtsp_url in self.rtsp_connections:self.rtsp_connections[rtsp_url] -= 1if self.rtsp_connections[rtsp_url] <= 0:logger.info(f"没有更多连接,准备关闭RTSP流: {rtsp_url}")# 释放RTSP客户端if rtsp_url in self.rtsp_clients:self.rtsp_clients[rtsp_url].release()del self.rtsp_clients[rtsp_url]logger.info(f"释放RTSP客户端: {rtsp_url}")# 停用帧缓冲区if rtsp_url in self.rtsp_frame_buffers:self.rtsp_frame_buffers[rtsp_url].deactivate()del self.rtsp_frame_buffers[rtsp_url]logger.info(f"释放帧缓冲区: {rtsp_url}")# RTSP URL从连接数统计中移除del self.rtsp_connections[rtsp_url]# 移除线程引用(线程会在检测到连接数为0时自行退出)if rtsp_url in self.rtsp_threads:del self.rtsp_threads[rtsp_url]logger.info(f"断开连接: {connection_id}, RTSP: {rtsp_url}, 当前连接数: {self.rtsp_connections.get(rtsp_url, 0)}")async def send_message(self, message: str, connection_id: str):if connection_id in self.active_connections:try:# 尝试发送消息await self.active_connections[connection_id].send_text(message)except WebSocketDisconnect:logger.info(f"WebSocket连接已关闭,终止发送数据: {connection_id}")# 获取RTSP URL并断开连接rtsp_url = next((url for url, conns in self.rtsp_connections.items() if connection_id in [k for k in self.active_connections]), None)if rtsp_url:self.disconnect(connection_id, rtsp_url)except Exception as e:logger.error(f"发送消息失败: {connection_id}, 错误: {str(e)}")# 获取RTSP URL并断开连接rtsp_url = next((url for url, conns in self.rtsp_connections.items() if connection_id in [k for k in self.active_connections]), None)if rtsp_url:self.disconnect(connection_id, rtsp_url)def get_frame_buffer(self, rtsp_url: str):return self.rtsp_frame_buffers.get(rtsp_url)def _read_rtsp_stream(self, rtsp_url: str):logger.info(f"启动RTSP读取线程: {rtsp_url}")# 创建新的RTSP客户端cap = cv2.VideoCapture(rtsp_url)# 设置参数,减少延迟cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)  # 尽量减少缓冲区cap.set(cv2.CAP_PROP_FPS, 25)  # 设置期望的帧率with self._lock:self.rtsp_clients[rtsp_url] = capframe_buffer = self.rtsp_frame_buffers.get(rtsp_url)if not frame_buffer:logger.error(f"帧缓冲区不存在: {rtsp_url}")returnwhile True:# 检查是否还有连接with self._lock:if rtsp_url not in self.rtsp_connections or self.rtsp_connections[rtsp_url] <= 0:logger.info(f"没有活动连接,停止读取RTSP流: {rtsp_url}")breaktry:ret, frame = cap.read()except cv2.error as e:logger.error(f"读取RTSP流时出错: {rtsp_url}, 错误: {str(e)}")# 尝试重新连接with self._lock:if rtsp_url in self.rtsp_clients:self.rtsp_clients[rtsp_url].release()cap = cv2.VideoCapture(rtsp_url)cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)self.rtsp_clients[rtsp_url] = captime.sleep(1)  # 等待并重试continueif not ret:logger.warning(f"无法从RTSP流读取帧: {rtsp_url}")# 尝试重新连接with self._lock:if rtsp_url in self.rtsp_clients:self.rtsp_clients[rtsp_url].release()cap = cv2.VideoCapture(rtsp_url)cap.set(cv2.CAP_PROP_BUFFERSIZE, 1)self.rtsp_clients[rtsp_url] = captime.sleep(1)  # 等待并重试continue# 更新帧缓冲区为最新帧if frame_buffer:frame_buffer.update(frame)# 不需要延时,让线程尽可能快地读取最新帧# 清理资源with self._lock:if rtsp_url in self.rtsp_clients:self.rtsp_clients[rtsp_url].release()del self.rtsp_clients[rtsp_url]logger.info(f"RTSP读取线程已停止: {rtsp_url}")# 优化的HTML页面,添加防缓存机制
@app.get("/", response_class=HTMLResponse)
async def get():return """<html><head><title>RTSP摄像头检测测试</title><style>body { font-family: Arial, sans-serif; margin: 0; padding: 20px; }h1 { color: #333; }#video-container { margin-top: 20px; }#video { max-width: 100%; border: 1px solid #ddd; }#status { margin-top: 10px; color: #666; }</style></head><body><h1>RTSP摄像头检测测试</h1><div id="video-container"><img id="video" src="" alt="RTSP视频流"></div><div id="status">连接中...</div><script>const videoElement = document.getElementById('video');const statusElement = document.getElementById('status');let lastTimestamp = 0;// 创建WebSocket连接function createWebSocket() {const ws = new WebSocket(`ws://${window.location.host}/ws/camera_detection?camera_id=rtsp://admin:zxcv1234@192.168.2.14:554/h264/ch1/main/av_stream`);ws.onopen = function() {statusElement.textContent = '已连接';console.log('WebSocket连接已建立');};ws.onmessage = function(event) {try {const data = JSON.parse(event.data);if (data.image_base64) {// 只显示比当前更新的帧if (!lastTimestamp || data.timestamp > lastTimestamp) {lastTimestamp = data.timestamp;// 添加随机查询参数,防止浏览器缓存videoElement.src = `${data.image_base64}?t=${Date.now()}`;statusElement.textContent = `已更新: ${new Date(data.timestamp * 1000).toLocaleTimeString()}`;}} else if (data.error) {statusElement.textContent = `错误: ${data.error}`;console.error('接收错误:', data.error);} else if (data.type === 'heartbeat') {// 处理心跳statusElement.textContent = '连接活跃中...';}} catch (error) {console.error('解析消息失败:', error);}};ws.onclose = function() {statusElement.textContent = '连接已关闭,尝试重新连接...';console.log('WebSocket连接已关闭');// 5秒后尝试重新连接setTimeout(createWebSocket, 5000);};ws.onerror = function(error) {statusElement.textContent = '连接错误,正在重试...';console.error('WebSocket错误:', error);};return ws;}// 初始化WebSocketconst ws = createWebSocket();// 添加页面卸载时关闭WebSocket的处理window.addEventListener('beforeunload', function() {ws.close();});</script></body></html>"""# 用于摄像头/RTSP流检测的WebSocket端点
@app.websocket("/ws/camera_detection")
async def websocket_camera_detection(websocket: WebSocket,camera_id: str = Query(...),model_path: str = Query("yolo11n.pt"),conf: float = Query(0.3),target_classes: Optional[str] = Query(None),fps_limit: int = Query(1)  # 每秒处理一帧
):# 生成唯一连接IDconnection_id = f"{camera_id}_{int(time.time() * 1000)}"last_activity_time = time.perf_counter()  # 使用更精确的计时器frame_retry_count = 0max_frame_retries = 10processing_time = 0  # 记录处理一帧的时间try:# 检查模型是否存在if model_path not in models:await manager.send_message(json.dumps({"error": f"模型 {model_path} 不存在"}), connection_id)return# 解析目标类别classes = Noneif target_classes:try:classes = [c.strip() for c in target_classes.split(',')]# 将类别名称转换为类别IDmodel = models[model_path]class_names = list(model.names.values())classes = [class_names.index(c) for c in classes if c in class_names]except Exception as e:await manager.send_message(json.dumps({"error": f"解析目标类别失败: {str(e)}"}), connection_id)return# 连接到WebSocketawait manager.connect(websocket, connection_id, camera_id)# 获取帧缓冲区frame_buffer = manager.get_frame_buffer(camera_id)if frame_buffer is None:await manager.send_message(json.dumps({"error": f"无法获取RTSP帧缓冲区: {camera_id}"}), connection_id)await manager.disconnect(connection_id, camera_id)return# 发送连接成功消息await manager.send_message(json.dumps({"status": "connected", "message": "已成功连接到RTSP流"}), connection_id)# 帧处理逻辑frame_interval = 1.0 / fps_limit  # 计算帧间隔时间next_process_time = time.perf_counter() + frame_interval  # 下一次处理的时间点model = models[model_path]while True:current_time = time.perf_counter()# 等待到下一个处理时间点if current_time < next_process_time:time.sleep(next_process_time - current_time)current_time = time.perf_counter()# 更新下一次处理时间next_process_time = current_time + frame_interval# 检查连接是否仍然存在if connection_id not in manager.active_connections:logger.info(f"连接已断开,终止处理: {connection_id}")break# 发送心跳包if current_time - last_activity_time > manager.heartbeat_interval:await manager.send_message(json.dumps({"type": "heartbeat"}), connection_id)last_activity_time = current_time# 获取最新帧frame = frame_buffer.get_latest()if frame is None:frame_retry_count += 1logger.warning(f"获取帧失败 ({frame_retry_count}/{max_frame_retries}): {camera_id}")if frame_retry_count >= max_frame_retries:await manager.send_message(json.dumps({"error": "连续多次无法获取视频帧,尝试重新连接"}), connection_id)# 等待一段时间再继续尝试time.sleep(1)# 发送空帧消息await manager.send_message(json.dumps({"type": "empty_frame", "retry_count": frame_retry_count}), connection_id)continueelse:# 成功获取帧,重置重试计数frame_retry_count = 0# 执行检测try:process_start_time = time.perf_counter()# 执行目标检测results = model(frame, conf=conf, classes=classes)inference_time = time.perf_counter() - process_start_time# 绘制检测结果annotated_frame = results[0].plot()# 转换为JPEG并编码为Base64_, buffer = cv2.imencode('.jpg', annotated_frame, [cv2.IMWRITE_JPEG_QUALITY, 70])frame_base64 = base64.b64encode(buffer).decode('utf-8')# 统计检测到的类别class_counts = {}if results[0].boxes is not None:for box in results[0].boxes:class_id = int(box.cls)class_name = model.names[class_id]class_counts[class_name] = class_counts.get(class_name, 0) + 1# 计算处理时间processing_time = time.perf_counter() - process_start_timelogger.info(f"处理时间: {processing_time:.3f}秒, 推理时间: {inference_time:.3f}秒")# 发送结果到客户端message = {"image_base64": f"data:image/jpeg;base64,{frame_base64}","inference_time": inference_time,"processing_time": processing_time,  # 添加处理时间"fps": fps_limit,  # 显示设定的FPS,而不是计算值"object_count": len(results[0].boxes) if results[0].boxes is not None else 0,"classes": class_counts,"timestamp": time.time()}await manager.send_message(json.dumps(message), connection_id)# 检查处理时间是否超过间隔时间if processing_time > frame_interval:logger.warning(f"警告: 处理时间({processing_time:.3f}秒)超过帧间隔时间({frame_interval:.3f}秒)")except WebSocketDisconnect:logger.info(f"WebSocket连接已关闭,终止处理帧: {connection_id}")breakexcept Exception as e:frame_retry_count += 1logger.error(f"处理帧时出错 ({frame_retry_count}/{max_frame_retries}): {str(e)}")if frame_retry_count >= max_frame_retries:await manager.send_message(json.dumps({"error": "连续多次处理帧失败,尝试重新连接"}), connection_id)# 等待一段时间再继续尝试time.sleep(1)# 发送错误消息await manager.send_message(json.dumps({"type": "processing_error", "error": str(e), "retry_count": frame_retry_count}), connection_id)except WebSocketDisconnect:logger.info(f"客户端断开连接: {connection_id}")# 获取RTSP URL并断开连接rtsp_url = next((url for url, conns in manager.rtsp_connections.items() if connection_id in [k for k in manager.active_connections]), None)if rtsp_url:manager.disconnect(connection_id, rtsp_url)except Exception as e:logger.error(f"WebSocket处理异常: {str(e)}")# 获取RTSP URL并断开连接rtsp_url = next((url for url, conns in manager.rtsp_connections.items() if connection_id in [k for k in manager.active_connections]), None)if rtsp_url:manager.disconnect(connection_id, rtsp_url)finally:# 确保资源被释放if connection_id in manager.active_connections:# 获取RTSP URL并断开连接rtsp_url = next((url for url, conns in manager.rtsp_connections.items() if connection_id in [k for k in manager.active_connections]), None)if rtsp_url:manager.disconnect(connection_id, rtsp_url)# 全局连接管理器实例
manager = ConnectionManager()if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000)

运行

1. 启动服务

在终端中运行以下命令启动 FastAPI 服务:

python camera4.py

服务将在 http://0.0.0.0:8000 上启动。

2. 配置 RTSP 地址

在代码中的 HTML 页面(@app.get("/") 端点)里,修改 createWebSocket 函数中的 camera_id 参数为实际的 RTSP 地址:

const ws = new WebSocket(`ws://${window.location.host}/ws/camera_detection?camera_id=rtsp://admin:zxcv1234@192.168.2.14:554/h264/ch1/main/av_stream`);

测试

1. 访问测试页面

在浏览器中打开 http://localhost:8000,即可看到测试页面。页面会尝试连接到指定的 RTSP 流,并显示实时目标检测结果。

2. 使用 WebSocket 客户端测试

可以使用 WebSocket 客户端工具(如 WebSocket King)来测试 WebSocket 端点。连接到以下地址:

ws://localhost:8000/ws/camera_detection?camera_id=rtsp://your_rtsp_url&model_path=yolo11n.pt&conf=0.3&target_classes=person,car&fps_limit=1
  • camera_id:RTSP 视频流地址
  • model_path:使用的 YOLO 模型文件路径
  • conf:检测置信度阈值
  • target_classes:目标检测的类别,多个类别用逗号分隔
  • fps_limit:每秒处理的帧数

3. 查看日志

在终端中可以查看服务的日志信息,包括连接状态、帧处理时间、检测结果等。

注意事项

  • 确保 RTSP 地址正确,并且网络可以正常访问。
  • 在生产环境中,需要限制 CORSMiddleware 的 allow_origins 参数,以提高安全性。
  • 如果遇到模型加载失败的问题,检查模型文件路径和文件是否存在。
http://www.dtcms.com/a/476348.html

相关文章:

  • 网站都可以做哪些主题网站icp备案地
  • 最便宜的购物网站排名广告牌设计
  • 那可以做网站淘宝官网首页网站
  • 学做家常菜的网站 知乎sem扫描电子显微镜
  • 东莞网络优化推广网站专业术语中 seo意思是
  • 建设网站比较好公司吗滁州网站seo
  • 私人做网站收费京东电器商城网上购物
  • 网站建设软件开发网站上怎么做推广比较好呢
  • app下载量查询北京优化seo
  • 建行网站用户名是什么wordpress运行php文件下载
  • 网站建设培训的心得画册设计免费模板
  • 网站平台怎么做wordpress 流量统计
  • 抓取网站访问量c 做网站开发
  • 长沙城乡建设网站郑州网约车从业资格证
  • 佛山三水建设局网站科技创新可以被分成三种类型
  • 红桥网站建设查看别人网站的访问量
  • 温州制作企业网站网站控制面板 地址
  • 网站开篇动画个人能接广告联盟吗
  • 长沙网站关键词排名推广公司德阳seo优化
  • 做html5网站在线logo生成器标智客
  • 公司怎样制作网站桃浦做网站
  • 品牌网站建设权威海外网络服务商
  • 做网站鼎盛网站改版方案模板
  • 网站建设 资质要求青岛网站定制
  • 网站建设与推广推荐手机wap浏览器
  • 做机械有什么兼职网站网站网站注册
  • 网站屏幕自适应代码襄阳住房城乡建设厅官方网站
  • 荆门网站开发公司电话红色风格网站
  • 品牌网站建站托管网站是什么意思
  • asp资源下载网站wordpress怎么禁止更新