当前位置: 首页 > news >正文

充电宝项目中的MQTT(轻量高效的物联网通信协议)

文章目录

  • 补充:HTTP协议
  • MQTT协议
    • MQTT的核心特性
    • MQTT vs HTTP:关键对比
  • EMQX
  • 项目集成EMQX
    • 集成配置
    • 客户端和回调方法
    • 具体接口和方法处理
    • 处理类

补充:HTTP协议

  • HTTP是一种应用层协议,使用TCP作为传输层协议,默认端口是80,基于请求和响应的方式,即客户端发起请求,服务器响应请求并返回数据(HTML,JSON)。在HTTP/1.1中,使用了长连接技术,允许一个连接复用多个请求和响应,减少了TCP三次握手的消耗。
  • HTTP的基本结构
    • **请求行:**包含请求方法(GET, POST等)、请求URL、协议版本。
    • **请求头:**包括各种元数据,如Connection、Host、Content-Type等。
    • **空行:**标识头部与载荷的分界线
    • **请求体:**通常在POST请求中出现,包含请求的具体数据。

  • HTTP的**无状态性:**HTTP是无状态协议,每次请求都是独立的,不会记录上一次请求的任何信息,如果需要记录用户状态,需要额外机制,如:**Cookies:**浏览器在发送请求时,可以携带上次访问时服务器存储的Cookies(小型文本数据),服务器通过这些Cookies来识别用户的身份或维持会话状态。
  • **高开销:**每次请求都需要建立TCP连接,导致网络开销较大,尤其在频繁请求的场景下。
  • 实时性差:HTTP通常是客户端主动发起请求,服务器无法主动推送数据。

MQTT协议

  • MQTT(Message Queuing Telemetry Transport)是一种轻量级的发布/订阅式消息传输协议,专为低带宽、高延迟或不稳定的网络环境设计。使用TCP协议进行传输,端口为1883(非加密)和8883(加密),客户端通过发布(Publish)消息到某个主题(Topic),而其他订阅(Subscribe)该主题的客户端会接收到消息。现已成为物联网(IoT)领域最流行的通信协议之一。

  • **主题(Topic):**消息的标签,决定消息的去向,订阅者根据主题来接收消息。
  • **QoS(Quality of Service)级别:**决定消息传输的可靠性。MQTT支持三个级别的QoS:
    • QoS 0:最多一次发送,不保证消息送达。
    • QoS 1:至少一次发送,确保消息至少送达一次。
    • QoS 2:只有一次发送,确保消息只送达一次。
  • **保留标志:**用于确保客户端在订阅时能接收到最后一条消息。

MQTT基于客户端-服务器架构,其中:

  • 发布者(Publisher):发送消息的客户端
  • 订阅者(Subscriber):接收消息的客户端
  • 代理(Broker):接收所有消息并过滤后分发给相关订阅者的服务器

