【React】MQTT + useEventBus 实现MQTT长连接以及消息分发
MQTT封装以及消息分发
- MQTT封装
- 项目中导入mqtt
- 封装MQTT操作类
- 消息分发
- 工具类的封装
- 消息分发处理
- 消息订阅
- MQTT使用
- 初始化
- 心跳
MQTT封装
MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅(Pub/Sub)模式的轻量级消息传输协议,专为低带宽、不稳定网络环境设计,广泛应用于物联网(IoT)设备通信。。
项目中导入mqtt
使用yarn安装
yarn add mqtt
使用npm安装
npm install mqtt
封装MQTT操作类
为了保持MQTT在全局的唯一性,我们需要封装一个MQTT的单例操作类来统一管理MQTT相关的操作(连接、断开、订阅、重连等)
新建 MQTTService.js
import mqtt from 'mqtt';
import {dealMqttMsg} from "./Events";
import {logError, logInfo, logWarn} from "@utils/LogUtils";let _instance = null // 单例实例// 初始化 MQTT 工具的状态
function initMqtt() {if (_instance) {return _instance // 如果实例已存在,直接返回}let client = nulllet connectStatus = 'Connect'let isSubed = false // 订阅状态let initHost = "ws://xxx.xxx.xxx.xxx:xxxx/mqtt"let initOptions = {clientId: '',username: '',password: ''}_instance = { // 将实例赋值给 _instancegetConnectStatus: () => connectStatus,getIsSubscribedStatus: () => isSubed, // 获取订阅状态//全局MQTT连接handleConnect: function () {if (connectStatus !== 'Connect') {logInfo("MQTT已连接")return}_instance.handleCustomConnect(initHost, initOptions)},handleCustomConnect: (host, mqttOptions, autoSub = true, onConnect, onError, onReconnect, onMessage) => {connectStatus = 'Connecting'client = mqtt.connect(host, mqttOptions)logInfo('------> [ Connecting ] ------>', host, mqttOptions)if (client) {client.on('connect', () => {connectStatus = 'Connected'logInfo('------> [ Connected ]')if (autoSub) {//默认订阅主题_instance.handleSubscribe(['MqttTopic1','MqttTopic2','MqttTopic3',],0)}if (onConnect) onConnect()});client.on('error', (err) => {logError('------> [ Connection error ]', err)if (onError) onError(err)client.end()});client.on('reconnect', () => {connectStatus = 'Reconnecting'logError('------> [ Reconnecting ]')if (onReconnect) onReconnect()});client.on('message', (topic, message) => {logInfo('------> [ topic ] ------>', topic)logInfo('------> [ message ] ---->', message.toString())const payload = {topic, message: message.toString()}//处理mqtt消息dealMqttMsg(topic, message.toString())if (onMessage) onMessage(payload)});}},handleSubscribe: (topic, qos, callback) => {if (!client) {throw new Error('MQTT 客户端未正确初始化或缺少 subscribe 方法');}client.subscribe(topic, {qos}, (err) => {if (err) {logError('Subscribe failed:', err)return}isSubed = true// 更新订阅状态为已订阅logInfo('------> [ 订阅主题成功 ]')if (callback) callback()});},handleUnsub: (topic, qos, callback) => {if (client) {client.unsubscribe(topic, {qos}, (error) => {if (error) {logError('Unsubscribe error', error)return}isSubed = false // 更新订阅状态为未订阅if (callback) callback()})}},handlePublish: (pubRecord) => {if (!client) {logError('MQTT 客户端未初始化,无法发布消息')return}const {topic, qos, payload} = pubRecordclient.publish(topic, payload, {qos}, (error) => {if (error) {logError('------> [ 发布消息失败 ] ------>', error)throw new Error(`发布消息失败: ${error.message || '未知错误'}`)}})},handleDisconnect: (callback) => {if (client) {try {// 移除所有事件监听器client.removeAllListeners()//client.end(false, () => {connectStatus = 'Connect' // 确保断开连接后状态正确更新client = nulllogInfo('------> [ disconnect ]')if (callback) callback()})} catch (error) {connectStatus = 'Connect' // 确保断开连接后状态正确更新logInfo('------> [ disconnect error ] ------>', error)}}},}return _instance
}export default initMqtt;
消息分发
工具类的封装
全局MQTT连接到服务器之后,会通过订阅主题来接收推送的消息,推送的消息是 topic(主题)+ message(消息内容)组成。我们需要根据不同的topic主题来进行消息的分发,所以需要封装一个消息分发的工具类。
新建 EventEmitter.js
import {pull} from 'lodash';/*** 自定义事件触发器类,用于管理事件监听和触发*/
class EventEmitter {/*** 构造函数,初始化事件存储对象*/constructor() {this._events = {}; // 存储事件名与回调函数列表的映射}/*** 获取指定事件的回调函数列表,若不存在则创建空数组* @param {string} event - 事件名称* @returns {Function[]} 当前事件的回调函数列表*/_getFns(event) {return this._events[event] || (this._events[event] = []);}/*** 监听指定事件,添加回调函数* @param {string} event - 事件名称* @param {Function} cb - 回调函数*/on(event, cb) {const fns = this._getFns(event);fns.push(cb); // 将回调加入该事件的队列中}/*** 移除指定事件的某个回调函数或整个事件的所有回调* @param {string} event - 事件名称* @param {Function} [cb] - 要移除的回调函数,不传则清空整个事件*/off(event, cb) {if (cb) {const fns = this._getFns(event);pull(fns, cb); // 使用 lodash 的 pull 方法从数组中移除指定回调} else {delete this._events[event]; // 不传回调时删除整个事件键}}/*** 绑定只执行一次的事件监听器* @param {string} event - 事件名称* @param {Function} cb - 回调函数*/once(event, cb) {const fn2 = (e) => {this.off(event, fn2); // 执行后立即解绑自身cb(e); // 执行原始回调};this.on(event, fn2); // 绑定包装后的回调}/*** 同步触发指定事件,依次执行所有回调* @param {string} event - 事件名称* @param {*} [param] - 传递给回调函数的参数*/emit(event, param) {const fns = this._getFns(event);for (let i = 0; i < fns.length; i++) {const fn = fns[i];fn(param); // 同步执行每个回调函数}}/*** 异步触发指定事件,返回一个Promise* 注意:此方法仅触发第一个回调并返回其 Promise 结果* @param {string} event - 事件名称* @param {*} [param] - 传递给回调函数的参数* @returns {Promise} 返回第一个回调执行后的 Promise*/invoke(event, param) {const fns = this._getFns(event);for (let i = 0; i < fns.length; i++) {const fn = fns[i];return new Promise((resolve, reject) => {resolve(fn(param)); // 将第一个回调的结果封装为 Promise});}return Promise.reject(); // 如果没有回调,则返回拒绝状态}
}export default EventEmitter;
消息分发处理
新建 Events.js
import EventEmitter from './EventEmitter.js';
import {useEffect} from 'react';const events = new EventEmitter();export default events;/*** 订阅事件总线** @param event - 事件名称* @param callback - 回调函数** @example* useEventBus('自定义事件名', (pushInfo) => {* console.log(pushInfo);* });*/
export function useEventBus(event, callback) {useEffect(() => {events.on(event, callback);return () => {events.off(event, callback);};});
}/*** mqtt消息处理* @param topic* @param message*/
export function dealMqttMsg(topic, message) {switch (topic) {//网关配置相关订阅case 'MqttTopic1'://发送消息events.emit('MqttTopic1', message)break;case 'MqttTopic2'://发送消息events.emit('MqttTopic2', message)break;case 'MqttTopic3'://发送消息events.emit('MqttTopic3', message)break;}
}
消息订阅
我们可以在页面上通过订阅的方式来进行消息的接收。
/*** MQTT消息订阅*/useEventBus('MqttTopic1', (message) => {console.log(message)})
MQTT使用
初始化
如果需要在全局进行MQTT连接,则可以在App.js里面进行mqtt的连接和断开,或者在登陆、登出进行同样可以。
useEffect(() => {if (initMqtt().getConnectStatus() === "Connect") {//连接MQTTinitMqtt().handleConnect()}return () => {//组件销毁时关闭连接initMqtt().handleDisconnect()}}, [])
心跳
新建 HeartBeatUtils.js
let heartTimerId = null;export const startHeartBeat = (intervalTime, intervalMethod) => {if (!intervalTime || !intervalMethod) {console.warn('Invalid parameters for startHeartBeat');return null;}// 清除已有的定时器clearHeartBeat();// 设置新的定时器heartTimerId = setInterval(async () => {intervalMethod()}, intervalTime);//intervalMethod()
};export const clearHeartBeat = () => {if (heartTimerId) {clearTimeout(heartTimerId);heartTimerId = null;}
};
可以在连接MQTT的回调中使用,MQTT连接成功后开启心跳定时器,发送心跳消息,在MQTT断开连接时,手动停止心跳定时器即可。
handleCustomConnect: (host, mqttOptions, autoSub = true, onConnect, onError, onReconnect, onMessage) => {connectStatus = 'Connecting'client = mqtt.connect(host, mqttOptions)logInfo('------> [ Connecting ] ------>', host, mqttOptions)if (client) {client.on('connect', () => {connectStatus = 'Connected'logInfo('------> [ Connected ]')// 启动心跳定时器startHeartBeat(1000 * 60, () => {//实现具体的心跳方法(发送指定的心跳数据)_instance.handlePublish({topic: 'heart', qos: 0, payload: '这是一条心跳消息!!!'})})//是否自动订阅主题if (autoSub) {//默认订阅主题_instance.handleSubscribe(['MqttTopic1','MqttTopic2','MqttTopic3',],0)}if (onConnect) onConnect()});client.on('error', (err) => {logError('------> [ Connection error ]', err)if (onError) onError(err)client.end()});client.on('reconnect', () => {connectStatus = 'Reconnecting'logError('------> [ Reconnecting ]')if (onReconnect) onReconnect()});client.on('message', (topic, message) => {logInfo('------> [ topic ] ------>', topic)logInfo('------> [ message ] ---->', message.toString())const payload = {topic, message: message.toString()}//处理mqtt消息dealMqttMsg(topic, message.toString())if (onMessage) onMessage(payload)});}}handleDisconnect: (callback) => {if (client) {try {// 移除所有事件监听器client.removeAllListeners()//client.end(false, () => {connectStatus = 'Connect' // 确保断开连接后状态正确更新client = null//移除心跳定时器clearHeartBeat()logInfo('------> [ disconnect ]')if (callback) callback()})} catch (error) {connectStatus = 'Connect' // 确保断开连接后状态正确更新logInfo('------> [ disconnect error ] ------>', error)}}}
完整的MQTT相关操作类如下所示:
import mqtt from 'mqtt';
import {dealMqttMsg} from "./Events";
import {logError, logInfo} from "@utils/LogUtils";
import {startHeartBeat} from "./util/HeartBeatUtils";let _instance = null // 单例实例// 初始化 MQTT 工具的状态
function initMqtt() {if (_instance) {return _instance // 如果实例已存在,直接返回}let client = nulllet connectStatus = 'Connect'let isSubed = false // 订阅状态let initHost = "ws://XXX.XXX.X.X:XXXX/aiot/mqtt"let initOptions = {clientId: '',username: '',password: ''}_instance = {getConnectStatus: () => connectStatus,getIsSubscribedStatus: () => isSubed, // 获取订阅状态//全局MQTT连接handleConnect: function () {if (connectStatus !== 'Connect') {logInfo("MQTT已连接")return}_instance.handleCustomConnect(initHost, initOptions)},handleCustomConnect: (host, mqttOptions, autoSub = true, onConnect, onError, onReconnect, onMessage) => {connectStatus = 'Connecting'client = mqtt.connect(host, mqttOptions)logInfo('------> [ Connecting ] ------>', host, mqttOptions)if (client) {client.on('connect', () => {connectStatus = 'Connected'// 启动心跳定时器startHeartBeat(1000 * 60, () => {_instance.handlePublish({topic: 'heart', qos: 0, payload: '这是一条心跳消息!!!'})})logInfo('------> [ Connected ]')if (autoSub) {//默认订阅主题_instance.handleSubscribe(['MqttTopic1','MqttTopic2','MqttTopic3',],0)}if (onConnect) onConnect()});client.on('error', (err) => {logError('------> [ Connection error ]', err)if (onError) onError(err)client.end()});client.on('reconnect', () => {connectStatus = 'Reconnecting'logError('------> [ Reconnecting ]')if (onReconnect) onReconnect()});client.on('message', (topic, message) => {logInfo('------> [ topic ] ------>', topic)logInfo('------> [ message ] ---->', message.toString())const payload = {topic, message: message.toString()}//处理mqtt消息dealMqttMsg(topic, message.toString())if (onMessage) onMessage(payload)});}},handleSubscribe: (topic, qos, callback) => {if (!client) {throw new Error('MQTT 客户端未正确初始化或缺少 subscribe 方法');}client.subscribe(topic, {qos}, (err) => {if (err) {logError('Subscribe failed:', err)return}isSubed = true// 更新订阅状态为已订阅logInfo('------> [ 订阅主题成功 ]')if (callback) callback()});},handleUnsub: (topic, qos, callback) => {if (client) {client.unsubscribe(topic, {qos}, (error) => {if (error) {logError('Unsubscribe error', error)return}isSubed = false // 更新订阅状态为未订阅if (callback) callback()})}},handlePublish: (pubRecord) => {if (!client) {logError('MQTT 客户端未初始化,无法发布消息')return}const {topic, qos, payload} = pubRecordclient.publish(topic, payload, {qos}, (error) => {if (error) {logError('------> [ 发布消息失败 ] ------>', error)throw new Error(`发布消息失败: ${error.message || '未知错误'}`)}})},handleDisconnect: (callback) => {if (client) {try {// 移除所有事件监听器client.removeAllListeners()//client.end(false, () => {connectStatus = 'Connect' // 确保断开连接后状态正确更新client = nulllogInfo('------> [ disconnect ]')if (callback) callback()})} catch (error) {connectStatus = 'Connect' // 确保断开连接后状态正确更新logInfo('------> [ disconnect error ] ------>', error)}}},}return _instance
}export default initMqtt;