MQTT 与双工通信
目录:
rabbitmq 消息队列
Kafka详解
mqtt 与双工通信
MQTT 全称是 Message Queuing Telemetry Transport,翻译过来叫消息队列遥测传输。
本质上它是一个轻量级的通信协议,专为资源受限的设备设计,比如那些带宽有限、算力不强的物联网小设备。如温湿度传感器、Wi-Fi 灯泡、甚至智能插座,它们都需要快速、可靠地交换数据,此时就可以用 MQTT 来交换数据,如下所示:

其核心思想是发布/订阅模型。
MQTT 与 AMQP
从作用来看,MQTT 和 AMQP(Advanced Message Queue Protocol) 都是消息传输协议,那两者有什么不同呢?
AMQP
AMQP 的典型实现是 rabbitMq,该协议主要面向可靠、复杂路由的消息传输。
RocketMq 虽然借鉴了 AMQP 模型,但并非 AMQP 的实现,而是使用了自研协议。
Kafka 则是使用了自定义二进制协议,追求高吞吐、分布式伸缩和流数据处理。
相比于 MQTT,AMQP 是一个非常复杂的协议,最初是为金融机构和企业系统而设计的。在这种应用场景中,对消息的传输有以下要求:
- 可靠传递
- 支持事务
- 支持安全认证、访问控制
- 支持灵活的消息路由
- 支持消息持久化和确认机制
也就是说,AMQP 不仅仅用于传消息,还要解决企业级消息传递的所有难题,这就让 AMQP 协议变得庞大和复杂。
AMQP 是一个多层协议栈,包括:Transport Layer,Messaging Layer,Session/Channel Layer,Exchange/Queue Layer。传输层负责建立 TCP 连接,协商数据帧格式、心跳、SASL认证。消息层负责消息格式(header, properties, body)的定义。会话层定义逻辑会话、流控、事务。交换层负责定义路由模型(binding、routing key)等。
在 AMQP 中,发送一条消息涉及到很多组件。生产者需要先声明 exchange、queue,并与其绑定。每条消息在传输链路上可能需要多次确认,如 Broker 确认收到,Consumer 确认处理完成,最后才提交 transaction 或者回滚。一次消息发送到最终确认可能需要多次往返交互,涉及不同组件(Channel、Exchange、Queue)之间的状态同步。
也正是由于这种复杂性,AMQP 的实现(如 rabbitMq)需要相对高的资源占用。
AMQP 有多个版本体系,最早的 AMQP 0-9-1 应用广泛,rabbitMq 正是基于此版本实现的,特点是路由模型清晰。而后来的 AMQP 1.0 完全重构了协议,支持点对点、异步流传输。中间甚至还有一个过渡的 0-10 版本。
MQTT
与 AMQP 相比,MQTT 则更轻量、高效,面向 IoT 场景,适合在低带宽、低功耗、不稳定网络的环境下进行消息通信。
MQTT 在车联网领域也有广泛的应用。
Topic
在 MQTT 中,topic 使用 / 作为层级分隔符,支持通配符匹配, + 匹配一个层级,# 匹配多个层级。
假如有以下 topic:
sensor/bedroom/temp
sendor/living_room/temp
sensor/kitchen/humidity
如果消费者订阅的 topic 为 sensor/+/temp,则会匹配到 sensor/bedroom/temp 和 sendor/living_room/temp。如果消费者订阅的 topic 为 sensor/#, 则会订阅全部的三个 topic。
Topic 的设计原则:
- 不要以
/开头。虽然是合法的,但不符合常规习惯。 - 不要包含空格。同样是合法的,但会导致 topic 难以阅读和处理。
- 使用固定的、一致的层级顺序,如
系统/位置/设备类型/设备id/指标。 - topic 中不能包含通配符
#和+。 - 许多 MQTT Broker (如EMQX、HiveMQ) 保留以
$开头的 topic 用于内部使用。客户端通常默认无法订阅这些 topic,除非显示配置。
QoS
MQTT 中对消息传输的可靠性有三个 Qos 等级设置:
| Qos 等级 | 含义 | 吞吐量 | 可靠性 | 消息确认 |
|---|---|---|---|---|
| 0 | 消息投递次数为:At most once | 无需确认,吞吐量最高 | 不保证送达 | 单向发送,无确认 |
| 1 | 消息投递次数为:At least once |中等 | 保证至少一次,可能重复 | Pub -> PubAck | 一次确认 |
| 2 | 消息投递次数为:Exactly once | 最低 |仅一次,最可靠 | Pub -> PubRec -> PubRel -> PubComp | 二次确认 |
需要注意的是,发布流程和订阅流程的 Qos 等级设置是独立的。发布者和 broker 之间的消息可靠性由发布者 Qos 决定;Broker 和订阅着之间的消息可靠性由订阅着 Qos 决定。
举个例子:假设发布者 Qos 为 1,订阅者 Qos 为 1。消息从发布者到达 broker,可以确保是有且仅有一次的。但是消息从 broker 到达订阅着,则可能会重复。
如果从整条链路来看,Qos 是两者的较小值,也就是说 effective_Qos = min(publish_Qos, suscribe_Qos)。
MQTT 与 WebSocket
考虑一种场景:服务端需要给客户端推送数据,比如状态更新、消息通知、进度等,该如何实现?
应用广泛的 http 是不行的,因为 http 是请求/响应模式的,服务端无法主动发起通信,要实现服务端推送,需要使用长连接。
首先想到的是 WebSocket。
WebSocket
WebSocket 是一种建立在 TCP 之上的全双工的通信协议,通过 HTTP/1.1 的 Upgrade 机制进行握手。
WebSocket 是标准的、跨平台的协议,其支持非常广泛,如浏览器、移动端、各种后端框架,都有对 WebSocket 的稳定实现。应用场景也非常多:
- 实时聊天系统
- 实时数据推送,如股票行情、监控仪表盘等
- 网络游戏,服务端系统频繁给客户端同步数据
- IoT 设备通信,如智能家居、车辆网
- 协作应用,如 Google Docs、Figma
- 实时通知,如异步任务状态更新、推送中心
但使用 WebSocket 也要考虑取以下取舍点:
连接数量
当客户端数量规模较大时,比如百万设备,如果采用 webSocket 来进行通信,意味着服务端需要维护百万级别的 webSocket 连接,这对服务端是巨大的挑战。
首选是硬件资源方面。每个连接都会消耗内存(内核缓冲区、应用层缓冲区、连接状态等),假设每个连接占用 20 KB,光 100 万连接大概需要 20 GB 的内存。在网络带宽方面,海量连接上的数据传输,需要消耗巨大的网络带宽。
从软件层面来说,大量连接需要考虑各种内核参数的限制,比如文件句柄数,tcp 连接参数。
除此之外,还要考虑技术层面的问题,比如服务器故障时的连接恢复和状态迁移等等。
因此当客户端规模很大的情况下,是否使用 WebSocket 是需要慎重考虑的。
消息可靠性
WebSocket 本身没有内置的消息可靠性保证机制,如果要保证可靠性,则需要在应用层面来考虑,比如在应用层增加确认机制。
通信实时性
WebSocket 通信延迟低、实时性高,如果服务端主动发起的通信并不频繁,只是“偶尔通知”,比如异步任务完成通知,或者对服务端的推送实时性要求并不高,完全没有必要占用长连接来进行通信。
总结来说,当系统规模大,或者对消息可靠性要求高时,可以选用其他的通信协议,例如 MQTT、gRPC Stream 或者 SSE。
MQTT Over WebSocket
MQTT 原生是在 TCP/IP 上运行的,可以称之为基于 TCP 的 MQTT。而 WebSocket 上面也介绍过了,是基于 TCP 的全双工通信的协议。那 MQTT over webSocket 又是什么意思呢?
假设有这样一个场景:你想开发一个网页,在浏览器打开该网页可以显示某个传感器的湿度和温度。传感器使用 MQTT 协议发布数据。
问题是你的网页运行在浏览器中,而浏览器不允许你直接建立一个到 MQTT 服务器的原生 TCP 连接(出于安全考虑)。这种情况下就可以用 MQTT over WebSocket 的解决方案了:
- 在网页中使用 JavaScript 建立一个标准的 WebSocket 连接到服务器,这是浏览器允许的。
- 服务器端(如 EMQX、HiveMQ)需要同时支持 WebSocket 协议和 MQTT 协议。
- 服务器接收来自浏览器的 WebSocket 连接,然后将 WebSocket 数据帧内部转换成标准的 MQTT 数据包进行处理。
- 网页就可以通过 WebSocket 这个桥梁,使用完整的 MQTT 功能了。
简单点来说,MQTT over WebSocket 就是让 MQTT 协议能够通过 WebSocket 通道进行通信,主要原因是原生的 MQTT 不能在浏览器中直接使用,正常情况下也不能穿透 HTTP 防火墙,而通过 WebSocket 包装一层后就可以了。
再考虑一个场景,在使用知乎这类的 UGC 网站或应用的时候,用户在发表一个问题或评论之后,其他用户可以对其进行评论或点赞。如果单纯使用 http 协议来进行通信,用户只有在刷新网页的时候,才能看到其他用户和自己的互动。
事实上,大部分这类网站都是有实时的消息提醒的。即使用户不刷新网页,也能看到消息中心的消息或私信的提示。这意味着服务端是可以主动对客户端推送消息的。
上面说过,这种情况可以使用 WebSocket 进行通信。但是对服务端来说,维护大量的长连接来实现这种非重点功能是得不偿失的,所以知乎使用的是 MQTT,准确点说是基于 WebSocket 的 MQTT。在浏览器调试页面可以看到:

