当前位置: 首页 > news >正文

关于阿里云-云消息队列MQTT的连接和使用,以及SpringBoot的集成使用

一、目的

本文主要记录物联网设备接入MQTT以及对接服务端SpringBoot整个的交互流程和使用。

二、概念

2.1什么是MQTT?

MQTT是基于TCP/IP协议栈构建的异步通信消息协议,是一种轻量级的发布、订阅信息传输协议。可以在不可靠的网络环境中进行扩展,适用于设备硬件存储空间或网络带宽有限的场景。使用MQTT协议,消息发送者与接收者不受时间和空间的限制。物联网平台支持设备使用MQTT协议接入。
简单来讲:为物联网弱联网设备提供可靠的消息传输,比如传感器、蓝牙、手表等。本文的应用场景是葡萄糖检测仪,实时上传接受推送消息至服务器,服务器推送到设备端通知消息等。

2.2工作原理

当然不能直接使用,下边简单图解一下
MQTT(Message Queuing Telemetry Transport)由IBM于1999年开发的一种基于发布订阅模式"的轻量级的消息传输协议
发布订阅模式是一种传统的客户端-服务器架构的替代方案,因为一般传统的客户端-服务器是客户端能够直接和服务器进行通信完成消息的传输。发布订阅模式会将发送消息的发布者publisher与接收消息的订阅者subscribers进行分离,publisher与subscribers 并不会直接通信,他们甚至都不清楚对方是否存在,他们之间的交流由第三方组件broker代理。
在这里插入图片描述

在这里插入图片描述
看到这个有没有很熟悉?像不像RabbitMQ的发布订阅?像不像RocketMQ?
没错,其实都是一个套路。
MQTT特殊的是,它只是一种通信协议,如果想要使用它,就需要基于MQTT协议的服务端实现,哎,这个服务端的实现,就类似消息中心的功能,负责消息的中转,甚至还能在客户端崩溃时,缓存接收到的消息(正因为此,它的可靠性是极高的)。
看到中间的 MQTT Broker了吗,这个就是对MQTT协议服务端的实现,负责处理客户端请求的关键组件,包括建立连接、断开连接、订阅和取消订阅等操作,同时还负责消息的转发。

2.3如何使用

如果没有购买云消息队列MQTT,就需要用第三方实现了MQTT的消息代理

2.3.1 EMQX

EMQX,是一款实现了MQTT协议的,开源的MQTT消息代理软件。MQTT定义了消息通讯的规则和流程,而EMQX则是遵循这些规则的软件,使得设备能够依据MQTT协议进行有效通信。在新版本的EMQX中同时支持MQTT3.1.1协议和5.0协议
下载地址:
官网地址
其他代理软件

2.3.2 EMQX部署

选择EMQX企业版进行部署:企业版
购买云服务器ECS(不想买的话可以用虚拟机安装,也可以去薅免费试用的),安装Docker:

# 移除旧版本docker
sudo yum remove docker \docker-client \docker-client-latest \docker-common \docker-latest \docker-latest-logrotate \docker-logrotate \docker-engine# 配置docker yum源。
sudo yum install -y yum-utils
sudo yum-config-manager \
--add-repo \
http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo# 安装 最新 docker
sudo yum install -y docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin# 启动& 开机启动docker; enable + start 二合一
systemctl enable docker --now# 配置加速
sudo mkdir -p /etc/docker
sudo tee /etc/docker/daemon.json <<-'EOF'
{"registry-mirrors": ["https://82m9ar63.mirror.aliyuncs.com"]
}
EOF
sudo systemctl daemon-reload
sudo systemctl restart docker

运行启动

docker run -d --name emqx-enterprise \-p 1883:1883 -p 8083:8083 \-p 8084:8084 -p 8883:8883 \-p 18083:18083 \-v emqx_data:/opt/emqx/data \-v emqx_log:/opt/emqx/log \-v emqx_etc:/opt/emqx/etc \emqx/emqx-enterprise:5.6.1

常见端口介绍:

端口号说明
1883TCP端口
8083WebSocket端口
8084WebSocket Secure 端口
8883SSL/TLS 端口
18083Broker的Dashboard访问端口号

下边就可以使用了,至于这个软件的功能,这里不赘述了,有兴趣的可以找我要笔记。

2.4 客户端

2.4.1 运行docker容器模拟消息接收客户端

