不在需要mqtt依赖 也不需要依赖外部mqtt服务,就可以直接接收设备服务进行直连服务。
package com.cqcloud.platform.config;import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import com.cqcloud.platform.handler.NettyServerFactory;
import com.cqcloud.platform.service.NettyService;
import io.netty.bootstrap.ServerBootstrap;
/*** @author weimeilayer@gmail.com ✨* @date 💓💕 2025年9月1日 🐬🐇 💓💕*/
@Configuration
@ConditionalOnClass(ServerBootstrap.class)
@EnableConfigurationProperties(NettyServerProperties.class)
@ConditionalOnProperty(prefix = "netty.server", name = "enabled", havingValue = "true", matchIfMissing = true)
public class NettyServerAutoConfiguration {@Bean@ConditionalOnMissingBeanpublic NettyServerFactory nettyServerFactory(NettyServerProperties properties, NettyService nettyService) {return new NettyServerFactory(properties, nettyService);}
}
package com.cqcloud.platform.config;
import org.springframework.boot.context.properties.ConfigurationProperties;/*** @author weimeilayer@gmail.com ✨* @date 💓💕 2025年9月1日 🐬🐇 💓💕*/
@ConfigurationProperties(prefix = "netty.server")
public class NettyServerProperties {private int port = 1883;private boolean enabled = true;private int bossThreads = 1;private int workerThreads = 0; // 0表示使用默认public int getPort() {return port;}public void setPort(int port) {this.port = port;}public boolean isEnabled() {return enabled;}public void setEnabled(boolean enabled) {this.enabled = enabled;}public int getBossThreads() {return bossThreads;}public void setBossThreads(int bossThreads) {this.bossThreads = bossThreads;}public int getWorkerThreads() {return workerThreads;}public void setWorkerThreads(int workerThreads) {this.workerThreads = workerThreads;}
}
package com.cqcloud.platform.handler;import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import com.cqcloud.platform.config.NettyServerProperties;
import com.cqcloud.platform.service.NettyService;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.nio.charset.StandardCharsets;/*** @author weimeilayer@gmail.com ✨* @date 💓💕 2025年9月1日 🐬🐇 💓💕*/
public class NettyServerFactory implements InitializingBean, DisposableBean {private final NettyServerProperties properties;private final NettyService nettyService;private EventLoopGroup bossGroup;private EventLoopGroup workerGroup;private ChannelFuture channelFuture;public NettyServerFactory(NettyServerProperties properties, NettyService nettyService) {this.properties = properties;this.nettyService = nettyService;}@Overridepublic void afterPropertiesSet() throws Exception {if (!properties.isEnabled()) {return;}// 配置线程组bossGroup = properties.getBossThreads() > 0 ?new NioEventLoopGroup(properties.getBossThreads()) :new NioEventLoopGroup();workerGroup = properties.getWorkerThreads() > 0 ?new NioEventLoopGroup(properties.getWorkerThreads()) :new NioEventLoopGroup();try {ServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ChannelPipeline pipeline = ch.pipeline();// 添加字符串编解码器pipeline.addLast(new StringDecoder(StandardCharsets.UTF_8));pipeline.addLast(new StringEncoder(StandardCharsets.UTF_8));// 添加自定义处理器pipeline.addLast(new TcpMqttServerHandler(nettyService));}});channelFuture = bootstrap.bind(properties.getPort()).sync();System.out.println("Netty TCP Server started on port: " + properties.getPort());// 异步关闭监听channelFuture.channel().closeFuture().addListener(future -> {System.out.println("Netty TCP Server channel closed");});} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RuntimeException("Netty server start interrupted", e);} catch (Exception e) {destroy();throw new RuntimeException("Failed to start Netty server", e);}}@Overridepublic void destroy() {if (channelFuture != null) {channelFuture.channel().close();}if (bossGroup != null) {bossGroup.shutdownGracefully();}if (workerGroup != null) {workerGroup.shutdownGracefully();}System.out.println("Netty TCP Server resources released");}public boolean isRunning() {return channelFuture != null && channelFuture.channel().isActive();}
}
package com.cqcloud.platform.handler;import java.util.List;
import com.cqcloud.platform.service.NettyService;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;/*** 物联网云平台设备协议** @author weimeilayer@gmail.com ✨* @date 💓💕 2024年9月23日 🐬🐇 💓💕*/
public class TcpMqttServerHandler extends SimpleChannelInboundHandler<String> {// 接口注入private final NettyService iotPushService;public TcpMqttServerHandler(NettyService iotPushService) {this.iotPushService = iotPushService;}@Overrideprotected void channelRead0(ChannelHandlerContext ctx, String message) throws Exception {if (message == null || message.trim().isEmpty()) {return;}System.out.println("接收到物联网云平台设备协议数据: " + message);// 将消息传递给 iotPushService 进行逻辑处理// 同时传递 ChannelHandlerContext 用于后续响应List<String> responseCommands = iotPushService.pushMessageArrived(message, ctx);// 如果有返回的响应指令,则发送if (responseCommands != null && !responseCommands.isEmpty()) {responseCommands.forEach(command -> {System.out.println("下发指令: " + command);sendJsonResponse(ctx, command);});}}// 发送JSON响应的统一辅助方法private void sendJsonResponse(ChannelHandlerContext ctx, String jsonResponse) {try {// 直接将JSON字符串转换为字节数组byte[] responseBytes = jsonResponse.getBytes(java.nio.charset.StandardCharsets.UTF_8);ByteBuf responseBuffer = Unpooled.copiedBuffer(responseBytes);ctx.writeAndFlush(responseBuffer);System.out.println("JSON响应发送成功: " + jsonResponse);} catch (Exception e) {System.err.println("JSON响应发送失败: " + e.getMessage());e.printStackTrace();}}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {System.out.println("客户端连接: " + ctx.channel().remoteAddress());super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {System.out.println("客户端断开: " + ctx.channel().remoteAddress());super.channelInactive(ctx);}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {System.err.println("通道异常: " + ctx.channel().remoteAddress());cause.printStackTrace();ctx.close();}
}
package com.cqcloud.platform.service;import io.netty.channel.ChannelHandlerContext;
import java.util.List;
/*** @author weimeilayer@gmail.com* @date 💓💕2024年9月8日🐬🐇💓💕*/
public interface NettyService {/*** 处理接收到的消息** @param message 接收到的字符串消息* @param ctx ChannelHandlerContext 用于发送响应* @return 需要下发的响应指令列表,如果没有响应则返回null或空列表*/List<String> pushMessageArrived(String message, ChannelHandlerContext ctx);
}
package com.cqcloud.platform.service.impl;import com.cqcloud.platform.service.NettyService;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import io.netty.channel.ChannelHandlerContext;
import lombok.AllArgsConstructor;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;/*** @author weimeilayer@gmail.com* @date 💓💕2024年9月8日🐬🐇💓💕*/
@Service
@AllArgsConstructor
public class NettyServiceImpl implements NettyService {@Overridepublic List<String> pushMessageArrived(String message, ChannelHandlerContext ctx) {System.out.println("Service层处理消息: " + message);List<String> responses = new ArrayList<>();try {// 尝试解析为JSON格式的MQTT消息JsonObject jsonMessage = JsonParser.parseString(message).getAsJsonObject();String cmd = jsonMessage.get("cmd").getAsString();String deviceId = jsonMessage.get("devId").getAsString();System.out.println("解析到命令: " + cmd + ", 设备ID: " + deviceId);// 根据不同的命令类型处理switch (cmd) {// 处理车牌识别结果case "plateResult":responses = handlePlateResult(jsonMessage, deviceId);break;// 处理心跳消息case "heartbeat":responses = handleHeartbeat(jsonMessage, deviceId);break;// 处理IO状态变化case "ioInStatus":responses = handleIoStatus(jsonMessage, deviceId);break;// 处理RS485数据case "rs485DataPut":responses = handleRs485Data(jsonMessage, deviceId);break;// 处理设备信息case "devVerInfo":responses = handleDeviceInfo(jsonMessage, deviceId);break;// 处理未知命令default:System.out.println("未知命令类型: " + cmd);responses = handleUnknownCommand(jsonMessage, deviceId);}} catch (Exception e) {// 如果不是JSON格式,按原始数据处理System.out.println("消息不是JSON格式,按原始数据处理");responses = handleRawMessage(message);}return responses;}/*** 处理车牌识别结果消息* @param message 包含车牌识别结果的JSON对象,应包含content字段,content中包含plateNum和isWhitelist* @param deviceId 设备ID* @return 响应命令列表,根据是否为白名单车辆返回开闸指令或收费指令*/private List<String> handlePlateResult(JsonObject message, String deviceId) {List<String> responses = new ArrayList<>();JsonObject content = message.get("content").getAsJsonObject();String plateNum = content.get("plateNum").getAsString();boolean isWhitelist = content.get("isWhitelist").getAsBoolean();System.out.println("处理车牌识别结果: " + plateNum + ", 白名单: " + isWhitelist);// 根据业务逻辑生成响应if (isWhitelist) {// 白名单车辆,发送开闸指令responses.add(generateOpenGateCommand(message.get("msgId").getAsString()));} else {// 非白名单车辆,发送收费指令responses.add(generateChargeCommand(plateNum, message.get("msgId").getAsString()));}return responses;}/*** 处理设备心跳消息** @param message 心跳消息内容,类型为JsonObject* @param deviceId 设备唯一标识符* @return 包含心跳响应消息的字符串列表*/private List<String> handleHeartbeat(JsonObject message, String deviceId) {List<String> responses = new ArrayList<>();System.out.println("处理心跳消息,设备: " + deviceId);// 调用生成心跳响应的方法String heartbeatResponse = generateHeartbeatResponse(message);responses.add(heartbeatResponse);return responses;}/*** 处理IO状态变化消息** @param message 包含IO状态信息的JSON对象* @param deviceId 设备标识符* @return 响应消息列表*/private List<String> handleIoStatus(JsonObject message, String deviceId) {List<String> responses = new ArrayList<>();System.out.println("处理IO状态变化,设备: " + deviceId);// 处理IO状态逻辑return responses;}/*** 处理RS485数据** @param message 包含RS485数据的JSON对象* @param deviceId 设备ID* @return 响应消息列表*/private List<String> handleRs485Data(JsonObject message, String deviceId) {List<String> responses = new ArrayList<>();System.out.println("处理RS485数据,设备: " + deviceId);// 处理RS485数据逻辑return responses;}/*** 处理设备信息** @param message 包含设备信息的JSON对象* @param deviceId 设备标识符* @return 处理结果的响应列表*/private List<String> handleDeviceInfo(JsonObject message, String deviceId) {List<String> responses = new ArrayList<>();System.out.println("处理设备信息,设备: " + deviceId);// 处理设备信息逻辑return responses;}/*** 处理未知命令的消息** @param message 包含命令信息的JSON对象* @param deviceId 发送命令的设备ID* @return 包含错误响应的字符串列表*/private List<String> handleUnknownCommand(JsonObject message, String deviceId) {List<String> responses = new ArrayList<>();System.out.println("处理未知命令,设备: " + deviceId);// 返回错误响应String errorResponse = generateErrorResponse(message, "unknown_command");responses.add(errorResponse);return responses;}/*** 处理原始消息并生成响应列表** @param message 待处理的原始消息字符串* @return 包含处理结果的字符串列表*/private List<String> handleRawMessage(String message) {List<String> responses = new ArrayList<>();System.out.println("处理原始消息: " + message);// 处理非JSON格式的原始消息return responses;}// ============== 响应生成方法 ==============/*** 生成心跳响应(符合文档7.4.2格式)*/private String generateHeartbeatResponse(JsonObject message) {String msgId = message.get("msgId").getAsString();JsonObject response = new JsonObject();response.addProperty("cmd", "heartbeatRsp");response.addProperty("msgId", msgId); // 使用原始消息的msgIdresponse.addProperty("status", "ok");return response.toString();}/*** 生成开闸指令(符合文档8.1.1格式)*/private String generateOpenGateCommand(String originalMsgId) {long currentTime = System.currentTimeMillis() / 1000;JsonObject request = new JsonObject();request.addProperty("cmd", "ioOutput");request.addProperty("msgId", generateMessageId());request.addProperty("utcTs", currentTime);JsonObject gpioData = new JsonObject();gpioData.addProperty("ioNum", "io1");gpioData.addProperty("action", "on");request.add("gpioData", gpioData);return request.toString();}/*** 生成LCD显示指令(符合文档8.21.1格式)*/private String generateChargeCommand(String plateNum, String originalMsgId) {long currentTime = System.currentTimeMillis() / 1000;JsonObject request = new JsonObject();request.addProperty("cmd", "lcdShowInfo");request.addProperty("msgId", generateMessageId());request.addProperty("utcTs", currentTime);JsonObject showInfo = new JsonObject();showInfo.addProperty("textType", "plateLine");// 车牌信息JsonObject plateInfo = new JsonObject();plateInfo.addProperty("plateNum", plateNum);plateInfo.addProperty("textColor", "FF0000");showInfo.add("plateInfo", plateInfo);// 行信息JsonArray lineInfo = new JsonArray();JsonObject line1 = new JsonObject();line1.addProperty("lineText", "请缴费 5 元");line1.addProperty("fontSize", "large");line1.addProperty("textColor", "FF0000");lineInfo.add(line1);showInfo.add("lineInfo", lineInfo);showInfo.addProperty("qrcodeUrl", "http://xxx.com/pay");request.add("showInfo", showInfo);// 语音信息JsonObject voiceInfo = new JsonObject();voiceInfo.addProperty("voiceText", "请缴费5元");request.add("voiceInfo", voiceInfo);return request.toString();}/*** 生成错误响应*/private String generateErrorResponse(JsonObject message, String errorType) {String msgId = message.has("msgId") ? message.get("msgId").getAsString() : generateMessageId();JsonObject response = new JsonObject();response.addProperty("cmd", "errorRsp");response.addProperty("msgId", msgId);response.addProperty("status", errorType);return response.toString();}/*** 生成符合文档格式的消息ID(20位:13位毫秒时间+7位随机数)*/private String generateMessageId() {long millis = System.currentTimeMillis();String random = String.format("%07d", new Random().nextInt(10000000));return millis + random;}
}
yml配置 application.yml
netty:server:enabled: trueport: 1883boss-threads: 2worker-threads: 8
