当前位置: 首页 > news >正文

Python利用ffmpeg实现rtmp视频拉流和推流

总体实现

  • 根据rtmp_url即视频流地址,上传每一帧至rtmp_url_label生成带识别框的新视频流
  • 同时从识别到目标帧开始,会生成15s带识别框的视频并带3张目标帧图片
  • 同时会将生成的视频、图片以及json文件上传到MinIO,并将识别后的相关结果集存放至mq

解决的Bug和小问题

  • 先初始化mq需要的channel,避免出现 Stream connection lost: IndexError(‘pop from an empty deque’)
  • 使用了codec_list从而拿到正确的video_writer,避免出现 [ERROR:0@70.755] global cap_ffmpeg_impl.hpp:3207 open Could not find encoder for codec_id=27, error: Encoder not found
  • 删除本地临时文件前video_writer及时释放资源,避免出现 PermissionError: [WinError 32] 另一个程序正在使用此文件,进程无法访问
  • 使用了ffmpeg将视频转码为h264,避免出现 视频播放器能播放视频,前端无法播放
import json
import logging
import os
import subprocess
import time
import traceback
import uuid
from threading import Threadimport cv2
import numpy as np
import pika
from PIL import Image, ImageDraw, ImageFontfrom detect.entity.ResponseResult import ResponseResult
from detect.entity.vo.analysis_result import AnalysisResult
from detect.mq import mq_connect
from detect.utils.file_utils import load_config
from detect.utils.minio_utils import upload_fileconfig = load_config()
yolo_conf = config['yolo']
CONF_THRESHOLD = yolo_conf['conf_threshold']
MINIO_CONFIG = config['minio_config']
FONT_PATH = config['font_path']
ffmpeg = config['ffmpeg']
JSON_RESULT_BASE = config['json_result_base']
animals_queues = config['mq_config']['animals_queues']
cp_queues = config['mq_config']['cp_queues']
excavator_queues = config['mq_config']['excavator_queues']
fire_queues = config['mq_config']['fire_queues']
pine_queues = config['mq_config']['pine_queues']logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)class VideoStreamManager:def __init__(self):self.url_set = set()self.tasks = {}# 创建处理任务def create_video_stream_task(self, rtmp_url):if rtmp_url in self.url_set:return Falseself.url_set.add(rtmp_url)cap = cv2.VideoCapture(rtmp_url, cv2.CAP_FFMPEG)self.tasks[rtmp_url] = {'cap': cap,'thread': None,  # 防止重复创建线程}return Truedef draw_image_label(self, source, dest, boxes, confidences, chinese_names):img = source  # 读取图片image_pil = Image.fromarray(cv2.cvtColor(img, cv2.COLOR_BGR2RGB))  # 转PILdraw = ImageDraw.Draw(image_pil)font = ImageFont.truetype(FONT_PATH, 25)for i in range(len(boxes)):box = boxes[i]confidence = confidences[i]chinese_name = chinese_names[i]left_top = (box[0], box[1])right_bottom = (box[2], box[3])draw.rectangle([left_top, right_bottom], fill=None, outline=(0, 255, 0), width=2)  # 矩形框text = '{} {:.2f}'.format(chinese_name, confidence)text_bbox = draw.textbbox((0, 0), text, font=font)text_width = text_bbox[2] - text_bbox[0]  # 文字宽度rect_width = right_bottom[0] - left_top[0]  # 矩形宽度text_x = left_top[0] + (max(rect_width - text_width, 0)) // 2  # 文本位置draw.text((text_x, max(box[1] - font.size, 0)), text, font=font, fill='red')  # 物种名称 +置信度image_cv = cv2.cvtColor(np.array(image_pil), cv2.COLOR_RGB2BGR)cv2.imwrite(dest, image_cv)  # 生成图片# 独立线程处理模型识别任务def detect_frame(self, rtmp_url, frame, detector, conf_threshold, frame_count, frame_interval, detect_target_dict,video_writer, video_stream_img_path):# 跳帧,不做识别,返回原始帧if frame_count % frame_interval != 0:cv2.imwrite(video_stream_img_path, frame)if detect_target_dict['detected'] and detect_target_dict['frame_count'] < detect_target_dict['frame_max']:detect_target_dict['frame_count'] += 1video_writer.write(frame)returnboxes, confidences, class_ids, class_names = detector.predict(frame, conf_threshold)try:chinese_names = []for name in class_names:chinese_name = detector.class_name_mapping.get(name, name)chinese_names.append(chinese_name)# 检测到目标if boxes:# 视频流图片画框self.draw_image_label(frame, video_stream_img_path, boxes, confidences, chinese_names)def save_image_and_video_frame():# 保存图片if detect_target_dict['target_frame_count'] < 3:frame_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))frame_name = f"{str(uuid.uuid4())}.jpg"frame_path = os.path.join(frame_dir, frame_name)cv2.imwrite(frame_path, frame)minio_path = upload_file(MINIO_CONFIG['bucket_name'], frame_name, frame_path, detect_target_dict['tid'])result_data = {"box": [[str(x) for x in box] for box in boxes],"conf": confidences,"name": chinese_names,"total": len(boxes)}res = AnalysisResult(comment="图片分析成功" if boxes else "未检测到目标",endTime=int(time.time() * 1000),status=200 if boxes else 204,resultData=result_data,dest=minio_path,tid=detect_target_dict['tid'],frame_index=[detect_target_dict['frame_count']])detect_target_dict['results'].append(res.to_dict())detect_target_dict['target_frame_indexes'].append(detect_target_dict['frame_count'])# 存储视频帧image_pil = Image.fromarray(cv2.cvtColor(frame, cv2.COLOR_BGR2RGB))  # 转PILdraw = ImageDraw.Draw(image_pil)font = ImageFont.truetype(FONT_PATH, 25)for i in range(len(boxes)):box = boxes[i]confidence = confidences[i]cn = chinese_names[i]left_top = (box[0], box[1])right_bottom = (box[2], box[3])draw.rectangle([left_top, right_bottom], fill=None, outline=(0, 255, 0), width=2)  # 矩形框text = '{} {:.2f}'.format(cn, confidence)text_bbox = draw.textbbox((0, 0), text, font=font)text_width = text_bbox[2] - text_bbox[0]  # 文字宽度rect_width = right_bottom[0] - left_top[0]  # 矩形宽度text_x = left_top[0] + (max(rect_width - text_width, 0)) // 2  # 文本位置draw.text((text_x, max(box[1] - font.size, 0)), text, font=font, fill='red')  # 物种名称 +置信度image_cv = cv2.cvtColor(np.array(image_pil), cv2.COLOR_RGB2BGR)video_writer.write(image_cv)  # 写入视频帧if not detect_target_dict['detected']:save_image_and_video_frame()detect_target_dict['detected'] = Truedetect_target_dict['frame_count'] += 1detect_target_dict['target_frame_count'] += 1else:if detect_target_dict['frame_count'] < detect_target_dict['frame_max']:save_image_and_video_frame()detect_target_dict['frame_count'] += 1detect_target_dict['target_frame_count'] += 1else:detect_target_dict['detected'] = Falsedetect_target_dict['frame_count'] = 0else:# 视频流存储原始帧cv2.imwrite(video_stream_img_path, frame)if detect_target_dict['detected']:if detect_target_dict['frame_count'] < detect_target_dict['frame_max']:detect_target_dict['frame_count'] += 1else:detect_target_dict['detected'] = Falsedetect_target_dict['frame_count'] = 0except Exception as e:logger.error(f"处理图像失败: {str(e)}")logger.error(f"错误堆栈: {traceback.format_exc()}")# 开始处理任务def start_video_stream_task(self, rtmp_url, rtmp_url_label, detector, task_manager, conf_threshold=CONF_THRESHOLD,model=-1, taskId="", device="", deviceNum=""):print(f'开始处理来自 {rtmp_url} 的视频流,置信度 {conf_threshold} ,模型 {model}')if model == 0:channel = mq_connect.animals_video_stream_channelchannel.queue_declare(queue=animals_queues[1], durable=True)elif model == 1:channel = mq_connect.fire_video_stream_channelchannel.queue_declare(queue=fire_queues[1], durable=True)elif model == 2:channel = mq_connect.pine_video_stream_channelchannel.queue_declare(queue=pine_queues[1], durable=True)elif model == 3:channel = mq_connect.cp_video_stream_channelchannel.queue_declare(queue=cp_queues[1], durable=True)else:  # 4channel = mq_connect.excavator_video_stream_channelchannel.queue_declare(queue=excavator_queues[1], durable=True)task = self.tasks[rtmp_url]cap = task['cap']size = (int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)), int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)))width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))fps = int(cap.get(cv2.CAP_PROP_FPS))codec_list = ['mp4v', 'avc1', 'X264', 'XVID', 'MJPG']video_writer = Nonelabel_file_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))label_video_path = os.path.join(label_file_dir, f"video_{uuid.uuid4()}.mp4")  # 15s视频for codec in codec_list:fourcc = cv2.VideoWriter.fourcc(*codec)vw = cv2.VideoWriter(label_video_path, fourcc, fps, (width, height))if vw.isOpened():video_writer = vwbreakif not video_writer.isOpened():logger.error(f"无法创建输出视频文件: {label_video_path}")error_frame = AnalysisResult(comment=f"无法创建输出视频文件",createTime=int(time.time() * 1000),status=500,tid=uuid.uuid4())res = AnalysisResult(comment=f"无法创建输出视频文件",createTime=int(time.time() * 1000),endTime=int(time.time() * 1000),status=500,tid=uuid.uuid4(),resultData={},frame_index=[])res = [error_frame.to_dict()] * 3 + [res.to_dict()]json_filename = f"result_{str(uuid.uuid4())}_batch0.json"json_path = os.path.join(JSON_RESULT_BASE, json_filename)os.makedirs(os.path.dirname(json_path), exist_ok=True)tid = str(uuid.uuid4())json_content = {"device": device,"task_id": taskId,"tid": tid,"results": [res],'model': model,'device_num': deviceNum}with open(json_path, 'w', encoding='utf-8') as f:json.dump(json_content, f, ensure_ascii=False, indent=2)upload_file(MINIO_CONFIG['bucket_name'], json_filename, json_path, tid)# 写入消息到mqself.write_mq_msg(channel, model, json_content)returncommand = [ffmpeg,'-hide_banner','-loglevel', 'warning','-y','-re','-f', 'rawvideo','-vcodec', 'rawvideo','-pix_fmt', 'bgr24','-s', str(size[0]) + 'x' + str(size[1]),'-r', str(fps),'-i', '-','-c:v', 'libx264','-pix_fmt', 'yuv420p','-preset', 'fast','-f', 'flv',rtmp_url_label]pipe = subprocess.Popen(command, stdin=subprocess.PIPE)try:frame_count = 0  # 当前处理到第几帧frame_interval = 12  # 每隔多少帧处理一次detect_target_dict = {"tid": str(uuid.uuid4()),"detected": False,  # 是否检测到目标"frame_count": 0,  # 从检测到目标后开始计算"target_frame_count": 0,  # 含有目标帧的图片数量"frame_max": 15 * fps,  # 检测到目标后录制的视频长度 15s'results': [],  # 从探测到目标后的结果集"target_frame_indexes": []  # 从探测到目标后的视频帧坐标}# 视频转码def convert_video(input_file, output_file):cmd = [ffmpeg,'-hide_banner','-loglevel', 'warning','-i', input_file,'-vcodec', 'libx264','-preset', 'fast','-crf', '23','-acodec', 'aac',output_file]subprocess.run(cmd)while True:ret, frame = cap.read()if not ret:breakthread = self.tasks[rtmp_url]['thread']if not thread:video_stream_img_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), f"{uuid.uuid4()}.jpg")thread = Thread(target=self.detect_frame,args=(rtmp_url, frame, detector, conf_threshold, frame_count, frame_interval,detect_target_dict, video_writer, video_stream_img_path))thread.start()thread.join()self.tasks[rtmp_url]['thread'] = threadif os.path.exists(video_stream_img_path):img = cv2.imread(video_stream_img_path)pipe.stdin.write(img.tobytes())os.remove(video_stream_img_path)if detect_target_dict['detected']:# 当前视频处理完成if detect_target_dict['frame_count'] == detect_target_dict['frame_max']:video_writer.release()h264_video_file_path = os.path.join(label_file_dir, f"{uuid.uuid4()}.mp4")logger.info("开始视频转码")convert_video(label_video_path, h264_video_file_path)  # 视频转码logger.info("视频转码完成")new_filename = f"{uuid.uuid4()}.mp4"minio_video_path = upload_file(MINIO_CONFIG['bucket_name'], new_filename,h264_video_file_path, detect_target_dict['tid'])if os.path.exists(label_video_path):os.remove(label_video_path)video_end_time = int(time.time() * 1000)video_result = AnalysisResult(comment="视频分析成功",endTime=video_end_time,dest=minio_video_path,status=200,tid=detect_target_dict['tid'],resultData={},frame_index=detect_target_dict['target_frame_indexes'])detect_target_dict['results'].append(video_result.to_dict())json_filename = f"result_{detect_target_dict['tid']}_batch0.json"json_path = os.path.join(JSON_RESULT_BASE, json_filename)os.makedirs(os.path.dirname(json_path), exist_ok=True)json_content = {"device": device,"task_id": taskId,"tid": detect_target_dict['tid'],"results": [detect_target_dict['results']],'model': model,'device_num': deviceNum}with open(json_path, 'w', encoding='utf-8') as f:json.dump(json_content, f, ensure_ascii=False, indent=2)# 写入消息到mqself.write_mq_msg(channel, model, json_content)upload_file(MINIO_CONFIG['bucket_name'], json_filename, json_path, detect_target_dict['tid'])print(f"识别出一个视频任务, tid={detect_target_dict['tid']}")# 更新数据label_video_path = os.path.join(label_file_dir, f"video_{uuid.uuid4()}.mp4")  # 15s视频for codec in codec_list:fourcc = cv2.VideoWriter.fourcc(*codec)vw = cv2.VideoWriter(label_video_path, fourcc, fps, (width, height))if vw.isOpened():video_writer = vwbreakdetect_target_dict = {"tid": str(uuid.uuid4()),"detected": False,  # 是否检测到目标"frame_count": 0,  # 从检测到目标后开始计算"target_frame_count": 0,  # 含有目标帧的图片数量"frame_max": 15 * fps,  # 检测到目标后录制的视频长度 15s'results': [],  # 从探测到目标后的结果集"target_frame_indexes": []  # 从探测到目标后的视频帧坐标}else:if not thread.is_alive():self.tasks[rtmp_url]['thread'] = Noneframe_count += 1if rtmp_url and rtmp_url in self.url_set:self.url_set.remove(rtmp_url)self.tasks[rtmp_url] = {'cap': None,'thread': None,'results': []}except Exception as e:logger.error(f"处理视频流失败: {str(e)}")logger.error(f"错误堆栈: {traceback.format_exc()}")raise efinally:cap.release()pipe.stdin.close()pipe.wait()video_writer.release()if os.path.exists(label_video_path):os.remove(label_video_path)# 写入结果到mqdef write_mq_msg(self, channel, model, msg):res = ResponseResult.success(data=[msg])json_data = json.dumps(res, ensure_ascii=False)  # 先将对象转为字符串dict_data = json.loads(json_data)  # 字符串转dictjson_data = json.dumps(dict_data, ensure_ascii=False)  # dict转json字符串if model == 0:channel.basic_publish(exchange='', routing_key=animals_queues[1], body=json_data,properties=pika.BasicProperties(delivery_mode=2))elif model == 1:channel.basic_publish(exchange='', routing_key=fire_queues[1], body=json_data,properties=pika.BasicProperties(delivery_mode=2))elif model == 2:channel.basic_publish(exchange='', routing_key=pine_queues[1], body=json_data,properties=pika.BasicProperties(delivery_mode=2))elif model == 3:channel.basic_publish(exchange='', routing_key=cp_queues[1], body=json_data,properties=pika.BasicProperties(delivery_mode=2))elif model == 4:channel.basic_publish(exchange='', routing_key=excavator_queues[1], body=json_data,properties=pika.BasicProperties(delivery_mode=2))# 停止处理任务def stop_video_stream_task(self, rtmp_url, model=-1):if rtmp_url not in self.url_set:return f"{rtmp_url}视频流不存在或已处理"task = self.tasks[rtmp_url]# print(task)if task:if task['thread'] and task['thread'].is_alive():task['thread'].join()cap = task['cap']if cap:cap.release()if rtmp_url and rtmp_url in self.url_set:self.url_set.remove(rtmp_url)self.tasks[rtmp_url] = {'cap': None,'thread': None,'results': []}print(f'停止处理来自 {rtmp_url} 的视频流')return ""

