当前位置: 首页 > news >正文

web端rtmp推拉流测试、抽帧识别计数,一键式生成巡检报告

        本文旨在实现无人机城市交通智慧巡检中的一个模块——无人机视频实时推拉流以及识别流并在前端展示,同时,统计目标数量以及违停数量,生成结果评估,一并发送到前端展示。对于本文任何技术上的空缺,可在博主主页前面博客寻找,有任何问题欢迎私信或评论区讨论!!!

目录

涉及技术栈

基本效果

存在的问题,亟需解决

代码及粗略解释

资源


涉及技术栈:

Django5+vue3+websocket+SRS+FFmpeg+RTMP+YOLOv8+AI模型+异步+多进程+flv.js+node.js

基本效果:

web端推拉流测试、抽帧识别计数,一键式生成巡检报告

项目结构(Django):
├── DJI_yolo
│   ├── __init__.py
│   ├── __pycache__
│   ├── asgi.py
│   ├── settings.py
│   ├── templates
│   ├── urls.py
│   └── wsgi.py
├── app01
│   ├── __init__.py
│   ├── __pycache__
│   ├── admin.py
│   ├── apps.py
│   ├── car_best.pt
│   ├── consumers
│   │   ├── __init__.py
│   │   ├── __pycache__
│   │   ├── detection_consumer.py
│   │   └── report_consumer.py
│   ├── migrations
│   │   ├── __init__.py
│   │   └── __pycache__
│   ├── models.py
│   ├── pictures
│   ├── routings.py
│   ├── services
│   │   ├── __init__.py
│   │   ├── __pycache__
│   │   └── detection_engine.py
│   ├── stream_status.py
│   ├── tests.py
│   ├── utils.py
│   └── views.py
├── manage.py
├── media
│   ├── 2025-06-02
│   │   ├── 20时14分54秒-20时17分03秒
│   │   │   ├── 关键帧图片
│   │   │   ├── 原视频
│   │   │   └── 识别视频
│   │   ├── 20时26分11秒-20时29分00秒
│   │   │   ├── 关键帧图片
│   │   │   ├── 原视频
│   │   │   └── 识别视频

前端代码因保密不方便展示,可自行根据后端端口续写,后端Django完整代码在文章末尾,请查收!!!

存在的问题,亟需解决:

1、后端逐帧识别并返回前端,前端显示卡顿,考虑跳帧识别,一秒识别一帧,剩余原始帧直接返回。

2、曾尝试过使用 yolo 中的 model 类中的 track 方法进行跨帧追踪,为每帧每个目标进行编号(id),提取帧中目标特征(速度、目标面积、唯位移等等),进行特征化工程,从而判断相邻两识别帧中的某两目标是否为同一目标。由于添加追踪算法,单帧识别时间过长,单帧识别时间大于拉流帧传输时间,导致进程堵塞,程序崩溃。后续采取措施改进。

3、模型并未做违停检测,考虑重新训练模型,添加违停训练集,重新训练,顺便搭配 yolo12 环境,用 yolo12 进行训练。

4、Django 后端接受到流,并不能直接开始识别,而是先用四五秒时间加载模型,需要优化。

5、执行任务、完成任务到数据存储、前端显示巡检报告,延迟较高,需要优化。

6、为后端添加车辆运动状态算法,若连续识别的两个帧同一目标静止,并且识别为违停,则判断为违停;若非静止但识别违停,不做处理。相对静止判断?无人机在动,无法建立参考系。

7、进行车牌识别。

代码及粗略解释:

detection_consumer.py:

此处代码使用 FFmpeg 拉取 SRS 上的流,并使用 detection_engine 中的类进行帧识别和计数,然后判断计数,调用讯飞星火 API 生成 AI 答复,获取关键帧,将所有信息封装成一个字典,并传给前端,以用于违停报告生成。同时,将实时识别帧通过 websocket 的消费者传给前端显示。

