当前位置: 首页 > news >正文

MQTT 协议深度学习笔记(含实战示例・完整版)

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)是物联网(IoT)领域的 “轻量级通信基石”,专为资源受限设备(如嵌入式传感器)和弱网络环境(如 2G、LoRa)设计,基于发布 / 订阅模式实现高效、灵活的消息传输,目前已成为 ISO 标准(ISO/IEC PRF 20922),广泛应用于智能家居、工业监控、远程医疗等场景。

一、MQTT 协议基础:定义与起源

1. 核心定义

MQTT 是工作在 TCP/IP 协议簇之上 的发布 / 订阅型消息协议,核心目标是用最小的带宽和硬件资源,实现设备间的可靠通信。其 “轻量级” 体现在两方面:

  • 协议头精简:最小固定头仅 2 字节,远超 HTTP(头信息通常数十至上百字节);

  • 逻辑简单:无需复杂的请求 / 响应交互,设备只需 “发布消息” 或 “订阅主题” 即可通信。

2. 起源与发展

  • 1999 年:由 IBM 联合 Arcom(现属 Sierra Wireless)开发,最初用于石油、天然气行业的 “远程传感器数据采集”(如油井压力监测),解决 “低带宽、高延迟、设备算力低” 的痛点;

  • 2014 年:由 OASIS(结构化信息标准促进组织)接管,正式成为开源标准,推动其在 IoT 领域普及;

  • 现状:支持 AWS IoT、Azure IoT Hub、阿里云 IoT 等主流云平台,兼容 C、Java、Python、JavaScript 等几乎所有编程语言,是 IoT 设备通信的 “事实标准”。

二、MQTT 核心角色与通信要素(附示例)

MQTT 协议通过 “3 类角色 + 3 个核心要素” 实现消息流转,结构清晰且解耦发布者与订阅者。

1. 三大核心角色

角色英文标识功能描述实战示例
发布者Publisher生产消息的客户端,将消息按 “主题” 发送给 Broker,无需知道订阅者是谁温湿度传感器(每 10 秒发布一次数据)、智能门锁(开门时发布 “开门事件”)
订阅者Subscriber消费消息的客户端,通过订阅 “主题” 从 Broker 接收感兴趣的消息,无需知道发布者手机 App(订阅 “客厅温湿度” 查看数据)、报警器(订阅 “烟雾检测” 触发报警)
经纪人(代理)Broker核心服务器,负责:①接收发布者的消息;②管理主题订阅关系;③将消息转发给所有订阅者开源 Broker(如 Mosquitto、EMQX)、云 Broker(如阿里云 IoT 网关)

关键特性:发布者与订阅者可身份重叠(例如一个智能灯,既 “订阅控制指令”(如 “开灯”),也 “发布状态”(如 “已开灯”))。

2. 三大通信要素

(1)主题(Topic):消息的 “路由地址”
  • 定义:类似 “文件夹路径”,用

    /

    分隔层级,用于区分不同类型的消息,例如:

    • home/livingroom/temp:客厅温度数据;

    • car/engine/status:汽车发动机状态;

    • device/door/123/event:编号 123 的门锁事件。

  • 通配符规则:支持两种通配符,满足 “批量订阅” 需求:

    • +:匹配单个层级,例如 home/+/temp 可匹配 home/bedroom/temp(卧室温度)、home/kitchen/temp(厨房温度);

    • #:匹配所有子层级(必须放在末尾),例如 home/# 可匹配 home/livingroom/temphome/bedroom/light/status 等所有 “home” 下的主题。

示例

订阅者订阅 home/#,则发布者发送到 home/livingroom/humidity(客厅湿度)、home/bedroom/light(卧室灯光)的消息,都会被该订阅者接收。

(2)负载(Payload):消息的 “实际内容”
  • 定义:发布者传递给订阅者的具体数据,格式无强制要求,常见格式如下:

    • JSON(最常用):{"temp": 25.3, "humidity": 58, "timestamp": 1690000000}

    • 文本:"door_opened"(门锁开门事件);

    • 二进制:传感器采集的原始二进制数据(如音频、图片片段)。

示例

