spring通过Spring Integration实现udp通信
1:Spring Integration UDP 的优势
Spring Integration 对 UDP 的封装,让你能通过简单的配置和注解就实现消息的接收、转换、过滤、路由和处理,无需过多关心底层 Socket 的复杂性。它支持单播(Unicast) 和多播(Multicast),并且提供了消息长度检查和确认机制来一定程度提升 UDP 的可靠性。
2:消息处理流程
3:具体实现
1):导入依赖
<!-- spring-integration -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-integration</artifactId><version>2.5.15</version>
</dependency>
<dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-ip</artifactId><version>5.5.18</version>
</dependency>
2)在配置文件中配置udp端口
# 测试udp
cs:udp:# 接收端口receive-port: 1234# 发送端口send-port: 1235
3)创建消息处理流程
package **.**;import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.Filter;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.ip.dsl.Udp;
import org.springframework.integration.ip.udp.UnicastSendingMessageHandler;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;import java.nio.charset.StandardCharsets;
import java.util.Map;@Configuration
public class UdpIntegrationConfig {@Value("${cs.udp.receive-port}")private Integer recivePort;@Value("${cs.udp.send-port}")private Integer sendPort;/*** 定义接收流程*/@Beanpublic IntegrationFlow udpInboundFlow() {return IntegrationFlows.from(Udp.inboundAdapter(recivePort)) // 创建UDP入站适配器,监听指定端口.channel("udpChannel") // 指定接收通道.get();}/*** 转换器 (Transformer)* 将接收到的 byte[] payload 转换为 String* 输出到 udpFilterChannel*/@Transformer(inputChannel = "udpChannel", outputChannel = "udpFilterChannel")public String transformer(@Payload byte[] payload, @Headers Map<String, Object> headers) {String message = new String(payload, StandardCharsets.UTF_8); // 指定编码,避免乱码// 这里可以进行必要的转换,例如JSON解析等System.out.println("----------------------------" + message);return message;}/*** 过滤器 (Filter)* 决定哪些消息需要继续处理* 输出到 udpRouterChannel*/@Filter(inputChannel = "udpFilterChannel", outputChannel = "udpRouterChannel")public boolean filter(String message, @Headers Map<String, Object> headers) {// 获取来源信息,可用于过滤String sourceIp = headers.get("ip_address").toString();String sourcePort = headers.get("ip_port").toString();// 示例:简单过滤掉空消息或包含特定关键字的消息if (message == null || message.trim().isEmpty() || message.contains("DROP")) {return false; // 消息将被丢弃}return true; // 消息继续向下流转}/*** 路由器 (Router)* 根据消息内容或头信息决定将消息发送到哪个处理器*/@Router(inputChannel = "udpRouterChannel")public String router(String message, @Headers Map<String, Object> headers) {if (message.startsWith("ORDER:")) {return "udpOrderHandleChannel"; // 路由到订单处理器} else if (message.startsWith("LOG:")) {return "udpLogHandleChannel"; // 路由到日志处理器} else {return "udpDefaultHandleChannel"; // 路由到默认处理器}}/*** 最终处理器 (Service Activator) - 处理订单消息*/@ServiceActivator(inputChannel = "udpOrderHandleChannel")public void handleOrder(String message) {// 这里处理订单逻辑System.out.println("Processing order: " + message);}/*** 最终处理器 (Service Activator) - 处理日志消息*/@ServiceActivator(inputChannel = "udpLogHandleChannel")public void handleLog(String message) {// 这里处理日志逻辑System.out.println("Processing log: " + message);}/*** 最终处理器 (Service Activator) - 处理默认消息:cite[3]* 通常这是你最常用的处理器*/@ServiceActivator(inputChannel = "udpDefaultHandleChannel")public void handleDefaultMessage(String message, @Headers Map<String, Object> headers) {// 这里是你的核心业务逻辑String sourceIp = headers.get("ip_address").toString();String sourcePort = headers.get("ip_port").toString();System.out.println("Received UDP message from " + sourceIp + ":" + sourcePort + " - " + message);// TODO: 调用你的业务Service处理消息}/*** 配置 UDP 发送处理器 (出站适配器)*/@Beanpublic UnicastSendingMessageHandler udpSendingMessageHandler() {// 参数:目标主机、目标端口、是否启用长度检查(建议true)// TODO 正式使用时,此处需修改为发送端口return new UnicastSendingMessageHandler("localhost", recivePort, true);}}
4)创建发送udp消息的接口
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.ip.udp.UnicastSendingMessageHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
@RequestMapping("/udp")
public class UdpSendController {@Autowiredprivate UnicastSendingMessageHandler udpSendingMessageHandler;@PostMapping("/send")public String sendUdpMessage(@RequestParam String message) {// 构建消息并发送Message<byte[]> messageToSend = MessageBuilder.withPayload(message.getBytes()).build();udpSendingMessageHandler.handleMessage(messageToSend);return "Message sent: " + message;}}
5)测试
启动项目,调用发送接口,看控制台是否能正确输出内容