docker run -e TOPIC="xxx"  \-e INSTANCE_ID="mqtt-cn-xx" \-e ENDPOINT="xx" \-e DEVICE_ID="xx" \-e GROUP_ID="您在 Group 管理页面中创建的 Group 的 ID" \-e AK="您访问阿里云的 AccessKey" \-e SK="您访问阿里云的 SecretKey" \registry.cn-hangzhou.aliyuncs.com/aliyun-mq/mqtt
# TOPIC 你创建的Topic
# -e TOPIC="cgm_monitor"
# 设置 MQTT 的主题(Topic)为 cgm_monitor。MQTT 客户端将订阅或发布到这个主题。# -e INSTANCE_ID="mqtt-cn-hic4av7iy01"
# 设置阿里云 MQTT 实例的 ID。这是您在阿里云上创建的 MQTT 实例的唯一标识。# -e ENDPOINT="mqtt-cn-hic4av7iy01.mqtt.aliyuncs.com"
# 设置 MQTT 服务的接入点(Endpoint),即 MQTT 服务器的地址。# -e DEVICE_ID="i3c7bfbe"
# 设置设备 ID。MQTT 客户端会以这个设备身份连接到阿里云 MQTT 服务。# -e GROUP_ID="您在 Group 管理页面中创建的 Group 的 ID"
# 设置设备所属的 Group ID。Group 是阿里云 MQTT 服务中用于管理一组设备的逻辑单元。

不懂看下图
先建立topic才能有公网访问地址,也要建Group
在这里插入图片描述
建立topic
在这里插入图片描述
建立Group
在这里插入图片描述
签名校验得到,Client ID/用户名/密码
在这里插入图片描述
这样 去云服务运行上面这段docker命令,就可以得到一个连接云MQTT的客户端,也就是消费者。

2.4.2 使用MQTTX客户端

MQTTX 简化了使用 MQTT broker 的过程,包括连接,发布与订阅消息主题。无论你使用桌面版,命令行,或是网页版,MQTTX 使每个关键步骤都更加顺滑。
官网地址:MQTTX
在这里插入图片描述
下载后傻瓜式安装就行,下一步直到完成。
可以设置下中文
在这里插入图片描述
点击新建连接
在这里插入图片描述
可以参考我的配置:
在这里插入图片描述
如果用的不是云MQTT,本地自己docker安装的 其实访问更简单,直接界面化Broker那里自己设置用户名/密码。
下边就可以连接了
在这里插入图片描述
如果你都配置正确,点击连接会出现已连接
在这里插入图片描述
下边演示下在阿里云MQTT控制台,快速体验消息收发:
在这里插入图片描述
可以看到,已经发送成功了!那么客户端MQTTX有没有收到消息呢?
在这里插入图片描述
可以看到,确实是收到了!!!如此便成功了!

2.4.3SpringBoot集成MQTT

这里把源码贴在这里,有兴趣的也可以去阿里云官网看,一样的

