当前位置: 首页 > news >正文

MQTT的连接配置以及重连机制和遇到的问题--------求如何修改更加好

今天遇到了一个mqtt的问题,虽然解决了,但是感觉不是很好,希望大家多指点

这是配置文件

customer:
  mqtt:
    broker: tcp://ip:1883
    clientList:
      - clientId: nays_service
        subscribeTopic: xxxxxx
      - clientId: receive_service
        subscribeTopic: xxxxxx

MqttConfig 读取配置文件的

@Data
@Configuration
@ConfigurationProperties(prefix = "customer.mqtt")
public class MqttConfig {
    /**
     * mqtt broker地址
     */
    String broker;
    /**
     * 需要创建的MQTT客户端
     */
    List<MqttClient> clientList;
}

一个MqttClient类用来构造配置文件中的数据对象

@Data
public class MqttClient {
    /**
     * 客户端ID
     */
    private String clientId;
    /**
     * 监听主题
     */
    private String subscribeTopic;
    /**
     * 用户名
     */
    private String userName;
    /**
     * 密码
     */
    private String password;
}

服务运行的时候进行mqtt客户端创建,创建的数据从配置文件中读取

/**
 * MQTT客户端创建
 */
@Component
@Slf4j
public class MqttClientCreate {
    @Resource
    private MqttClientManager mqttClientManager;
    @Resource
    private MqttConfig mqttConfig;

    /**
     * 创建MQTT客户端
     */
    @PostConstruct
    public void createMqttClient() {

        // 会读取配置文件中的clientList
        List<MqttClient> mqttClientList = mqttConfig.getClientList();

        // 遍历去创建
        for (MqttClient mqttClient : mqttClientList) {
            log.info("{}", mqttClient);
            mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic());
        }
    }
}

这是创建的代码,问题很多(请看代码的注释部分)


