MQTT 协议深度学习笔记(含实战示例・完整版)
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)是物联网(IoT)领域的 “轻量级通信基石”,专为资源受限设备(如嵌入式传感器)和弱网络环境(如 2G、LoRa)设计,基于发布 / 订阅模式实现高效、灵活的消息传输,目前已成为 ISO 标准(ISO/IEC PRF 20922),广泛应用于智能家居、工业监控、远程医疗等场景。
一、MQTT 协议基础:定义与起源
1. 核心定义
MQTT 是工作在 TCP/IP 协议簇之上 的发布 / 订阅型消息协议,核心目标是用最小的带宽和硬件资源,实现设备间的可靠通信。其 “轻量级” 体现在两方面:
-
协议头精简:最小固定头仅 2 字节,远超 HTTP(头信息通常数十至上百字节);
-
逻辑简单:无需复杂的请求 / 响应交互,设备只需 “发布消息” 或 “订阅主题” 即可通信。
2. 起源与发展
-
1999 年:由 IBM 联合 Arcom(现属 Sierra Wireless)开发,最初用于石油、天然气行业的 “远程传感器数据采集”(如油井压力监测),解决 “低带宽、高延迟、设备算力低” 的痛点;
-
2014 年:由 OASIS(结构化信息标准促进组织)接管,正式成为开源标准,推动其在 IoT 领域普及;
-
现状:支持 AWS IoT、Azure IoT Hub、阿里云 IoT 等主流云平台,兼容 C、Java、Python、JavaScript 等几乎所有编程语言,是 IoT 设备通信的 “事实标准”。
二、MQTT 核心角色与通信要素(附示例)
MQTT 协议通过 “3 类角色 + 3 个核心要素” 实现消息流转,结构清晰且解耦发布者与订阅者。
1. 三大核心角色
角色 | 英文标识 | 功能描述 | 实战示例 |
---|---|---|---|
发布者 | Publisher | 生产消息的客户端,将消息按 “主题” 发送给 Broker,无需知道订阅者是谁 | 温湿度传感器(每 10 秒发布一次数据)、智能门锁(开门时发布 “开门事件”) |
订阅者 | Subscriber | 消费消息的客户端,通过订阅 “主题” 从 Broker 接收感兴趣的消息,无需知道发布者 | 手机 App(订阅 “客厅温湿度” 查看数据)、报警器(订阅 “烟雾检测” 触发报警) |
经纪人(代理) | Broker | 核心服务器,负责:①接收发布者的消息;②管理主题订阅关系;③将消息转发给所有订阅者 | 开源 Broker(如 Mosquitto、EMQX)、云 Broker(如阿里云 IoT 网关) |
关键特性:发布者与订阅者可身份重叠(例如一个智能灯,既 “订阅控制指令”(如 “开灯”),也 “发布状态”(如 “已开灯”))。
2. 三大通信要素
(1)主题(Topic):消息的 “路由地址”
-
定义:类似 “文件夹路径”,用
/
分隔层级,用于区分不同类型的消息,例如:
-
home/livingroom/temp
:客厅温度数据; -
car/engine/status
:汽车发动机状态; -
device/door/123/event
:编号 123 的门锁事件。
-
-
通配符规则:支持两种通配符,满足 “批量订阅” 需求:
-
+
:匹配单个层级,例如home/+/temp
可匹配home/bedroom/temp
(卧室温度)、home/kitchen/temp
(厨房温度); -
#
:匹配所有子层级(必须放在末尾),例如home/#
可匹配home/livingroom/temp
、home/bedroom/light/status
等所有 “home” 下的主题。
-
示例:
订阅者订阅 home/#
,则发布者发送到 home/livingroom/humidity
(客厅湿度)、home/bedroom/light
(卧室灯光)的消息,都会被该订阅者接收。
(2)负载(Payload):消息的 “实际内容”
-
定义:发布者传递给订阅者的具体数据,格式无强制要求,常见格式如下:
-
JSON(最常用):
{"temp": 25.3, "humidity": 58, "timestamp": 1690000000}
; -
文本:
"door_opened"
(门锁开门事件); -
二进制:传感器采集的原始二进制数据(如音频、图片片段)。
-
示例:
温湿度传感器(发布者)向主题 home/livingroom/env
发布消息,Payload 为 JSON 格式:
{"device_id": "sensor_001", "temp": 24.8, "humidity": 62, "update_time": "2024-05-20 14:30:00"}
。
(3)服务质量(QoS):消息的 “可靠性等级”
MQTT 最核心的特性之一,通过 3 个等级平衡 “可靠性” 与 “传输效率”,适配不同业务场景:
QoS 等级 | 英文标识 | 核心逻辑 | 数据流向示例 | 适用场景 |
---|---|---|---|---|
QoS 0 | Almost Once | 至多一次:发布者发一次消息,不等确认;Broker 收后直接转发,可能丢失 | 传感器 → Broker(无确认);Broker → 订阅者(无确认) | 非关键数据(如实时监控视频帧、临时状态上报) |
QoS 1 | At Least Once | 至少一次:发布者发消息后等 Broker 确认(PUBACK),未确认则重发,可能重复 | 传感器 → Broker(发 PUBLISH → 收 PUBACK);Broker → 订阅者(同逻辑) | 关键数据(如设备控制指令、报警信息) |
QoS 2 | Exactly Once | 仅一次:四步确认(发→Broker 确认→Broker→订阅者确认→最终确认),无丢失 / 重复 | 传感器→Broker(PUBLISH→PUBREC);Broker→订阅者(PUBLISH→PUBREC→PUBREL→PUBCOMP) | 核心数据(如金融交易、设备故障日志) |
示例:
-
场景 1:实时监控摄像头(QoS 0):丢失几帧画面不影响整体监控,优先保证传输效率;
-
场景 2:智能开关控制(QoS 1):必须确保 “开灯指令” 到达,即使重复执行(开关重复开灯无影响);
-
场景 3:燃气表计费数据(QoS 2):必须确保 “用气量 10m³” 仅被记录一次,避免重复计费或漏计费。
三、MQTT 报文结构(数据包格式)
MQTT 客户端与 Broker 的所有通信都通过 “报文(Packet)” 完成,所有报文均由 固定头(Fixed Header)、可变头(Variable Header)、消息体(Payload) 三部分构成,后两部分是否存在取决于报文类型。
1. 固定头(Fixed Header):所有报文必含
用于标识报文类型和核心属性,最小 2 字节,结构如下:
字节位置 | 字段名称 | 长度(bit) | 功能描述 |
---|---|---|---|
第 1 字节 | 报文类型 | 4 | 共 14 种类型,如:0001=CONNECT(连接请求)、0011=PUBLISH(发布消息)、0100=SUBSCRIBE(订阅请求) |
第 1 字节 | 标志位(Flags) | 4 | 补充报文属性,如 PUBLISH 报文的标志位包含 QoS 等级、是否保留消息(Retain) |
第 2 + 字节 | 剩余长度 | 1~4 字节 | 表示 “可变头 + 消息体” 的总字节数,采用 “可变长度编码”(最高位为续位标志,0 表示结束),最大支持 256MB |
示例:
PUBLISH 报文的固定头(第 1 字节):0b00110010
-
前 4 位
0011
:表示报文类型为 PUBLISH; -
后 4 位
0010
:包含 QoS 等级(中间 2 位01
表示 QoS 1)、Retain 标志(最低位0
表示不保留消息)。
2. 可变头(Variable Header):部分报文含
内容由 “报文类型” 决定,用于传递该类型报文的额外信息,常见示例:
报文类型 | 可变头包含的内容 |
---|---|
CONNECT | 协议名(如 “MQTT”)、协议版本(如 5.0)、连接标志(是否需用户名密码、是否清除会话)、保活时间 |
PUBLISH | 主题名(Topic Name)、报文标识符(仅 QoS 1/2 需携带,用于重发和确认) |
SUBSCRIBE | 报文标识符(用于接收 SUBSCRIBE 的确认报文 SUBACK) |
示例:
PUBLISH 报文的可变头(QoS 1):
-
主题名长度(2 字节):
0x000E
(14 字节); -
主题名(14 字节):
home/livingroom/temp
; -
报文标识符(2 字节):
0x0005
(用于匹配 Broker 回复的 PUBACK 报文)。
3. 消息体(Payload):部分报文含
即实际业务数据,常见示例:
报文类型 | 消息体包含的内容 |
---|---|
CONNECT | 客户端标识符(Client ID,Broker 唯一标识客户端)、用户名、密码 |
PUBLISH | 负载(Payload):如温湿度数据、控制指令 |
SUBSCRIBE | 订阅的主题列表及对应的 QoS 等级(如 home/# QoS 1、device/door/123/event QoS 0) |
完整报文示例:
以 “传感器发布温湿度数据(QoS 1)” 为例,完整 PUBLISH 报文结构如下:
plaintext
[固定头](3 字节) - 第 1 字节:0b00110010(PUBLISH 类型 + QoS 1 + 不保留) - 第 2 字节:0x18(剩余长度:可变头 16 字节 + 消息体 8 字节 = 24 字节,0x18 即 24) [可变头](16 字节) - 主题名长度(2 字节):0x000E(14 字节) - 主题名(14 字节):home/livingroom/temp - 报文标识符(2 字节):0x0005 [消息体](8 字节) - 负载数据:{"t":25.5}(JSON 格式,温度 25.5℃)
四、MQTT 关键机制(附配置示例)
除基础角色和报文外,MQTT 还通过以下机制保障通信稳定性和灵活性。
1. 会话保持(Session)
客户端与 Broker 建立连接时,通过 “清除会话(Clean Session)” 标志控制会话状态是否保留:
-
Clean Session = 1:客户端断开连接后,Broker 清除其订阅关系、未完成的 QoS 1/2 消息,下次连接视为新会话;
-
Clean Session = 0:客户端断开连接后,Broker 保留其订阅关系和未转发的 QoS 1/2 消息,下次重连后继续交付。
示例:
智能门锁(客户端)连接 Broker 时设置 Clean Session = 0
:
-
若门锁断电(异常断开),Broker 会保留其订阅的 “远程开锁指令” 主题;
-
门锁重启后重连 Broker,Broker 会将断电期间收到的 “开锁指令” 转发给门锁。
2. 保活机制(Keep Alive)
防止 TCP 连接 “假死”(如网络中断但未触发 TCP 断开),客户端与 Broker 协商 “保活时间(单位:秒)”:
-
客户端需在 “保活时间” 内发送 PINGREQ 报文(心跳);
-
Broker 收到 PINGREQ 后,回复 PINGRESP 报文;
-
若 Broker 超过 “1.5 倍保活时间” 未收到 PINGREQ,判定客户端离线,断开连接。
示例:
传感器(客户端)连接 Broker 时设置 Keep Alive = 60
(60 秒):
-
传感器需每 60 秒内发送一次 PINGREQ;
-
若传感器网络中断,Broker 等待 90 秒(1.5×60)后仍未收到 PINGREQ,判定传感器离线,触发 “遗嘱消息”。
3. 遗嘱消息(LWT)
客户端连接时预设 “遗嘱主题” 和 “遗嘱负载”,若客户端异常断开(如断电、网络中断),Broker 会自动将 “遗嘱消息” 发布到预设主题,通知其他订阅者该设备离线。
配置示例:
智能灯(客户端)连接 Broker 时配置 LWT:
-
遗嘱主题:
device/light/123/status
; -
遗嘱负载:
{"status": "offline", "time": "2024-05-20 15:00:00"}
; -
若智能灯断电,Broker 会向
device/light/123/status
发布上述遗嘱消息,手机 App(订阅该主题)会收到 “智能灯离线” 的通知。
4. 保留消息(Retain)
发布消息时可设置 “Retain 标志”,若为 1
,Broker 会 “保留” 该主题的最新一条消息:
-
新订阅该主题的客户端,无需等待发布者再次发送,即可直接收到 Broker 保留的最新消息;
-
若发布者发送新的 Retain 消息,Broker 会用新消息覆盖旧消息;
-
若发布 “空负载” 的 Retain 消息,Broker 会删除该主题的保留消息。
示例:
气象站(发布者)向主题 weather/city/beijing
发布 Retain 消息(负载 {"temp": 26, "weather": "sunny"}
):
-
第二天新订阅该主题的客户端(如手机天气 App),会直接收到 Broker 保留的 “北京温度 26℃、晴天” 消息,无需等待气象站再次发布。
五、MQTT 实战:基于 Python 的完整发布 / 订阅案例
以 “温湿度传感器发布数据 + 手机 App 订阅数据” 为例,使用开源 Broker(Mosquitto)和 Python 的 paho-mqtt
库实现端到端通信。
1. 环境准备
-
安装 Mosquitto Broker(Windows/Linux/macOS 均可):
-
Linux:
sudo apt-get install mosquitto
,安装后自动启动,默认端口 1883; -
Windows:从 Mosquitto 官网 下载安装包,安装后手动启动服务(在安装目录执行
mosquitto.exe
); -
验证:执行
mosquitto -v
,若显示 “Opening ipv4 listen socket on port 1883”,表示 Broker 启动成功。
-
-
安装 Python 依赖库:
执行
pip install paho-mqtt
,安装 MQTT 客户端库。
2. 发布者代码(温湿度传感器模拟)
功能:每 5 秒生成一次模拟温湿度数据,以 QoS 1 发布到主题 home/livingroom/env
,并配置 LWT 和保活机制。
import paho.mqtt.client as mqtt import json import time import random # 1. Broker 基础配置 BROKER_HOST = "localhost" # 本地Broker地址,远程Broker填IP(如"192.168.1.100") BROKER_PORT = 1883 # MQTT默认端口(未加密),加密端口为8883 PUB_TOPIC = "home/livingroom/env" # 发布的主题 CLIENT_ID = "sensor_001" # 客户端唯一标识(不可重复) # 2. 连接回调函数(连接成功/失败时触发) def on_connect(client, userdata, flags, rc):# rc=0表示连接成功,其他值为错误码if rc == 0:print(f"✅ 传感器 {CLIENT_ID} 已连接到Broker")else:print(f"❌ 连接失败,错误码:{rc}") # 3. 发布消息回调函数(消息发布成功时触发) def on_publish(client, userdata, mid):print(f"📤 消息发布成功,消息ID:{mid}") # 4. 创建客户端实例 client = mqtt.Client(client_id=CLIENT_ID, clean_session=False) # 5. 设置回调函数 client.on_connect = on_connect client.on_publish = on_publish # 6. 配置遗嘱消息(LWT) client.will_set(topic="device/status",payload=json.dumps({"device_id": CLIENT_ID,"status": "offline","timestamp": time.strftime("%Y-%m-%d %H:%M:%S")}),qos=1,retain=True ) # 7. 连接到Broker(设置保活时间60秒) client.connect(BROKER_HOST, BROKER_PORT, keepalive=60) # 8. 启动后台网络循环 client.loop_start() try:# 9. 循环发布模拟数据while True:# 生成模拟温湿度数据temperature = round(random.uniform(20.0, 30.0), 1)humidity = round(random.uniform(30.0, 80.0), 1)# 构造消息负载payload = {"device_id": CLIENT_ID,"temperature": temperature,"humidity": humidity,"timestamp": time.strftime("%Y-%m-%d %H:%M:%S")}# 发布消息(QoS=1,不保留消息)result = client.publish(topic=PUB_TOPIC,payload=json.dumps(payload),qos=1,retain=False)# 等待发布确认(QoS>0时有效)result.wait_for_publish()print(f"发布数据:{payload}")# 每5秒发布一次time.sleep(5) except KeyboardInterrupt:# 10. 捕获Ctrl+C中断,优雅退出print("\n用户中断程序") finally:# 11. 停止网络循环并断开连接client.loop_stop()client.disconnect()print("已断开与Broker的连接")import paho.mqtt.client as mqtt import json import time import random # 1. Broker 基础配置 BROKER_HOST = "localhost" # 本地 Broker 地址,远程 Broker 填 IP(如 "192.168.1.100") BROKER_PORT = 1883 # MQTT 默认端口(未加密),加密端口为 8883 PUB_TOPIC = "home/livingroom/env" # 发布的主题 CLIENT_ID = "sensor_001" # 客户端唯一标识(不可重复) # 2. 连接回调函数(连接成功/失败时触发) def on_connect(client, userdata, flags, rc):# rc=0 表示连接成功,其他值为错误码(如 rc=1:协议版本不支持)if rc == 0:print(f"✅ 传感器 {CLIENT_ID} 已连接到
3.订阅者代码
import paho.mqtt.client as mqtt import json import time # 1. Broker 基础配置 BROKER_HOST = "localhost" # 与发布者相同的Broker地址 BROKER_PORT = 1883 SUB_TOPICS = [ # 订阅的主题列表(可多个)("home/livingroom/env", 1), # (主题, QoS)("device/status", 1) ] CLIENT_ID = "monitor_client" # 订阅者客户端ID # 2. 连接回调函数 def on_connect(client, userdata, flags, rc):if rc == 0:print(f"✅ 监控客户端 {CLIENT_ID} 已连接到Broker")# 连接成功后订阅主题client.subscribe(SUB_TOPICS)print(f"📥 已订阅主题:{[topic[0] for topic in SUB_TOPICS]}")else:print(f"❌ 连接失败,错误码:{rc}") # 3. 接收消息回调函数 def on_message(client, userdata, msg):try:# 尝试解析JSON格式消息payload = json.loads(msg.payload.decode('utf-8'))message = json.dumps(payload, indent=2) # 格式化显示except json.JSONDecodeError:# 非JSON格式消息直接显示message = msg.payload.decode('utf-8')# 打印接收的消息print(f"\n📩 收到消息:")print(f" 主题:{msg.topic}")print(f" QoS:{msg.qos}")print(f" 内容:{message}") # 4. 订阅确认回调函数 def on_subscribe(client, userdata, mid, granted_qos):print(f"✅ 订阅确认,消息ID:{mid},授予的QoS:{granted_qos}") # 5. 创建客户端实例 client = mqtt.Client(client_id=CLIENT_ID, clean_session=False) # 6. 设置回调函数 client.on_connect = on_connect client.on_message = on_message client.on_subscribe = on_subscribe # 7. 连接到Broker client.connect(BROKER_HOST, BROKER_PORT, keepalive=60) try:# 8. 阻塞式网络循环(持续处理消息)client.loop_forever() except KeyboardInterrupt:# 9. 捕获Ctrl+C中断,优雅退出print("\n用户中断程序") finally:# 10. 断开连接client.disconnect()print("已断开与Broker的连接")