import asyncio
import base64
import os
import subprocess
import time
import traceback
from datetime import datetime
from pathlib import Path
from .report_consumer import ReportConsumer
import numpy as np
import cv2
from channels.generic.websocket import AsyncWebsocketConsumer
from channels.layers import get_channel_layer
from app01.services.detection_engine import DetectionEngine, logger
from concurrent.futures import ThreadPoolExecutor
from app01.utils import rename_time_folder  # 导入重命名函数
from app01.stream_status import stream_status, status_lock  # 从新模块导入
import requestsRTMP_URL = "rtmp://127.0.0.1:1935/live/stream"
channel_layer = get_channel_layer()class DetectionStreamConsumer(AsyncWebsocketConsumer):async def connect(self):await self.channel_layer.group_add("detection_group", self.channel_name)await self.accept()print("WebSocket 连接建立成功")async def disconnect(self, code):await self.channel_layer.group_discard("detection_group", self.channel_name)print("WebSocket 连接关闭")async def start_detection(self):print("开始检测帧流...")await self.monitor_stream_status()async def monitor_stream_status(self):while True:if stream_status.is_streaming:logger.info("检测到推流,开始处理...")await self.start_detection_stream()else:logger.info("当前无推流,等待中...")await asyncio.sleep(1)async def start_detection_stream(self, all_dict=None):ffmpeg_pull_command = ["ffmpeg","-c:v", "h264","-i", RTMP_URL,"-f", "rawvideo","-pix_fmt", "bgr24","-s", "640x360","-r", "5","-vcodec", "rawvideo","-an","-flags", "+low_delay","-tune", "zerolatency","-"]try:process = subprocess.Popen(ffmpeg_pull_command,stdout=subprocess.PIPE,stderr=subprocess.DEVNULL,bufsize=10 * 8)except Exception as e:logger.error(f"FFmpeg 子进程启动失败: {e}")returnif process.stdout is None:logger.error("FFmpeg stdout 为空")returnframe_width, frame_height = 640, 360frame_size = frame_width * frame_height * 3executor = ThreadPoolExecutor(max_workers=4)  # 最多可以同时运行 4 个线程engine = DetectionEngine()frame_count = 0skip_interval = 1try:while stream_status.is_streaming:try:loop = asyncio.get_event_loop()raw_frame = await asyncio.wait_for(loop.run_in_executor(executor, process.stdout.read, frame_size),timeout=8  # 超时检测断流)except asyncio.TimeoutError:logger.warning("读取帧超时,触发流结束")with status_lock:stream_status.is_streaming = Falsebreakif len(raw_frame) != frame_size:continuetry:frame = np.frombuffer(raw_frame, dtype=np.uint8).reshape((frame_height, frame_width, 3))except Exception as e:logger.warning(f"帧解析失败: {e}")continueframe_count += 1if frame_count % skip_interval != 0:continueresult = await loop.run_in_executor(executor, engine.process_frame, frame)# 增加空值检查if result is None:logger.warning("检测结果为空,跳过处理")continueprocessed_frame, detected_classes = result# 更新车辆统计(仅按类别)with DetectionEngine.counter_lock:for class_id in detected_classes:if class_id == 0:DetectionEngine.total_count['car'] += 1DetectionEngine.total_count['total'] += 1elif class_id == 1:DetectionEngine.total_count['bus'] += 1DetectionEngine.total_count['total'] += 1elif class_id == 2:DetectionEngine.total_count['truck'] += 1DetectionEngine.total_count['total'] += 1elif class_id == 3:DetectionEngine.total_count['van'] += 1DetectionEngine.total_count['total'] += 1_, jpeg = cv2.imencode('.jpg', processed_frame, [int(cv2.IMWRITE_JPEG_QUALITY), 50])if channel_layer:await channel_layer.group_send("detection_group",{"type": "send_frame", "frame": jpeg.tobytes()})except Exception as e:logger.error(f"检测处理错误: {e}")logger.error(traceback.format_exc())finally:logger.warning("流结束,进入处理...")if process.returncode is None:process.kill()logger.warning("FFmpeg 进程已终止")# 延迟一小段时间,确保文件操作完全释放time.sleep(1)stream_status.is_streaming = Falsestream_status.end_datetime = datetime.now()  # 记录结束时间logger.warning("正在获取三个关键帧...")# 找3个关键帧,必须在重命名前执行folder = Path(f"{stream_status.image_path}")files = [f for f in folder.iterdir() if f.is_file()]if not files:return None, None, None# 按修改时间排序files.sort(key=lambda x: x.stat().st_mtime)first = files[0]last = files[-1]middle = files[len(files) // 2]# 转换为Base64stream_status.image_li = [self.image_to_base64(str(f))  # 传入字符串路径,避免Path对象序列化问题for f in [first, middle, last]]logger.warning("正在重命名文件夹...")# 先复制 time_path,因为后续可能被修改time_path = stream_status.time_pathstart_datetime = stream_status.start_datetimeend_datetime = stream_status.end_datetimesave_path = ""# 调用重命名函数(使用完整 datetime)if time_path and start_datetime:path = rename_time_folder(time_path,start_datetime,  # 传入开始时间end_datetime  # 传入结束时间)save_path = pathlogger.warning(f"文件夹已重命名为 {start_datetime.strftime('%H时%M分%S秒')}-{end_datetime.strftime('%H时%M分%S秒')}")try:logger.warning("正在更新数据...")# 更新识别结果stats = self.correct_overcounted_stats(DetectionEngine.total_count)print("stats:", stats)stream_status.total_stats = statsprint("stream_status.total_stats:", stream_status.total_stats)self.save_vehicle_stats(stats, save_path)except Exception as e:logger.error(f"保存统计信息失败: {e}")print("正在获取AI答复...")self.AI_response()# 定义总数据,并序列化all_dict = {'datetime': stream_status.start_time.strftime("%Y-%m-%d %H:%M:%S.%f"),  # 序列化,转为字符串'image_li': [bs for bs in stream_status.image_li],  # 关键帧转二进制'detect_car': [_ for _ in stream_status.total_stats.values()],'Al_response': stream_status.AI_talk,}print("all_dict:", all_dict)print("正在发送信息...")await ReportConsumer.send_report_data(all_dict)executor.shutdown(wait=False)DetectionEngine.reset_stats()stream_status.reset_all_dict()def save_vehicle_stats(self, stats, save_path=None):# 如果未提供路径,使用默认的 time_pathif not save_path:save_path = stream_status.time_pathif not save_path:logger.warning("无法保存统计信息:路径为空")returnstats_path = os.path.join(save_path, 'vehicle_stats.txt')try:with open(stats_path, 'w', encoding='utf-8-sig') as f:f.write("车辆统计结果\n")f.write("-------------------------\n")for k, v in stats.items():f.write(f"{k}: {v}\n")f.write("-------------------------\n")logger.info(f"统计信息已保存至 {stats_path}")except Exception as e:logger.error(f"写入统计信息失败: {e}")def AI_response(self):"""获取AI回复"""url = "https://spark-api-open.xf-yun.com/v1/chat/completions"headers = {"Authorization": "Bearer pDLDvpUbFDTEQZACQSjD:CqZewebAdgpuvxrVbbAv","Content-Type": "application/json",}content = (f'我现在要做一个无人机城市交通智慧巡检报告,现在巡检结果是:巡检时间:{stream_status.start_time}'f'小汽车有{stream_status.total_stats["car"]}辆,面包车有{stream_status.total_stats["van"]}辆,'f'公交车有{stream_status.total_stats["bus"]}辆,卡车有{stream_status.total_stats["truck"]}辆,'f'识别到的违停车辆共有{stream_status.total_stats["illegally_car"]}辆。'f'帮我生成一段结论,要求不要有废话,也不要写具体那四个类别识别到的车辆数,字数200字,语言精炼严谨')question = {"role": "user","content": content}data = {"max_tokens": 4096,"top_k": 4,"messages": [question],"temperature": 0.5,"model": "4.0Ultra","stream": False,}response = requests.post(url, headers=headers, json=data)response.raise_for_status()result = response.json()# 提取模型输出的内容reply = result.get('choices', [{}])[0].get('message', {}).get('content', '').strip()print(reply)stream_status.AI_talk = reply# 读取图片文件并转换为 Base64@staticmethod  # 静态方法不应接收self参数def image_to_base64(image_path):with open(image_path, "rb") as img_file:base64_str = base64.b64encode(img_file.read()).decode("utf-8")return f"data:image/jpeg;base64,{base64_str}"@staticmethod  # 去掉self第一个参数def correct_overcounted_stats(stats, avg_appearance=6):"""简单修正严重重复统计的结果"""corrected = {}for cls in ['car', 'van', 'bus', 'truck']:if cls in stats:corrected[cls] = max(0, int(round(stats[cls] / avg_appearance)))# 可选:重新计算 totalcorrected['total'] = sum(corrected.values())corrected['illegally_car'] = int(corrected['total'] / 100)print(corrected)return correctedasync def send_frame(self, event):frame_bytes = event["frame"]await self.send(bytes_data=frame_bytes)