温湿度传感器(发布者)向主题 home/livingroom/env 发布消息,Payload 为 JSON 格式:

{"device_id": "sensor_001", "temp": 24.8, "humidity": 62, "update_time": "2024-05-20 14:30:00"}

(3)服务质量(QoS):消息的 “可靠性等级”

MQTT 最核心的特性之一,通过 3 个等级平衡 “可靠性” 与 “传输效率”,适配不同业务场景:

QoS 等级英文标识核心逻辑数据流向示例适用场景
QoS 0Almost Once至多一次:发布者发一次消息,不等确认;Broker 收后直接转发,可能丢失传感器 → Broker(无确认);Broker → 订阅者(无确认)非关键数据(如实时监控视频帧、临时状态上报)
QoS 1At Least Once至少一次:发布者发消息后等 Broker 确认(PUBACK),未确认则重发,可能重复传感器 → Broker(发 PUBLISH → 收 PUBACK);Broker → 订阅者(同逻辑)关键数据(如设备控制指令、报警信息)
QoS 2Exactly Once仅一次:四步确认(发→Broker 确认→Broker→订阅者确认→最终确认),无丢失 / 重复传感器→Broker(PUBLISH→PUBREC);Broker→订阅者(PUBLISH→PUBREC→PUBREL→PUBCOMP)核心数据(如金融交易、设备故障日志)

示例

  • 场景 1:实时监控摄像头(QoS 0):丢失几帧画面不影响整体监控,优先保证传输效率;

  • 场景 2:智能开关控制(QoS 1):必须确保 “开灯指令” 到达,即使重复执行(开关重复开灯无影响);

  • 场景 3:燃气表计费数据(QoS 2):必须确保 “用气量 10m³” 仅被记录一次,避免重复计费或漏计费。

三、MQTT 报文结构(数据包格式)

MQTT 客户端与 Broker 的所有通信都通过 “报文(Packet)” 完成,所有报文均由 固定头(Fixed Header)、可变头(Variable Header)、消息体(Payload) 三部分构成,后两部分是否存在取决于报文类型。

1. 固定头(Fixed Header):所有报文必含

用于标识报文类型和核心属性,最小 2 字节,结构如下:

字节位置字段名称长度(bit)功能描述
第 1 字节报文类型4共 14 种类型,如:0001=CONNECT(连接请求)、0011=PUBLISH(发布消息)、0100=SUBSCRIBE(订阅请求)
第 1 字节标志位(Flags)4补充报文属性,如 PUBLISH 报文的标志位包含 QoS 等级、是否保留消息(Retain)
第 2 + 字节剩余长度1~4 字节表示 “可变头 + 消息体” 的总字节数,采用 “可变长度编码”(最高位为续位标志,0 表示结束),最大支持 256MB

示例

PUBLISH 报文的固定头(第 1 字节):0b00110010

  • 前 4 位 0011:表示报文类型为 PUBLISH;

  • 后 4 位 0010:包含 QoS 等级(中间 2 位 01 表示 QoS 1)、Retain 标志(最低位 0 表示不保留消息)。

2. 可变头(Variable Header):部分报文含

内容由 “报文类型” 决定,用于传递该类型报文的额外信息,常见示例:

报文类型可变头包含的内容
CONNECT协议名(如 “MQTT”)、协议版本(如 5.0)、连接标志(是否需用户名密码、是否清除会话)、保活时间
PUBLISH主题名(Topic Name)、报文标识符(仅 QoS 1/2 需携带,用于重发和确认)
SUBSCRIBE报文标识符(用于接收 SUBSCRIBE 的确认报文 SUBACK)

示例

PUBLISH 报文的可变头(QoS 1):

  • 主题名长度(2 字节):0x000E(14 字节);

  • 主题名(14 字节):home/livingroom/temp

  • 报文标识符(2 字节):0x0005(用于匹配 Broker 回复的 PUBACK 报文)。

3. 消息体(Payload):部分报文含

即实际业务数据,常见示例:

报文类型消息体包含的内容
CONNECT客户端标识符(Client ID,Broker 唯一标识客户端)、用户名、密码
PUBLISH负载(Payload):如温湿度数据、控制指令
SUBSCRIBE订阅的主题列表及对应的 QoS 等级(如 home/# QoS 1、device/door/123/event QoS 0)

完整报文示例

以 “传感器发布温湿度数据(QoS 1)” 为例,完整 PUBLISH 报文结构如下:

plaintext

[固定头](3 字节)
- 第 1 字节:0b00110010(PUBLISH 类型 + QoS 1 + 不保留)
- 第 2 字节:0x18(剩余长度:可变头 16 字节 + 消息体 8 字节 = 24 字节,0x18 即 24)
​
[可变头](16 字节)
- 主题名长度(2 字节):0x000E(14 字节)
- 主题名(14 字节):home/livingroom/temp
- 报文标识符(2 字节):0x0005
​
[消息体](8 字节)
- 负载数据:{"t":25.5}(JSON 格式,温度 25.5℃)

四、MQTT 关键机制(附配置示例)

除基础角色和报文外,MQTT 还通过以下机制保障通信稳定性和灵活性。

1. 会话保持(Session)

客户端与 Broker 建立连接时,通过 “清除会话(Clean Session)” 标志控制会话状态是否保留:

  • Clean Session = 1:客户端断开连接后,Broker 清除其订阅关系、未完成的 QoS 1/2 消息,下次连接视为新会话;

  • Clean Session = 0:客户端断开连接后,Broker 保留其订阅关系和未转发的 QoS 1/2 消息,下次重连后继续交付。

示例

智能门锁(客户端)连接 Broker 时设置 Clean Session = 0

  • 若门锁断电(异常断开),Broker 会保留其订阅的 “远程开锁指令” 主题;

  • 门锁重启后重连 Broker,Broker 会将断电期间收到的 “开锁指令” 转发给门锁。

2. 保活机制(Keep Alive)

防止 TCP 连接 “假死”(如网络中断但未触发 TCP 断开),客户端与 Broker 协商 “保活时间(单位:秒)”:

  • 客户端需在 “保活时间” 内发送 PINGREQ 报文(心跳);

  • Broker 收到 PINGREQ 后,回复 PINGRESP 报文

  • 若 Broker 超过 “1.5 倍保活时间” 未收到 PINGREQ,判定客户端离线,断开连接。

示例

传感器(客户端)连接 Broker 时设置 Keep Alive = 60(60 秒):

  • 传感器需每 60 秒内发送一次 PINGREQ;

  • 若传感器网络中断,Broker 等待 90 秒(1.5×60)后仍未收到 PINGREQ,判定传感器离线,触发 “遗嘱消息”。

3. 遗嘱消息(LWT)

客户端连接时预设 “遗嘱主题” 和 “遗嘱负载”,若客户端异常断开(如断电、网络中断),Broker 会自动将 “遗嘱消息” 发布到预设主题,通知其他订阅者该设备离线。

配置示例

智能灯(客户端)连接 Broker 时配置 LWT:

  • 遗嘱主题:device/light/123/status

  • 遗嘱负载:{"status": "offline", "time": "2024-05-20 15:00:00"}

  • 若智能灯断电,Broker 会向 device/light/123/status 发布上述遗嘱消息,手机 App(订阅该主题)会收到 “智能灯离线” 的通知。

4. 保留消息(Retain)

发布消息时可设置 “Retain 标志”,若为 1,Broker 会 “保留” 该主题的最新一条消息:

  • 新订阅该主题的客户端,无需等待发布者再次发送,即可直接收到 Broker 保留的最新消息;

  • 若发布者发送新的 Retain 消息,Broker 会用新消息覆盖旧消息;

  • 若发布 “空负载” 的 Retain 消息,Broker 会删除该主题的保留消息。

示例

气象站(发布者)向主题 weather/city/beijing 发布 Retain 消息(负载 {"temp": 26, "weather": "sunny"}):

  • 第二天新订阅该主题的客户端(如手机天气 App),会直接收到 Broker 保留的 “北京温度 26℃、晴天” 消息,无需等待气象站再次发布。

五、MQTT 实战:基于 Python 的完整发布 / 订阅案例

以 “温湿度传感器发布数据 + 手机 App 订阅数据” 为例,使用开源 Broker(Mosquitto)和 Python 的 paho-mqtt 库实现端到端通信。

1. 环境准备

  1. 安装 Mosquitto Broker(Windows/Linux/macOS 均可):

    • Linux:sudo apt-get install mosquitto,安装后自动启动,默认端口 1883;

    • Windows:从 Mosquitto 官网 下载安装包,安装后手动启动服务(在安装目录执行 mosquitto.exe);

    • 验证:执行 mosquitto -v,若显示 “Opening ipv4 listen socket on port 1883”,表示 Broker 启动成功。

  2. 安装 Python 依赖库

    执行 pip install paho-mqtt,安装 MQTT 客户端库。

2. 发布者代码(温湿度传感器模拟)

功能:每 5 秒生成一次模拟温湿度数据,以 QoS 1 发布到主题 home/livingroom/env,并配置 LWT 和保活机制。

import paho.mqtt.client as mqtt
import json
import time
import random
​
# 1. Broker 基础配置
BROKER_HOST = "localhost"  # 本地Broker地址,远程Broker填IP(如"192.168.1.100")
BROKER_PORT = 1883         # MQTT默认端口(未加密),加密端口为8883
PUB_TOPIC = "home/livingroom/env"  # 发布的主题
CLIENT_ID = "sensor_001"   # 客户端唯一标识(不可重复)
​
# 2. 连接回调函数(连接成功/失败时触发)
def on_connect(client, userdata, flags, rc):# rc=0表示连接成功,其他值为错误码if rc == 0:print(f"✅ 传感器 {CLIENT_ID} 已连接到Broker")else:print(f"❌ 连接失败,错误码:{rc}")
​
# 3. 发布消息回调函数(消息发布成功时触发)
def on_publish(client, userdata, mid):print(f"📤 消息发布成功,消息ID:{mid}")
​
# 4. 创建客户端实例
client = mqtt.Client(client_id=CLIENT_ID, clean_session=False)
​
# 5. 设置回调函数
client.on_connect = on_connect
client.on_publish = on_publish
​
# 6. 配置遗嘱消息(LWT)
client.will_set(topic="device/status",payload=json.dumps({"device_id": CLIENT_ID,"status": "offline","timestamp": time.strftime("%Y-%m-%d %H:%M:%S")}),qos=1,retain=True
)
​
# 7. 连接到Broker(设置保活时间60秒)
client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)
​
# 8. 启动后台网络循环
client.loop_start()
​
try:# 9. 循环发布模拟数据while True:# 生成模拟温湿度数据temperature = round(random.uniform(20.0, 30.0), 1)humidity = round(random.uniform(30.0, 80.0), 1)# 构造消息负载payload = {"device_id": CLIENT_ID,"temperature": temperature,"humidity": humidity,"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")}# 发布消息(QoS=1,不保留消息)result = client.publish(topic=PUB_TOPIC,payload=json.dumps(payload),qos=1,retain=False)# 等待发布确认(QoS>0时有效)result.wait_for_publish()print(f"发布数据:{payload}")# 每5秒发布一次time.sleep(5)
​
except KeyboardInterrupt:# 10. 捕获Ctrl+C中断,优雅退出print("\n用户中断程序")
finally:# 11. 停止网络循环并断开连接client.loop_stop()client.disconnect()print("已断开与Broker的连接")import paho.mqtt.client as mqtt
import json
import time
import random
​
# 1. Broker 基础配置
BROKER_HOST = "localhost"  # 本地 Broker 地址,远程 Broker 填 IP(如 "192.168.1.100")
BROKER_PORT = 1883         # MQTT 默认端口(未加密),加密端口为 8883
PUB_TOPIC = "home/livingroom/env"  # 发布的主题
CLIENT_ID = "sensor_001"   # 客户端唯一标识(不可重复)
​
# 2. 连接回调函数(连接成功/失败时触发)
def on_connect(client, userdata, flags, rc):# rc=0 表示连接成功,其他值为错误码(如 rc=1:协议版本不支持)if rc == 0:print(f"✅ 传感器 {CLIENT_ID} 已连接到 

