当前位置: 首页 > news >正文

mqtt封装与使用

 mqtt.js

import mqtt from 'mqtt'

const mqttTool = {
    client: null,
    messageHandlers: new Map(), // 存储主题和对应的处理函数
    isReconnecting: false, // 是否正在重连
    autoReconnect: true, // 是否自动重连
    reconnectTimer: null, // 重连定时器
    baseReconnectDelay: 1000, // 基础重连延迟(毫秒)
    maxReconnectDelay: 30000, // 最大重连延迟(毫秒)
    currentReconnectDelay: 1000, // 当前重连延迟
    connectionParams: null, // 存储连接参数
    
    // 连接方法
    connect(params) {
        return new Promise((resolve, reject) => {
            // 保存连接参数
            this.connectionParams = params
            
            // 防止重复连接
            if (this.isReconnecting) {
                console.log('正在重连中,请稍候...')
                return reject(new Error('正在重连中'))
            }

            const options = {
                port: 8083,
                host: params.url,
                protocol: 'ws',
                path: '/mqtt',
                clientId: this.generateClientId(),
                username: params.username,
                password: params.password,
                connectTimeout: 5000,
                clean: false, // 保持会话
                rejectUnauthorized: false,
                keepalive: 60, // 60秒发送一次心跳
            }

            try {
                // 创建连接
                this.client = mqtt.connect(options)
                
                // 设置连接事件处理
                this.client.on('connect', () => {
                    console.log('MQTT 连接成功')
                    this.isReconnecting = false
                    this.currentReconnectDelay = this.baseReconnectDelay // 重置重连延迟
                    // 重新订阅之前订阅的主题
                    this.resubscribeTopics()
                    resolve(this.client)
                })
                
                this.client.on('error', (error) => {
                    console.error('MQTT 连接错误:', error)
                    this.handleDisconnect()
                    reject(error)
                })
                
                this.client.on('message', (topic, message) => {
                    this.handleMessage(topic, message)
                })

                // 监听断开连接事件
                this.client.on('close', () => {
                    console.log('MQTT 连接已断开')
                    this.handleDisconnect()
                })
            } catch (error) {
                console.error('MQTT 连接失败:', error)
                reject(error)
            }
        })
    },
    // 生成客户端ID
    generateClientId() {
        const timestamp = Date.now()
        const random = Math.random().toString(36).substring(2, 8)
        return `mqtt_client_${timestamp}_${random}`
    },
    // 订阅主题
    subscribe(topic, callback) {
        if (!this.client || !this.client.connected) {
            console.error('MQTT 未连接,无法订阅主题')
            return false
        }

        // 存储主题和回调函数的映射
        this.messageHandlers.set(topic, callback)

        // 订阅主题
        this.client.subscribe(topic, (err) => {
            if (err) {
                console.error(`订阅主题 ${topic} 失败:`, err)
                this.messageHandlers.delete(topic)
                return false
            } else {
                console.log(`订阅主题 ${topic} 成功`)
                return true
            }
        })
    },

    // 处理消息
    handleMessage(topic, message) {
        try {
			console.log(message)
            const data = JSON.parse(message.toString())
            console.log('收到消息:', topic, data)
            
            // 获取对应主题的处理函数
            const handler = this.messageHandlers.get(topic)
            if (handler) {
                handler(data)
            } else {
                // 处理通配符订阅
                this.messageHandlers.forEach((callback, pattern) => {
                    if (this.matchTopic(topic, pattern)) {
                        callback(data)
                    }
                })
            }
        } catch (error) {
            console.error('消息解析错误:', error)
        }
    },

    // 主题匹配(支持通配符)
    matchTopic(topic, pattern) {
        if (topic === pattern) return true
        if (pattern.includes('#')) {
            const prefix = pattern.replace(/#$/, '')
            return topic.startsWith(prefix)
        }
        return false
    },

    // 处理断开连接
    handleDisconnect() {
        // 如果是主动断开,不进行重连
        if (!this.autoReconnect) {
            console.log('连接已主动断开,不进行重连')
            return
        }

        // 检查是否有连接参数
        if (!this.connectionParams) {
            console.error('没有连接参数,无法重连')
            return
        }

        this.isReconnecting = true
        
        // 清除之前的重连定时器
        if (this.reconnectTimer) {
            clearTimeout(this.reconnectTimer)
        }
        
        // 使用指数退避算法计算重连延迟
        this.currentReconnectDelay = Math.min(
            this.currentReconnectDelay * 1.5,
            this.maxReconnectDelay
        )
        
        console.log(`将在 ${this.currentReconnectDelay/1000} 秒后尝试重连...`)
        
        // 设置重连定时器
        this.reconnectTimer = setTimeout(() => {
            this.connect(this.connectionParams)
        }, this.currentReconnectDelay)
    },
    
    // 重新订阅所有主题
    resubscribeTopics() {
        if (this.messageHandlers.size === 0) {
            console.log('没有需要重新订阅的主题')
            return
        }

        console.log(`开始重新订阅 ${this.messageHandlers.size} 个主题...`)
        
        let successCount = 0
        let failCount = 0

        this.messageHandlers.forEach((callback, topic) => {
            if (!this.client || !this.client.connected) {
                console.error('MQTT 未连接,无法重新订阅')
                return
            }

            this.client.subscribe(topic, (err) => {
                if (err) {
                    console.error(`重新订阅主题 ${topic} 失败:`, err)
                    failCount++
                } else {
                    console.log(`重新订阅主题 ${topic} 成功`)
                    successCount++
                }

                // 当所有主题都处理完成后,输出统计信息
                if (successCount + failCount === this.messageHandlers.size) {
                    console.log(`重新订阅完成:成功 ${successCount} 个,失败 ${failCount} 个`)
                }
            })
        })
    },

    // 发布消息
    publish(topic, message) {
        if (this.client && this.client.connected) {
            this.client.publish(topic, JSON.stringify(message))
            return true
        } else {
            console.error('MQTT 未连接')
            return false
        }
    },
    
    // 设置自动重连
    setAutoReconnect(value) {
        this.autoReconnect = value
        if (!value && this.reconnectTimer) {
            clearTimeout(this.reconnectTimer)
            this.isReconnecting = false
            this.currentReconnectDelay = this.baseReconnectDelay
        }
    },
    
    // 断开连接
    disconnect() {
        this.autoReconnect = false
        
        if (this.client) {
            this.client.end()
            this.client = null
            this.messageHandlers.clear()
            
            if (this.reconnectTimer) {
                clearTimeout(this.reconnectTimer)
                this.reconnectTimer = null
            }
            
            this.currentReconnectDelay = this.baseReconnectDelay
            this.connectionParams = null
        }
    }
}

export default mqttTool

main.js  调用,设置全局

// 从封装的工具导入
import mqtt from './utils/mqtt'
// 将mqtt实例挂载到Vue.prototype上
Vue.prototype.$mqtt = mqtt

组件中调用

引入mqtt.js

import mqttTool from '../../utils/mqtt';

创建连接

async initMQTT() {
			try{
				const params = {
					port:8083,
					url:'', //需要连接的url //需要连接的url
					username:'', //需要连接配置的username,
					password:'',// 需要连接配置的password,
				}
				// 等待连接成功
				await mqttTool.connect(params)
				
				// 订阅主题
				mqttTool.subscribe('your/topic'’, (data) => {
					console.log('收到消息:', data)
					// 处理数据
					
				})
				
				        
				console.log('MQTT 初始化成功')
			}catch (error) {
                console.error('MQTT 初始化失败:', error)
            }
		},

断开连接

	beforeDestroy() {
		// 组件销毁时取消所有订阅并断开连接
		if (this.mqttTool) {
			// 主动断开连接(不会重连)
			this.mqttTool.disconnect()
		}
	}

http://www.dtcms.com/a/109808.html

相关文章:

  • 大模型面试题
  • SSL/TLS
  • 【系统架构设计师】系统架构评估中的重要概念
  • 几何法证明卡特兰数_栈混洗
  • 代码随想录|动态规划|21组合总和IV
  • 从零开始:使用Spring Boot和MyBatis实现CRUD操作全攻略
  • 25.Reactor
  • PHP代码审计-01
  • RAGFlow 知识库分段研究
  • 码界奇缘 Java 觉醒 第一章 命运的终端
  • 数据驱动防灾:AI 大模型在地质灾害应急决策中的关键作用。基于DeepSeek/ChatGPT的AI智能体开发
  • 关于inode,dentry结合软链接及硬链接的实验
  • 线性代数:分块矩阵,秩,齐次线性,非齐次线性的解相关经典例题
  • 带头结点 的单链表插入方法(头插法与尾插法)
  • 纪检委行业光盘安全隔离与数据摆渡应用方案
  • 笔记:背包问题总结
  • 【PHP】- 项目通用目录架构及示例demo
  • 【LLM之评测】AlignBench: Benchmarking Chinese Alignment of Large Language Models
  • Docker 常用指令速查
  • TDengine 重磅功能虚拟表
  • Spring Security(maven项目) 3.1.0
  • 青少年编程与数学 02-015 大学数学知识点 06课题、离散数学
  • Linux驱动开发进阶(三)- 热插拔机制
  • Scala基础知识
  • 数据结构学习
  • 混杂模式(Promiscuous Mode)与 Trunk 端口的区别详解
  • UEFI镜像结构布局
  • 蓝桥杯刷题记录【并查集001】(2024)
  • DeepSeek真的超越了OpenAI吗?
  • 黑马点评项目总结