当前位置: 首页 > news >正文

MQTT 与双工通信

目录:
rabbitmq 消息队列
Kafka详解
mqtt 与双工通信


MQTT 全称是 Message Queuing Telemetry Transport,翻译过来叫消息队列遥测传输。

本质上它是一个轻量级的通信协议,专为资源受限的设备设计,比如那些带宽有限、算力不强的物联网小设备。如温湿度传感器、Wi-Fi 灯泡、甚至智能插座,它们都需要快速、可靠地交换数据,此时就可以用 MQTT 来交换数据,如下所示:

在这里插入图片描述

其核心思想是发布/订阅模型。

MQTT 与 AMQP

从作用来看,MQTT 和 AMQP(Advanced Message Queue Protocol) 都是消息传输协议,那两者有什么不同呢?

AMQP

AMQP 的典型实现是 rabbitMq,该协议主要面向可靠、复杂路由的消息传输。

RocketMq 虽然借鉴了 AMQP 模型,但并非 AMQP 的实现,而是使用了自研协议。

Kafka 则是使用了自定义二进制协议,追求高吞吐、分布式伸缩和流数据处理。

相比于 MQTT,AMQP 是一个非常复杂的协议,最初是为金融机构和企业系统而设计的。在这种应用场景中,对消息的传输有以下要求:

  • 可靠传递
  • 支持事务
  • 支持安全认证、访问控制
  • 支持灵活的消息路由
  • 支持消息持久化和确认机制

也就是说,AMQP 不仅仅用于传消息,还要解决企业级消息传递的所有难题,这就让 AMQP 协议变得庞大和复杂。

AMQP 是一个多层协议栈,包括:Transport Layer,Messaging Layer,Session/Channel Layer,Exchange/Queue Layer。传输层负责建立 TCP 连接,协商数据帧格式、心跳、SASL认证。消息层负责消息格式(header, properties, body)的定义。会话层定义逻辑会话、流控、事务。交换层负责定义路由模型(binding、routing key)等。

在 AMQP 中,发送一条消息涉及到很多组件。生产者需要先声明 exchange、queue,并与其绑定。每条消息在传输链路上可能需要多次确认,如 Broker 确认收到,Consumer 确认处理完成,最后才提交 transaction 或者回滚。一次消息发送到最终确认可能需要多次往返交互,涉及不同组件(Channel、Exchange、Queue)之间的状态同步。

也正是由于这种复杂性,AMQP 的实现(如 rabbitMq)需要相对高的资源占用。

AMQP 有多个版本体系,最早的 AMQP 0-9-1 应用广泛,rabbitMq 正是基于此版本实现的,特点是路由模型清晰。而后来的 AMQP 1.0 完全重构了协议,支持点对点、异步流传输。中间甚至还有一个过渡的 0-10 版本。

MQTT

与 AMQP 相比,MQTT 则更轻量、高效,面向 IoT 场景,适合在低带宽、低功耗、不稳定网络的环境下进行消息通信。

MQTT 在车联网领域也有广泛的应用。

Topic

在 MQTT 中,topic 使用 / 作为层级分隔符,支持通配符匹配, + 匹配一个层级,# 匹配多个层级。

假如有以下 topic:

sensor/bedroom/temp
sendor/living_room/temp
sensor/kitchen/humidity

如果消费者订阅的 topic 为 sensor/+/temp,则会匹配到 sensor/bedroom/tempsendor/living_room/temp。如果消费者订阅的 topic 为 sensor/#, 则会订阅全部的三个 topic。

Topic 的设计原则:

  • 不要以 / 开头。虽然是合法的,但不符合常规习惯。
  • 不要包含空格。同样是合法的,但会导致 topic 难以阅读和处理。
  • 使用固定的、一致的层级顺序,如 系统/位置/设备类型/设备id/指标
  • topic 中不能包含通配符 #+
  • 许多 MQTT Broker (如EMQX、HiveMQ) 保留以 $ 开头的 topic 用于内部使用。客户端通常默认无法订阅这些 topic,除非显示配置。

