AI学习笔记三十:基于yolov8的web显示
若该文为原创文章,转载请注明原文出处。
一、目的
最近想做基于web端的YOLO识别检测系统,由于个人知识量有限,有很多不懂,特别是VUE这块,所以使用了AI的方式来实现自己的目的。
网上有很多程序是融合了Python、Flask、Vue、SQLite等技术;
检测方式:图片,视频,摄像头,在此基础上,通过AI工具,又增加了RTSP方式。
这里记录下过程及方式,虽然框架不是很好,但实现了功能。
二、准备工作
1、rtsp服务器
下载页面 : https://github.com/aler9/rtsp-simple-server/releases
这里,我们下载 rtsp-simple-server_v0.19.1_windows_amd64.zip
2、ffmpeg推rtsp流
ffmpeg -re -stream_loop -1 -i test.mp4 -c:v h264 -rtsp_transport tcp -f rtsp rtsp://192.168.50.83:8554/video
三、思路流程
1、系统架构
采用 Flask + Vue 前后端分离架构:
- 后端:Flask 提供 RESTful API,处理核心逻辑(YOLO 检测、RTSP 流管理、数据库操作)。
- 前端:Vue 3 负责交互界面(用户登录、文件上传、检测结果展示)。
- 数据库:SQLite 存储用户信息和检测记录。
2、后端
1)RTSP 实时流检测
-
流程:
- 前端调用
/api/rtsp_start
启动 RTSP 流(传递 RTSP URL)。 - 后端通过 RTSPStreamManager 管理多个 RTSP 线程:
- 每个 RTSP URL 对应一个 RTSPStreamThread 线程。
- 线程持续读取视频帧,调用 YOLO 模型检测,保存结果到共享变量。
- 前端定时调用
/api/rtsp_frame
获取最新帧和检测结果。 - 调用
/api/rtsp_stop
停止流。
- 前端调用
-
关键类:
- RTSPStreamManager:管理 RTSP 线程的生命周期(启动/停止/清理)。
- RTSPStreamThread:独立线程处理 RTSP 流的帧读取和 YOLO 检测。
2、图片/视频检测
- 图片检测(
/api/detect_image
):- 上传图片 → 调用 YOLO 检测 → 绘制检测框 → 保存结果图片和数据库记录。
- 视频检测(
/api/detect_video
):- 上传视频 → 逐帧检测 → 生成带检测框的视频 → 保存结果和数据库记录。
3、前端
图片检测流程
用户->>前端: 拖拽/选择图片
前端->>前端: 生成Blob URL预览 (beforeImageUpload)
前端->>后端: 上传图片到 /api/detect_image
后端-->>前端: 返回检测结果
前端->>前端: 显示结果图片和检测框数据
视频检测流程
用户->>前端: 拖拽/选择视频
前端->>前端: 生成Blob URL预览 (beforeVideoUpload)
前端->>后端: 上传视频到 /api/detect_video
后端-->>前端: 返回带检测框的视频
前端->>前端: 播放结果视频并显示检测统计
摄像头实时检测
1. 调用 navigator.mediaDevices.getUserMedia 获取摄像头权限
2. 绑定视频流到 <video> 元素
3. 启动定时器 (detectionInterval):- 捕获帧 -> 发送到 /api/process_frame- 接收检测结果 -> 通过CSS绝对定位绘制检测框
4. 显示实时FPS计数器
RTSP流检测
1. 验证RTSP地址格式
2. 调用 /api/rtsp_start 启动后端RTSP线程
3. 定时轮询 /api/rtsp_frame:- 接收JPEG二进制帧 -> Blob URL显示- 解析响应头中的检测结果(X-Detections)
4. 绘制检测框并显示FPS
四、核心代码
这是只附RTSP部分代码
后端解析RTSP流
# RTSP流管理
class RTSPStreamManager:def __init__(self):self.streams = {} # {rtsp_url: {'thread': thread, 'last_active': timestamp}}self.lock = threading.Lock()def add_stream(self, rtsp_url, fps=10):with self.lock:if rtsp_url in self.streams:return Falsethread = RTSPStreamThread(rtsp_url, fps)thread.start()self.streams[rtsp_url] = {'thread': thread,'last_active': time.time(),'fps': fps}return Truedef get_frame(self, rtsp_url):with self.lock:if rtsp_url not in self.streams:return None, []print(f"get_frame: {rtsp_url}") self.streams[rtsp_url]['last_active'] = time.time()thread = self.streams[rtsp_url]['thread']# 获取帧数据frame, detections = thread.thread_get_frame()print(f"thread.thread_get_frame") # 保存测试帧用于调试if frame is not None:try:print('保存测试帧')cv2.imwrite('static/debug_frame.jpg', frame)print(f"get_frame: 成功保存测试帧到 static/debug_frame.jpg")except Exception as e:print(f"get_frame: 保存测试帧失败 - {str(e)}")print(f"get_frame: 返回帧和检测结果")return frame, detectionsdef stop_stream(self, rtsp_url):with self.lock:if rtsp_url not in self.streams:return Falsethread = self.streams[rtsp_url]['thread']thread.stop()thread.join()del self.streams[rtsp_url]return Truedef cleanup_inactive(self, timeout=300):with self.lock:current_time = time.time()to_remove = []for rtsp_url, stream_info in self.streams.items():if current_time - stream_info['last_active'] > timeout:to_remove.append(rtsp_url)for rtsp_url in to_remove:self.stop_stream(rtsp_url)return len(to_remove)# RTSP处理线程
class RTSPStreamThread(threading.Thread):def __init__(self, rtsp_url, fps=10):super().__init__()self.rtsp_url = rtsp_urlself.fps = fpsself.cap = Noneself.frame = Noneself.detections = []self.lock = threading.Lock()self.running = Trueself.frame_interval = 1.0 / fpsself.last_frame_time = 0def run(self):# 尝试使用FFMPEG后端(如果可用)backends = [cv2.CAP_FFMPEG,cv2.CAP_GSTREAMER,cv2.CAP_ANY]for backend in backends:try:self.cap = cv2.VideoCapture(self.rtsp_url, backend)if self.cap.isOpened():print(f"✅ 成功打开RTSP流: {self.rtsp_url} (后端: {backend})")breakexcept:passprint(f"RTSP流: {self.rtsp_url}")if not self.cap or not self.cap.isOpened():print(f"❌ 无法打开RTSP流: {self.rtsp_url}")returnprint(f"开始RTSP处理")while self.running:current_time = time.time()if current_time - self.last_frame_time >= self.frame_interval:ret, frame = self.cap.read()if not ret:print(f"RTSP流读取失败: {self.rtsp_url}")time.sleep(1)continueprint(f"RTSP流读取成功")# 保存成图片 cv2.imwrite(f'static/01_test.jpg', frame) # 检测对象try:results = model(frame)detections = []for r in results:boxes = r.boxesif boxes is not None:for box in boxes:x1, y1, x2, y2 = box.xyxy[0].cpu().numpy()conf = box.conf[0].cpu().numpy()cls = box.cls[0].cpu().numpy()detections.append({'class': model.names[int(cls)],'confidence': float(conf),'bbox': [float(x1), float(y1), float(x2), float(y2)]})# 更新帧和检测结果with self.lock:print('更新帧和检测结果')self.frame = frameself.detections = detectionsexcept Exception as e:print(f"RTSP检测失败: {e}")with self.lock:print('RTSP检测失败')self.frame = frameself.detections = []self.last_frame_time = current_timecv2.imwrite(f'static/result_test.jpg', frame)time.sleep(0.5) # 新增延时,控制检测频率self.cap.release()def thread_get_frame(self):with self.lock:# 输出获取帧的信息print(f"thread_get_frame")if self.frame is None:print(f"thread_get_frame: 没有帧") return None, []frame_with_detections = self.frame.copy()for detection in self.detections:x1, y1, x2, y2 = detection['bbox']cv2.rectangle(frame_with_detections, (int(x1), int(y1)), (int(x2), int(y2)), (0, 255, 0), 2)label = f"{detection['class']}: {detection['confidence']:.2f}"cv2.putText(frame_with_detections, label, (int(x1), int(y1)-10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0), 2)cv2.imwrite(f'static/thread_get_frame.jpg', frame_with_detections)print('thread_get_frame 返回帧')return frame_with_detections, self.detectionsdef stop(self):# 停止运行标志设置,将running属性设为Falseself.running = False
后端API路由
# API路由
@app.route('/api/rtsp_start', methods=['POST'])
def rtsp_start():print('rtsp_start')data = request.get_json()rtsp_url = data.get('rtsp_url')fps = data.get('fps', 10)user_id = data.get('user_id', 1)print(f"rtsp_start: {rtsp_url}")print(f"fps: {fps}")print(f"user_id: {user_id}")if not rtsp_url:print('rtsp_start: 缺少RTSP URL')return jsonify({'success': False, 'message': '缺少RTSP URL'}), 400try:# 启动RTSP流success = rtsp_manager.add_stream(rtsp_url, fps)if success:# 记录到数据库detection = DetectionResult(user_id=user_id,detection_type='rtsp',original_file=rtsp_url,result_file='', # RTSP没有结果文件detections='[]',confidence=0)db.session.add(detection)db.session.commit()print('RTSP流处理已启动')return jsonify({'success': True,'message': 'RTSP流处理已启动','detection_id': detection.id})else:return jsonify({'success': False, 'message': 'RTSP流已在处理中'}), 400except Exception as e:print(f"启动RTSP流失败: {str(e)}")return jsonify({'success': False, 'message': f'启动RTSP流失败: {str(e)}'}), 500#"GET /api/rtsp_frame?rtsp_url=rtsp://192.168.50.83:8554/video HTTP/1.1" 404 -
@app.route('/api/rtsp_frame', methods=['POST'])
def rtsp_get_frame():print('rtsp_frame')data = request.get_json()rtsp_url = data.get('rtsp_url')detection_id = data.get('user_id', 1)print(f"rtsp_get_frame: {rtsp_url}")print(f'detection_id: {detection_id}')if not rtsp_url:print('rtsp_get_frame: 缺少RTSP URL')return jsonify({'success': False, 'message': '缺少RTSP URL'}), 400print('获取rtsp_manager的rtsp帧')try:print('rtsp_get_frame: 获取RTSP帧1')frame, detections = rtsp_manager.get_frame(rtsp_url)save_frame = frame.copy()print(f'rtsp_get_frame: 获取到的帧={frame is not None}')print(f'rtsp_get_frame: 获取到的检测结果数量={len(detections)}')if save_frame is not None:print('rtsp_get_frame: 获取帧成功')cv2.imwrite('static/02_test.jpg', save_frame)print(f'调试: 帧已保存到 02_test.jpg')with open('static/02_test.jpg', 'rb') as f:print('本地文件头:', f.read(4).hex(' '))else: print('rtsp_get_frame: 获取到的帧为空') print(f'rtsp_get_frame: 获取到的帧={frame is not None}')if frame is not None: print('rtsp_get_frame: 更新数据库记录')# 更新数据库记录(如果有detection_id)if detection_id:detection = DetectionResult.query.get(detection_id)if detection:detection.detections = json.dumps(detections)detection.confidence = max([d['confidence'] for d in detections]) if detections else 0db.session.commit()print('rtsp_get_frame: 更新数据库记录成功')# 将帧转换为JPEG 转换为JPEG二进制ret, jpeg = cv2.imencode('.jpg', frame)if not ret:print('rtsp_get_frame: 无法编码帧 500')return jsonify({'success': False, 'message': '无法编码帧'}), 500print("编码完成")print(f'JPEG二进制数据长度: {len(jpeg.tobytes())} 字节')print(f'检测结果: {detections}')# 比较编码前后数据print('编码后数据头:', jpeg.tobytes()[:4].hex(' '))# 返回响应response = make_response(jpeg.tobytes())response.headers['Content-Type'] = 'image/jpeg'response.headers['Content-Length'] = str(len(jpeg.tobytes()))response.headers['X-Detections'] = json.dumps(detections)response.headers['Access-Control-Expose-Headers'] = 'X-Detections'return responseelse:print('rtsp_get_frame: 无法获取RTSP帧 500')return jsonify({'success': False, 'message': '无法获取RTSP帧'}), 500except Exception as e:print('rtsp_get_frame: 获取RTSP帧失败 500')return jsonify({'success': False, 'message': f'获取RTSP帧失败: {str(e)}'}), 500@app.route('/api/rtsp_stop', methods=['POST'])
def rtsp_stop():print('rtsp_stop')data = request.get_json()rtsp_url = data.get('rtsp_url')print(rtsp_url)detection_id = data.get('user_id', 1)print(detection_id)if not rtsp_url:return jsonify({'success': False, 'message': '缺少RTSP URL'}), 400try:print('rtsp_stop')success = rtsp_manager.stop_stream(rtsp_url)print('rtsp_manager.stop_stream')if success:print('rtsp_stop success')# 更新数据库记录if detection_id:detection = DetectionResult.query.get(detection_id)if detection:detection.result_file = 'RTSP流已停止'db.session.commit()return jsonify({'success': True, 'message': 'RTSP流已停止'})else:return jsonify({'success': False, 'message': 'RTSP流未运行'}), 404except Exception as e:return jsonify({'success': False, 'message': f'停止RTSP流失败: {str(e)}'}), 500
功能基本实现
如有侵权,或需要完整代码,请及时联系博主。