RabbitMQ-MQTT即时通讯详解
前言
有时候我们的项目中会用到即时通讯功能,比如电商系统中的客服聊天、支付成功后的异步回调通知等。最近发现如果没有特殊的业务需求,甚至可以不写后端代码,用RabbitMQ可以很方便的实现即时通讯功能。
MQTT协议
MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件。
IBM公司的安迪·斯坦福-克拉克及Cirrus Link公司的阿兰·尼普于1999年撰写了该协议的第一个版本。
该协议的可用性取决于该协议的使用环境。IBM公司在2013年就向结构化资讯标准促进组织提交了 MQTT 3.1 版规范,并附有相关章程,以确保只能对规范进行少量更改。MQTT-SN是针对非 TCP/IP 网络上的嵌入式设备主要协议的变种,与此类似的还有ZigBee协议。
纵观行业的发展历程,“MQTT”中的“MQ” 是来自于IBM的MQ系列消息队列产品线。然而通常队列本身不需要作为标准功能来支持。
可选协议包含了高级消息队列协议,面向文本的消息传递协议,互联网工程任务组约束应用协议,可扩展消息与存在协议,数据分发服务,OPC UA以及web 应用程序消息传递协议。
通过MQTT协议,目前已经扩展出了数十个MQTT服务器端程序,可以通过PHP,JAVA,Python,C,C#等系统语言来向MQTT发送相关消息。
MQTT特点
MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:
1、使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;
2、对负载内容屏蔽的消息传输;
3、使用 TCP/IP 提供网络连接;
4、有三种消息发布服务质量:
5、小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;
6、使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。
RabbitMQ启用MQTT功能
RabbitMQ默认不会启用启用MQTT功能,需要手动开启。
Docker环境,安装并启动RabbitMQ
docker run -p 5672:5672 -p 15672:15672 -p 1883:1883 -p 15675:15675 --name rabbitmq-mqtt \ -v /mydata/rabbitmq-mqtt/data:/var/lib/rabbitmq \ -d rabbitmq:3.9.11-management
接下来就是启用RabbitMQ的MQTT WEB插件了,进入rabbitmq容器后
# 先进入rabbitmq容器docker exec -it rabbitmq-mqtt /bin/bash# 再启用mqtt web插件,会同时启用rabbitmq_mqtt插件rabbitmq-plugins enable rabbitmq_web_mqtt
开启成功后,查看管理控制台,我们可以发现MQTT的web服务运行在15675端口上了
Mall开源项目示例本机实现
mall项目是一套基于SpringBoot3 + Vue 的电商系统(Github标星60K),后端支持多模块和最新微服务架构 ,采用Docker和K8S部署。包括前台商城项目和后台管理系统,能支持完整的订单流程!涵盖商品、订单、购物车、权限、优惠券、会员、支付等功能!
MQTT客户端
使用MQTT客户端来测试MQTT的即时通讯功能,这里使用的是MQTTX这个客户端工具。
docker run -p 80:80 --name mqttx-web -d emqx/mqttx-web
运行成功后可以访问MQTTX的控制台
点击左侧的加号按钮来创建一个MQTT连接,配置好连接信息,注意MQTT版本选择3.1.1;
再添加一个订阅,订阅testTopicA这个主题,我们会向这个主题发送消息;
发布者向主题中发布消息,订阅者可以实时接收到。
前端实现即时通讯
既然MQTTX客户端可以直接通过RabbitMQ实现即时通讯,那我们直接使用前端技术是否也能实现即时通讯?答案是肯定的!下面我们将通过html+javascript实现一个简单的聊天功能,真正不写一行后端代码实现即时通讯!
WEB端与MQTT服务进行通讯需要使用一个叫MQTT.js的库
gitHub.com/mqttjs/MQTT.js
实现的功能非常简单,一个单聊功能,需要注意的是配置好MQTT服务的访问地址为:ws://192.168.3.101:15675/ws
<!DOCTYPE html><html lang="en"><head><meta charset="UTF-8"><title>Title</title></head><body><div><label>目标Topic:<input id="targetTopicInput" type="text"></label><br><label>发送消息:<input id="messageInput" type="text"></label><br><button onclick="sendMessage()">发送</button><button onclick="clearMessage()">清空</button><div id="messageDiv"></div></div></body><script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script><script>//RabbitMQ的web-mqtt连接地址const url = 'ws://192.168.3.101:15675/ws';//获取订阅的topicconst topic = getQueryString("topic");//连接到消息队列let client = mqtt.connect(url);client.on('connect', function () {//连接成功后订阅topicclient.subscribe(topic, function (err) {if (!err) {showMessage("订阅topic:" + topic + "成功!");}});});//获取订阅topic中的消息client.on('message', function (topic, message) {showMessage("收到消息:" + message.toString());});//发送消息function sendMessage() {let targetTopic = document.getElementById("targetTopicInput").value;let message = document.getElementById("messageInput").value;//向目标topic中发送消息client.publish(targetTopic, message);showMessage("发送消息给" + targetTopic + "的消息:" + message);}//从URL中获取参数function getQueryString(name) {let reg = new RegExp("(^|&)" + name + "=([^&]*)(&|$)", "i");let r = window.location.search.substr(1).match(reg);if (r != null) {return decodeURIComponent(r[2]);}return null;}//在消息列表中展示消息function showMessage(message) {let messageDiv = document.getElementById("messageDiv");let messageEle = document.createElement("div");messageEle.innerText = message;messageDiv.appendChild(messageEle);}//清空消息列表function clearMessage() {let messageDiv = document.getElementById("messageDiv");messageDiv.innerHTML = "";}</script></html>
接下来订阅不同的主题开启两个页面测试下功能(页面已经放在SpringBoot项目的resources目录下了,需要先启动应用再访问):
在SpringBoot中使用
没有特殊业务需求的时候,前端可以直接和RabbitMQ对接实现即时通讯。但有时候我们需要通过服务端去通知前端,此时就需要在应用中集成MQTT了,接下来我们来讲讲如何在SpringBoot应用中使用MQTT。
首先我们需要在项目的pom.xml中添加MQTT相关依赖
<!--Spring集成MQTT--><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency>
在application.yml中添加MQTT相关配置,主要是访问地址、用户名密码、默认主题信息
rabbitmq:mqtt:url: tcp://192.168.3.101:1883username: guestpassword: guestdefaultTopic: testTopic
编写一个Java配置类从配置文件中读取上面的配置便于使用;
@Data@EqualsAndHashCode(callSuper = false)@Component@ConfigurationProperties(prefix = "rabbitmq.mqtt")public class MqttConfig {/*** RabbitMQ连接用户名*/private String username;/*** RabbitMQ连接密码*/private String password;/*** RabbitMQ的MQTT默认topic*/private String defaultTopic;/*** RabbitMQ的MQTT连接地址*/private String url;}
添加MQTT消息订阅者相关配置,使用@ServiceActivator注解声明一个服务激活器,通过MessageHandler来处理订阅消息;
@Slf4j
@Configuration
public class MqttInboundConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient", mqttConfig.getDefaultTopic());
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
//设置消息质量:0->至多一次;1->至少一次;2->只有一次
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
//处理订阅消息
log.info("handleMessage : {}",message.getPayload()); }
};
}
}
添加MQTT消息发布者相关配置;
@Configurationpublic class MqttOutboundConfig {@Autowiredprivate MqttConfig mqttConfig;@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[] { mqttConfig.getUrl()});options.setUserName(mqttConfig.getUsername());options.setPassword(mqttConfig.getPassword().toCharArray());factory.setConnectionOptions(options);return factory;}@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler =new MqttPahoMessageHandler("publisherClient", mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());return messageHandler;}@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}}
添加MQTT网关,用于向主题中发送消息
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/** * 发送消息到默认topic */
void sendToMqtt(String payload);
/** * 发送消息到指定topic */
void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
/** * 发送消息到指定topic并设置QOS */
void sendToMqtt(@Header(MqttHeaders.TOPIC)
String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
添加MQTT测试接口,使用MQTT网关向特定主题中发送消息
@Slf4j
@RestController
@Tag(name = "MqttController", description = "MQTT测试接口")
@RequestMapping("/mqtt")
public class MqttController {
@Autowired
private MqttGateway mqttGateway;
@PostMapping("/sendToDefaultTopic")
@Operation(summary = "向默认主题发送消息")
public CommonResult sendToDefaultTopic(String payload) { mqttGateway.sendToMqtt(payload);
return CommonResult.success(null);
}
@PostMapping("/sendToTopic")
@Operation(summary = "向指定主题发送消息")
public CommonResult sendToTopic(String payload, String topic) { mqttGateway.sendToMqtt(payload, topic);
return CommonResult.success(null);
} }
调用接口向主题中发送消息进行测试,访问地址:http://localhost:8088/swagger-ui.html
总结
消息中间件应用越来越广泛,不仅可以实现可靠的异步通信,还可以实现即时通讯,掌握一个消息中间件还是很有必要的。如果没有特殊业务需求,客户端或者前端直接使用MQTT对接消息中间件即可实现即时通讯,有特殊需求的时候也可以使用SpringBoot集成MQTT的方式来实现,总之消息中间件是实现即时通讯的一个好选择!