QoS

MQTT 中对消息传输的可靠性有三个 Qos 等级设置:

Qos 等级含义吞吐量可靠性消息确认
0消息投递次数为:At most once无需确认,吞吐量最高不保证送达单向发送,无确认
1消息投递次数为:At least once |中等保证至少一次,可能重复Pub -> PubAck一次确认
2消息投递次数为:Exactly once最低 |仅一次,最可靠Pub -> PubRec -> PubRel -> PubComp二次确认

需要注意的是,发布流程和订阅流程的 Qos 等级设置是独立的。发布者和 broker 之间的消息可靠性由发布者 Qos 决定;Broker 和订阅着之间的消息可靠性由订阅着 Qos 决定。

举个例子:假设发布者 Qos 为 1,订阅者 Qos 为 1。消息从发布者到达 broker,可以确保是有且仅有一次的。但是消息从 broker 到达订阅着,则可能会重复。

如果从整条链路来看,Qos 是两者的较小值,也就是说 effective_Qos = min(publish_Qos, suscribe_Qos)。

MQTT 与 WebSocket

考虑一种场景:服务端需要给客户端推送数据,比如状态更新、消息通知、进度等,该如何实现?

应用广泛的 http 是不行的,因为 http 是请求/响应模式的,服务端无法主动发起通信,要实现服务端推送,需要使用长连接。

首先想到的是 WebSocket。

WebSocket

WebSocket 是一种建立在 TCP 之上的全双工的通信协议,通过 HTTP/1.1 的 Upgrade 机制进行握手。

WebSocket 是标准的、跨平台的协议,其支持非常广泛,如浏览器、移动端、各种后端框架,都有对 WebSocket 的稳定实现。应用场景也非常多:

  • 实时聊天系统
  • 实时数据推送,如股票行情、监控仪表盘等
  • 网络游戏,服务端系统频繁给客户端同步数据
  • IoT 设备通信,如智能家居、车辆网
  • 协作应用,如 Google Docs、Figma
  • 实时通知,如异步任务状态更新、推送中心

但使用 WebSocket 也要考虑取以下取舍点:

连接数量

当客户端数量规模较大时,比如百万设备,如果采用 webSocket 来进行通信,意味着服务端需要维护百万级别的 webSocket 连接,这对服务端是巨大的挑战。

首选是硬件资源方面。每个连接都会消耗内存(内核缓冲区、应用层缓冲区、连接状态等),假设每个连接占用 20 KB,光 100 万连接大概需要 20 GB 的内存。在网络带宽方面,海量连接上的数据传输,需要消耗巨大的网络带宽。

从软件层面来说,大量连接需要考虑各种内核参数的限制,比如文件句柄数,tcp 连接参数。

除此之外,还要考虑技术层面的问题,比如服务器故障时的连接恢复和状态迁移等等。

因此当客户端规模很大的情况下,是否使用 WebSocket 是需要慎重考虑的。

消息可靠性

WebSocket 本身没有内置的消息可靠性保证机制,如果要保证可靠性,则需要在应用层面来考虑,比如在应用层增加确认机制。

通信实时性

WebSocket 通信延迟低、实时性高,如果服务端主动发起的通信并不频繁,只是“偶尔通知”,比如异步任务完成通知,或者对服务端的推送实时性要求并不高,完全没有必要占用长连接来进行通信。

总结来说,当系统规模大,或者对消息可靠性要求高时,可以选用其他的通信协议,例如 MQTT、gRPC Stream 或者 SSE。

MQTT Over WebSocket

MQTT 原生是在 TCP/IP 上运行的,可以称之为基于 TCP 的 MQTT。而 WebSocket 上面也介绍过了,是基于 TCP 的全双工通信的协议。那 MQTT over webSocket 又是什么意思呢?

假设有这样一个场景:你想开发一个网页,在浏览器打开该网页可以显示某个传感器的湿度和温度。传感器使用 MQTT 协议发布数据。

