当前位置: 首页 > news >正文

MQTT 入门教程:三步从 Docker 部署到 Java 客户端实现

在物联网(IoT)与边缘计算快速发展的今天,设备间的高效通信成为核心需求。MQTT 作为一种轻量级的发布 / 订阅模式协议,凭借其低带宽占用、强稳定性和灵活的消息路由能力,已成为物联网通信的事实标准。无论是智能家居的设备联动、工业传感器的数据采集,还是车联网的实时信息交互,MQTT 都在其中扮演着关键角色。
本文将从零开始搭建:从使用 Docker 部署轻量级 MQTT 服务器(Broker),到基于 Java 语言实现完整的消息发布与订阅功能,通过清晰的步骤和可直接运行的代码,最短时间内搭建起自己的 MQTT 通信系统。

什么是 MQTT?

MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布 / 订阅模式消息传输协议,专为低带宽、不稳定网络环境设计,广泛应用于物联网(IoT)、传感器网络和移动设备通信等场景。
核心概念

  • Broker:消息服务器,负责接收和转发所有消息
  • Publisher:消息发布者,发送消息到 Broker
  • Subscriber:消息订阅者,从 Broker 接收消息
  • Topic:消息主题,用于消息分类和路由
  • QoS (Quality of Service):服务质量等级,定义消息传递的可靠性
    -在这里插入图片描述

第一步:使用 Docker 部署 MQTT Broker

我们将使用 Eclipse Mosquitto,一个流行的开源 MQTT Broker。

1. 拉取 Mosquitto 镜像

docker pull eclipse-mosquitto

2. 创建配置文件

首先创建一个目录用于存放配置文件和数据:

mkdir -p ~/mosquitto/config ~/mosquitto/data ~/mosquitto/log

创建配置文件 mosquitto.conf:

nano ~/mosquitto/config/mosquitto.conf

添加以下内容:

persistence true
persistence_location /mosquitto/data/
log_dest file /mosquitto/log/mosquitto.log
listener 1883
allow_anonymous true
listener 1883:MQTT 默认端口
allow_anonymous true:允许匿名连接(生产环境建议关闭)

3. 启动 Mosquitto 容器

docker run -d \--name mosquitto \-p 1883:1883 \-v ~/mosquitto/config:/mosquitto/config \-v ~/mosquitto/data:/mosquitto/data \-v ~/mosquitto/log:/mosquitto/log \eclipse-mosquitto

4. 验证 Broker 是否运行

docker ps | grep mosquitto

如果看到运行中的容器,说明 Broker 部署成功。

第二步:Java 客户端实现

我们将使用 Eclipse Paho Java 客户端库来实现 MQTT 客户端。

1. 添加依赖

如果使用 Maven,在pom.xml中添加:

<dependencies><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>
</dependencies>

2. MQTT 工具类

首先创建一个工具类封装 MQTT 连接的通用功能:

//运行
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class MQTTUtils {// MQTT Broker地址private static final String BROKER = "tcp://localhost:1883";/*** 创建MQTT客户端并连接到Broker* @param clientId 客户端ID,应唯一* @return 已连接的MQTT客户端* @throws MqttException 连接异常*/public static MqttClient connect(String clientId) throws MqttException {// 设置客户端持久化方式为内存MemoryPersistence persistence = new MemoryPersistence();// 创建客户端MqttClient client = new MqttClient(BROKER, clientId, persistence);// 配置连接选项MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(true); // 清除会话connOpts.setConnectionTimeout(10); // 连接超时时间connOpts.setKeepAliveInterval(20); // 心跳间隔// 连接到BrokerSystem.out.println("Connecting to broker: " + BROKER);client.connect(connOpts);System.out.println("Connected");return client;}/*** 发布消息* @param client MQTT客户端* @param topic 消息主题* @param content 消息内容* @param qos 服务质量等级 (0, 1, 2)* @throws MqttException 发布异常*/public static void publish(MqttClient client, String topic, String content, int qos) throws MqttException {MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);client.publish(topic, message);System.out.println("Published message: " + content + " to topic: " + topic);}/*** 订阅主题* @param client MQTT客户端* @param topic 要订阅的主题* @param qos 服务质量等级* @throws MqttException 订阅异常*/public static void subscribe(MqttClient client, String topic, int qos) throws MqttException {System.out.println("Subscribing to topic: " + topic);client.subscribe(topic, qos);}
}

3. 订阅者客户端实现

