当前位置: 首页 > news >正文

通过 MQTT 命令控制 RV1106 的 WebRTC 推流启停” 及 “30 分钟无命令自动停止”

要实现 “通过 MQTT 命令控制 RV1106 的 WebRTC 推流启停” 及 “30 分钟无命令自动停止” 的功能,需在 RV1106 上集成MQTT 客户端(如paho-mqtt-c),结合 WebRTC 推流逻辑和定时任务。以下是完整实现方案,包含核心代码和流程设计:

一、整体架构

  1. 角色分工

    • RV1106:运行 MQTT 客户端(接收命令)、WebRTC 推流模块(受控于 MQTT 命令)、定时任务(检测超时)。
    • 浏览器 / 客户端:通过 MQTT 发送控制命令(start/stop),同时作为 WebRTC 视频接收端。
    • MQTT 服务器:转发命令(如公共服务器mqtt://test.mosquitto.org或自建EMQX)。
  2. 核心逻辑

    • 收到start命令:启动 WebRTC 推流,重置 30 分钟计时器。
    • 收到stop命令:停止 WebRTC 推流,重置计时器。
    • 30 分钟内无任何命令:自动停止推流。

二、依赖库移植(RV1106 端)

需交叉编译MQTT 客户端库paho-mqtt-c,轻量级 C 库):

1. 交叉编译paho-mqtt-c

bash

# 下载源码
git clone https://github.com/eclipse/paho.mqtt.c.git
cd paho.mqtt.c# 配置交叉编译(安装到/opt/paho-mqtt-arm)
mkdir build-arm && cd build-arm
cmake .. -DCMAKE_TOOLCHAIN_FILE=../toolchain.cmake \  # 复用之前的RV1106工具链配置-DCMAKE_INSTALL_PREFIX=/opt/paho-mqtt-arm \-DBUILD_SHARED_LIBS=OFF  # 静态库,减少依赖# 编译安装
make -j4 && sudo make install

三、核心代码实现

1. 数据结构与全局变量(控制状态)

cpp

运行

#include <rtc/rtc.hpp>
#include <paho_mqtt_c/MQTTClient.h>
#include <thread>
#include <chrono>
#include <mutex>
#include <condition_variable>// 全局状态控制
struct StreamState {bool is_running = false;  // 推流是否运行std::chrono::steady_clock::time_point last_cmd_time;  // 最后一次命令时间std::mutex mtx;std::condition_variable cv;  // 用于线程同步
} stream_state;// WebRTC相关
std::unique_ptr<rtc::PeerConnection> peer_connection;
std::unique_ptr<rtc::Track> video_track;
std::thread webrtc_thread;  // WebRTC推流线程// MQTT相关
const char* MQTT_BROKER = "tcp://test.mosquitto.org:1883";  // MQTT服务器
const char* MQTT_CLIENT_ID = "rv1106_webrtc";
const char* MQTT_CMD_TOPIC = "rv1106/webrtc/cmd";  // 接收命令的主题
2. MQTT 客户端初始化与命令处理

cpp

运行

// MQTT消息回调(接收浏览器发送的命令)
int mqtt_message_callback(void* context, char* topicName, int topicLen, MQTTClient_message* message) {std::string cmd((char*)message->payload, message->payloadlen);printf("收到MQTT命令: %s\n", cmd.c_str());std::lock_guard<std::mutex> lock(stream_state.mtx);stream_state.last_cmd_time = std::chrono::steady_clock::now();  // 更新最后命令时间if (cmd == "start" && !stream_state.is_running) {// 启动WebRTC推流stream_state.is_running = true;webrtc_thread = std::thread(webrtc_stream_loop);  // 启动推流线程stream_state.cv.notify_all();} else if (cmd == "stop" && stream_state.is_running) {// 停止WebRTC推流stream_state.is_running = false;if (webrtc_thread.joinable()) {webrtc_thread.join();  // 等待推流线程结束}// 释放WebRTC资源video_track.reset();peer_connection.reset();}MQTTClient_freeMessage(&message);MQTTClient_free(topicName);return 1;
}// 初始化MQTT客户端
bool init_mqtt() {MQTTClient client;MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;int rc;// 创建客户端if ((rc = MQTTClient_create(&client, MQTT_BROKER, MQTT_CLIENT_ID, MQTTCLIENT_PERSISTENCE_NONE, nullptr)) != MQTTCLIENT_SUCCESS) {printf("MQTT创建失败, 错误码: %d\n", rc);return false;}// 设置回调MQTTClient_setCallbacks(client, nullptr, nullptr, mqtt_message_callback, nullptr);// 连接服务器conn_opts.keepAliveInterval = 60;conn_opts.cleansession = 1;if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {printf("MQTT连接失败, 错误码: %d\n", rc);return false;}// 订阅命令主题(QoS=1)if ((rc = MQTTClient_subscribe(client, MQTT_CMD_TOPIC, 1)) != MQTTCLIENT_SUCCESS) {printf("MQTT订阅失败, 错误码: %d\n", rc);return false;}printf("MQTT初始化成功, 等待命令...\n");return true;
}
3. WebRTC 推流逻辑(受控于状态变量)

