mqtt封装与使用
mqtt.js
import mqtt from 'mqtt'
const mqttTool = {
client: null,
messageHandlers: new Map(), // 存储主题和对应的处理函数
isReconnecting: false, // 是否正在重连
autoReconnect: true, // 是否自动重连
reconnectTimer: null, // 重连定时器
baseReconnectDelay: 1000, // 基础重连延迟(毫秒)
maxReconnectDelay: 30000, // 最大重连延迟(毫秒)
currentReconnectDelay: 1000, // 当前重连延迟
connectionParams: null, // 存储连接参数
// 连接方法
connect(params) {
return new Promise((resolve, reject) => {
// 保存连接参数
this.connectionParams = params
// 防止重复连接
if (this.isReconnecting) {
console.log('正在重连中,请稍候...')
return reject(new Error('正在重连中'))
}
const options = {
port: 8083,
host: params.url,
protocol: 'ws',
path: '/mqtt',
clientId: this.generateClientId(),
username: params.username,
password: params.password,
connectTimeout: 5000,
clean: false, // 保持会话
rejectUnauthorized: false,
keepalive: 60, // 60秒发送一次心跳
}
try {
// 创建连接
this.client = mqtt.connect(options)
// 设置连接事件处理
this.client.on('connect', () => {
console.log('MQTT 连接成功')
this.isReconnecting = false
this.currentReconnectDelay = this.baseReconnectDelay // 重置重连延迟
// 重新订阅之前订阅的主题
this.resubscribeTopics()
resolve(this.client)
})
this.client.on('error', (error) => {
console.error('MQTT 连接错误:', error)
this.handleDisconnect()
reject(error)
})
this.client.on('message', (topic, message) => {
this.handleMessage(topic, message)
})
// 监听断开连接事件
this.client.on('close', () => {
console.log('MQTT 连接已断开')
this.handleDisconnect()
})
} catch (error) {
console.error('MQTT 连接失败:', error)
reject(error)
}
})
},
// 生成客户端ID
generateClientId() {
const timestamp = Date.now()
const random = Math.random().toString(36).substring(2, 8)
return `mqtt_client_${timestamp}_${random}`
},
// 订阅主题
subscribe(topic, callback) {
if (!this.client || !this.client.connected) {
console.error('MQTT 未连接,无法订阅主题')
return false
}
// 存储主题和回调函数的映射
this.messageHandlers.set(topic, callback)
// 订阅主题
this.client.subscribe(topic, (err) => {
if (err) {
console.error(`订阅主题 ${topic} 失败:`, err)
this.messageHandlers.delete(topic)
return false
} else {
console.log(`订阅主题 ${topic} 成功`)
return true
}
})
},
// 处理消息
handleMessage(topic, message) {
try {
console.log(message)
const data = JSON.parse(message.toString())
console.log('收到消息:', topic, data)
// 获取对应主题的处理函数
const handler = this.messageHandlers.get(topic)
if (handler) {
handler(data)
} else {
// 处理通配符订阅
this.messageHandlers.forEach((callback, pattern) => {
if (this.matchTopic(topic, pattern)) {
callback(data)
}
})
}
} catch (error) {
console.error('消息解析错误:', error)
}
},
// 主题匹配(支持通配符)
matchTopic(topic, pattern) {
if (topic === pattern) return true
if (pattern.includes('#')) {
const prefix = pattern.replace(/#$/, '')
return topic.startsWith(prefix)
}
return false
},
// 处理断开连接
handleDisconnect() {
// 如果是主动断开,不进行重连
if (!this.autoReconnect) {
console.log('连接已主动断开,不进行重连')
return
}
// 检查是否有连接参数
if (!this.connectionParams) {
console.error('没有连接参数,无法重连')
return
}
this.isReconnecting = true
// 清除之前的重连定时器
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
}
// 使用指数退避算法计算重连延迟
this.currentReconnectDelay = Math.min(
this.currentReconnectDelay * 1.5,
this.maxReconnectDelay
)
console.log(`将在 ${this.currentReconnectDelay/1000} 秒后尝试重连...`)
// 设置重连定时器
this.reconnectTimer = setTimeout(() => {
this.connect(this.connectionParams)
}, this.currentReconnectDelay)
},
// 重新订阅所有主题
resubscribeTopics() {
if (this.messageHandlers.size === 0) {
console.log('没有需要重新订阅的主题')
return
}
console.log(`开始重新订阅 ${this.messageHandlers.size} 个主题...`)
let successCount = 0
let failCount = 0
this.messageHandlers.forEach((callback, topic) => {
if (!this.client || !this.client.connected) {
console.error('MQTT 未连接,无法重新订阅')
return
}
this.client.subscribe(topic, (err) => {
if (err) {
console.error(`重新订阅主题 ${topic} 失败:`, err)
failCount++
} else {
console.log(`重新订阅主题 ${topic} 成功`)
successCount++
}
// 当所有主题都处理完成后,输出统计信息
if (successCount + failCount === this.messageHandlers.size) {
console.log(`重新订阅完成:成功 ${successCount} 个,失败 ${failCount} 个`)
}
})
})
},
// 发布消息
publish(topic, message) {
if (this.client && this.client.connected) {
this.client.publish(topic, JSON.stringify(message))
return true
} else {
console.error('MQTT 未连接')
return false
}
},
// 设置自动重连
setAutoReconnect(value) {
this.autoReconnect = value
if (!value && this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
this.isReconnecting = false
this.currentReconnectDelay = this.baseReconnectDelay
}
},
// 断开连接
disconnect() {
this.autoReconnect = false
if (this.client) {
this.client.end()
this.client = null
this.messageHandlers.clear()
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer)
this.reconnectTimer = null
}
this.currentReconnectDelay = this.baseReconnectDelay
this.connectionParams = null
}
}
}
export default mqttTool
main.js 调用,设置全局
// 从封装的工具导入
import mqtt from './utils/mqtt'
// 将mqtt实例挂载到Vue.prototype上
Vue.prototype.$mqtt = mqtt
组件中调用
引入mqtt.js
import mqttTool from '../../utils/mqtt';
创建连接
async initMQTT() {
try{
const params = {
port:8083,
url:'', //需要连接的url //需要连接的url
username:'', //需要连接配置的username,
password:'',// 需要连接配置的password,
}
// 等待连接成功
await mqttTool.connect(params)
// 订阅主题
mqttTool.subscribe('your/topic'’, (data) => {
console.log('收到消息:', data)
// 处理数据
})
console.log('MQTT 初始化成功')
}catch (error) {
console.error('MQTT 初始化失败:', error)
}
},
断开连接
beforeDestroy() {
// 组件销毁时取消所有订阅并断开连接
if (this.mqttTool) {
// 主动断开连接(不会重连)
this.mqttTool.disconnect()
}
}