//运行
import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;public class MQTTSubscriber {// 订阅的主题private static final String TOPIC = "test/topic";// 客户端IDprivate static final String CLIENT_ID = "subscriber-client";// QoS等级private static final int QOS = 1;public static void main(String[] args) {MqttClient client = null;try {// 连接到Brokerclient = MQTTUtils.connect(CLIENT_ID);// 设置消息监听器client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable cause) {System.out.println("Connection lost: " + cause.getMessage());}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("Received message on topic: " + topic);System.out.println("Message content: " + new String(message.getPayload()));System.out.println("QoS: " + message.getQos());}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {// 对于订阅者来说,这个方法通常不需要实现}});// 订阅主题MQTTUtils.subscribe(client, TOPIC, QOS);// 保持客户端运行以接收消息System.out.println("Waiting for messages...");while (true) {Thread.sleep(1000);}} catch (MqttException | InterruptedException e) {System.err.println("Error: " + e.getMessage());} finally {if (client != null && client.isConnected()) {try {client.disconnect();System.out.println("Disconnected");} catch (MqttException e) {e.printStackTrace();}}}}
}

4. 发布者客户端实现

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;public class MQTTPublisher {// 发布的主题private static final String TOPIC = "test/topic";// 客户端IDprivate static final String CLIENT_ID = "publisher-client";// QoS等级private static final int QOS = 1;public static void main(String[] args) {MqttClient client = null;try {// 连接到Brokerclient = MQTTUtils.connect(CLIENT_ID);// 发布几条测试消息for (int i = 1; i <= 5; i++) {String message = "Hello, MQTT! This is message " + i;MQTTUtils.publish(client, TOPIC, message, QOS);Thread.sleep(2000); // 间隔2秒发送一条}} catch (MqttException | InterruptedException e) {System.err.println("Error: " + e.getMessage());} finally {if (client != null && client.isConnected()) {try {client.disconnect();System.out.println("Disconnected");} catch (MqttException e) {e.printStackTrace();}}}}
}

第三步:运行和测试

1. 启动订阅者

首先运行MQTTSubscriber类,它会连接到 Broker 并开始等待接收消息:

Connecting to broker: tcp://localhost:1883
Connected
Subscribing to topic: test/topic
Waiting for messages...

2. 启动发布者

然后运行MQTTPublisher类,它会发送 5 条消息到指定主题:

Connecting to broker: tcp://localhost:1883
Connected
Published message: Hello, MQTT! This is message 1 to topic: test/topic
Published message: Hello, MQTT! This is message 2 to topic: test/topic

3. 查看结果

在订阅者的控制台,你应该能看到接收到的消息:

Received message on topic: test/topic
Message content: Hello, MQTT! This is message 1
QoS: 1
Received message on topic: test/topic
Message content: Hello, MQTT! This is message 2
QoS: 1

总结
本文介绍了 MQTT 的基本概念,展示了如何使用 Docker 快速部署 Mosquitto Broker,并通过 Java 代码实现了 MQTT 客户端的发布和订阅功能。

http://www.dtcms.com/a/311490.html

相关文章:

  • Linux基础学习笔记二
  • MySQL PostgreSQL JDBC URL 配置允许批量操作
  • C语言输入安全10大边界漏洞解析与防御
  • 基于LSTM模型与加权链路预测的动态热门商品成长性分析
  • SpringBoot相关注解
  • 项目管理平台是什么?概念、定义、作用、主流厂商解读
  • docker:将python开发的大模型应用,打成docker容器
  • C#中的除法
  • PostGIS面试题及详细答案120道之 (081-090 )
  • cuda编程笔记(12)--学习cuFFT的简单使用
  • 【Mybatis】MyBatis分页的三种实现方式,Log4j的使用
  • Elasticsearch 混合检索一句 `retriever.rrf`,把语义召回与关键词召回融合到极致
  • 模拟激光相机工作站版本6.0 5.2.32 6.0.44 6.031 5.2.20
  • 题解:P4447 [AHOI2018初中组] 分组
  • 归并排序(简单讲解)
  • [论文阅读] 人工智能 + 软件工程 | GitHub Marketplace中CI Actions的功能冗余与演化规律研究
  • 【RK3568 看门狗驱动开发详解】
  • Kubernetes Gateway API 详解:现代流量路由管理方案
  • 【最后203篇系列】030 强化学习探索
  • 浏览器及java读取ros1的topic
  • 重生之我在暑假学习微服务第八天《OpenFeign篇》
  • 暑期算法训练.13
  • cv弹窗,退款确认弹窗
  • 数据结构(12)二叉树
  • 深入 Go 底层原理(六):垃圾回收(GC)
  • 数据资产是什么?
  • MySQL 内置函数
  • npm安装下载慢问题
  • 离线安装docker和docker-compose
  • 【人工智能agent】--服务器部署PaddleX 的 印章文本识别模型