当前位置: 首页 > news >正文

Mediamtx+Python读取webrtc流

一、功能思路:

1、我采用ffmpeg -re -stream_loop -1 -i xcc.mp4 -c:v libx264 -profile:v baseline -x264opts "bframes=0:repeat_headers=1" -b:v 1500k -preset fast -f flv rtmp://127.0.0.1:1835/stream/111推流到mediamtx的rtmp上
2、通过mediamtx自带的转流,我将可以直接把推过来的流以webrtc方式访问http://127.0.0.1:8889/stream/111
3、由于我确认http://127.0.0.1:8889/stream/111能够播放出视频,所以我现在想采用python的方式读取http://127.0.0.1:8889/stream/111/whep的流,来实现保存流的每帧图像

涉及到的工具有:

  • 开源视频流服务器—>mediamtx
  • 本地推流rtmp工具—>ffmpeg+lal
  • 编写代码读取webrtc流工具—>PyCharm
    在这里插入图片描述

二、代码编写实现:

import asyncio
import json
from aiortc import RTCPeerConnection, RTCSessionDescription, VideoStreamTrack
import aiohttp
import cv2
import numpy as np

class DummyVideoTrack(VideoStreamTrack):
    """创建一个虚拟的视频轨道,以防止 WebRTC 报错"""
    def __init__(self):
        super().__init__()

    async def recv(self):
        # 生成一个黑色帧,防止 WebRTC 报错
        width, height = 640, 480
        frame = np.zeros((height, width, 3), dtype=np.uint8)
        return frame

class VideoReceiver:
    def __init__(self):
        self.frame = None

    async def consume_track(self, track):
        while True:
            try:
                frame = await track.recv()
                img = frame.to_ndarray(format="bgr24")
                cv2.imshow("Video", img)
                cv2.waitKey(1)
            except Exception as e:
                print("处理帧错误:", e)
                break

async def send_ice_candidate(candidate, ice_url):
    async with aiohttp.ClientSession() as session:
        async with session.post(
            ice_url,
            data=json.dumps({"candidate": candidate}),
            headers={"Content-Type": "application/json"}
        ) as response:
            if response.status != 200:
                print(f"发送 ICE candidate 错误: {response.status}")

async def run():
    pc = RTCPeerConnection()
    receiver = VideoReceiver()

    # 添加一个虚拟的视频轨道,防止 WebRTC 报错
    pc.addTrack(DummyVideoTrack())


    # 处理接收到的媒体轨道
    def on_track(track):
        print(f"接收到 {track.kind} 轨道")
        if track.kind == "video":
            asyncio.create_task(receiver.consume_track(track))

    pc.add_listener("track", on_track)

    # 监听 ICE 连接状态变化
    def on_ice_connection_state_change():
        print(f"ICE 连接状态: {pc.iceConnectionState}")

    pc.on("iceconnectionstatechange", on_ice_connection_state_change)

    # 监听 ICE Candidate
    def on_ice_candidate(candidate):
        if candidate:
            print(f"新 ICE candidate: {candidate}")
            asyncio.create_task(send_ice_candidate(candidate, ice_url))

    pc.on("icecandidate", on_ice_candidate)

    # 创建 SDP Offer
    offer = await pc.createOffer()
    await pc.setLocalDescription(offer)

    # 发送 Offer 到 WHEP 端点
    whep_url = "http://127.0.0.1:8889/stream/111/whep"
    headers = {"Content-Type": "application/sdp"}

    async with aiohttp.ClientSession() as session:
        async with session.post(
                whep_url,
                data=pc.localDescription.sdp,
                headers=headers
        ) as response:
            if response.status != 201:
                raise Exception(f"服务器返回错误: {response.status}")

            answer_sdp = await response.text()
            answer = RTCSessionDescription(sdp=answer_sdp, type="answer")
            await pc.setRemoteDescription(answer)

            if "Location" in response.headers:
                ice_url = "http://127.0.0.1:8889" + response.headers["Location"]
                print("ICE 协商 URL:", ice_url)

        try:
            while True:
                await asyncio.sleep(1)
        except KeyboardInterrupt:
            pass
        finally:
            await pc.close()

if __name__ == "__main__":
    asyncio.run(run())

三、效果展示:

  • 浏览器可以直接播放webrtc的扩展地址,然后最上层就是代码cv2读取每帧显示出来的画面
    在这里插入图片描述

相关文章:

  • 【Elasticsearch】分词器概述
  • 嵌入式LINUX驱动开发入门之hello驱动(基于IMX6ULL-MINI开发板)
  • 算法-计算字符的最短距离
  • 计算机毕业设计PySpark+hive招聘推荐系统 职位用户画像推荐系统 招聘数据分析 招聘爬虫 数据仓库 Django Vue.js Hadoop
  • 基于微型5G网关的石化厂区巡检机器人应用
  • -bash:/usr/bin/rm: Argument list too long 解决办法
  • Swagger2 Knife4jConfig 配置,父子项目swagger扫描多个子模块中的Controller生成接口文档:
  • sward简介与安装
  • 小厂面(又是依托)
  • AWK系统学习指南:从文本处理到数据分析的终极武器 实战
  • 动态DNS神器nip.io使用指南:快速实现域名与IP的动态映射--告别配置本地hosts
  • Go 语言里中的堆与栈
  • LabVIEW用户界面(UI)和用户体验(UX)设计
  • 如何本地部署DeepSeek
  • HTML之JavaScript运算符
  • macOS部署DeepSeek-r1
  • oracle使用动态sql将多层级组织展平
  • C++自研游戏引擎-碰撞检测组件-八叉树AABB检测算法实现
  • 企业文件安全:零信任架构下的文件访问控制
  • 深度学习|表示学习|Instance Normalization 全面总结|26
  • 江门外贸网站建设/无锡网站制作无锡做网站
  • 在哪个网站上做预收款报告/百度推广软件
  • 微商城网站建设教程/软文代写接单平台
  • 企业网站的建设论文/品牌营销策划方案怎么做才好
  • 西安公司代办/网站seo工具
  • 动态网站如何做登录界面/无货源网店怎么开