stream_status.py:

本文件用于 Django 项目全局状态管理,相当于 vue3 中的 pinia , is_streaming  是前端推本地视频、后端上传 SRS 后,变为 True ,后端因为 is_streaming  = True 而开始拉流,当超过 6 秒未拉到流,判断流结束,is_streaming  = False。还定义了全局状态锁、状态重置方法、时间获取方法、文件夹名获取方法、状态实例。全局状态锁是为了防止多线程修改同一个共享变量,只有with status_lock:with 块内,只有当前线程持有锁时才能运行代码,其他线程必须等待,相当于在此处把锁”锁“起来了。

import threading# 全局状态锁和状态对象
status_lock = threading.Lock()
class StreamStatus:def __init__(self):self.is_streaming = False  # 是否正在推流/检测self.start_time = None      # 推流开始时间(datetime对象)self.end_time = None        # 推流结束时间(datetime对象)self.time_path = None       # 当前时间文件夹路径(如 Media/2025-06-01/12点)self.total_stats = {        # 车辆统计"car": 0,"van": 0,"bus": 0,"truck": 0,"total": 0,"illegally_car": 0,}self.last_frame_time = None # 最后收到有效帧的时间(用于断流检测)self.image_path = Noneself.image_li = []self.AI_talk = Noneself.all_dict = {}@propertydef date_folder(self):"""获取日期文件夹名称(如 2025-06-01)"""if self.start_time:return self.start_time.strftime("%Y-%m-%d")return None@propertydef start_hour(self):"""获取开始小时(如 12)"""if self.start_time:return self.start_time.hourreturn Nonedef reset(self):"""重置状态(用于新任务开始)"""self.is_streaming = Falseself.start_time = Noneself.end_time = Noneself.time_path = Noneself.total_stats = {k: 0 for k in self.total_stats}self.last_frame_time = Nonedef reset_all_dict(self):self.last_frame_time = None  # 最后收到有效帧的时间(用于断流检测)self.image_path = Noneself.image_li = []self.AI_talk = Noneself.all_dict = {}self.all_dict = {}
# 全局唯一的状态实例
stream_status = StreamStatus()

