当前位置: 首页 > news >正文

SpringBoot连接MQTT客户端

引入依赖

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
</dependency>

启动类

@SpringBootApplication
public class AxiosDemoApplication implements CommandLineRunner {
    @Autowired
    private MqttService mqttService;
    public static void main(String[] args) {
        SpringApplication.run(AxiosDemoApplication.class, args);
    }
    @Override
    public void run(String... args) throws Exception {
        // 在新线程中启动 MQTT 客户端
        new Thread(() -> mqttService.startMqttClient()).start();
    }
}

MqttServer类

这是一个类,要修改你的主题,ip,用户名密码等

package com.leo.springboothd.service;

import com.leo.springboothd.OnMessageCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MqttService {

    @Autowired
    private OnMessageCallback onMessageCallback;

    public void startMqttClient() {
        String subTopic = "topic";  // 订阅的主题
        String pubTopic = "topic";
        String content = "Hello World66+6";
        int qos = 2;
        String broker = "tcp://ip";  // 你自己的 IP
        String clientId = "java";
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient client = new MqttClient(broker, clientId, persistence);

            // MQTT 连接选项
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName("java");
            connOpts.setPassword("1234".toCharArray());
            // 保留会话
            connOpts.setCleanSession(true);

            // 建立连接
            System.out.println("Connecting to broker: " + broker);
            client.connect(connOpts);

            // 连接成功后设置回调
            client.setCallback(onMessageCallback);

            System.out.println("Connected");
            System.out.println("Publishing message: " + content);

            // 订阅
            client.subscribe(subTopic);

            // 消息发布所需参数
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            client.publish(pubTopic, message);
            System.out.println("Message published");

            // 主循环,保持程序运行
            while (true) {
                try {
                    Thread.sleep(1000); // 每秒检查一次
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }
}

OnMessageCallback类

package com.leo.springboothd;


import com.leo.springboothd.pojo.Mes;
import com.leo.springboothd.service.MesService;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

@Component
public class OnMessageCallback implements MqttCallback {

    private final MesService mesService;

    @Autowired
    public OnMessageCallback(MesService mesService) {
        this.mesService = mesService;
    }

    @Override
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        System.out.println("连接断开,可以做重连");
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        System.out.println("接收消息主题: " + topic);
        System.out.println("接收消息Qos: " + message.getQos());
        System.out.println("接收消息内容: " + new String(message.getPayload()));

        // 将消息封装到 Mes 对象中
        Mes mes = new Mes();
        mes.setId((long) message.getId());
        mes.setTopic(topic);
        mes.setPayload(new String(message.getPayload()));
        mes.setQos(message.getQos());
        mes.setTimestamp(LocalDateTime.now());

        // 保存消息到数据库
        mesService.save(mes);
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }
}

MesService接口

package com.leo.springboothd.service;

import com.leo.springboothd.pojo.Mes;

public interface MesService {
    void save(Mes mes);
}

MesService接口实现类

MesServiceImpl1
package com.leo.springboothd.service.impl;

import com.leo.springboothd.mapper.MesMapper;
import com.leo.springboothd.pojo.Mes;
import com.leo.springboothd.service.MesService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MesServiceImpl1 implements MesService {
    @Autowired
    private MesMapper mesMapper;
    @Override
    public void save(Mes mes) {
        mesMapper.save(mes);
        System.out.println(mes);
    }
}

 MesMapper

package com.leo.springboothd.mapper;

import com.leo.springboothd.pojo.Mes;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;

@Mapper
public interface MesMapper {
    @Insert("INSERT INTO Message (id, topic, payload, qos, timestamp) VALUES (#{id}, #{topic}, #{payload}, #{qos}, #{timestamp})")
    void save(Mes mes);
}

数据库

CREATE TABLE Message (
                         num BIGINT AUTO_INCREMENT PRIMARY KEY,
                         id BIGINT NOT NULL,
                         qos INT NOT NULL,
                         topic VARCHAR(255) NOT NULL,
                         payload TEXT NOT NULL,
                         timestamp DATETIME NOT NULL
)COMMENT='订阅信息表';

相关文章:

  • PODS_ROOT、BUILT_PRODUCTS_DIR和SRCROOT有什么区别
  • js鼠标拖拽 修改el-table表格顺序 vue2 + element-ui js
  • Python 装饰器(Decorator)
  • 解锁 HTML5 表单新力量:<datalist>、<keygen>、<output>元素深度解析
  • Redis 集群(Cluster)
  • python基础:变量-数据类型(整数类型、浮点类型、布尔类型、字符串类型)
  • tree-sitter的grammar.js解惑
  • Java学习手册:Java基本语法与数据类型
  • 众趣科技助力商家“以真示人”,让消费场景更真实透明
  • 深入理解Apache Kafka
  • [特殊字符] Spring Boot 日志系统入门博客大纲(适合初学者)
  • 网络4 OSI7层
  • Vccaux_IO在DDR3接口中的作用
  • K8S-证书过期更新
  • 医药行业的数据安全与加密软件
  • 阿里云服务迁移实战: 01-大纲
  • 禅道MCP Server开发实践与功能全解析
  • [leetcode]stack的基本操作的回顾
  • 大模型本地部署系列(3) Ollama部署QwQ[阿里云通义千问]
  • 前端-Vue3
  • 专做洗衣柜的网站/qq推广引流网站
  • 设计网站做多大合适/免费网站免费
  • 站长工具seo推广/无锡网络推广外包
  • 做外贸必应网站产品曝光/怎么制作一个简单的网页
  • 做技术网站赚钱吗/北京优化互联网公司
  • 长春网站建设厂家/厦门seo测试