代码还有很多值得优化的地方。。

http://www.dtcms.com/a/427369.html

相关文章:

  • 佛山电商网站建设软件开发流程流程图
  • 嵌入式软件开发工程师待遇seo管理员
  • cuda编程笔记(25)-- 如何像函数对象一样使用核函数
  • K230基础-摄像头基本原理
  • 数学笔记①
  • 企业为什么要网站建设seo推广哪家服务好
  • 详细解说基于mysql分布式锁的三种实现方式
  • 外贸网站设计注意事项网站繁体和中文这么做
  • AdGuard解锁订阅版高级版 安卓广告拦截器APP v4.11.63 / 4.13.7 Nightly MOD
  • 网站建设免费书江宁网站制作
  • claude code + claude code router 接入魔搭、openrouter等
  • 图观 流渲染场景服务器
  • Android Studio 代码混淆核心解释
  • 雨晨WIN11PE网络版VIP资源国庆限时开放
  • 网站改版Excel怎么做泰安抖音seo
  • Websocket+Redis实现微服务消息实时同步
  • 仪器仪表第四节课学习笔记
  • Java 黑马程序员学习笔记(进阶篇15)
  • 【开题答辩过程】以《基于SpringBoot+Vue+uni-app的智慧校园服务系统的设计与实现》为例,不会开题答辩的可以进来看看
  • 做二手电脑的网站宣城网站建设 有限公司
  • 没有服务器 怎么做网站建设企业高端网站
  • 极简时钟APP(手机全能计时工具) 极简版
  • 华为光模块命名规则
  • 做企业网站用什么cms好易语言怎么做网页网站
  • 域名做网站北京 网站 公司
  • 深入浅出 Redis:从核心原理到运维实战指南一
  • 自定义含工具包`Ubuntu22.04.5.iso`镜像
  • Day 29 - 密码管理器开发 - Python学习笔记
  • Docker镜像结构全解析
  • ubuntu 22.04安装CUDA 13.0