detection_engine.py:

此处是 detection_consumer.py 使用的服务代码块,用于为其提供帧识别服务,consumer 拉到的每一个流都会调用该类中的 process_frame 类方法,并同时进行计数和关键帧保存。

import threading
import time
import cv2
from ultralytics import YOLO
import logging
import os
from app01.stream_status import stream_status, status_lock  # 从新模块导入logger = logging.getLogger(__name__)MODEL_PATH = "F:\\FullStack\\Django\\DJI_yolo\\app01\\car_best.pt"
model = YOLO(MODEL_PATH)# 关键帧存储位置
pictures_dir = "pictures"
if not os.path.exists(pictures_dir):os.makedirs(pictures_dir)class DetectionEngine:frame_counter = 1counter_lock = threading.Lock()  # 线程锁,保证计数器安全# 新增:全局计数器和已统计 IDtotal_count = {'car': 0,'van': 0,'bus': 0,'truck': 0,"total": 0,"illegally_car": 0}# 下次任务重置@classmethoddef reset_stats(cls):with cls.counter_lock:for k in cls.total_count:cls.total_count[k] = 0# 下次任务重置@classmethoddef update_stats(cls, vehicle_type):with cls.counter_lock:if vehicle_type in cls.total_count:cls.total_count[vehicle_type] += 1cls.total_count["total"] += 1def __init__(self):self.model = model# 调整检测参数,减少计算量self.detect_params = {'conf': 0.3,       # 提高置信度阈值,减少无效检测'iou': 0.5,        # 调整 NMS 阈值'imgsz': [384, 640],  # 输入尺寸,降低分辨率'classes': [0, 1, 2, 3],  # 仅检测车辆类别(2: car, 5: bus, 7: truck)'device': 0,  # 使用 GPU'half': True,  # 使用 FP16 精度'show': False  # 关闭实时显示,减少开销}def process_frame(self, frame):"""处理单帧图像:YOLO推理 + 保存结果到关键帧图片文件夹"""results = model(frame)detected_objects = []annotated_frame = frame.copy()  # 复制原始帧以防止修改原图# 防御性检查:确保 results 非空且包含有效数据if not results or len(results) == 0:logger.warning("未获取到检测结果,跳过处理")return frame, []boxes = results[0].boxesif boxes is None or len(boxes) == 0:logger.warning("检测结果为空,跳过处理")return frame, []try:# 假设类别信息存储在 cls 属性中classes = boxes.cls.int().cpu().tolist() if hasattr(boxes, 'cls') and boxes.cls is not None else []for class_id in classes:detected_objects.append(class_id)annotated_frame = results[0].plot()  # 绘制检测框except Exception as e:logger.error(f"解析检测数据失败: {e}")return annotated_frame, []with status_lock:car_count = sum(1 for cls in detected_objects if cls == 0)if car_count > 18:try:timestamp = int(time.time())save_path = os.path.join(stream_status.image_path, f"keyframe_{timestamp}.jpg")cv2.imwrite(save_path, annotated_frame)logger.info(f"关键帧保存成功: {save_path}")except Exception as e:logger.error(f"保存关键帧失败: {e}")return annotated_frame, detected_objects