cpp

运行

// WebRTC初始化(STUN/TURN配置)
void init_webrtc() {rtc::Configuration config;config.iceServers = {"stun:stun.aliyun.com:3478","turn:123.45.67.89:3478?username=rv1106&password=123456"};peer_connection = rtc::make_unique<rtc::PeerConnection>(config);video_track = peer_connection->addTrack(rtc::MediaKind::Video);// 生成SDP Offer并通过信令服务器发送(此处省略信令交互逻辑,参考前文)peer_connection->onLocalDescription([](const rtc::Description& desc) {send_offer_to_signaling_server(desc.sdp());  // 自定义信令发送函数});
}// WebRTC推流循环(受is_running控制)
void webrtc_stream_loop() {init_webrtc();  // 初始化WebRTCinit_camera();  // 初始化摄像头(参考前文)init_encoder();  // 初始化编码器(参考前文)while (true) {std::lock_guard<std::mutex> lock(stream_state.mtx);if (!stream_state.is_running) {break;  // 收到stop命令,退出循环}// 采集->编码->推流(核心逻辑,参考前文)capture_and_encode_video();  // 自定义函数:采集YUV并编码为H.264rtc::Buffer nal = get_encoded_nal();  // 获取编码后的NAL单元video_track->send(nal, rtc::MediaPacketFlag::KeyFrame);std::this_thread::sleep_for(std::chrono::milliseconds(40));  // 25fps}// 停止后释放资源close_camera();close_encoder();
}
4. 超时检测线程(30 分钟无命令自动停止)

cpp

运行

void timeout_check_loop() {const std::chrono::minutes TIMEOUT(30);  // 超时时间30分钟while (true) {std::unique_lock<std::mutex> lock(stream_state.mtx);// 等待30分钟,或被命令唤醒(收到新命令时会notify)if (stream_state.cv.wait_for(lock, TIMEOUT) == std::cv_status::timeout) {// 超时且推流正在运行:自动停止if (stream_state.is_running) {printf("30分钟无命令,自动停止推流\n");stream_state.is_running = false;if (webrtc_thread.joinable()) {webrtc_thread.join();}video_track.reset();peer_connection.reset();}}// 被唤醒(收到新命令):不做处理,继续等待下一个30分钟}
}
5. 主函数(整合所有模块)

cpp

运行

int main() {// 初始化状态(记录程序启动时间为初始命令时间)stream_state.last_cmd_time = std::chrono::steady_clock::now();// 启动MQTT客户端if (!init_mqtt()) {return -1;}// 启动超时检测线程std::thread timeout_thread(timeout_check_loop);// 主线程阻塞(或处理其他逻辑)timeout_thread.join();  // 等待超时线程(实际中可换成while(true)阻塞)return 0;
}

四、浏览器端 MQTT 命令发送(控制界面)

通过浏览器的mqtt.js库发送start/stop命令,示例 HTML:

html

预览

