第十四章 MQTT订阅
系列文章目录
系列文章目录
第一章 总体概述
第二章 在实体机上安装ubuntu
第三章 Windows远程连接ubuntu
第四章 使用Docker安装和运行EMQX
第五章 Docker卸载EMQX
第六章 EMQX客户端MQTTX Desktop的安装与使用
第七章 EMQX客户端MQTTX CLI的安装与使用
第八章 Wireshark工具的安装与使用
第九章 MQTT报文
第十章 MQTT消息质量等级QoS
第十一章 MQTT主题
第十二章 MQTT会话
第十三章 MQTT消息
第十四章 MQTT订阅
文章目录
- 系列文章目录
- 前言
- 1 订阅选项
- 1.1 订阅选项简介
- 1.2 QoS
- 1.2.1 QoS订阅选项简介
- 1.2.2 QoS订阅选项演示
- 1.3 No Local
- 1.3.1 No Local订阅选项简介
- 1.3.2 No Local订阅选项演示
- 允许转发
- 不允许转发
- 1.4 Retain As Published
- 1.4.1 Retain As Published订阅选项简介
- 1.4.2 Retain As Published订阅选项演示
- 1.5 Retain Handling
- 1.5.1 Retain Handling订阅选项简介
- 1.5.2 Retain Handling订阅选项演示
- 2 共享订阅
- 2.1 共享订阅简介
- 2.2 共享订阅分类
- 2.2.1 带群组的共享订阅
- 2.2.2 不带群组的共享订阅
- 2.3 共享订阅演示
- 2.4 负载均衡算法
- 3 排它订阅
- 3.1 排它订阅简介
- 3.2 排它订阅演示
- 4 自动订阅
- 4.1 配置自动订阅规则
- 4.2 演示自动订阅使用
- 总结
前言
1 订阅选项
1.1 订阅选项简介
订阅的组成:
1、主题过滤器:决定了服务端将向我们转发哪些主题下的消息
2、订阅选项:是允许我们进一步定制服务端的转发行为
MQTT 5.0提供了4个订阅选项:QoS、No Local、Retain As Published、Retain Handling
1.2 QoS
1.2.1 QoS订阅选项简介
QoS 是最常用的一个订阅选项,它表示服务端在向订阅端发送消息时可以使用的最大QoS等级。
情况1:服务端支持的最大 QoS < 客户端订阅时请求的最大QoS
服务端将无法满足客户端的要求,这时服务端就会通过订阅的响应报文(SUBACK)告知订阅端最终授予的最大 QoS 等级,订阅端可以自行评估是否接受并继续通信。
情况2:订阅时请求的最大QoS < 消息发布时的QoS
为了尽可能地投递消息,服务端不会忽略这些消息,而是会在转发时对这些消息的 QoS 进行降级处理。
1.2.2 QoS订阅选项演示
具体步骤如下所示:
1、创建sub客户端连接,并且订阅test/n
主题, 并指定QOS为1
2、订阅成功以后,通过pub客户端连接向test/n
主题发布消息,并且指定QOS的登记为2
3、结果:sub客户端收到的消息的QOS值为1
1.3 No Local
1.3.1 No Local订阅选项简介
No Local取值:
1、0(默认值):服务端可以
将消息转发给发布这个消息的客户端
2、1:服务端不可以
将消息转发给发布这个消息的客户端
这个订阅选项尝尝被用在桥接场景
中,桥接本质上是两个 MQTT Server 建立了一个 MQTT 连接,然后相互订阅一些主题,Server 将客户端的消息转发给另一个 Server,而另一个 Server 则可以将消息继续转发给它的客户端。
在桥接的场景中,如果没有将No Local订阅选项的值设置为1,那么此时会形成转发风暴
。
举例:假设两个 MQTT Server 分别是 Server A 和 Server B,它们分别向对方订阅了#
主题。现在,Server A 将一些来自客户端的消息转发给了 Server B,而当 Server B 查找匹配的订阅时,Server A 也会位于其中。如果 Server B 将消息转发给了 Server A,那么同样 Server A 在收到消息后又会把它们再次转发给 Server B,这样就陷入了无休止的转发风暴。
而如果 Server A 和 Server B 在订阅 #
主题的同时,将 No Local 选项设置为 1,就可以完美地避免这个问题。
1.3.2 No Local订阅选项演示
允许转发
具体步骤:
1、创建sub客户端连接,并订阅test/k
主题,并且设置No Local订阅选项为0
2、使用sub客户端连接向test/k
主题发布消息
结果:当前的客户端连接收到该主题中的消息。
不允许转发
具体步骤:
1、创建sub客户端连接,并订阅test/u
主题,并且设置No Local订阅选项为1
2、使用sub客户端连接向test/u
主题发布消息
结果:当前的客户端没有接收到该主题中的消息。
1.4 Retain As Published
1.4.1 Retain As Published订阅选项简介
Retain As Published取值:
1、0(默认值):服务端在向此订阅转发应用消息时需要清除
消息中的 Retain 标识
2、1:服务端在向此订阅转发应用消息时需要保持
消息中的 Retain 标识
应用场景:桥接场景
桥接场景下带来了一些问题。我们继续沿用前面的设定,当 Server A 将保留消息转发给 Server B 时,由于消息中的 Retain 标识已经被清除,Server B 将不会知道这原本是一条保留消息,自然不会再存储它。这就导致了保留消息无法跨桥接使用。
那么在 MQTT 5.0 中,我们可以让桥接的服务端在订阅时将 Retain As Published 选项设置为 1,来解决这个问题。
1.4.2 Retain As Published订阅选项演示
具体步骤如下:
1、创建sub客户端连接,分别订阅主题test/demo01
和test/demo02
, 并且将Retain As Published设置为 0 和 1
2、通过sub客户端连接分别向sub/rap/demo01
和sub/rap/demo02
主题发布保留消息
3、结果看出,只有demo02
保留了retain
标志。
1.5 Retain Handling
1.5.1 Retain Handling订阅选项简介
作用:Retain Handling 这个订阅选项被用来向服务端指示当订阅建立时,是否需要发送保留消息
。
Retain Handling常见取值:
1、0(默认值):表示只要订阅建立,就发送保留消息;
2、1:表示只有建立全新的订阅而不是重复订阅时,才发送保留消息;
3、2:表示订阅建立时不要发送保留消息;
1.5.2 Retain Handling订阅选项演示
1、演示之前,先对MQTTX进行设置。
2、查看EMQX中有哪些保留消息。
具体步骤如下所示:
情况1:Retain Handling值设置为0
1、开启客户端的自动重订阅功能
2、创建sub客户端连接(Clean Start值设置为1,并且将Session Expiry Interval设置为300)
3、在sub客户端连接中,订阅test/demo01
主题,并且将Retain Handling的值设置为0
结果:只要订阅成功了,那么此时立马会收到保留消息
4、关闭客户端连接,设置客户端的Clean Start值设置为0
表示需要复用之间的会话
注意:只要是重新订阅成功了,那么此时就会收到保留消息
情况2:Retain Handling设置为1
删除sub客户端连接中的订阅,重新订阅test/demo01
主题,并且将Retain Handling的值设置为1
,新建立的订阅是可以获取到保留消息的。
关闭当前连接,重新建立连接【会自动复用之前的订阅】,此时无法获取到保留消息。
情况3: Retain Handling设置为2
删除sub客户端连接中的订阅,重新订阅test/demo01
主题,并且将Retain Handling的值设置为2
结果:即使订阅成功了,那么此时也不会收到保留消息
2 共享订阅
2.1 共享订阅简介
在普通的订阅中,我们每发布一条消息,所有匹配的订阅端都会收到该消息的副本。当某个订阅端的消费速度无法跟上消息的生产速度时,我们没有办法将其中一部分消息分流到其他订阅端中来分担压力。这使订阅端容易成为整个消息系统的性能瓶颈。
MQTT 5.0 引入了共享订阅特性,它使得 MQTT 服务端可以在使用特定订阅的客户端之间均衡地分配消息负载。
这表示,当我们有两个客户端共享一个订阅时,那么每个匹配该订阅的消息都只会有一个副本投递给其中一个客户端。
共享订阅不仅为消费端带来了极佳的水平扩展能力,使我们可以应对更高的吞吐量
,还为其带来了高可用性
,即使共享订阅组中的一个客户端断开连接或发生故障,其他客户端仍然可以继续处理消息,在必要时还可以接管原先流向该客户端的消息流。
2.2 共享订阅分类
启用共享订阅:为一组订阅者的原始主题添加指定前缀
共享订阅分类:
前缀格式 | 示例 | 前缀 | 真实主题名 |
---|---|---|---|
带群组格式 | $share/abc/t/1 | $share/abc | t/1 |
不带群组格式 | $queue/t/1 | $queue/ | t/1 |
2.2.1 带群组的共享订阅
您可以通过在原始主题前添加 $share/<group-name>
前缀为分组的订阅者启用共享订阅。组名可以是任意字符串。EMQX 同时将消息转发给不同的组,属于同一组的订阅者可以使用负载均衡接收消息。
例如,如果订阅者 s1
、s2
和 s3
是组 g1
的成员,订阅者 s4
和 s5
是组 g2
的成员,而所有订阅者都订阅了原始主题 t1
。共享订阅的主题是 $share/g1/t1
和 $share/g2/t1
。当 EMQX 发布消息 msg1
到原始主题 t1
时:
- EMQX 将
msg1
发送给g1
和g2
两个组。 s1
、s2
、s3
中的一个订阅者将接收msg1
。s4
和s5
中的一个订阅者将接收msg1
。
2.2.2 不带群组的共享订阅
以 $queue/
为前缀的共享订阅是不带群组的共享订阅。它是 $share
订阅的一种特例。您可以将其理解为所有订阅者都在一个订阅组中:
2.3 共享订阅演示
1、创建4个客户端连接分别是sub1
、sub2
、sub3
、sub4
,其中s1和s2属于同一个共享订阅组g1, s3和s4属于同一个共享订阅组g2
订阅的主题如下所示:
sub1: $share/g1/t/1
sub2: $share/g1/t/1
sub3: $share/g2/t/1
sub4: $share/g2/t/1
2、创建pub客户端连接,并且向t/1主题发布两条消息观测结果
默认的负载均衡算法:轮询
3、删除sub1、sub2、sub3、sub4的订阅信息,重新添加$queue/t/1
订阅
4、通过pub客户端向t/1主题发布消息观测结果
2.4 负载均衡算法
可以通过Dashboard进行负载均衡算法的配置【管理====>MATT配置】:
大致可以分为:
1、随机(Random),在共享订阅组内随机选择一个会话发送消息。
2、轮询(Round Robin),在共享订阅组内按顺序选择一个会话发送消息,循环往复。
3、哈希(Hash),基于某个字段的哈希结果来分配。
4、粘性(Sticky),在共享订阅组内随机选择一个会话发送消息,此后保持这一选择,直到该会话结束再重复这一过程。
6、本地优先(Local),随机选择,但优先选择与消息的发布者处于同一节点的会话,如果不存在这样的会话,则退化为普通的随机策略。
3 排它订阅
3.1 排它订阅简介
排它订阅允许对主题进行互斥订阅,一个主题同一时刻仅被允许存在一个订阅者,在当前订阅者未取消订阅前,其他订阅者都将无法订阅对应主题。
要进行排它订阅,您需要为主题名称添加$exclusive/
前缀,如以下表格中的示例:
示例 | 前缀 | 真实主题名 |
---|---|---|
$exclusive/t/1 | $exclusive/ | t/1 |
当某个客户端 A 订阅 $exclusive/t/1
后,其他客户端再订阅 $exclusive/t/1
时都会失败,直到 A 取消了对 $exclusive/t/1
的订阅为止。
注意: 排它订阅必须使用 $exclusive/
前缀,在上面的示例中,其他客户端依然可以通过 t/1
成功进行订阅。
订阅失败的常见错误码:
3.2 排它订阅演示
默认情况下排它订阅是关闭的。
具体步骤:
1、创建sub1客户端连接,并且添加$exclusive/t/1
订阅
2、在Dashboard开启排它订阅配置【管理====>MATT配置】
3、在sub2客户端连接中重新添加$exclusive/t/1
订阅
4、创建sub2客户端连接,并且添加$exclusive/t/1
订阅
5、创建sub2客户端连接,并且添加t/1
订阅,此时订阅成功
4 自动订阅
自动订阅能够给 EMQX 设置多个规则,在设备成功连接后按照规则为其订阅指定主题,不需要额外发起订阅。
4.1 配置自动订阅规则
通过 Dashboard 配置自动订阅规则:【管理 ====> MQTT高级特性 =====> 自动订阅 ====> 添加】
4.2 演示自动订阅使用
具体步骤:
1、创建pub客户端连接,作为发布者
2、创建sub客户端连接,作为订阅者
3、在pub客户端连接中向a/1主题发布消息
总结
以上,就是MQTT订阅介绍。