repoet_consumer.py:

用于将前面 detection_consumer.py 中的字典数据发送给前端,首先会将数据缓存到后端,等到前端消费者建立成功,立马发送出去。

import asynciofrom channels.generic.websocket import AsyncWebsocketConsumer
from channels.layers import get_channel_layer
import json# 缓存队列:用于保存尚未发送的数据
queue = []
# 记录是否有消费者连接
has_consumer = Falseclass ReportConsumer(AsyncWebsocketConsumer):async def connect(self):global has_consumer# 加入指定组await self.channel_layer.group_add("report_group", self.channel_name)await self.accept()print("巡检报告WebSocket 已连接")# 设置有消费者连接的标志has_consumer = True# 尝试发送缓存数据await flush_queue(self.channel_layer)async def disconnect(self, close_code):global has_consumer# 移除组成员await self.channel_layer.group_discard("report_group", self.channel_name)print("巡检报告WebSocket 断开连接")# 重置消费者连接标志has_consumer = Falseasync def receive(self, text_data=None, bytes_data=None):passasync def send_report(self, event):"""处理 send_report 类型的消息并发送给客户端"""data = event['data']print("【发送到客户端】", data)# 转换为JSON字符串发送try:# 处理可能包含的非JSON可序列化对象# 这里假设 image_li 包含 Path 对象,需要转换为字符串if 'image_li' in data:data['image_li'] = [str(path) for path in data['image_li']]await self.send(text_data=json.dumps(data))except Exception as e:print(f"【发送到客户端失败】{e}")@classmethodasync def send_report_data(cls, data):"""供其他模块调用的方法"""global queue, has_consumer# 不再检查是否有人连接,直接发送或缓存,生产模式用Redischannel_layer = get_channel_layer()# 如果没有消费者连接,将数据加入队列if not has_consumer:queue.append(data)print(f"【数据已缓存】等待消费者连接: {data}")else:# 有消费者连接,直接发送await send_report(channel_layer, data)# ========== 全局函数 ==========
# 定期检查并发送缓存数据的任务
async def queue_check_task():channel_layer = get_channel_layer()while True:await asyncio.sleep(1)  # 每秒检查一次if queue and has_consumer:await flush_queue(channel_layer)
# 在应用启动时启动队列检查任务
async def start_queue_check():asyncio.create_task(queue_check_task())
async def send_report(channel_layer, data):try:await channel_layer.group_send("report_group",{"type": "send_report","data": data,},)print("【发送成功】", data)except Exception as e:print(f"【发送失败】{e}")async def flush_queue(channel_layer):global queueif not queue:returnitems = list(queue)queue.clear()for data in items:await send_report(channel_layer, data)