在用户访问知乎的时候,用户浏览器端和知乎服务端建立了一条 wss(WebSocket Security)通道,完整的 URL 为:wss://mqtt-web.zhihu/mqtt。通过该加密的 WebSocket 通道,服务端可以将新消息提醒(私信、评论、点赞)、网页上实时的内容更新(如回答的点赞数)和在线状态或通知推送到用户浏览器网页上。
MQTT 的实现
MQTT 本身只是一个协议标准,其实现有很多,比如 Mosquitto、EMQX,HiveMQ 等,云服务商也提供了 MQTT 的支持,如 AWS IoT,Azure IoT Hub。
| 名称 | 语言 | 类型 | 特点 | 适用场景 |
|---|---|---|---|---|
| Eclipse Mosquitto | C | 开源、轻量级 | 简单稳定、嵌入式友好 | 个人项目、小型 IoT |
| EMQX | Erlang/Elixir | 企业级分布式 | 高性能、高可用、插件丰富 | 大规模 IoT 平台、企业应用 |
| HiveMQ | Java | 商业/社区 | 可扩展、企业支持 | 车辆网、工业 IoT |
| RabbitMQ + MQTT 插件 | Erlang | 通用消息中间件 | 统一协议网关 | 需整合 MQTT 与 AMQP/Kafka 系统 |
| AWS IoT Core / Azure IoT Hub | 云平台 | 托管服务 | 云集成、免维护 | 云端 IoT 系统 |
Mosquitto
Mosquitto 是 Eclipse 基金会开源的轻量级 mqtt 实现,支持 mqtt 5.0、3.1 和 3.1.1 版本。
下面简单演示一下在mac上如何安装和使用。
首先通过 brew 下载 mosquitto,通过 mosquitto -v 开启服务。
接着打开订阅窗口来订阅消息:mosquitto_sub -t test/topic,再打开发布窗口来发布消息:mosquitto_pub -t test/topic -m "Hello MQTT"。在订阅窗口中可以看到订阅的消息。
如果先开发布窗口,再开订阅窗口,会收不到消息,因为默认的 retain 为 false(不保留消息)。
如果要让订阅者启动后还能收到之前的消息,可以发布保留消息,即 mosquitto_pub -t test/topic -m "Hello" -r,这样 broker 会保存最新一条消息。
即使先订阅再发布,也有可能会丢消息,因为默认的 QoS 为 1,即最多发送一次。要想保证消息的可靠性,需要修改 QoS 等级。
开发库
下面用一个最简单的例子来演示在 Java 中使用 mosquitto:
依赖:
<dependencies><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>${pahoVersion}</version></dependency>
</dependencies>// 发布端
public class PublishSample {public static void main(String[] args) {String topic = "mqtt/test";String content = "hello";int qos = 1;String broker = "tcp://localhost:1883";String userName = "test";String password = "test";String clientId = "pubClient";// 内存存储MemoryPersistence persistence = new MemoryPersistence();try {// 创建客户端MqttClient sampleClient = new MqttClient(broker, clientId, persistence);// 创建链接参数MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setCleanSession(false);connOpts.setUserName(userName);connOpts.setPassword(password.toCharArray());// 建立连接sampleClient.connect(connOpts);MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);sampleClient.publish(topic, message);sampleClient.disconnect();sampleClient.close();} catch (MqttException me) {System.out.println("msg " + me.getMessage());me.printStackTrace();}}
}// 订阅端
public class SubscribeSample {public static void main(String[] args) throws MqttException { String host = "tcp://localhost:1883";String topic = "mqtt/test";int qos = 1;String clientid = "subClient";String userName = "test";String passWord = "test";try {MqttClient client = new MqttClient(host, clientid, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(userName);options.setPassword(passWord.toCharArray());options.setConnectionTimeout(10);// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线options.setKeepAliveInterval(20);// 设置回调函数client.setCallback(new MqttCallback() {public void connectionLost(Throwable cause) {System.out.println("connectionLost");}public void messageArrived(String topic, MqttMessage message) throws Exception {System.out.println("topic:"+topic);System.out.println("Qos:"+message.getQos());System.out.println("message content:"+new String(message.getPayload()));}public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------"+ token.isComplete());}});client.connect(options);client.subscribe(topic, qos);} catch (Exception e) {e.printStackTrace();}}
}
需要注意的是,mosquitte 是单机的,适合用在小型系统中(几百到几千连接),当客户端数量超过 10000 时,单个 Mosquitto 通常会存在各种问题。
EMQX
EMQX 是 MQTT 的一个企业级实现,支持 mqtt 5.0、3.1 和 3.1.1 版本。EMQX 与 Mosquitto 不一样,是原生就支持集群的,可支撑上亿级连接。EMQX 功能强大,支持规则引擎、数据集成、认证授权、协议网关等企业级功能,可扩展性强。但部署和运维比较复杂,资源消耗高。
值得一提的是,EMQX 使用的是 Business Source License 1.1 许可,该许可介于传统的开源许可证和商业许可证之间。简单来说在使用的前四年内,是不允许商用的(商用需购买许可),四年之后许可转变成 Apache License 2.0,此时才可以免费商用。
下面以单节点非商用版来做一个简单演示。
首先用 Docker 来下载镜像并启动容器:
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx-enterprise:latest
1883 是 tcp 端口,8083 是 ws 端口,8084 是 wss 端口,18083 是 dashboard 端口
容器运行后访问 http://localhost:18083 来登录控制台。默认用户名和密码为 admin 和 public。
接下来我们用上节中的 Mosquitto 客户端来订阅和发布消息。
// 订阅
mosquitto_sub -h localhost -p 1883 -t testtopic/#
// 发布,host 默认 localhost,port 默认 1883
mosquitto_pub -t testtopic/my -m "hello"
查看 EMQX 的 dashboard,可以看到订阅的客户端和 topic 数量:

开发库
下面演示如何在 Springboot 中使用 EMQX。
依赖仍然是 paho:
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version>
</dependency>
配置 mqtt:
mqtt:broker-url: tcp://localhost:1883username: adminpassword: publicclient-id: spring-clienttopic: mqtt/test
代码如下:
// client bean:
@Configuration
public class MqttConfig {@Value("${mqtt.broker-url}")private String brokerUrl;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.client-id}")private String clientId;@Beanpublic MqttClient mqttClient() throws MqttException {MqttClient client = new MqttClient(brokerUrl, clientId, new MemoryPersistence());MqttConnectOptions options = new MqttConnectOptions();options.setCleanSession(true);options.setUserName(username);options.setPassword(password.toCharArray());options.setAutomaticReconnect(true);options.setConnectionTimeout(10);client.connect(options);System.out.println("✅ Connected to MQTT broker: " + brokerUrl);return client;}
}// 订阅
@Service
public class MqttSubscriber {private final MqttClient mqttClient;@Value("${mqtt.topic}")private String topic;public MqttSubscriber(MqttClient mqttClient) {this.mqttClient = mqttClient;}@PostConstructpublic void subscribe() {try {mqttClient.subscribe(topic, (t, msg) -> {String payload = new String(msg.getPayload());System.out.println("📩 Received message on " + t + ": " + payload);});System.out.println("✅ Subscribed to topic: " + topic);} catch (MqttException e) {e.printStackTrace();}}
}// 发布
@Service
public class MqttPublisher {private final MqttClient mqttClient;@Value("${mqtt.topic}")private String topic;public MqttPublisher(MqttClient mqttClient) {this.mqttClient = mqttClient;}public void publish(String message) {try {MqttMessage mqttMessage = new MqttMessage(message.getBytes());mqttMessage.setQos(1);mqttClient.publish(topic, mqttMessage);System.out.println("📤📩 Published message: " + message);} catch (MqttException e) {e.printStackTrace();}}
}// Api
@RestController
@RequestMapping("/mqtt")
public class MqttController {private final MqttPublisher publisher;public MqttController(MqttPublisher publisher) {this.publisher = publisher;}@PostMapping("/publish")public String publish(@RequestParam String msg) {publisher.publish(msg);return "Message sent: " + msg;}
}
使用 curl 来调用接口发布消息:curl -X POST http://localhost:8080/mqtt/publish -d "msg=hello"
参考资料
[1]. https://docs.oasis-open.org/mqtt/mqtt/v5.0/mqtt-v5.0.html
[2]. https://zhuanlan.zhihu.com/p/66807833
[3]. https://www.jianshu.com/p/65e1748a930c
