NQTT-基础知识
MQTT(Message Queuing Telemetry Transport)是一种轻量级的、基于发布/订阅模式的消息传输协议,特别适用于低带宽、高延迟或不稳定网络环境下的远程设备通信。MQTT的设计目标是提供一种简单的协议,以便 物联网(IoT)设备之间进行高效的消息交换。
一、MQTT协议特点
1.轻量级
MQTT协议的消息头简洁,占用的带宽少,适合资源受限的设备(如传感器、嵌入式设备等)。
2.长连接
MQTT协议允许客户端与服务器(Broker)保持长连接,即通过持久会话来接收消息。
3.发布/订阅模式
客户端可以发布消息到某个“主题”上,其他客户端可以订阅该主题并接收消息。
4.三种服务质量(QoS)
QoS 0:消息最多发送一次,不确认送达。
QoS 1:消息至少发送一次,确保送达。
QoS 2:消息只发送一次,确保消息不重复发送。
二、MQTT主要组件
1.Broker
MQTT代理服务器,负责接收所有客户端的消息,并根据订阅信息将消息转发给订阅者。
常见的Broker:Eclipse Mosquitto、HiveMQ、EMQ。
2.Client
发布消息或订阅主题的客户端终端设备。
客户端可以是任何支持MQTT协议的设备或软件,如:嵌入式设备、移动应用、Web应用等。
3.Message
消息是由客户端通过Broker发送的内容。
消息由 头部(Payload)和 其他控制信息(如QoS等级、主题、消息标识等)构成。
4.Topic
主题是消息的分类标识,客户端通过订阅主题来接收特定类型的消息。
主题可以分为多个层级,例如:/home/livingroom/temperature。
5.Subscription
客户端订阅主题后,Broker会在该主题上发布新消息时,将其推送到订阅了该主题的客户端。
三、MQTT工作流程
1.客户端连接到Broker
客户端首先发送 CONNECT 消息向 Broker 发送连接请求(建立一个 TCP/IP 连接) 。
Broker返回连接确认,建立连接。
2.发布消息
发布者通过 PUBLISH 消息将消息推送到 Broker ,并指定消息的主题。
Broker 接收到消息后,将其转发给所有已订阅该主题的客户端。
3.订阅主题
客户端发送 SUBSCRIBE 消息,指定需要订阅的主题。
Broker 将根据订阅要求将未来发布到该主题的消息推送给客户端。
4.客户端接收消息
客户端通过订阅的主题接收来自 Broker 的推送消息。
5.客户端断开连接
客户端发送 DISCONNECT 消息来断开与Broker的连接。
四、MQTT 安装部署
MQTT Broker(消息代理),它负责消息的发布和订阅。
常用的 Broker 有 Mosquitto、EMQ X、HiveMQ 等。
以 Mosquitto (windows版)为例来演示。
1.安装 Mosquitto Broker
(1)下载
下载路径:Download | Eclipse Mosquitto
根据操作系统选择合适的版本。
(2)安装
指定目录,默认安装即可。
(3)主要文件
mosquitto.exe:可执行文件。用于启动服务端。
mosquitto_pub.exe:可执行文件。用于启动发布者客户端。
mosquitto_sub.exe:可执行文件。用于启动订阅者客户端。
mosquitto.conf:配置文件。
pwfile.example:用户配置文件。用于存放用户的账号和密码。
(4)启动
a.窗口一:启动 mosquito 服务端。
打开命令行提示符(cmd),切换到 Mosquitto 的根目录。
运行命令:
mosquitto.exe -c mosquitto.conf -v |
参数解释:
-c
:表示以配置文件启动服务器,参数后面要跟配置文件。-v
:显示详细的日志信息。
b.窗口二:启动一个订阅者客户端,订阅名为 message 的主题。
运行命令:
mosquitto.exe -c mosquitto.conf -v |
参数解释:
-t
:表示订阅,后跟要订阅的主题。
c.窗口三:启动一个发布者客户端,向主题 message 发送消息。
运行命令:
mosquitto.exe -c mosquitto.conf -v |
参数解释:
-m
:表示主题的消息。
五、MQTT 基本使用
在Spring Boot项目中,通常会使用 Spring Integration MQTT 或 Eclipse Paho 来实现MQTT的通信。这里使用 Spring Integration MQTT,因为它与Spring集成得更好。
1.依赖
<dependencies> <!-- Spring Integration MQTT --> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> <version>5.5.5</version> </dependency> <!-- Paho MQTT Client --> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>paho-mqtt-client</artifactId> <version>1.2.5</version> </dependency> </dependencies> |
2.配置 MQTT 连接
在application.yml文件中配置MQTT连接。
主要需要配置MQTT Broker的地址、端口、客户端ID、主题等信息。
mqtt: url: tcp://localhost:1883 # Broker的URL clientId: spring-mqtt-client # 客户端ID defaultTopic: test/topic # 默认的主题 username: testuser # 可选:如果需要身份验证 password: testpass # 可选:如果需要身份验证 |
3.配置 MQTT 连接工厂和消息接收器
需要配置一个 MqttPahoClientFactory 来管理与 Broker 的连接,
以及配置一个 MqttMessageListener 来接收消息。
@Configuration public class MqttConfig { @Bean public MqttPahoClientFactory mqttClientFactory() { MqttPahoClientFactory factory = new MqttPahoClientFactory(); factory.setHost("tcp://localhost:1883"); // Broker地址 factory.setClientId("spring-mqtt-client"); return factory; } @Bean public MqttPahoMessageDrivenChannelAdapter mqttInbound(MqttPahoClientFactory mqttClientFactory) { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( "spring-mqtt-client", mqttClientFactory, "test/topic"); // 订阅主题 adapter.setOutputChannel(messageChannel()); return adapter; } @Bean public MessageChannel messageChannel() { return new DirectChannel(); } } |
4.创建消息接收器
MqttPahoMessageDrivenChannelAdapter 用于监听并处理从Broker订阅到的消息。
可以通过实现 MessageHandler 接口来处理消息。
@Component public class MqttMessageHandler { @ServiceActivator(inputChannel = "mqttInboundChannel") public void handleMessage(Message<?> message) { // 打印收到的消息内容 System.out.println("Received message: " + message.getPayload()); } } |
5.发布消息
需要创建一个发布的 MqttTemplate ,用来将消息发布到 Broker 的特定主题。
@Service public class MqttPublisherService { private final MqttPahoMessageHandler messageHandler; public MqttPublisherService(MqttPahoMessageHandler messageHandler) { this.messageHandler = messageHandler; } public void publishMessage(String topic, String payload) { Message<String> message = MessageBuilder.withPayload(payload).setHeader("mqtt_topic", topic).build(); messageHandler.handleMessage(message); } } |
6.启动与测试
(1)启动Spring Boot应用
启动Spring Boot应用后,应用会自动连接到配置的MQTT Broker,并订阅指定的主题。
(2)发布测试消息
可以通过其他MQTT客户端(例如mosquitto_pub命令行工具)发布消息到test/topic主题。
Spring Boot应用会自动接收到该消息。
mosquitto_pub -h localhost -t "test/topic" -m "Hello from MQTT" |
结果示例:
Received message: Hello from MQTT |
六、总结
MQTT是一个轻量级、简单而高效的消息传输协议,适用于资源受限的设备和低带宽、高延迟网络环境。它采用发布/订阅模式,支持QoS等级、持久会话等特性,非常适合物联网、远程监控、智能家居等应用场景。通过理解和使用MQTT协议,你可以实现高效的设备通信和实时消息传输。