routings.py:

用于定义 websocket 的后端接收路径,并连接消费者

from django.urls import re_path
from app01.consumers import detection_consumer, report_consumerwebsocket_urlpatterns = [re_path(r'^ws/detection/$', detection_consumer.DetectionStreamConsumer.as_asgi()),re_path(r'^ws/report/$', report_consumer.ReportConsumer.as_asgi()),
]

urls.py:

用于接收前端发送的 http 请求,并与 view.py 中的视图函数建立联系

"""
URL configuration for DJI_yolo project.The `urlpatterns` list routes URLs to views. For more information please see:https://docs.djangoproject.com/en/5.1/topics/http/urls/
Examples:
Function views1. Add an import:  from my_app import views2. Add a URL to urlpatterns:  path('', views.home, name='home')
Class-based views1. Add an import:  from other_app.views import Home2. Add a URL to urlpatterns:  path('', Home.as_view(), name='home')
Including another URLconf1. Import the include() function: from django.urls import include, path2. Add a URL to urlpatqterns:  path('blog/', include('blog.urls'))
"""
from django.conf.urls.static import static
from django.contrib import admin
from django.urls import pathfrom DJI_yolo import settings
from app01 import views
urlpatterns = [path("admin/", admin.site.urls),path('csrf/', views.get_csrf_token, name='get_csrf_token'),# 推流测试path('push_stream/', views.push_stream, name='push_stream'),# 媒体库管理path('api4/date-folders/', views.get_date_folders, name='date-folders'),path('api4/time-folders/<str:date>/', views.get_time_folders, name='time-folders'),path('api4/files/<str:date>/<str:time_range>/<str:category>/count/', views.get_file_count, name='file-count'),path('api4/files/<str:date>/<str:time_range>/<str:category>/', views.get_files, name='files'),path('api4/delete-file/<str:date>/<str:time_range>/<str:category>/<str:filename>/', views.delete_file,name='delete-file'),path('delete-file/<str:date>/<str:time_range>/<str:category>/<str:filename>', views.delete_file,name='delete-file'),  # 添加一个用于删除文件的 URL 路径] + static(settings.MEDIA_URL, document_root=settings.MEDIA_ROOT)

views.py:

用于处理前端请求,这里报告推流、媒体管理系统、原始视频存储逻辑等等。

# views.py
from datetime import datetime
from pathlib import Path
import urllib.parse
from django.middleware.csrf import get_token
import os
import subprocess
import threading
from django.http import JsonResponse
from django.conf import settings
import asyncio
from .consumers.detection_consumer import DetectionStreamConsumer
from .utils import create_date_folder, create_time_folder
from .stream_status import stream_statusMEDIA_ROOT = Path(settings.MEDIA_ROOT)def get_csrf_token(request):token = get_token(request)print(token)return JsonResponse({'csrfToken': token})def push_stream(request):print("前端返回本地视频,准备推流")video_file = request.FILES.get('video')if not video_file:return JsonResponse({'error': '未上传视频文件'}, status=400)# ------------------- 创建精确时间文件夹 -------------------start_datetime = datetime.now()  # 记录精确到秒的开始时间date_str = start_datetime.strftime('%Y-%m-%d')date_path = create_date_folder(date_str)stream_status.start_time = datetime.now()# 创建带时分秒的文件夹(如 15:30:45)time_path, time_folder = create_time_folder(date_path, start_datetime)# 创建三类子文件夹original_path = os.path.join(time_path, '原视频')recognized_path = os.path.join(time_path, '识别视频')image_path = os.path.join(time_path, '关键帧图片')stream_status.image_path = image_path  # 保存到全局状态for folder in [original_path, recognized_path, image_path]:os.makedirs(folder, exist_ok=True)# ------------------- 保存上传视频 -------------------original_video_name = f"original_{start_datetime.strftime('%H%M%S')}.mp4"original_video_path = os.path.join(original_path, original_video_name)with open(original_video_path, 'wb') as f:for chunk in video_file.chunks():f.write(chunk)# ------------------- 推流配置 -------------------push_url = "rtmp://127.0.0.1:1935/live/stream"recognized_video_name = f"recognized_{start_datetime.strftime('%H%M%S')}.mp4"recognized_video_path = os.path.join(recognized_path, recognized_video_name)command = ["ffmpeg","-re","-i", original_video_path,"-c:v", "libx264","-preset", "ultrafast","-tune", "zerolatency","-g", "50","-r", "30","-an","-f", "flv",push_url,"-f", "mp4",recognized_video_path]try:process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)# 保存完整时间状态stream_status.is_streaming = Truestream_status.start_datetime = start_datetime  # 保存完整 datetimestream_status.time_path = time_pathstream_status.recognized_path = recognized_pathstream_status.image_path = image_pathdetection_thread = threading.Thread(target=run_detection_in_background,daemon=True)detection_thread.start()return JsonResponse({'status': 'success','date_folder': date_str,'time_folder': time_folder  # 格式如 15:30:45})except Exception as e:return JsonResponse({'error': str(e)}, status=500)def run_detection_in_background():try:loop = asyncio.new_event_loop()asyncio.set_event_loop(loop)loop.run_until_complete(DetectionStreamConsumer().start_detection())except Exception as e:print(f"后台检测线程异常: {e}")def get_date_folders(request):page = int(request.GET.get('page', 1))page_size = int(request.GET.get('page_size', 10))MEDIA_ROOT_PATH = Path(r'F:\FullStack\Django\DJI_yolo\media')date_folders = []for name in os.listdir(MEDIA_ROOT_PATH):folder_path = MEDIA_ROOT_PATH / nameif folder_path.is_dir() and len(name.split('-')) == 3:date_folders.append({'date': name})date_folders.sort(key=lambda x: x['date'], reverse=True)start = (page - 1) * page_sizeend = start + page_sizereturn JsonResponse({'data': date_folders[start:end],'total': len(date_folders)})def get_time_folders(request, date):date_path = Path(r'F:\FullStack\Django\DJI_yolo\media') / dateif not date_path.is_dir():return JsonResponse({'error': '日期文件夹不存在'}, status=404)page = int(request.GET.get('page', 1))page_size = int(request.GET.get('page_size', 10))time_folders = []for name in os.listdir(date_path):time_path = date_path / nameif time_path.is_dir():original_count = len(os.listdir(time_path / '原视频')) if (time_path / '原视频').is_dir() else 0recognized_count = len(os.listdir(time_path / '识别视频')) if (time_path / '识别视频').is_dir() else 0image_count = len(os.listdir(time_path / '关键帧图片')) if (time_path / '关键帧图片').is_dir() else 0time_folders.append({'time_range': name,  # 直接使用时间区间名称'original_count': original_count,'recognized_count': recognized_count,'image_count': image_count})time_folders.sort(key=lambda x: x['time_range'], reverse=True)start = (page - 1) * page_sizeend = start + page_sizereturn JsonResponse({'data': time_folders[start:end],'total': len(time_folders)})def get_file_count(request, date, time_range, category):category_path = Path(r'F:\FullStack\Django\DJI_yolo\media') / date / time_range / categoryif not category_path.is_dir():return JsonResponse({'count': 0})count = len(os.listdir(category_path))return JsonResponse({'count': count})def get_files(request, date, time_range, category):try:decoded_time_range = urllib.parse.unquote(time_range)category_path = Path(r'F:\FullStack\Django\DJI_yolo\media') / date / decoded_time_range / categoryif not category_path.is_dir():return JsonResponse({'error': '文件类别不存在'}, status=404)page = int(request.GET.get('page', 1))page_size = int(request.GET.get('page_size', 10))files = []for filename in os.listdir(category_path):file_path = category_path / filenameif file_path.is_file():encoded_time_range = urllib.parse.quote(decoded_time_range)files.append({'name': filename,'url': f'http://{request.get_host()}/media/{date}/{encoded_time_range}/{category}/{filename}'})start = (page - 1) * page_sizeend = start + page_sizereturn JsonResponse({'data': files[start:end],'total': len(files),'status': 'success'})except Exception as e:print(f"文件列表错误: {str(e)}")return JsonResponse({'error': '服务器内部错误','detail': str(e)}, status=500)def delete_file(request, date, time_range, category, filename):try:decoded_time_range = urllib.parse.unquote(time_range)file_path = Path(r'F:\FullStack\Django\DJI_yolo\media') / date / decoded_time_range / category / filenameif not file_path.exists():return JsonResponse({'error': '文件不存在'}, status=404)os.remove(file_path)return JsonResponse({'status': 'success'})except Exception as e:return JsonResponse({'error': str(e)}, status=500)

感谢您的观看!

资源如下:

https://download.csdn.net/download/2403_83182682/90961392?spm=1001.2014.3001.5501https://download.csdn.net/download/2403_83182682/90961392?spm=1001.2014.3001.5501

相关文章:

  • 曲面的存在性定理
  • ServerTrust 并非唯一
  • Spring AI中使用ChatMemory实现会话记忆功能
  • Java【基础篇0】
  • 【时序预测】-Transformer系列
  • 【差分】详解二维前缀和和差分问题
  • F(x, y, z) = 0 隐函数微分 确定自变量
  • 【异常】极端事件的概率衰减方式(指数幂律衰减)
  • 【CUDA 】第5章 共享内存和常量内存——5.3减少全局内存访问(2)展开+动态共享内存
  • AI智能体|扣子(Coze)搭建【公众号对标文章采集拆解】工作流
  • 【量化】策略交易类型
  • 互联网协议IPv6
  • 解决Vscode JDK插件源码缺失问题
  • Opnelayers:封装Popup
  • HNSW - 分层可导航小世界
  • 使用idea开发工具创建javaweb项目工程
  • 《最短路(Bellman-ford)》题集
  • 振动力学:无阻尼多自由度系统(受迫振动)
  • agent基础概念
  • 在数字工厂实施过程中,如何学会通过梳理流程的思想来分析解决问题
  • ciid室内设计协会/windows7优化大师官方下载
  • 西安微商城网站建设/优化王
  • 盐湖网站制作/高端网站优化公司
  • 湖北潜江疫情最新消息/处理器优化软件
  • 做网站 备案/竞价排名点击器
  • 郑州优之客网站建设/企业网站的推广形式有