3.订阅者代码

import paho.mqtt.client as mqtt
import json
import time
​
# 1. Broker 基础配置
BROKER_HOST = "localhost"  # 与发布者相同的Broker地址
BROKER_PORT = 1883
SUB_TOPICS = [              # 订阅的主题列表(可多个)("home/livingroom/env", 1),  # (主题, QoS)("device/status", 1)
]
CLIENT_ID = "monitor_client"  # 订阅者客户端ID
​
# 2. 连接回调函数
def on_connect(client, userdata, flags, rc):if rc == 0:print(f"✅ 监控客户端 {CLIENT_ID} 已连接到Broker")# 连接成功后订阅主题client.subscribe(SUB_TOPICS)print(f"📥 已订阅主题:{[topic[0] for topic in SUB_TOPICS]}")else:print(f"❌ 连接失败,错误码:{rc}")
​
# 3. 接收消息回调函数
def on_message(client, userdata, msg):try:# 尝试解析JSON格式消息payload = json.loads(msg.payload.decode('utf-8'))message = json.dumps(payload, indent=2)  # 格式化显示except json.JSONDecodeError:# 非JSON格式消息直接显示message = msg.payload.decode('utf-8')# 打印接收的消息print(f"\n📩 收到消息:")print(f"   主题:{msg.topic}")print(f"   QoS:{msg.qos}")print(f"   内容:{message}")
​
# 4. 订阅确认回调函数
def on_subscribe(client, userdata, mid, granted_qos):print(f"✅ 订阅确认,消息ID:{mid},授予的QoS:{granted_qos}")
​
# 5. 创建客户端实例
client = mqtt.Client(client_id=CLIENT_ID, clean_session=False)
​
# 6. 设置回调函数
client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe
​
# 7. 连接到Broker
client.connect(BROKER_HOST, BROKER_PORT, keepalive=60)
​
try:# 8. 阻塞式网络循环(持续处理消息)client.loop_forever()
​
except KeyboardInterrupt:# 9. 捕获Ctrl+C中断,优雅退出print("\n用户中断程序")
finally:# 10. 断开连接client.disconnect()print("已断开与Broker的连接")
http://www.dtcms.com/a/500116.html

