当前位置: 首页 > news >正文

Java优雅使用Spring Boot+MQTT推送与订阅

在物联网(IoT)和智能设备横行的今天,你有没有遇到这样的问题:

  • 服务端需要实时把报警、状态更新、控制指令推送给客户端;

  • 安卓 App、嵌入式设备、网页等终端,需要轻量且稳定的连接方式;

  • HTTP 太“重”、WebSocket 配置又麻烦?

这时,轻量级消息传输协议 MQTT(Message Queuing Telemetry Transport)登场!

一句话理解 MQTT:专为低带宽、高并发、实时通信设计的发布-订阅协议。

那么问题来了 —— 在 Spring Boot 项目中,如何快速、优雅、高可控地落地 MQTT?

-01-

MQTT 接入方案选择 

MQTT 本身只是一种通信协议,并不指定你用哪个消息中间件。而目前支持 MQTT 的主流 Broker 包括:

Broker

特点简述

Mosquitto

轻量级,C语言实现,非常稳定

RabbitMQ

插件支持 MQTT,易与现有系统整合

EMQX

高性能 MQTT Broker,专为 IoT 优化

HiveMQ

商用支持强,价格偏贵

本次我们采用的是:RabbitMQ + MQTT 插件,实现服务端到安卓客户端的推送通知,配合 Spring Boot 框架,集成简便,生产可用!

-02-

MQTT 三大角色 

MQTT,就像微信一样:

  • Publisher(发布者):你发朋友圈

  • Broker(中间人):微信服务器

  • Subscriber(订阅者):看到你朋友圈的朋友

也就是说,消息不是点对点的,而是“你说一句,谁订阅了就能听到”。


-03-

实战解析

Spring Boot + RabbitMQ MQTT 实现推送系统

整体架构:

[Spring Boot服务] --发布消息--> [RabbitMQ MQTT插件] --> [MQTT客户端订阅接收消息]

RabbitMQ 开启 MQTT 插件

rabbitmq-plugins enable rabbitmq_mqtt        # 服务端 MQTT 协议,端口1883rabbitmq-plugins enable rabbitmq_web_mqtt    # Web前端用 MQTT 协议,端口15675

引入依赖

<dependency>  <groupId>org.springframework.integration</groupId>  <artifactId>spring-integration-mqtt</artifactId></dependency>

配置 application.yml

mqtt-push:  clientId: mqtt_client_  serverClientId: mqtt_server_  servers: tcp://127.0.0.1:1883  username: guest  password: guest  defaultTopic: sensor/+/temperature

配置连接工厂

@Beanpublic MqttPahoClientFactory mqttClientFactory() {    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();    MqttConnectOptions options = new MqttConnectOptions();    options.setServerURIs(servers.split(","));    options.setCleanSession(false);    options.setUserName(username);    options.setPassword(password.toCharArray());    options.setKeepAliveInterval(20);    factory.setConnectionOptions(options);    return factory;}

服务端推送消息

@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {    MqttPahoMessageHandler handler = new MqttPahoMessageHandler(serverClientId + "producer_" + RandomUtil.getRandomStr(), mqttClientFactory());    handler.setAsync(true);    handler.setDefaultQos(1);    handler.setDefaultTopic(defaultTopic);    return handler;}

使用接口发送消息:

@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")public interface MqttGateway {    void sendMessage2Mqtt(String data, @Header(MqttHeaders.TOPIC) String topic);}

服务端监听客户端消息

@Beanpublic MessageProducer inbound() {    MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter(        clientId + "consumer_" + RandomUtil.getRandomStr(), mqttClientFactory(), defaultTopic);    adapter.setQos(2);    adapter.setOutputChannel(mqttInputChannel());    return adapter;}

处理消息回调:

@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler mqttInMessageHandler() {    return message -> {        String topic = message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC).toString();        String payload = message.getPayload().toString();        log.info("收到消息:主题 [{}] 内容 [{}]", topic, payload);    };}

MQTTBox 测试

MQTTBox 是一款强大的 MQTT 测试工具,可以模拟发送消息,也能订阅查看接收到的消息:

1. 发布测试

使用 MQTTBox 向 sensor/s123/temperature 发布消息
服务端通过通配符 sensor/+/temperature 成功收到消息!

2. 控制器测试

@PostMapping("/sendMessage")public String sendMqtt(@RequestBody ReqSendMsgDTO dto) {    mqttGateway.sendMessage2Mqtt(dto.getTopic(), dto.getPayload());    return "SUCCESS";}

-04-

总结

实践建议

  • clientId 必须唯一,推荐使用 UUID 或服务实例标识;

  • QoS 建议使用 1(至少一次),避免消息丢失;

  • 若用 RabbitMQ,也可以使用 Exchange + Topic Binding 方式做高级路由;

  • 对于高并发或长连接推送,推荐结合 Netty 或 Gateway 层限流处理。

技术方案

能力点

技术实现

协议支持

MQTT(通过 rabbitmq_mqtt 插件)

服务端推送

Spring Integration + MqttGateway

客户端订阅

MqttPahoMessageDrivenChannelAdapter

工具联调

MQTTBox / Postman / 模拟器

安全与稳定性

唯一 clientId、QoS 保证、自动重连

http://www.dtcms.com/a/303937.html

相关文章:

  • vue请求golang后端CORS跨域问题深度踩坑
  • 【STM32】FreeRTOS 任务消息队列 和 中断消息队列的区别(六)
  • 14 - 大语言模型 — 抽取式问答系统 “成长记”:靠 BERT 学本事,从文本里精准 “揪” 答案的全过程(呆瓜版-1号)
  • “非参数化”大语言模型与RAG的关系?
  • 云原生MySQL Operator开发实战(五):扩展与生态系统集成
  • python使用ffmpeg录制rtmp/m3u8推流视频并按ctrl+c实现优雅退出
  • DateTime::ToString 日期时间文本格式化深度解析(C++)
  • Mysql InnoDB存储引擎
  • 2.快速开始
  • Windows下基于 SenseVoice模型的本地语音转文字工具
  • 【Linux我做主】探秘进程状态
  • 聚铭安全管家平台2.0实战解码 | 安服篇(三):配置保障 自动核查
  • 从单机架构到分布式:Redis为何成为架构升级的关键一环?
  • OpenLayers 综合案例-底图换肤(变色)
  • DevOps 详解
  • Linux -- 文件【中】
  • CVE-2022-46169漏洞复现
  • DNS污染与劫持
  • 《林景媚与命运协议》
  • 服务器数据恢复—RAID上层部署的oracle数据库数据恢复案例
  • logtrick 按位或最大的最小子数组长度
  • JavaWeb(苍穹外卖)--学习笔记15(分页查询PageHelper)
  • Unity_UI_NGUI_DrawCall
  • Mac安装Navicat步骤Navicat Premium for Mac v17.1.9【亲测】
  • 【腾讯云】EdgeOne网站安全防护的配置方法 防范盗刷流量 附恶意IP和UA黑名单
  • 学习网址备份(二)
  • Linux 启动流程、密码破解、引导修复完全手册
  • 【智能协同云图库】智能协同云图库第八弹:基于阿里云百炼大模型—实现 AI 扩图功能
  • haproxy应用详解
  • 创建型设计模式-工厂方法模式和抽象工厂方法模式