通过 MQTT 命令控制 RV1106 的 WebRTC 推流启停” 及 “30 分钟无命令自动停止”
要实现 “通过 MQTT 命令控制 RV1106 的 WebRTC 推流启停” 及 “30 分钟无命令自动停止” 的功能,需在 RV1106 上集成MQTT 客户端(如paho-mqtt-c),结合 WebRTC 推流逻辑和定时任务。以下是完整实现方案,包含核心代码和流程设计:
一、整体架构
角色分工:
- RV1106:运行 MQTT 客户端(接收命令)、WebRTC 推流模块(受控于 MQTT 命令)、定时任务(检测超时)。
- 浏览器 / 客户端:通过 MQTT 发送控制命令(
start/stop),同时作为 WebRTC 视频接收端。 - MQTT 服务器:转发命令(如公共服务器
mqtt://test.mosquitto.org或自建EMQX)。
核心逻辑:
- 收到
start命令:启动 WebRTC 推流,重置 30 分钟计时器。 - 收到
stop命令:停止 WebRTC 推流,重置计时器。 - 30 分钟内无任何命令:自动停止推流。
- 收到
二、依赖库移植(RV1106 端)
需交叉编译MQTT 客户端库(paho-mqtt-c,轻量级 C 库):
1. 交叉编译paho-mqtt-c
bash
# 下载源码
git clone https://github.com/eclipse/paho.mqtt.c.git
cd paho.mqtt.c# 配置交叉编译(安装到/opt/paho-mqtt-arm)
mkdir build-arm && cd build-arm
cmake .. -DCMAKE_TOOLCHAIN_FILE=../toolchain.cmake \ # 复用之前的RV1106工具链配置-DCMAKE_INSTALL_PREFIX=/opt/paho-mqtt-arm \-DBUILD_SHARED_LIBS=OFF # 静态库,减少依赖# 编译安装
make -j4 && sudo make install
三、核心代码实现
1. 数据结构与全局变量(控制状态)
cpp
运行
#include <rtc/rtc.hpp>
#include <paho_mqtt_c/MQTTClient.h>
#include <thread>
#include <chrono>
#include <mutex>
#include <condition_variable>// 全局状态控制
struct StreamState {bool is_running = false; // 推流是否运行std::chrono::steady_clock::time_point last_cmd_time; // 最后一次命令时间std::mutex mtx;std::condition_variable cv; // 用于线程同步
} stream_state;// WebRTC相关
std::unique_ptr<rtc::PeerConnection> peer_connection;
std::unique_ptr<rtc::Track> video_track;
std::thread webrtc_thread; // WebRTC推流线程// MQTT相关
const char* MQTT_BROKER = "tcp://test.mosquitto.org:1883"; // MQTT服务器
const char* MQTT_CLIENT_ID = "rv1106_webrtc";
const char* MQTT_CMD_TOPIC = "rv1106/webrtc/cmd"; // 接收命令的主题
2. MQTT 客户端初始化与命令处理
cpp
运行
// MQTT消息回调(接收浏览器发送的命令)
int mqtt_message_callback(void* context, char* topicName, int topicLen, MQTTClient_message* message) {std::string cmd((char*)message->payload, message->payloadlen);printf("收到MQTT命令: %s\n", cmd.c_str());std::lock_guard<std::mutex> lock(stream_state.mtx);stream_state.last_cmd_time = std::chrono::steady_clock::now(); // 更新最后命令时间if (cmd == "start" && !stream_state.is_running) {// 启动WebRTC推流stream_state.is_running = true;webrtc_thread = std::thread(webrtc_stream_loop); // 启动推流线程stream_state.cv.notify_all();} else if (cmd == "stop" && stream_state.is_running) {// 停止WebRTC推流stream_state.is_running = false;if (webrtc_thread.joinable()) {webrtc_thread.join(); // 等待推流线程结束}// 释放WebRTC资源video_track.reset();peer_connection.reset();}MQTTClient_freeMessage(&message);MQTTClient_free(topicName);return 1;
}// 初始化MQTT客户端
bool init_mqtt() {MQTTClient client;MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;int rc;// 创建客户端if ((rc = MQTTClient_create(&client, MQTT_BROKER, MQTT_CLIENT_ID, MQTTCLIENT_PERSISTENCE_NONE, nullptr)) != MQTTCLIENT_SUCCESS) {printf("MQTT创建失败, 错误码: %d\n", rc);return false;}// 设置回调MQTTClient_setCallbacks(client, nullptr, nullptr, mqtt_message_callback, nullptr);// 连接服务器conn_opts.keepAliveInterval = 60;conn_opts.cleansession = 1;if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) {printf("MQTT连接失败, 错误码: %d\n", rc);return false;}// 订阅命令主题(QoS=1)if ((rc = MQTTClient_subscribe(client, MQTT_CMD_TOPIC, 1)) != MQTTCLIENT_SUCCESS) {printf("MQTT订阅失败, 错误码: %d\n", rc);return false;}printf("MQTT初始化成功, 等待命令...\n");return true;
}
3. WebRTC 推流逻辑(受控于状态变量)
cpp
运行
// WebRTC初始化(STUN/TURN配置)
void init_webrtc() {rtc::Configuration config;config.iceServers = {"stun:stun.aliyun.com:3478","turn:123.45.67.89:3478?username=rv1106&password=123456"};peer_connection = rtc::make_unique<rtc::PeerConnection>(config);video_track = peer_connection->addTrack(rtc::MediaKind::Video);// 生成SDP Offer并通过信令服务器发送(此处省略信令交互逻辑,参考前文)peer_connection->onLocalDescription([](const rtc::Description& desc) {send_offer_to_signaling_server(desc.sdp()); // 自定义信令发送函数});
}// WebRTC推流循环(受is_running控制)
void webrtc_stream_loop() {init_webrtc(); // 初始化WebRTCinit_camera(); // 初始化摄像头(参考前文)init_encoder(); // 初始化编码器(参考前文)while (true) {std::lock_guard<std::mutex> lock(stream_state.mtx);if (!stream_state.is_running) {break; // 收到stop命令,退出循环}// 采集->编码->推流(核心逻辑,参考前文)capture_and_encode_video(); // 自定义函数:采集YUV并编码为H.264rtc::Buffer nal = get_encoded_nal(); // 获取编码后的NAL单元video_track->send(nal, rtc::MediaPacketFlag::KeyFrame);std::this_thread::sleep_for(std::chrono::milliseconds(40)); // 25fps}// 停止后释放资源close_camera();close_encoder();
}
4. 超时检测线程(30 分钟无命令自动停止)
cpp
运行
void timeout_check_loop() {const std::chrono::minutes TIMEOUT(30); // 超时时间30分钟while (true) {std::unique_lock<std::mutex> lock(stream_state.mtx);// 等待30分钟,或被命令唤醒(收到新命令时会notify)if (stream_state.cv.wait_for(lock, TIMEOUT) == std::cv_status::timeout) {// 超时且推流正在运行:自动停止if (stream_state.is_running) {printf("30分钟无命令,自动停止推流\n");stream_state.is_running = false;if (webrtc_thread.joinable()) {webrtc_thread.join();}video_track.reset();peer_connection.reset();}}// 被唤醒(收到新命令):不做处理,继续等待下一个30分钟}
}
5. 主函数(整合所有模块)
cpp
运行
int main() {// 初始化状态(记录程序启动时间为初始命令时间)stream_state.last_cmd_time = std::chrono::steady_clock::now();// 启动MQTT客户端if (!init_mqtt()) {return -1;}// 启动超时检测线程std::thread timeout_thread(timeout_check_loop);// 主线程阻塞(或处理其他逻辑)timeout_thread.join(); // 等待超时线程(实际中可换成while(true)阻塞)return 0;
}
四、浏览器端 MQTT 命令发送(控制界面)
通过浏览器的mqtt.js库发送start/stop命令,示例 HTML:
html
预览
<!DOCTYPE html>
<html>
<body><button onclick="sendCmd('start')">开始推流</button><button onclick="sendCmd('stop')">停止推流</button><video id="remoteVideo" autoplay playsinline></video><script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script><script>// 连接MQTT服务器const client = mqtt.connect('wss://test.mosquitto.org:8081'); // 浏览器用WebSocket连接client.on('connect', () => {console.log('MQTT连接成功');});// 发送命令function sendCmd(cmd) {client.publish('rv1106/webrtc/cmd', cmd);console.log('发送命令:', cmd);}// WebRTC视频接收(参考前文客户端代码)const pc = new RTCPeerConnection({/* STUN/TURN配置 */});pc.ontrack = e => {document.getElementById('remoteVideo').srcObject = e.streams[0];};// ... 信令交互逻辑 ...</script>
</body>
</html>
五、编译与运行
1. 编译命令(RV1106 端)
bash
arm-rockchip830-linux-uclibcgnueabihf-g++ main.cpp -o mqtt_webrtc \
-I/opt/libdatachannel-arm/include \
-I/opt/openssl-arm/include \
-I/opt/paho-mqtt-arm/include \
-L/opt/libdatachannel-arm/lib \
-L/opt/openssl-arm/lib \
-L/opt/paho-mqtt-arm/lib \
-ldatachannel -lssl -lcrypto -lpaho-mqtt3c -lpthread -lm
2. 运行流程
- 在 RV1106 上执行:
./mqtt_webrtc(程序启动,等待 MQTT 命令)。 - 浏览器打开 HTML 页面,点击 “开始推流”:RV1106 收到
start命令,启动 WebRTC 推流,视频在浏览器播放。 - 点击 “停止推流”:RV1106 收到
stop命令,停止推流。 - 30 分钟内无任何操作:RV1106 自动停止推流。
六、关键细节
- 线程安全:用
mutex和condition_variable保护is_running等共享变量,避免多线程冲突。 - 超时重置:每次收到
start/stop命令都会更新last_cmd_time,并通过cv.notify_all()唤醒超时线程,重置 30 分钟计时。 - MQTT 连接稳定性:建议在代码中添加 MQTT 重连逻辑(如检测连接断开后自动重连)。
- 资源释放:停止推流时需释放摄像头、编码器、WebRTC 等资源,避免内存泄漏。
通过以上方案,可实现对 RV1106 WebRTC 推流的灵活控制,满足远程启停和自动超时关闭的需求。