相关文章:

  • 工程建设网站导航图珠海建网站价格
  • 做外贸是什么网站网络宣传网站建设制作
  • 网站关键词制作《电子商务网站开发与管理》书籍
  • 成绩查询系统网站开发怎么做网站的图片跳转
  • 基于 GEE MODIS 数据实现 7 大遥感指数计算与可视化
  • 【计算机算法设计与分析】分治算法
  • CSS核心概念全解析:从入门到精通
  • 公司品牌网站建设常州语言网站建设
  • 北京做商铺的网站网站建设及域名申请 厦门
  • 微网站制作软件无版权视频素材网站
  • 深圳外贸建站模版那些市区做网站群
  • 【Linux】路劲解析-简析inode和dentry关系
  • AI Agent概念 原理 实践
  • 微信公众号的网站开发四川建设厅官方网站文件下载
  • 提供五屏网站建设深圳外贸建站网络推广价格
  • 电脑的 wifi 图标不见了该怎么处理
  • 深入浅出:SQL注入中的逗号绕过技巧剖析
  • (Kotlin高级特性四)kotlin属性委托(如 by lazy) 的原理?
  • 网站美术视觉效果布局设计在线服务平台的跨境电商有哪些
  • k8s(七)pod的配置资源管理
  • 做软件跟网站哪个难沭阳找做网站合伙
  • 智元灵犀X1开源分析-通讯架构
  • 5.1元挂逼VPSW
  • 旅游电子商务网站建设长春做网站wang
  • 智能语义搜索核心算法:全链路技术解析与工程实践,将rag向量检索准确率提升到98%以上……
  • 2025基于springboot的校车预定全流程管理系统
  • 学网站建设需要下载什么太平保险网站
  • 封面型网站首页怎么做做吃穿住行网站
  • macos安装、更新、使用homebrew
  • Vue3+Three.js:第06期 实现立方体旋转动画