当前位置: 首页 > news >正文

【React】MQTT + useEventBus 实现MQTT长连接以及消息分发

MQTT封装以及消息分发

  • MQTT封装
    • 项目中导入mqtt
    • 封装MQTT操作类
  • 消息分发
    • 工具类的封装
    • 消息分发处理
    • 消息订阅
  • MQTT使用
    • 初始化
    • 心跳

MQTT封装

MQTT(Message Queuing Telemetry Transport)是一种基于发布/订阅(Pub/Sub)模式的轻量级消息传输协议,专为低带宽、不稳定网络环境设计,广泛应用于物联网(IoT)设备通信。。

项目中导入mqtt

使用yarn安装

yarn add mqtt

使用npm安装

npm install mqtt

封装MQTT操作类

为了保持MQTT在全局的唯一性,我们需要封装一个MQTT的单例操作类来统一管理MQTT相关的操作(连接、断开、订阅、重连等)
新建 MQTTService.js

import mqtt from 'mqtt';
import {dealMqttMsg} from "./Events";
import {logError, logInfo, logWarn} from "@utils/LogUtils";let _instance = null // 单例实例// 初始化 MQTT 工具的状态
function initMqtt() {if (_instance) {return _instance // 如果实例已存在,直接返回}let client = nulllet connectStatus = 'Connect'let isSubed = false // 订阅状态let initHost = "ws://xxx.xxx.xxx.xxx:xxxx/mqtt"let initOptions = {clientId: '',username: '',password: ''}_instance = { // 将实例赋值给 _instancegetConnectStatus: () => connectStatus,getIsSubscribedStatus: () => isSubed, // 获取订阅状态//全局MQTT连接handleConnect: function () {if (connectStatus !== 'Connect') {logInfo("MQTT已连接")return}_instance.handleCustomConnect(initHost, initOptions)},handleCustomConnect: (host, mqttOptions, autoSub = true, onConnect, onError, onReconnect, onMessage) => {connectStatus = 'Connecting'client = mqtt.connect(host, mqttOptions)logInfo('------> [ Connecting ] ------>', host, mqttOptions)if (client) {client.on('connect', () => {connectStatus = 'Connected'logInfo('------> [ Connected ]')if (autoSub) {//默认订阅主题_instance.handleSubscribe(['MqttTopic1','MqttTopic2','MqttTopic3',],0)}if (onConnect) onConnect()});client.on('error', (err) => {logError('------> [ Connection error ]', err)if (onError) onError(err)client.end()});client.on('reconnect', () => {connectStatus = 'Reconnecting'logError('------> [ Reconnecting ]')if (onReconnect) onReconnect()});client.on('message', (topic, message) => {logInfo('------> [ topic ] ------>', topic)logInfo('------> [ message ] ---->', message.toString())const payload = {topic, message: message.toString()}//处理mqtt消息dealMqttMsg(topic, message.toString())if (onMessage) onMessage(payload)});}},handleSubscribe: (topic, qos, callback) => {if (!client) {throw new Error('MQTT 客户端未正确初始化或缺少 subscribe 方法');}client.subscribe(topic, {qos}, (err) => {if (err) {logError('Subscribe failed:', err)return}isSubed = true// 更新订阅状态为已订阅logInfo('------> [ 订阅主题成功 ]')if (callback) callback()});},handleUnsub: (topic, qos, callback) => {if (client) {client.unsubscribe(topic, {qos}, (error) => {if (error) {logError('Unsubscribe error', error)return}isSubed = false // 更新订阅状态为未订阅if (callback) callback()})}},handlePublish: (pubRecord) => {if (!client) {logError('MQTT 客户端未初始化,无法发布消息')return}const {topic, qos, payload} = pubRecordclient.publish(topic, payload, {qos}, (error) => {if (error) {logError('------> [ 发布消息失败 ] ------>', error)throw new Error(`发布消息失败: ${error.message || '未知错误'}`)}})},handleDisconnect: (callback) => {if (client) {try {// 移除所有事件监听器client.removeAllListeners()//client.end(false, () => {connectStatus = 'Connect' // 确保断开连接后状态正确更新client = nulllogInfo('------> [ disconnect ]')if (callback) callback()})} catch (error) {connectStatus = 'Connect' // 确保断开连接后状态正确更新logInfo('------> [ disconnect error ] ------>', error)}}},}return _instance
}export default initMqtt;

消息分发

工具类的封装

全局MQTT连接到服务器之后,会通过订阅主题来接收推送的消息,推送的消息是 topic(主题)+ message(消息内容)组成。我们需要根据不同的topic主题来进行消息的分发,所以需要封装一个消息分发的工具类。
新建 EventEmitter.js

import {pull} from 'lodash';/*** 自定义事件触发器类,用于管理事件监听和触发*/
class EventEmitter {/*** 构造函数,初始化事件存储对象*/constructor() {this._events = {}; // 存储事件名与回调函数列表的映射}/*** 获取指定事件的回调函数列表,若不存在则创建空数组* @param {string} event - 事件名称* @returns {Function[]} 当前事件的回调函数列表*/_getFns(event) {return this._events[event] || (this._events[event] = []);}/*** 监听指定事件,添加回调函数* @param {string} event - 事件名称* @param {Function} cb - 回调函数*/on(event, cb) {const fns = this._getFns(event);fns.push(cb); // 将回调加入该事件的队列中}/*** 移除指定事件的某个回调函数或整个事件的所有回调* @param {string} event - 事件名称* @param {Function} [cb] - 要移除的回调函数,不传则清空整个事件*/off(event, cb) {if (cb) {const fns = this._getFns(event);pull(fns, cb); // 使用 lodash 的 pull 方法从数组中移除指定回调} else {delete this._events[event]; // 不传回调时删除整个事件键}}/*** 绑定只执行一次的事件监听器* @param {string} event - 事件名称* @param {Function} cb - 回调函数*/once(event, cb) {const fn2 = (e) => {this.off(event, fn2); // 执行后立即解绑自身cb(e); // 执行原始回调};this.on(event, fn2); // 绑定包装后的回调}/*** 同步触发指定事件,依次执行所有回调* @param {string} event - 事件名称* @param {*} [param] - 传递给回调函数的参数*/emit(event, param) {const fns = this._getFns(event);for (let i = 0; i < fns.length; i++) {const fn = fns[i];fn(param); // 同步执行每个回调函数}}/*** 异步触发指定事件,返回一个Promise* 注意:此方法仅触发第一个回调并返回其 Promise 结果* @param {string} event - 事件名称* @param {*} [param] - 传递给回调函数的参数* @returns {Promise} 返回第一个回调执行后的 Promise*/invoke(event, param) {const fns = this._getFns(event);for (let i = 0; i < fns.length; i++) {const fn = fns[i];return new Promise((resolve, reject) => {resolve(fn(param)); // 将第一个回调的结果封装为 Promise});}return Promise.reject(); // 如果没有回调,则返回拒绝状态}
}export default EventEmitter;

消息分发处理

新建 Events.js

import EventEmitter from './EventEmitter.js';
import {useEffect} from 'react';const events = new EventEmitter();export default events;/*** 订阅事件总线** @param  event - 事件名称* @param  callback - 回调函数** @example* useEventBus('自定义事件名', (pushInfo) => {*   console.log(pushInfo);* });*/
export function useEventBus(event, callback) {useEffect(() => {events.on(event, callback);return () => {events.off(event, callback);};});
}/*** mqtt消息处理* @param topic* @param message*/
export function dealMqttMsg(topic, message) {switch (topic) {//网关配置相关订阅case 'MqttTopic1'://发送消息events.emit('MqttTopic1', message)break;case 'MqttTopic2'://发送消息events.emit('MqttTopic2', message)break;case 'MqttTopic3'://发送消息events.emit('MqttTopic3', message)break;}
}

消息订阅

我们可以在页面上通过订阅的方式来进行消息的接收。

/*** MQTT消息订阅*/useEventBus('MqttTopic1', (message) => {console.log(message)})

MQTT使用

初始化

如果需要在全局进行MQTT连接,则可以在App.js里面进行mqtt的连接和断开,或者在登陆、登出进行同样可以。

useEffect(() => {if (initMqtt().getConnectStatus() === "Connect") {//连接MQTTinitMqtt().handleConnect()}return () => {//组件销毁时关闭连接initMqtt().handleDisconnect()}}, [])

心跳

新建 HeartBeatUtils.js

let heartTimerId = null;export const startHeartBeat = (intervalTime, intervalMethod) => {if (!intervalTime || !intervalMethod) {console.warn('Invalid parameters for startHeartBeat');return null;}// 清除已有的定时器clearHeartBeat();// 设置新的定时器heartTimerId = setInterval(async () => {intervalMethod()}, intervalTime);//intervalMethod()
};export const clearHeartBeat = () => {if (heartTimerId) {clearTimeout(heartTimerId);heartTimerId = null;}
};

可以在连接MQTT的回调中使用,MQTT连接成功后开启心跳定时器,发送心跳消息,在MQTT断开连接时,手动停止心跳定时器即可。

handleCustomConnect: (host, mqttOptions, autoSub = true, onConnect, onError, onReconnect, onMessage) => {connectStatus = 'Connecting'client = mqtt.connect(host, mqttOptions)logInfo('------> [ Connecting ] ------>', host, mqttOptions)if (client) {client.on('connect', () => {connectStatus = 'Connected'logInfo('------> [ Connected ]')// 启动心跳定时器startHeartBeat(1000 * 60, () => {//实现具体的心跳方法(发送指定的心跳数据)_instance.handlePublish({topic: 'heart', qos: 0, payload: '这是一条心跳消息!!!'})})//是否自动订阅主题if (autoSub) {//默认订阅主题_instance.handleSubscribe(['MqttTopic1','MqttTopic2','MqttTopic3',],0)}if (onConnect) onConnect()});client.on('error', (err) => {logError('------> [ Connection error ]', err)if (onError) onError(err)client.end()});client.on('reconnect', () => {connectStatus = 'Reconnecting'logError('------> [ Reconnecting ]')if (onReconnect) onReconnect()});client.on('message', (topic, message) => {logInfo('------> [ topic ] ------>', topic)logInfo('------> [ message ] ---->', message.toString())const payload = {topic, message: message.toString()}//处理mqtt消息dealMqttMsg(topic, message.toString())if (onMessage) onMessage(payload)});}}handleDisconnect: (callback) => {if (client) {try {// 移除所有事件监听器client.removeAllListeners()//client.end(false, () => {connectStatus = 'Connect' // 确保断开连接后状态正确更新client = null//移除心跳定时器clearHeartBeat()logInfo('------> [ disconnect ]')if (callback) callback()})} catch (error) {connectStatus = 'Connect' // 确保断开连接后状态正确更新logInfo('------> [ disconnect error ] ------>', error)}}}

完整的MQTT相关操作类如下所示:

import mqtt from 'mqtt';
import {dealMqttMsg} from "./Events";
import {logError, logInfo} from "@utils/LogUtils";
import {startHeartBeat} from "./util/HeartBeatUtils";let _instance = null // 单例实例// 初始化 MQTT 工具的状态
function initMqtt() {if (_instance) {return _instance // 如果实例已存在,直接返回}let client = nulllet connectStatus = 'Connect'let isSubed = false // 订阅状态let initHost = "ws://XXX.XXX.X.X:XXXX/aiot/mqtt"let initOptions = {clientId: '',username: '',password: ''}_instance = {getConnectStatus: () => connectStatus,getIsSubscribedStatus: () => isSubed, // 获取订阅状态//全局MQTT连接handleConnect: function () {if (connectStatus !== 'Connect') {logInfo("MQTT已连接")return}_instance.handleCustomConnect(initHost, initOptions)},handleCustomConnect: (host, mqttOptions, autoSub = true, onConnect, onError, onReconnect, onMessage) => {connectStatus = 'Connecting'client = mqtt.connect(host, mqttOptions)logInfo('------> [ Connecting ] ------>', host, mqttOptions)if (client) {client.on('connect', () => {connectStatus = 'Connected'// 启动心跳定时器startHeartBeat(1000 * 60, () => {_instance.handlePublish({topic: 'heart', qos: 0, payload: '这是一条心跳消息!!!'})})logInfo('------> [ Connected ]')if (autoSub) {//默认订阅主题_instance.handleSubscribe(['MqttTopic1','MqttTopic2','MqttTopic3',],0)}if (onConnect) onConnect()});client.on('error', (err) => {logError('------> [ Connection error ]', err)if (onError) onError(err)client.end()});client.on('reconnect', () => {connectStatus = 'Reconnecting'logError('------> [ Reconnecting ]')if (onReconnect) onReconnect()});client.on('message', (topic, message) => {logInfo('------> [ topic ] ------>', topic)logInfo('------> [ message ] ---->', message.toString())const payload = {topic, message: message.toString()}//处理mqtt消息dealMqttMsg(topic, message.toString())if (onMessage) onMessage(payload)});}},handleSubscribe: (topic, qos, callback) => {if (!client) {throw new Error('MQTT 客户端未正确初始化或缺少 subscribe 方法');}client.subscribe(topic, {qos}, (err) => {if (err) {logError('Subscribe failed:', err)return}isSubed = true// 更新订阅状态为已订阅logInfo('------> [ 订阅主题成功 ]')if (callback) callback()});},handleUnsub: (topic, qos, callback) => {if (client) {client.unsubscribe(topic, {qos}, (error) => {if (error) {logError('Unsubscribe error', error)return}isSubed = false // 更新订阅状态为未订阅if (callback) callback()})}},handlePublish: (pubRecord) => {if (!client) {logError('MQTT 客户端未初始化,无法发布消息')return}const {topic, qos, payload} = pubRecordclient.publish(topic, payload, {qos}, (error) => {if (error) {logError('------> [ 发布消息失败 ] ------>', error)throw new Error(`发布消息失败: ${error.message || '未知错误'}`)}})},handleDisconnect: (callback) => {if (client) {try {// 移除所有事件监听器client.removeAllListeners()//client.end(false, () => {connectStatus = 'Connect' // 确保断开连接后状态正确更新client = nulllogInfo('------> [ disconnect ]')if (callback) callback()})} catch (error) {connectStatus = 'Connect' // 确保断开连接后状态正确更新logInfo('------> [ disconnect error ] ------>', error)}}},}return _instance
}export default initMqtt;
http://www.dtcms.com/a/272356.html

相关文章:

  • 昇腾 k8s vnpu配置
  • 在Linux中,如何使用grep awk sed find?
  • 链式二叉树数据结构(递归)
  • 自动化——bat——批量复制所选的文件
  • 微服务架构的演进:迈向云原生——Java技术栈的实践之路
  • SpringBoot整合腾讯云新一代行为验证码
  • RabbitMQ 幂等性
  • Allegro PCB 手动添加元器件全流程解析
  • expect 安装入门手册
  • 【保姆级教程】基于anji-plus-captcha实现行为验证码(滑动拼图+点选文字),前后端完整代码奉上!
  • 人工智能-基础篇-28-模型上下文协议--MCP请求示例(JSON格式,客户端代码,服务端代码等示例)
  • 开源入侵防御系统——CrowdSec
  • Linux 服务器综合性能测试脚本(优化版)结构化分析
  • 若依框架去掉Redis
  • CORESET 0 and SIB1 Scheduling in a Nutshell
  • 论文阅读笔记:VI-Net: Boosting Category-level 6D Object Pose Estimation
  • RocketMQ安装(Windows环境)
  • 上线节点固定,项目进度紧张,如何合理压缩工期
  • NGINX系统基于PHP部署应用
  • 实验作业1+整理笔记截图
  • 实训八——路由器与交换机与网线
  • 栈题解——有效的括号【LeetCode】两种方法
  • 硬件基础------电感
  • Matplotlib-绘制训练曲线指南
  • 力扣刷题记录(c++)06
  • HTML应用指南:利用GET请求获取全国永辉超市门店位置信息
  • Unity3D iOS闪退问题解决方案
  • PyTorch仿射变换:原理与实战全解析
  • 深入理解Java虚拟机:Java内存区域与内存溢出异常
  • 【运维架构】云计算运维架构师与基础设施,技术路线,Linux证书(标准化/定制化/CNCF,公有云/混合云/私有云)