MQTT 入门教程:三步从 Docker 部署到 Java 客户端实现
在物联网(IoT)与边缘计算快速发展的今天,设备间的高效通信成为核心需求。MQTT 作为一种轻量级的发布 / 订阅模式协议,凭借其低带宽占用、强稳定性和灵活的消息路由能力,已成为物联网通信的事实标准。无论是智能家居的设备联动、工业传感器的数据采集,还是车联网的实时信息交互,MQTT 都在其中扮演着关键角色。
本文将从零开始搭建:从使用 Docker 部署轻量级 MQTT 服务器(Broker),到基于 Java 语言实现完整的消息发布与订阅功能,通过清晰的步骤和可直接运行的代码,最短时间内搭建起自己的 MQTT 通信系统。
什么是 MQTT?
MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布 / 订阅模式消息传输协议,专为低带宽、不稳定网络环境设计,广泛应用于物联网(IoT)、传感器网络和移动设备通信等场景。
核心概念
- Broker:消息服务器,负责接收和转发所有消息
- Publisher:消息发布者,发送消息到 Broker
- Subscriber:消息订阅者,从 Broker 接收消息
- Topic:消息主题,用于消息分类和路由
- QoS (Quality of Service):服务质量等级,定义消息传递的可靠性
-
第一步:使用 Docker 部署 MQTT Broker
我们将使用 Eclipse Mosquitto,一个流行的开源 MQTT Broker。
1. 拉取 Mosquitto 镜像
docker pull eclipse-mosquitto
2. 创建配置文件
首先创建一个目录用于存放配置文件和数据:
mkdir -p ~/mosquitto/config ~/mosquitto/data ~/mosquitto/log
创建配置文件 mosquitto.conf:
nano ~/mosquitto/config/mosquitto.conf
添加以下内容:
persistence true
persistence_location /mosquitto/data/
log_dest file /mosquitto/log/mosquitto.log
listener 1883
allow_anonymous true
listener 1883:MQTT 默认端口
allow_anonymous true:允许匿名连接(生产环境建议关闭)
3. 启动 Mosquitto 容器
docker run -d \--name mosquitto \-p 1883:1883 \-v ~/mosquitto/config:/mosquitto/config \-v ~/mosquitto/data:/mosquitto/data \-v ~/mosquitto/log:/mosquitto/log \eclipse-mosquitto
4. 验证 Broker 是否运行
docker ps | grep mosquitto
如果看到运行中的容器,说明 Broker 部署成功。
第二步:Java 客户端实现
我们将使用 Eclipse Paho Java 客户端库来实现 MQTT 客户端。
1. 添加依赖
如果使用 Maven,在pom.xml中添加:
<dependencies><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>
</dependencies>
2. MQTT 工具类
首先创建一个工具类封装 MQTT 连接的通用功能:
//运行
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class MQTTUtils {// MQTT Broker地址private static final String BROKER = "tcp://localhost:1883";/*** 创建MQTT客户端并连接到Broker* @param clientId 客户端ID,应唯一* @return 已连接的MQTT客户端* @throws MqttException 连接异常*/public static MqttClient connect(String clientId) throws MqttException {// 设置客户端持久化方式为内存MemoryPersistence persistence = new MemoryPersistence();// 创建客户端MqttClient client = new MqttClient(BROKER, clientId, persistence);// 配置连接选项MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true); // 清除会话connOpts.setConnectionTimeout(10); // 连接超时时间connOpts.setKeepAliveInterval(20); // 心跳间隔// 连接到BrokerSystem.out.println("Connecting to broker: " + BROKER);client.connect(connOpts);System.out.println("Connected");return client;}/*** 发布消息* @param client MQTT客户端* @param topic 消息主题* @param content 消息内容* @param qos 服务质量等级 (0, 1, 2)* @throws MqttException 发布异常*/public static void publish(MqttClient client, String topic, String content, int qos) throws MqttException {MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);client.publish(topic, message);System.out.println("Published message: " + content + " to topic: " + topic);}/*** 订阅主题* @param client MQTT客户端* @param topic 要订阅的主题* @param qos 服务质量等级* @throws MqttException 订阅异常*/public static void subscribe(MqttClient client, String topic, int qos) throws MqttException {System.out.println("Subscribing to topic: " + topic);client.subscribe(topic, qos);}
}
3. 订阅者客户端实现
//运行
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;public class MQTTSubscriber {// 订阅的主题private static final String TOPIC = "test/topic";// 客户端IDprivate static final String CLIENT_ID = "subscriber-client";// QoS等级private static final int QOS = 1;public static void main(String[] args) {MqttClient client = null;try {// 连接到Brokerclient = MQTTUtils.connect(CLIENT_ID);// 设置消息监听器client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable cause) {System.out.println("Connection lost: " + cause.getMessage());}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("Received message on topic: " + topic);System.out.println("Message content: " + new String(message.getPayload()));System.out.println("QoS: " + message.getQos());}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {// 对于订阅者来说,这个方法通常不需要实现}});// 订阅主题MQTTUtils.subscribe(client, TOPIC, QOS);// 保持客户端运行以接收消息System.out.println("Waiting for messages...");while (true) {Thread.sleep(1000);}} catch (MqttException | InterruptedException e) {System.err.println("Error: " + e.getMessage());} finally {if (client != null && client.isConnected()) {try {client.disconnect();System.out.println("Disconnected");} catch (MqttException e) {e.printStackTrace();}}}}
}
4. 发布者客户端实现
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;public class MQTTPublisher {// 发布的主题private static final String TOPIC = "test/topic";// 客户端IDprivate static final String CLIENT_ID = "publisher-client";// QoS等级private static final int QOS = 1;public static void main(String[] args) {MqttClient client = null;try {// 连接到Brokerclient = MQTTUtils.connect(CLIENT_ID);// 发布几条测试消息for (int i = 1; i <= 5; i++) {String message = "Hello, MQTT! This is message " + i;MQTTUtils.publish(client, TOPIC, message, QOS);Thread.sleep(2000); // 间隔2秒发送一条}} catch (MqttException | InterruptedException e) {System.err.println("Error: " + e.getMessage());} finally {if (client != null && client.isConnected()) {try {client.disconnect();System.out.println("Disconnected");} catch (MqttException e) {e.printStackTrace();}}}}
}
第三步:运行和测试
1. 启动订阅者
首先运行MQTTSubscriber类,它会连接到 Broker 并开始等待接收消息:
Connecting to broker: tcp://localhost:1883
Connected
Subscribing to topic: test/topic
Waiting for messages...
2. 启动发布者
然后运行MQTTPublisher类,它会发送 5 条消息到指定主题:
Connecting to broker: tcp://localhost:1883
Connected
Published message: Hello, MQTT! This is message 1 to topic: test/topic
Published message: Hello, MQTT! This is message 2 to topic: test/topic
3. 查看结果
在订阅者的控制台,你应该能看到接收到的消息:
Received message on topic: test/topic
Message content: Hello, MQTT! This is message 1
QoS: 1
Received message on topic: test/topic
Message content: Hello, MQTT! This is message 2
QoS: 1
总结
本文介绍了 MQTT 的基本概念,展示了如何使用 Docker 快速部署 Mosquitto Broker,并通过 Java 代码实现了 MQTT 客户端的发布和订阅功能。