MQTT的核心特性

  1. 轻量高效:最小化协议开销,报文头仅2字节
  2. 发布/订阅模式:解耦消息生产者和消费者
  3. 三种服务质量(QoS)等级
    • QoS 0:最多一次(可能丢失)
    • QoS 1:至少一次(可能重复)
    • QoS 2:恰好一次(确保可靠)
  4. 持久会话:可恢复中断的连接
  5. 遗嘱消息:客户端异常断开时发送预设消息
  6. 主题过滤:支持多级通配符(#和+)

MQTT vs HTTP:关键对比

特性MQTTHTTP
通信模式发布/订阅请求/响应
连接开销保持长连接(Keep-Alive)通常短连接(可配置Keep-Alive)
消息方向双向通信客户端发起请求
协议开销极小(最小2字节头)较大(包含大量头信息)
实时性高(消息即时推送)低(依赖轮询或WebSocket)
适用场景IoT、实时消息、低带宽环境Web服务、API交互
消息推送服务器可主动推送传统HTTP需客户端轮询
功耗相对较高
安全性支持TLS加密支持HTTPS加密

EMQX

  • EMQX 是一款大规模可弹性伸缩的云原生分布式物联网 MQTT 消息服务器。作为全球最具扩展性的 MQTT 消息服务器,EMQX 提供了高效可靠海量物联网设备连接,能够高性能实时移动与处理消息和事件流数据,帮助您快速构建关键业务的物联网平台与应用。

  • EMQX文档

  • EMQX的docker安装:开始在linux上安装1Panel,然后再应用商店中进行一键安装。
    在这里插入图片描述

  • EMQX特性:

    • 开放源码:基于 Apache 2.0 许可证完全开源,自 2013 年起 200+ 开源版本迭代。
    • MQTT 5.0:100% 支持 MQTT 5.0 和 3.x 协议标准,更好的伸缩性、安全性和可靠性。
    • 海量连接:单节点支持 500 万 MQTT 设备连接,集群可扩展至 1 亿并发 MQTT 连接。
    • 高性能:单节点支持每秒实时接收、移动、处理与分发数百万条的 MQTT 消息。
    • 低时延:基于 Erlang/OTP 软实时的运行时系统设计,消息分发与投递时延低于 1 毫秒。
    • 高可用:采用 Masterless 的大规模分布式集群架构,实现系统高可用和水平扩展。

  • 根据业务流程图可以看出,系统与柜机交互是通过MQTT协议进行
    在这里插入图片描述

项目集成EMQX

集成配置

  1. 引入依赖
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId>
</dependency>
  1. MqttTest
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;public class MqttTest {public static void main(String[] args) {String subTopic = "testtopic/#";String pubTopic = "testtopic/1";String content = "Hello World";int qos = 2;String broker = "tcp://ip:1883";String clientId = "emqx_test";MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(broker, clientId, persistence);// MQTT 连接选项MqttConnectOptions connOpts = new MqttConnectOptions();connOpts.setUserName("emqx_test");connOpts.setPassword("emqx_test_password".toCharArray());// 保留会话connOpts.setCleanSession(true);// 设置回调client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable cause) {// 连接丢失后,一般在这里面进行重连System.out.println("连接断开,可以做重连");}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {// subscribe后得到的消息会执行到这里面System.out.println("接收消息主题:" + topic);System.out.println("接收消息Qos:" + message.getQos());System.out.println("接收消息内容:" + new String(message.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete());}});// 建立连接System.out.println("Connecting to broker: " + broker);client.connect(connOpts);System.out.println("Connected");System.out.println("Publishing message: " + content);// 订阅client.subscribe(subTopic);// 消息发布所需参数MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);client.publish(pubTopic, message);System.out.println("Message published");client.disconnect();System.out.println("Disconnected");client.close();System.exit(0);} catch (MqttException me) {System.out.println("reason " + me.getReasonCode());System.out.println("msg " + me.getMessage());System.out.println("loc " + me.getLocalizedMessage());System.out.println("cause " + me.getCause());System.out.println("excep " + me);me.printStackTrace();}}
}
  1. 配置yaml文件
emqx:client:clientId: xt001username: xxxpassword: xxxserverURI: tcp://ip:1883keepAliveInterval: 10connectionTimeout: 30
  1. Emqx配置对象类(EmqxProperties)
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;@Data
@Component
@ConfigurationProperties(prefix = "emqx.client")
public class EmqxProperties {private String clientId;private String username;private String password;private String serverURI;private int keepAliveInterval;private int connectionTimeout;
}
  1. Emqx常量(EmqxConstants)
/*** Emqx常量信息**/
public class EmqxConstants {/** 充电宝插入,柜机发布Topic消息, 服务器监听消息 */public final static String TOPIC_POWERBANK_CONNECTED = "/sys/powerBank/connected";/** 用户扫码,服务器发布Topic消息 柜机监听消息  */public final static String TOPIC_SCAN_SUBMIT = "/sys/scan/submit/%s";/** 充电宝弹出,柜机发布Topic消息,服务器监听消息  */public final static String TOPIC_POWERBANK_UNLOCK = "/sys/powerBank/unlock";/** 柜机属性上报,服务器监听消息  */public final static String TOPIC_PROPERTY_POST = "/sys/property/post";
}

客户端和回调方法

  1. EmqxClientWrapper
import com.share.device.emqx.callback.OnMessageCallback;
import com.share.device.emqx.config.EmqxProperties;
import com.share.device.emqx.constant.EmqxConstants;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class EmqxClientWrapper {@Autowiredprivate EmqxProperties emqxProperties;@Autowiredprivate MqttClient client;@Autowiredprivate OnMessageCallback onMessageCallback;@PostConstructprivate void init() {MqttClientPersistence mqttClientPersistence = new MemoryPersistence();try {//新建客户端 参数:MQTT服务的地址,客户端名称,持久化client = new MqttClient(emqxProperties.getServerURI(), emqxProperties.getClientId(), mqttClientPersistence);// 设置回调client.setCallback(onMessageCallback);// 建立连接connect();} catch (MqttException e) {log.info("MqttClient创建失败");throw new RuntimeException(e);}}public Boolean connect() {// 设置连接的配置try {client.connect(mqttConnectOptions());log.info("连接成功");// 订阅String[] topics = {EmqxConstants.TOPIC_POWERBANK_CONNECTED, EmqxConstants.TOPIC_POWERBANK_UNLOCK, EmqxConstants.TOPIC_PROPERTY_POST};client.subscribe(topics);return true;} catch (MqttException e) {log.info("连接失败");e.printStackTrace();}return false;}/*创建MQTT配置类*/private MqttConnectOptions mqttConnectOptions() {MqttConnectOptions options = new MqttConnectOptions();options.setUserName(emqxProperties.getUsername());options.setPassword(emqxProperties.getPassword().toCharArray());options.setAutomaticReconnect(true);//是否自动重新连接options.setCleanSession(true);//是否清除之前的连接信息options.setConnectionTimeout(emqxProperties.getConnectionTimeout());//连接超时时间options.setKeepAliveInterval(emqxProperties.getKeepAliveInterval());//心跳return options;}/*** 发布消息* @param topic* @param data*/public void publish(String topic, String data) {try {MqttMessage message = new MqttMessage(data.getBytes());message.setQos(2);client.publish(topic, message);} catch (MqttException e) {log.info("消息发布失败");e.printStackTrace();}}}
  1. 回调消息处理类 :OnMessageCallback
    import lombok.extern.slf4j.Slf4j;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.springframework.stereotype.Component;@Slf4j
    @Component
    public class OnMessageCallback implements MqttCallback {@Overridepublic void connectionLost(Throwable cause) {// 连接丢失后,一般在这里面进行重连System.out.println("连接断开,可以做重连");}@Override
    public void messageArrived(String topic, MqttMessage message) {// subscribe后得到的消息会执行到这里面System.out.println("接收消息主题:" + topic);System.out.println("接收消息Qos:" + message.getQos());System.out.println("接收消息内容:" + new String(message.getPayload()));try {// 根据主题选择不同的处理逻辑MassageHandler massageHandler = messageHandlerFactory.getMassageHandler(topic);if(null != massageHandler) {String content = new String(message.getPayload());massageHandler.handleMessage(JSONObject.parseObject(content));}} catch (Exception e) {e.printStackTrace();log.error("mqtt消息异常:{}", new String(message.getPayload()));}
    }@Override
    public void deliveryComplete(IMqttDeliveryToken token) {System.out.println("deliveryComplete---------" + token.isComplete());
    }
    }
    

具体接口和方法处理

  1. 定义策略接口:MassageHandler
public interface MassageHandler {/*** 策略接口* @param message*/void handleMessage(JSONObject message);
}
  1. 具体Handler处理
import java.lang.annotation.*;
// 自定义注解
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface GuiguEmqx {String topic();
}
  1. 充电宝插入处理类:PowerBankConnectedHandler
@Slf4j
@Component
@GuiguEmqx(topic = EmqxConstants.TOPIC_POWERBANK_CONNECTED)
public class PowerBankConnectedHandler implements MassageHandler {@Overridepublic void handleMessage(JSONObject message) {log.info("handleMessage: {}", message.toJSONString());}
}
  1. 充电宝弹出处理类:PowerBankUnlockHandler
@Slf4j
@Component
@GuiguEmqx(topic = EmqxConstants.TOPIC_POWERBANK_UNLOCK)
public class PowerBankUnlockHandler implements MassageHandler {@Overridepublic void handleMessage(JSONObject message) {log.info("handleMessage: {}", message.toJSONString());}
}
  1. 属性上报:PropertyPostHandler
@Slf4j
@Component
@GuiguEmqx(topic = EmqxConstants.TOPIC_PROPERTY_POST)
public class PropertyPostHandler implements MassageHandler {@Overridepublic void handleMessage(JSONObject message) {log.info("handleMessage: {}", message.toJSONString());}
}

处理类

  1. MessageHandlerFactory
public interface MessageHandlerFactory {MassageHandler getMassageHandler(String topic);
}
  1. MessageHandlerFactoryImpl
@Service
public class MessageHandlerFactoryImpl implements MessageHandlerFactory, ApplicationContextAware {private Map<String, MassageHandler> handlerMap = new HashMap<>();/*** 初始化bean对象* @param ioc*/@Overridepublic void setApplicationContext(ApplicationContext ioc) {// 获取对象Map<String, MassageHandler> beanMap = ioc.getBeansOfType(MassageHandler.class);for (MassageHandler massageHandler : beanMap.values()) {GuiguEmqx guiguEmqx = AnnotatedElementUtils.findAllMergedAnnotations(massageHandler.getClass(), GuiguEmqx.class).iterator().next();if (null != guiguEmqx) {String topic = guiguEmqx.topic();// 初始化到maphandlerMap.put(topic, massageHandler);}}}@Overridepublic MassageHandler getMassageHandler(String topic) {return handlerMap.get(topic);}
}

相关文章:

  • Sherpa简介
  • 公务员体检肌酐临界值处理指南
  • 比特率、码元速率(波特率)的定义、关系及相关计算公式
  • 代码随想录算法训练营day5(哈希表)
  • 【Python进阶】字典:高效键值存储的十大核心应用
  • Web开发-JavaEE应用原生和FastJson反序列化URLDNS链JDBC链Gadget手搓
  • 构件技术(高软58)
  • 永磁同步电机控制中,滑模观测器是基于反电动势观测转子速度和角度的?扩展卡尔曼滤波观测器是基于什么观测的?扩展卡尔曼滤波观测器也是基于反电动势吗?
  • 高防CDN、高防IP vs 高防服务器:核心优势与选型指南
  • spring:注解@Component、@Controller、@Service、@Reponsitory
  • 【实施运维】在谷歌浏览器离线安装360浏览器插件
  • C++指针和引用之区别(The Difference between C++Pointers and References)
  • mcp和API区别
  • 【时时三省】(C语言基础)循环结构程序设计
  • 好用的链接
  • frp frp_0.62.0
  • 上门送水小程序区域代理模块框架设计
  • 电脑知识 | TCP通俗易懂详解 <三>tcp首部中ACK、SYN、FIN等信息填写案例_握手时
  • 前端VUE框架理论与应用(10)
  • 【Ragflow】18.更好的推理框架:vLLM的docker部署方式
  • 潍坊营销型网站制作/深圳排名seo
  • 宁夏网站建设优化/百度一下首页网址
  • 做网站怎么弄/谷歌商店下载官网
  • 网站建设职业怎么样/南宁百度关键词优化
  • a站下载安装/seo没什么作用了
  • 政府网站html模板/百度网站客服