```java
@Slf4j
@Component
public class MqttClientManager {

    @Value("${customer.mqtt.broker}")
    private String mqttBroker;

    @Resource
    private MqttCallBackContext mqttCallBackContext;
    /**
     * 存储MQTT客户端
     */
    public static Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();

    public MqttClient getMqttClientById(String clientId) {
        return MQTT_CLIENT_MAP.get(clientId);
    }

    /**
     * 创建mqtt客户端
     * @param clientId       客户端ID
     * @param subscribeTopic 订阅主题,可为空
     */
    public void createMqttClient(String clientId, String subscribeTopic) {

        // 它将消息存储在内存中,而不是持久存储到文件或其他存储介质中
        MemoryPersistence persistence = new MemoryPersistence();
        try {
            MqttClient client = new MqttClient(mqttBroker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();

            // 客户端每次连接到 MQTT 服务器时都会被视为一个全新的会话。
            connOpts.setCleanSession(true);

            if (null != subscribeTopic && !subscribeTopic.isEmpty()) {

                AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);

                // 这里的default就是DefaultMqttCallBack, 一开始创建的时候走的就是这个
                // 问题最大的地方在这,通过这样方式拿回来的是同一个对象,hashCode也相同
                // 现在想到的做法是深拷贝,有没有什么好的做法,比如通过构造方法
                if (null == callBack) {
					
					// 一开始这里的操作直接是, 当创建多个客户端的时候拿到的对象都是同一个
					// callback = mqttCallBackContext.getCallBack("default");

                    AbsMqttCallBack original  = mqttCallBackContext.getCallBack("default");
                    callBack = original.deepCopy();

                }
                callBack.setClientId(clientId);
                callBack.setConnectOptions(connOpts);
                client.setCallback(callBack);

            }

            //连接mqtt服务端broker
            client.connect(connOpts);
            log.info("客户端 {} 连接成功状态 {}", clientId, client.isConnected());

            // 订阅主题
            if (null != subscribeTopic && !subscribeTopic.isEmpty()) {
                if (subscribeTopic.contains("-")) {
                    client.subscribe(subscribeTopic.split("-"));
                }
                else {
                    client.subscribe(subscribeTopic);
                }
            }

            MQTT_CLIENT_MAP.putIfAbsent(clientId, client);



        } catch (MqttException e) {
            log.error("创建mqttClient失败!", e);
        }
    }
}

这是用于存储每个mqtt客户端的回调方法类

/**
 * MQTT订阅回调环境类
 */
@Component
@Slf4j
public class MqttCallBackContext {

    // 在 Spring 中,当你注入一个 Map<String, AbsMqttCallBack> 类型的字段时,
    // Spring 会自动将所有实现了 AbsMqttCallBack 接口的 Bean 收集起来,
    // 并将它们的名称作为键值。因此,DefaultMqttCallBack 会被注入到 callBackMap 中,键值为 "default"。
    private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>();

    /**
     * 默认构造函数
     *
     * @param callBackMap 回调集合
     */
    public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) {
        this.callBackMap.putAll(callBackMap);
    }

    /**
     * 获取MQTT回调类
     *
     * @param clientId 客户端ID
     * @return MQTT回调类
     */
    public AbsMqttCallBack getCallBack(String clientId) {
        return this.callBackMap.get(clientId);
    }
}

这里遇到的问题就是mqtt断了之后进行重新连接的机制,在MqttClientManager这个代码中之前的回调类是callback = mqttCallBackContext.getCallBack(“default”);这样拿的,通过hashCode来看,都一样,说明每次创建都会对这个对象进行修改,那么这里赋值的clientId就会变成最后一个创建的mqtt对象id,所以在重连代码中,每次进来的对象虽然是另外一个mqtt客户端,但是拿到的clientid都是同一个,没有办法进行获取和其它的操作

/**
 * MQTT回调抽象类
 */
@Setter
@Getter
@Slf4j
public abstract class AbsMqttCallBack implements MqttCallback {

    private String clientId;
    private MqttConnectOptions connectOptions;

    @Resource
    private MqttConfig mqttConfig;

    @Resource
    private MqttClientManager mqttClientManager;


    /**
     * 失去连接操作,进行重连
     *
     * @param throwable 异常
     */
    @Override
    public void connectionLost(Throwable throwable) {

        log.info("{}失去连接,进行尝试重连", this.clientId);

        MqttClient mqttClient = MqttClientManager.MQTT_CLIENT_MAP.get(clientId);

        String subscribeTopic = mqttConfig.getClientList().stream()
                .filter(item -> item.getClientId().equals(clientId))
                .map(com.ruoyi.web.core.mottconfig.MqttClient::getSubscribeTopic)
                .findFirst()
                .orElse(null);

        if (mqttClient != null) {
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true); // 可以根据实际需求配置

            // 重连的尝试
            int retryCount = 0;
            int maxRetries = 10; // 最大重连次数
            while (retryCount < maxRetries) {
                try {

                    if (mqttClient.isConnected()) {
                        log.info("{} 重连成功", clientId);
                        return;
                    }

                    // 重新连接
                    mqttClient.connect(connOpts);

                    log.info("{} 重连成功", clientId);

                    if (null != subscribeTopic && !subscribeTopic.isEmpty()) {
                        if (subscribeTopic.contains("-")) {
                            mqttClient.subscribe(subscribeTopic.split("-"));
                        }
                        else {
                            mqttClient.subscribe(subscribeTopic);
                        }
                    }

                    break;

                } catch (MqttException e) {
                    retryCount++;
                    log.error("{} 重连失败,尝试第 {} 次重连", clientId, retryCount, e);

                    // 可设置重连间隔,比如等待2秒钟后再尝试重连
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    }
                }
            }

            if (retryCount == maxRetries) {
                log.error("{} 超过最大重连次数,重连失败", clientId);
            }
        }
    }


    /**
     * 接收订阅消息
     * @param topic    主题
     * @param mqttMessage 接收消息
     * @throws Exception 异常
     */
    @Override
    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
		String content = new String(mqttMessage.getPayload(), StandardCharsets.UTF_8);
     	handleReceiveMessage(topic, content);
    }

    /**
     * 消息发送成功
     *
     * @param iMqttDeliveryToken toke
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("消息发送成功");
    }

    /**
     * 处理接收的消息
     * @param topic   主题
     * @param message 消息内容
     */
    protected abstract void handleReceiveMessage(String topic, String message);

    // 深拷贝方法
    public abstract AbsMqttCallBack deepCopy();

}


这里就是 最后的callback实现类进行业务处理

/**
 * 默认回调
 */
@Slf4j
@Component("default")
public class DefaultMqttCallBack extends AbsMqttCallBack {

    @Autowired
    private AlarmListService alarmListService;

    @Autowired
    private OperateService operateService;

    @Autowired
    private INrDeviceService iNrDeviceService;

    //private static final String TOPIC1 = 1;
    /**
     * @param topic   主题
     * @param message 消息内容
     */
    @Override
    protected void handleReceiveMessage(String topic, String message) {
        log.info("订阅的主题---{}", topic);
        log.info("接收到消息---{}", message);
        // 业务操作
    }

    @Override
    public AbsMqttCallBack deepCopy() {
        DefaultMqttCallBack copy = new DefaultMqttCallBack();
        copy.setClientId(this.getClientId());
        copy.setConnectOptions(this.getConnectOptions()); 
        copy.setMqttConfig(this.getMqttConfig());
        copy.setMqttClientManager(this.getMqttClientManager());
        return copy;
    }
}

我是通过深拷贝来做的,应该是可以通过构造方法来,但是对这个整体的代码还是不够熟悉,想看看应该如何优化,还请指点,最好笑的是:这段代码是公司一直使用的,用在了好几个项目上,我真是服了!!!!

相关文章:

  • Flask flash() 消息示例
  • Python大数据可视化:基于Python的王者荣耀战队的数据分析系统设计与实现_flask+hadoop+spider
  • 数据分析和数据挖掘的工作内容
  • ollama 学习笔记
  • 亚马逊企业购大客户业务拓展经理张越:跨境电商已然成为全球零售电商领域中熠熠生辉的强劲增长点
  • 本地安装 Grafana Loki
  • HTTP SSE 实现
  • RabbitMq 基础
  • 贪心算法
  • 前端面试真题 2025最新版
  • ecovadis社会企业责任认证
  • C++面试笔记(持续更新...)
  • Transformer解析——(四)Decoder
  • Modbus协议基础
  • AWS云从业者认证题库 AWS Cloud Practitioner(2.21)
  • 【练习】【回溯:组合:一个集合 元素可重复】力扣 39. 组合总和
  • 如何实现使用DeepSeek的CV模型对管道内模糊、低光照或水渍干扰的图像进行去噪、超分辨率重建。...
  • 推理模型时代:大语言模型如何从对话走向深度思考?
  • java后端开发day18--学生管理系统
  • 多门店协同管理困难重重,管理系统如何破局?
  • 河北住房与城乡建设部网站/怎么设计一个网页
  • 东莞做网站seo/微信引流主动被加软件
  • 政府网站建设要求/有哪些推广平台和渠道
  • 个人网站网页制作/下载百度网盘
  • 注册网站要百度实名认证安不安全/郑州seo优化哪家好
  • 网站主题推荐/seo包括哪些方面