<!DOCTYPE html>
<html>
<body><button onclick="sendCmd('start')">开始推流</button><button onclick="sendCmd('stop')">停止推流</button><video id="remoteVideo" autoplay playsinline></video><script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script><script>// 连接MQTT服务器const client = mqtt.connect('wss://test.mosquitto.org:8081');  // 浏览器用WebSocket连接client.on('connect', () => {console.log('MQTT连接成功');});// 发送命令function sendCmd(cmd) {client.publish('rv1106/webrtc/cmd', cmd);console.log('发送命令:', cmd);}// WebRTC视频接收(参考前文客户端代码)const pc = new RTCPeerConnection({/* STUN/TURN配置 */});pc.ontrack = e => {document.getElementById('remoteVideo').srcObject = e.streams[0];};// ... 信令交互逻辑 ...</script>
</body>
</html>

五、编译与运行

1. 编译命令(RV1106 端)

bash

arm-rockchip830-linux-uclibcgnueabihf-g++ main.cpp -o mqtt_webrtc \
-I/opt/libdatachannel-arm/include \
-I/opt/openssl-arm/include \
-I/opt/paho-mqtt-arm/include \
-L/opt/libdatachannel-arm/lib \
-L/opt/openssl-arm/lib \
-L/opt/paho-mqtt-arm/lib \
-ldatachannel -lssl -lcrypto -lpaho-mqtt3c -lpthread -lm
2. 运行流程
  • 在 RV1106 上执行:./mqtt_webrtc(程序启动,等待 MQTT 命令)。
  • 浏览器打开 HTML 页面,点击 “开始推流”:RV1106 收到start命令,启动 WebRTC 推流,视频在浏览器播放。
  • 点击 “停止推流”:RV1106 收到stop命令,停止推流。
  • 30 分钟内无任何操作:RV1106 自动停止推流。

六、关键细节

  1. 线程安全:用mutexcondition_variable保护is_running等共享变量,避免多线程冲突。
  2. 超时重置:每次收到start/stop命令都会更新last_cmd_time,并通过cv.notify_all()唤醒超时线程,重置 30 分钟计时。
  3. MQTT 连接稳定性:建议在代码中添加 MQTT 重连逻辑(如检测连接断开后自动重连)。
  4. 资源释放:停止推流时需释放摄像头、编码器、WebRTC 等资源,避免内存泄漏。

通过以上方案,可实现对 RV1106 WebRTC 推流的灵活控制,满足远程启停和自动超时关闭的需求。

http://www.dtcms.com/a/610694.html

相关文章:

  • C++中将FlatBuffers序列化为JSON
  • 营销网站制作平台有哪些企业网站特色建设
  • pyinstaller 打包报错hook-matplotlib.backends.py
  • 盐城网站建设建站羽毛球最新赛事
  • 如何用dw做网站wordpress自动上传图片
  • 楼宇间网络拓扑测绘 从原理到精准部署
  • 汇编语言编译器存在哪 | 探讨编译器的设计与优化挑战
  • Torch核心数据结构Tensor(张量)
  • 什么是AI?AI新手终极指南(2025)
  • 22.与人类对齐的背景与标准
  • 周口城乡建设局网站外包公司工伤找谁赔偿
  • 【XR开发系列】理解游戏世界的基石 - 场景、物体与组件
  • MySQL 是怎么存储 NULL 的
  • 磁共振成像原理(理论)35:快速梯度回波成像 (Fast Gradient-Echo Imaging)
  • 【前传交换机 PTP】FibroLAN Falcon RX
  • Cursor 使用记录:C/C++ 开发者
  • 建设彩票网站一站式网站建设行业
  • fomo3d网站开发app软件开发培训班
  • 动态住宅IP和静态住宅IP哪个更好
  • 营销型网站 策划运营网站免认证域名
  • 网页设计个人网站建设工程交易中心是什么机构
  • Docker 核心命令速查表(精细分类版)
  • leetcode 2536 子矩阵元素加1
  • 如何做企业网站建设怎么清空wordpress媒体库
  • 做网站流程内容美食网站建设总结
  • 面对网络攻击告警 IP地址如何实现自动化封禁
  • 专栏介绍:AMD KFD BO设计深度剖析——解锁GPU存储核心技术
  • Kimi K2 Thinking:兼顾Agent和推理的六边形战士
  • 打字游戏——测一测你的反应速度
  • SpringBoot17-addresourcehandler()方法