package com.kiki.app.mqtt.demo;import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;import com.kiki.app.util.ConnectionOptionWrapper;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallbackExtended;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;/*** 本代码提供签名鉴权模式下 MQ4IOT 客户端发送消息到 MQ4IOT 客户端的示例,其中初始化参数请根据实际情况修改* 签名模式即使用阿里云账号系统提供的 AccessKey 和 SecretKey 对每个客户端计算出一个独立的签名供客户端识别使用。* 对于实际业务场景使用过程中,考虑到私钥 SecretKey 的隐私性,可以将签名过程放在受信任的环境完成。** 完整 demo 工程,参考https://github.com/AliwareMQ/lmq-demo*/
public class MQ4IoTSendMessageToMQ4IoTUseSignatureMode {public static void main(String[] args) throws Exception {/*** MQ4IOT 实例 ID,购买后控制台获取*/String instanceId = "test001";/*** 接入点地址,购买 MQ4IOT 实例,且配置完成后即可获取,接入点地址必须填写分配的域名,不得使用 IP 地址直接连接,否则可能会导致客户端异常。* */String endPoint = "xxx";/*** 账号 accesskey,从账号系统控制台获取* 阿里云账号AccessKey拥有所有API的访问权限,建议您使用RAM用户进行API访问或日常运维。* 强烈建议不要把AccessKey ID和AccessKey Secret保存到工程代码里,否则可能导致AccessKey泄露,威胁您账号下所有资源的安全。* 本示例以把AccessKey ID和AccessKey Secret保存在环境变量为例说明。运行本代码示例之前,请先配置环境变量MQTT_AK_ENV和MQTT_SK_ENV* 例如:export MQTT_AK_ENV=<access_key_id>*      export MQTT_SK_ENV=<access_key_secret>* 需要将<access_key_id>替换为已准备好的AccessKey ID,<access_key_secret>替换为AccessKey Secret。*/String accessKey = "xxx";/*** 账号 secretKey,从账号系统控制台获取,仅在Signature鉴权模式下需要设置*/String secretKey = "xxx";/*** MQ4IOT clientId,由业务系统分配,需要保证每个 tcp 连接都不一样,保证全局唯一,如果不同的客户端对象(tcp 连接)使用了相同的 clientId 会导致连接异常断开。* clientId 由两部分组成,格式为 GroupID@@@DeviceId,其中 groupId 在 MQ4IOT 控制台申请,DeviceId 由业务方自己设置,clientId 总长度不得超过64个字符。*/String clientId = "GID_test01@@@pub001";/*** MQ4IOT 消息的一级 topic,需要在控制台申请才能使用。* 如果使用了没有申请或者没有被授权的 topic 会导致鉴权失败,服务端会断开客户端连接。*/final String parentTopic = "xxx";/*** MQ4IOT支持子级 topic,用来做自定义的过滤,此处为示意,可以填写任何字符串,具体参考https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3* 需要注意的是,完整的 topic 参考 https://help.aliyun.com/document_detail/63620.html?spm=a2c4g.11186623.6.554.21a37f05ynxokW。*/final String mq4IotTopic = parentTopic + "/" + "testMq4Iot";/*** QoS参数代表传输质量,可选0,1,2,根据实际需求合理设置,具体参考 https://help.aliyun.com/document_detail/42420.html?spm=a2c4g.11186623.6.544.1ea529cfAO5zV3*/final int qosLevel = 0;ConnectionOptionWrapper connectionOptionWrapper = new ConnectionOptionWrapper(instanceId, accessKey, secretKey, clientId);final MemoryPersistence memoryPersistence = new MemoryPersistence();/*** 客户端使用的协议和端口必须匹配,具体参考文档 https://help.aliyun.com/document_detail/44866.html?spm=a2c4g.11186623.6.552.25302386RcuYFB* 如果是 SSL 加密则设置ssl://endpoint:8883*/final MqttClient mqttClient = new MqttClient("tcp://" + endPoint + ":1883", clientId, memoryPersistence);/*** 客户端设置好发送超时时间,防止无限阻塞*/mqttClient.setTimeToWait(5000);final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());mqttClient.setCallback(new MqttCallbackExtended() {@Overridepublic void connectComplete(boolean reconnect, String serverURI) {/*** 客户端连接成功后就需要尽快订阅需要的 topic*/System.out.println("connect success---------------------------------------------------");executorService.submit(new Runnable() {@Overridepublic void run() {try {final String topicFilter[] = {mq4IotTopic};final int[] qos = {qosLevel};mqttClient.subscribe(topicFilter, qos);} catch (MqttException e) {e.printStackTrace();}}});}@Overridepublic void connectionLost(Throwable throwable) {throwable.printStackTrace();}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {/*** 消费消息的回调接口,需要确保该接口不抛异常,该接口运行返回即代表消息消费成功。* 消费消息需要保证在规定时间内完成,如果消费耗时超过服务端约定的超时时间,对于可靠传输的模式,服务端可能会重试推送,业务需要做好幂等去重处理。超时时间约定参考限制* https://help.aliyun.com/document_detail/63620.html?spm=a2c4g.11186623.6.546.229f1f6ago55Fj*/System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {System.out.println("send msg succeed topic is : " + iMqttDeliveryToken.getTopics()[0]);}});mqttClient.connect(connectionOptionWrapper.getMqttConnectOptions());for (int i = 0; i < 10; i++) {MqttMessage message = new MqttMessage("hello mq4Iot pub sub msg".getBytes());message.setQos(qosLevel);/***  发送普通消息时,topic 必须和接收方订阅的 topic 一致,或者符合通配符匹配规则*/mqttClient.publish(mq4IotTopic, message);/*** MQ4IoT支持点对点消息,即如果发送方明确知道该消息只需要给特定的一个设备接收,且知道对端的 clientId,则可以直接发送点对点消息。* 点对点消息不需要经过订阅关系匹配,可以简化订阅方的逻辑。点对点消息的 topic 格式规范是  {{parentTopic}}/p2p/{{targetClientId}}*/final String p2pSendTopic = parentTopic + "/p2p/" + clientId;message = new MqttMessage("hello mq4Iot p2p msg".getBytes());message.setQos(qosLevel);mqttClient.publish(p2pSendTopic, message);}Thread.sleep(Long.MAX_VALUE);}
}

有需要的同学,可以私聊找我要源码!!!

相关文章:

  • Docker 下备份 Mariadb 数据库文件
  • 进程和线程的相关命令
  • git checkout 详解
  • 内接圆和外接矩形
  • 1.2、SDH的复用结构
  • Amazon Linux 2023 配置定时任务完全指南:cronie安装与使用
  • SpringBoot的Web应用开发——Web缓存利器Redis的应用!
  • 半导体标准协议 E94 ControlJob学习
  • 目前流行Agent框架对比表
  • 手搓一个记录复制记录的软件,方便快速找到之前复制内容
  • 【教程】Windows安全中心扫描设置排除文件
  • 「从实验室到工程现场:机器学习赋能智能水泥基复合材料研发全流程解析」
  • HarmonyOS5 运动健康app(三):健康睡眠(附代码)
  • springboot项目中整合高德地图
  • Java中extends与implements深度解析:继承与接口实现的本质区别
  • SpringBoot 日志管理
  • 什么是探索式测试,应该怎么做?
  • 视觉语言模型的“视而不见“
  • 初认Flask框架
  • 基于深度学习的智能语音合成系统:技术与实践
  • 郑州网站建设哪家有/seo具体是什么
  • 网站详情页怎么做/社群推广平台
  • 做酒的网站有哪些/营销平台是什么意思
  • 厦门网站改版/网络广告代理
  • 自己做网站卖货多少钱/快速排名seo
  • 红色色系网站/百度投诉中心入口