SpringBoot连接MQTT客户端
引入依赖
<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.2</version> </dependency>
启动类
@SpringBootApplication
public class AxiosDemoApplication implements CommandLineRunner {
@Autowired
private MqttService mqttService;
public static void main(String[] args) {
SpringApplication.run(AxiosDemoApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
// 在新线程中启动 MQTT 客户端
new Thread(() -> mqttService.startMqttClient()).start();
}
}
MqttServer类
这是一个类,要修改你的主题,ip,用户名密码等
package com.leo.springboothd.service;
import com.leo.springboothd.OnMessageCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MqttService {
@Autowired
private OnMessageCallback onMessageCallback;
public void startMqttClient() {
String subTopic = "topic"; // 订阅的主题
String pubTopic = "topic";
String content = "Hello World66+6";
int qos = 2;
String broker = "tcp://ip"; // 你自己的 IP
String clientId = "java";
MemoryPersistence persistence = new MemoryPersistence();
try {
MqttClient client = new MqttClient(broker, clientId, persistence);
// MQTT 连接选项
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("java");
connOpts.setPassword("1234".toCharArray());
// 保留会话
connOpts.setCleanSession(true);
// 建立连接
System.out.println("Connecting to broker: " + broker);
client.connect(connOpts);
// 连接成功后设置回调
client.setCallback(onMessageCallback);
System.out.println("Connected");
System.out.println("Publishing message: " + content);
// 订阅
client.subscribe(subTopic);
// 消息发布所需参数
MqttMessage message = new MqttMessage(content.getBytes());
message.setQos(qos);
client.publish(pubTopic, message);
System.out.println("Message published");
// 主循环,保持程序运行
while (true) {
try {
Thread.sleep(1000); // 每秒检查一次
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (MqttException me) {
System.out.println("reason " + me.getReasonCode());
System.out.println("msg " + me.getMessage());
System.out.println("loc " + me.getLocalizedMessage());
System.out.println("cause " + me.getCause());
System.out.println("excep " + me);
me.printStackTrace();
}
}
}
OnMessageCallback类
package com.leo.springboothd;
import com.leo.springboothd.pojo.Mes;
import com.leo.springboothd.service.MesService;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@Component
public class OnMessageCallback implements MqttCallback {
private final MesService mesService;
@Autowired
public OnMessageCallback(MesService mesService) {
this.mesService = mesService;
}
@Override
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
System.out.println("连接断开,可以做重连");
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
// subscribe后得到的消息会执行到这里面
System.out.println("接收消息主题: " + topic);
System.out.println("接收消息Qos: " + message.getQos());
System.out.println("接收消息内容: " + new String(message.getPayload()));
// 将消息封装到 Mes 对象中
Mes mes = new Mes();
mes.setId((long) message.getId());
mes.setTopic(topic);
mes.setPayload(new String(message.getPayload()));
mes.setQos(message.getQos());
mes.setTimestamp(LocalDateTime.now());
// 保存消息到数据库
mesService.save(mes);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}
MesService接口
package com.leo.springboothd.service;
import com.leo.springboothd.pojo.Mes;
public interface MesService {
void save(Mes mes);
}
MesService接口实现类
MesServiceImpl1
package com.leo.springboothd.service.impl;
import com.leo.springboothd.mapper.MesMapper;
import com.leo.springboothd.pojo.Mes;
import com.leo.springboothd.service.MesService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MesServiceImpl1 implements MesService {
@Autowired
private MesMapper mesMapper;
@Override
public void save(Mes mes) {
mesMapper.save(mes);
System.out.println(mes);
}
}
MesMapper
package com.leo.springboothd.mapper;
import com.leo.springboothd.pojo.Mes;
import org.apache.ibatis.annotations.Insert;
import org.apache.ibatis.annotations.Mapper;
@Mapper
public interface MesMapper {
@Insert("INSERT INTO Message (id, topic, payload, qos, timestamp) VALUES (#{id}, #{topic}, #{payload}, #{qos}, #{timestamp})")
void save(Mes mes);
}
数据库
CREATE TABLE Message (
num BIGINT AUTO_INCREMENT PRIMARY KEY,
id BIGINT NOT NULL,
qos INT NOT NULL,
topic VARCHAR(255) NOT NULL,
payload TEXT NOT NULL,
timestamp DATETIME NOT NULL
)COMMENT='订阅信息表';