一、引入maven包
<!-- MQTT -->
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId><version>5.3.2.RELEASE</version>
</dependency>
二、添加配置文件
#mqtt设置
customer:mqtt:broker: mqtt服务器地址#可以配置多个客户端用户多种任务clientList:#发布消息客户端ID- clientId: #监听主题 同时订阅多个主题使用 - 分割开subscribeTopic: #用户名userName:#密码password:#QOSqos:#接收消息客户端ID- clientId: #监听主题 同时订阅多个主题使用 - 分割开subscribeTopic: #用户名userName:#密码password:#QOSqos:
三、添加配置类以及初始化MQTT服务
/*** Mqtt配置类*/
@Data
@Configuration
@ConfigurationProperties(prefix = "customer.mqtt")
public class MqttConfig {/*** mqtt broker地址*/String broker;/*** 需要创建的MQTT客户端*/List<MqttClient> clientList;
}
/*** MQTT回调抽象类*/
@Slf4j
public abstract class AbsMqttCallBack implements MqttCallback {private String clientId;private final Integer MAX_RECONNECT_ATTEMPTS = 50;private MqttConnectOptions connectOptions;public String getClientId() {return clientId;}public void setClientId(String clientId) {this.clientId = clientId;}public MqttConnectOptions getConnectOptions() {return connectOptions;}public void setConnectOptions(MqttConnectOptions connectOptions) {this.connectOptions = connectOptions;}/*** 失去连接操作,进行重连** @param throwable 异常*/@Overridepublic void connectionLost(Throwable throwable) {log.error("mqtt 连接断开,5S之后尝试重连: {}", throwable.getMessage());try {if (null != clientId) {long reconnectTimes = 1;while (reconnectTimes < MAX_RECONNECT_ATTEMPTS) {try {MqttClient client = MqttClientManager.getMqttClientById(clientId);if (client.isConnected()) {//判断已经重新连接成功 需要重新订阅主题 可以在这个if里面订阅主题 或者 connectComplete(方法里面) 看你们自己选择log.info("mqtt {} 重新连接成功", clientId);if (clientId.equals("call_answer"))MQTTUtils.subscribe(xxx, "xxx", 0);return;}reconnectTimes += 1;log.warn("mqtt尝试重连 连接次数 {}", reconnectTimes);client.reconnect();} catch (MqttException e) {log.error("mqtt断连异常", e);}//等待5秒后重试try {Thread.sleep(5000);} catch (InterruptedException e1) {}}}} catch (Exception e) {log.error("{} reconnect failed!", e);}}/*** 接收订阅消息** @param topic 主题* @param message 接收消息* @throws Exception 异常*/@Overridepublic abstract void messageArrived(String topic, MqttMessage message) throws Exception;/*** 消息发送成功** @param iMqttDeliveryToken toke*/@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
// log.info("消息发送成功");}
}
/*** MQTT回调类*/
@Slf4j
public class CallMqttCallBack extends AbsMqttCallBack {@Overridepublic void messageArrived(String topic, MqttMessage message){log.info("收到" + topic + "主题发布的消息");try {String tmp = new String(message.getPayload());MQTTMessageHandle.addData(tmp);} catch (Exception e) {
// log.error(String.valueOf(e));e.printStackTrace();}}
}
/*** MQTT回调类*/
@Slf4j
public class CallMqttCallBack extends AbsMqttCallBack {@Overridepublic void messageArrived(String topic, MqttMessage message){log.info("收到" + topic + "主题发布的消息");try {String tmp = new String(message.getPayload());MQTTMessageHandle.addData(tmp);} catch (Exception e) {
// log.error(String.valueOf(e));e.printStackTrace();}}
}
/*** MQTT客户端管理类,如果客户端非常多后续可入redis缓存*/
@Slf4j
@Component
public class MqttClientManager {@Value("${customer.mqtt.broker}")private String mqttBroker;/*** 存储MQTT客户端*/public static Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();public static MqttClient getMqttClientById(String clientId) {return MQTT_CLIENT_MAP.get(clientId);}/*** 创建mqtt客户端** @param clientId 客户端ID* @param subscribeTopic 订阅主题* @param userName 用户名,可为空* @param password 密码,可为空* @param qos 消息传递质量* @return mqtt客户端*/public void createMqttClient(String clientId, String subscribeTopic, String userName, String password, Integer qos) {MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(mqttBroker, clientId, persistence);MqttConnectOptions connOpts = new MqttConnectOptions();if (!StringUtil.isNullOrEmpty(userName)) {connOpts.setUserName(userName);}if (!StringUtil.isNullOrEmpty(password)) {connOpts.setPassword(password.toCharArray());}// 是否清空session,设置false表示服务器会保留客户端的连接记录(订阅主题,qos),客户端重连之后能获取到服务器在客户端断开连接期间推送的消息// 设置为true表示每次连接服务器都是以新的身份connOpts.setCleanSession(false);AbsMqttCallBack callBack = new CallMqttCallBack();callBack.setClientId(clientId);callBack.setConnectOptions(connOpts);client.setCallback(callBack);//连接mqtt服务端brokerclient.connect(connOpts);// 订阅主题if (!StringUtil.isNullOrEmpty(subscribeTopic)) {MQTTUtils.subscribe(client, subscribeTopic, qos);}MQTT_CLIENT_MAP.putIfAbsent(clientId, client);} catch (MqttException e) {log.error("Create mqttClient failed!", e);}}
}
/*** MQTT客户端创建*/
@Component
@Slf4j
public class MqttClientCreate {@Resourceprivate MqttClientManager mqttClientManager;@Autowiredprivate MqttConfig mqttConfig;/*** 创建MQTT客户端*/@PostConstructpublic void createMqttClient() {List<MqttClient> mqttClientList = mqttConfig.getClientList();for (MqttClient mqttClient : mqttClientList) {log.info("{}", mqttClient);//创建客户端,客户端ID:demo,回调类跟客户端ID一致mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword(), mqttClient.getQos());}}
}
四、MQTT工具类
@Slf4j
public class MQTTUtils {private static Object object = new Object();/*** 发布消息** @param pushMessage* @param topic* @param qos*/public static void publish(String pushMessage, String topic, int qos, String clientId) {MqttMessage message = new MqttMessage();message.setPayload(pushMessage.getBytes());message.setQos(qos);MqttTopic mqttTopic = MqttClientManager.getMqttClientById(clientId).getTopic(topic);if (null == mqttTopic) {log.error("mqtt连接断开");}MqttDeliveryToken token;//Delivery:配送synchronized (object) {try {token = mqttTopic.publish(message);//也是发送到执行队列中,等待执行线程执行,将消息发送到消息中间件token.waitForCompletion(1000L);} catch (MqttPersistenceException e) {e.printStackTrace();} catch (MqttException e) {e.printStackTrace();}}}/*** 订阅某个主题** @param topic* @param qos*/public static void subscribe(MqttClient client, String topic, Integer qos) {try {//订阅多主题if (topic.contains("-")) {client.subscribe(topic.split("-"));} else {//订阅单主题log.info("订阅主题:"+topic);client.subscribe(topic, qos);}} catch (MqttException e) {log.error("订阅失败!");}}/*** 取消订阅主题** @param topic 主题名称*/public static void cleanTopic(MqttClient client, String topic) {if (client != null && client.isConnected()) {try {client.unsubscribe(topic);} catch (MqttException e) {e.printStackTrace();}} else {log.error("取消订阅失败!");}}
}
五、MQTT处理
@Component
@Slf4j
public class MQTTMessageHandle implementsApplicationListener<ContextRefreshedEvent> {private static BlockingQueue<MessageMsgBase> queue = new LinkedBlockingQueue<>();private List<Thread> workerThreads;private volatile boolean running = false;//通话处理线程数量@Value("${threadNum}")private Integer threadNum;private static Gson gson = new Gson();@Autowiredprivate RedisUtil redisUtil;@Overridepublic void onApplicationEvent(ContextRefreshedEvent event) {startProcessor();}/*** 初始化处理方法*/public void startProcessor() {if (running) return;running = true;System.out.println("启动MQTT消息回复线程");workerThreads = new ArrayList<>();//后面需要几个通话处理线程就改几个for (int i = 0; i < threadNum; i++) {Thread workerThread = new Thread(this::process);workerThread.setName("通话处理线程" + i);workerThreads.add(workerThread);workerThread.start();System.out.println("线程--" + workerThread.getName() + "--启动完毕");}}public void stopProcessor() {running = false;if (workerThreads.size() > 0) {for (Thread workerThread : workerThreads) {workerThread.interrupt();}}}private void process() {while (running) {MessageMsgBase data = null;try {//当队列中有数据就获取data = queue.take();//收到消息后续处理.......} catch (InterruptedException e) {Thread.currentThread().interrupt();System.out.println("处理器被中断");} catch (Exception e) {log.error("处理数据出错: " + e.getMessage());e.printStackTrace();}}}public static void addData(String json) {MessageMsgBase data = JsonUtils.fromJson(json, new TypeToken<MessageMsgBase>() {}.getType());queue.offer(data);}@PreDestroypublic void onDestroy() {stopProcessor();}}