问题是你的网页运行在浏览器中,而浏览器不允许你直接建立一个到 MQTT 服务器的原生 TCP 连接(出于安全考虑)。这种情况下就可以用 MQTT over WebSocket 的解决方案了:

  1. 在网页中使用 JavaScript 建立一个标准的 WebSocket 连接到服务器,这是浏览器允许的。
  2. 服务器端(如 EMQX、HiveMQ)需要同时支持 WebSocket 协议和 MQTT 协议。
  3. 服务器接收来自浏览器的 WebSocket 连接,然后将 WebSocket 数据帧内部转换成标准的 MQTT 数据包进行处理。
  4. 网页就可以通过 WebSocket 这个桥梁,使用完整的 MQTT 功能了。

简单点来说,MQTT over WebSocket 就是让 MQTT 协议能够通过 WebSocket 通道进行通信,主要原因是原生的 MQTT 不能在浏览器中直接使用,正常情况下也不能穿透 HTTP 防火墙,而通过 WebSocket 包装一层后就可以了。

再考虑一个场景,在使用知乎这类的 UGC 网站或应用的时候,用户在发表一个问题或评论之后,其他用户可以对其进行评论或点赞。如果单纯使用 http 协议来进行通信,用户只有在刷新网页的时候,才能看到其他用户和自己的互动。

事实上,大部分这类网站都是有实时的消息提醒的。即使用户不刷新网页,也能看到消息中心的消息或私信的提示。这意味着服务端是可以主动对客户端推送消息的。

上面说过,这种情况可以使用 WebSocket 进行通信。但是对服务端来说,维护大量的长连接来实现这种非重点功能是得不偿失的,所以知乎使用的是 MQTT,准确点说是基于 WebSocket 的 MQTT。在浏览器调试页面可以看到:

在这里插入图片描述

在用户访问知乎的时候,用户浏览器端和知乎服务端建立了一条 wss(WebSocket Security)通道,完整的 URL 为:wss://mqtt-web.zhihu/mqtt。通过该加密的 WebSocket 通道,服务端可以将新消息提醒(私信、评论、点赞)、网页上实时的内容更新(如回答的点赞数)和在线状态或通知推送到用户浏览器网页上。

MQTT 的实现

MQTT 本身只是一个协议标准,其实现有很多,比如 Mosquitto、EMQX,HiveMQ 等,云服务商也提供了 MQTT 的支持,如 AWS IoT,Azure IoT Hub。

名称语言类型特点适用场景
Eclipse MosquittoC开源、轻量级简单稳定、嵌入式友好个人项目、小型 IoT
EMQXErlang/Elixir企业级分布式高性能、高可用、插件丰富大规模 IoT 平台、企业应用
HiveMQJava商业/社区可扩展、企业支持车辆网、工业 IoT
RabbitMQ + MQTT 插件Erlang通用消息中间件统一协议网关需整合 MQTT 与 AMQP/Kafka 系统
AWS IoT Core / Azure IoT Hub云平台托管服务云集成、免维护云端 IoT 系统

Mosquitto

Mosquitto 是 Eclipse 基金会开源的轻量级 mqtt 实现,支持 mqtt 5.0、3.1 和 3.1.1 版本。

下面简单演示一下在mac上如何安装和使用。

首先通过 brew 下载 mosquitto,通过 mosquitto -v 开启服务。

接着打开订阅窗口来订阅消息:mosquitto_sub -t test/topic,再打开发布窗口来发布消息:mosquitto_pub -t test/topic -m "Hello MQTT"。在订阅窗口中可以看到订阅的消息。

如果先开发布窗口,再开订阅窗口,会收不到消息,因为默认的 retain 为 false(不保留消息)。

如果要让订阅者启动后还能收到之前的消息,可以发布保留消息,即 mosquitto_pub -t test/topic -m "Hello" -r,这样 broker 会保存最新一条消息。

即使先订阅再发布,也有可能会丢消息,因为默认的 QoS 为 1,即最多发送一次。要想保证消息的可靠性,需要修改 QoS 等级。

开发库

下面用一个最简单的例子来演示在 Java 中使用 mosquitto:

依赖:
<dependencies><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>${pahoVersion}</version></dependency>
</dependencies>// 发布端
public class PublishSample {public static void main(String[] args) {String topic = "mqtt/test";String content = "hello";int qos = 1;String broker = "tcp://localhost:1883";String userName = "test";String password = "test";String clientId = "pubClient";// 内存存储MemoryPersistence persistence = new MemoryPersistence();try {// 创建客户端MqttClient sampleClient = new MqttClient(broker, clientId, persistence);// 创建链接参数MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(false);connOpts.setUserName(userName);connOpts.setPassword(password.toCharArray());// 建立连接sampleClient.connect(connOpts);MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);sampleClient.publish(topic, message);sampleClient.disconnect();sampleClient.close();} catch (MqttException me) {System.out.println("msg " + me.getMessage());me.printStackTrace();}}
}// 订阅端
public class SubscribeSample {public static void main(String[] args) throws MqttException {   String host = "tcp://localhost:1883";String topic = "mqtt/test";int qos = 1;String clientid = "subClient";String userName = "test";String passWord = "test";try {MqttClient client = new MqttClient(host, clientid, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(userName);options.setPassword(passWord.toCharArray());options.setConnectionTimeout(10);// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线options.setKeepAliveInterval(20);// 设置回调函数client.setCallback(new MqttCallback() {public void connectionLost(Throwable cause) {System.out.println("connectionLost");}public void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("topic:"+topic);System.out.println("Qos:"+message.getQos());System.out.println("message content:"+new String(message.getPayload()));}public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------"+ token.isComplete());}});client.connect(options);client.subscribe(topic, qos);} catch (Exception e) {e.printStackTrace();}}
}

需要注意的是,mosquitte 是单机的,适合用在小型系统中(几百到几千连接),当客户端数量超过 10000 时,单个 Mosquitto 通常会存在各种问题。

EMQX

EMQX 是 MQTT 的一个企业级实现,支持 mqtt 5.0、3.1 和 3.1.1 版本。EMQX 与 Mosquitto 不一样,是原生就支持集群的,可支撑上亿级连接。EMQX 功能强大,支持规则引擎、数据集成、认证授权、协议网关等企业级功能,可扩展性强。但部署和运维比较复杂,资源消耗高。

值得一提的是,EMQX 使用的是 Business Source License 1.1 许可,该许可介于传统的开源许可证和商业许可证之间。简单来说在使用的前四年内,是不允许商用的(商用需购买许可),四年之后许可转变成 Apache License 2.0,此时才可以免费商用。

下面以单节点非商用版来做一个简单演示。

首先用 Docker 来下载镜像并启动容器:

docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx-enterprise:latest

1883 是 tcp 端口,8083 是 ws 端口,8084 是 wss 端口,18083 是 dashboard 端口

容器运行后访问 http://localhost:18083 来登录控制台。默认用户名和密码为 adminpublic

接下来我们用上节中的 Mosquitto 客户端来订阅和发布消息。

// 订阅
mosquitto_sub -h localhost -p 1883 -t testtopic/#
// 发布,host 默认 localhost,port 默认 1883
mosquitto_pub -t testtopic/my -m "hello"

查看 EMQX 的 dashboard,可以看到订阅的客户端和 topic 数量:

在这里插入图片描述

开发库

下面演示如何在 Springboot 中使用 EMQX。

依赖仍然是 paho:

<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version>
</dependency>

配置 mqtt:

mqtt:broker-url: tcp://localhost:1883username: adminpassword: publicclient-id: spring-clienttopic: mqtt/test

代码如下:

// client bean:
@Configuration
public class MqttConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.client-id}")private String clientId;@Beanpublic MqttClient mqttClient() throws MqttException {MqttClient client = new MqttClient(brokerUrl, clientId, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(username);options.setPassword(password.toCharArray());options.setAutomaticReconnect(true);options.setConnectionTimeout(10);client.connect(options);System.out.println("✅ Connected to MQTT broker: " + brokerUrl);return client;}
}// 订阅
@Service
public class MqttSubscriber {private final MqttClient mqttClient;@Value("${mqtt.topic}")private String topic;public MqttSubscriber(MqttClient mqttClient) {this.mqttClient = mqttClient;}@PostConstructpublic void subscribe() {try {mqttClient.subscribe(topic, (t, msg) -> {String payload = new String(msg.getPayload());System.out.println("📩 Received message on " + t + ": " + payload);});System.out.println("✅ Subscribed to topic: " + topic);} catch (MqttException e) {e.printStackTrace();}}
}// 发布
@Service
public class MqttPublisher {private final MqttClient mqttClient;@Value("${mqtt.topic}")private String topic;public MqttPublisher(MqttClient mqttClient) {this.mqttClient = mqttClient;}public void publish(String message) {try {MqttMessage mqttMessage = new MqttMessage(message.getBytes());mqttMessage.setQos(1);mqttClient.publish(topic, mqttMessage);System.out.println("📤📩 Published message: " + message);} catch (MqttException e) {e.printStackTrace();}}
}// Api
@RestController
@RequestMapping("/mqtt")
public class MqttController {private final MqttPublisher publisher;public MqttController(MqttPublisher publisher) {this.publisher = publisher;}@PostMapping("/publish")public String publish(@RequestParam String msg) {publisher.publish(msg);return "Message sent: " + msg;}
}

使用 curl 来调用接口发布消息:curl -X POST http://localhost:8080/mqtt/publish -d "msg=hello"

参考资料

[1]. https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
[2]. https://zhuanlan.zhihu.com/p/66807833
[3]. https://www.jianshu.com/p/65e1748a930c

http://www.dtcms.com/a/601031.html

相关文章:

  • 【.NET10】正式发布!微软开启智能开发生态新纪元
  • Linux 魔法:多种空块填充技术详解与实践
  • 深入浅出 SQLSugar:快速掌握高效 .NET ORM 框架
  • 广东哪家网站建网站搜索不到公司网站
  • 做网站开发需要学什么app开发自学教程
  • 【Linux】网络编程入门:从一个小型回声服务器开始
  • 【统一功能处理】从入门到源码:拦截器学习指南(含适配器模式深度解读)
  • linux 解析并生成一个platform_device设备具体过程
  • 编译器使用的开发语言 | 解析编译器的实现原理及其开发语言的选择
  • 佛山企业网站建设流程织梦营销型网站模板
  • 洛谷 P11965:[GESP202503 七级] 等价消除 ← 位运算(异或) + STL map
  • 智慧团建网登录入口移动网站如何优化排名
  • linux drm子系统专栏介绍
  • termux编译opencv给python用
  • 4.子任务四:Hive 安装配置
  • Lua学习记录(3) --- Lua中的复杂数据类型_table
  • 郑州做定制网站的公司南宁有名的seo费用
  • 华为SRv6技术:引领IP网络进入新时代的智能导航系统
  • 视频汇聚平台EasyCVR:构建通信基站“可视、可管、可控”的智慧安防体系
  • 在云手机中云计算的作用都有哪些?
  • 绿盟防火墙机制
  • 查询数据库上所有表用到图片和视频的数据,并记录到excel表
  • MUVERA:让RAG系统中的多向量检索像单向量一样高效
  • 数据分析笔记02:数值方法
  • 没有网站可以做cpa广告么自己怎么做网站优化
  • Spring Boot实现多数据源连接和切换
  • 【架构设计方法论】概念架构:系统设计的指路明灯
  • 将标签格式为xml的数据集按照8:2的比例划分为训练集和验证集
  • 实战派 JMeter 指南:核心功能、并发压测实操与常见问题解决方案
  • 宁晋网站建设地址信息采集平台