当前位置: 首页 > news >正文

Java-代码段-http接口调用自身服务中的其他http接口(mock)-并建立socket连接发送和接收报文实例

最新版本更新
https://code.jiangjiesheng.cn/article/367?from=csdn

推荐 《高并发 & 微服务 & 性能调优实战案例100讲 源码下载》

1. controller入口

    @ApiOperation("模拟平台端+现场机socket交互过程,需要Authorization")@PostMapping(path = "/testSocketBusiness")public ResponseBean<HashMap<Object, Object>> testSocketBusiness(@RequestBody SocketTestClient.SocketAllParams socketAllParams) {// 建立socket连接  "\n")HashMap<Object, Object> res = socketTestClient.doTestAll(socketAllParams.getFirstUrlRequest(), socketAllParams.getSocketMsgList(), socketAllParams.getAutoUseNowDataTime());return new ResponseBean<>(200, "请求结束", res);}

1.1 接口入参示例及说明

{"firstUrlRequestxxxxx": {"url": "/your http api/remoteControl","method": "PUT","urlParams": {"monitorId": 3319,"cn": "3020","polId": "md0501","infoId": "i42002"}},"socketMsgList": ["ST=32;CN=3020;PW=123456;MN=887799;Flag=5;CP=&&DataTime=20250529105000;PolId=md0501;DT=201;VaseNo=4;i33022-Info=0;i33028-Info=1&&"],"autoUseNowDataTime": true,"【示例非必要不改动】反控入参示例(前不要长度、不要QN,后不要结尾,DataTime是否自动更新取决于autoUseNowDataTime)": {"firstUrlRequest": {"url": "/your http api/remoteControl","method": "PUT","urlParams": {"monitorId": 3319,"cn": "3020","polId": "md0501","infoId": "i42002"}},"socketMsgList": ["ST=91;CN=9011;PW=123456;MN=7899871;Flag=4;CP=&&QnRtn=1&&","ST=32;CN=3020;PW=123456;MN=7899871;Flag=4;CP=&&DataTime=20250528111758;PolId=md0501;i42002-Info=2&&","ST=91;CN=9012;PW=123456;MN=7899871;Flag=4;CP=&&ExeRtn=1&&"],"autoUseNowDataTime": true},"【示例非必要不改动】监测数据入参示例(前不要长度、不要QN,后不要结尾,DataTime是否自动更新取决于autoUseNowDataTime)": {"socketMsgList": ["ST=32;CN=2011;PW=123456;MN=7899871;Flag=5;CP=&&DataTime=20250527105200;w01018-Rtd=8.8,w00000-Flag=N;w01018-Rtd=444.7,w01018-SampleTime=20250527105100,w01018-Flag=D&&"],"autoUseNowDataTime": false},"一次采集日志:": ["tail -200f /home/logs/yourProject/collect/connect.log","tail -200f /home/logs/yourProject/collect/receive.log [重点]","tail -200f /home/logs/yourProject/collect/send.log","tail -200f /home/logs/yourProject/debug.log [重点]","tail -200f /home/logs/yourProject/error.log"]
}

1.2 接口出参示例

{"code": 200,"msg": "请求结束","data": {"请求结束,当期是监测数据上传模式,返回qn列表:": ["20250529132932726"]},"timestamp": "2025-05-29 13:29:36","traceId": "cb9557eaed1543aa8749bf2a545816ef"
}

2. mock test方式调用自身controller下的http接口并建立socket连接发送和接收报文实例

SocketTestClient.java

package cn.jiangjiesheng.code.service.common;import cn.hutool.core.date.DateUtil;
import cn.hutool.json.JSONUtil;
import cn.jiangjiesheng.code.core.utils.ControllerInvoker;
import cn.jiangjiesheng.code.core.utils.HttpServletUtil;
import cn.jiangjiesheng.code.core.utils.StringUtils;
import cn.jiangjiesheng.code.exception.GnException;
import com.google.api.client.util.Lists;
import com.google.common.collect.Maps;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;import java.io.*;
import java.net.Socket;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;@Component
@Slf4j
public class SocketTestClient {// 配置参数:socket服务器IP/端口@Value("${ecp-collector.url}")private String socketIP;private static final int SOCKET_PORT = 16010;// 复用socketprivate static volatile Socket socket;private static final Object lock = new Object();// 建立连接使用,核心是PW=123456;MN=7899871private static final String BUILD_MN_CONNECT_FIRST_TIME = "QN=20250523094003516;ST=32;CN=2011;PW=%s;MN=%s;Flag=5;CP=&&DataTime=20250525093100;w00000-Rtd=0.00;&&4540"; // 固定内容// 发送间隔(毫秒)private static final long SEND_INTERVAL = 500;@Autowiredprivate ControllerInvoker controllerInvoker;/*** 模拟云平台和现场机交互的接口* (整个交互的多条报文在一个接口入参中写完,就不用通过socket工具来测试了,* 一次采集一套流程好像还有10秒内的限制,通过接口调用就没这些问题了。)** @param urlObj             模拟第一次业务触发* @param socketMsgList      其他交互报文* @param autoUseNowDataTime 是否自动更新DataTime,默认是* @return 返回qn*/public HashMap<Object, Object> doTestAll(FirstUrlRequest urlObj, List<String> socketMsgList, Boolean autoUseNowDataTime) {//反控的qnString qn = null;boolean isReverseControlMode = false;//记录下qn,返回便于查验List<String> qnList = Lists.newArrayList();try {//单例模式复用socketconnectToServer();if (CollectionUtils.isNotEmpty(socketMsgList)) {// 获取输出流(用于发送数据)OutputStream out = socket.getOutputStream();// 字符集String CHARSET = "UTF-8";BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out, CHARSET));//触发第1个接口请求if (urlObj != null && StringUtils.isNotBlank(urlObj.getUrl())) {//发送建立连接使用String msgOne = socketMsgList.get(0);// 正则匹配 PW 和 MN 的值String pw = extractValue(msgOne, "PW=([^;]+);");String mn = extractValue(msgOne, "MN=([^;]+);");log.info("doTestAll,PW = {}", mn);log.info("doTestAll,MN = {}", mn);sendMessage(writer, handleFinalMsg(String.format(BUILD_MN_CONNECT_FIRST_TIME, pw, mn)));//还要有首选String authorization = getRequest().getHeader("Authorization");if (StringUtils.isBlank(authorization)) {throw new GnException("请添加Authorization头");}String result = null;try {try {//这里要等待建立好mn连接,必须,否则会报"链接不可用"Thread.sleep(SEND_INTERVAL);} catch (InterruptedException e) {Thread.currentThread().interrupt();throw new GnException("发送线程被中断");}// 直接post get 方式调用会阻塞//  result = HttpConnectionsUtils.requestWithBody(urlObj.getUrl(), urlObj.getMethod(), null, JSONUtil.toJsonStr(urlObj.getUrlParams()), authorization);result = controllerInvoker.invokeController(urlObj.getUrl(), urlObj.getMethod(), JSONUtil.toJsonStr(urlObj.getUrlParams()), authorization);} catch (Exception e) {throw new GnException("当前反控接口调用失败:" + e.getMessage());}qn = JSONUtil.parseObj(result).getStr("data", "");if (StringUtils.isBlank(qn)) {throw new GnException("当前反控接口调用失败");}}//替换qn 和 DataTimeString datetime = null;if (autoUseNowDataTime == null) {autoUseNowDataTime = true;}if (autoUseNowDataTime) {datetime = DateUtil.format(new Date(), "yyyyMMddHHmmss");}//是否是反控模式isReverseControlMode = qn != null;//执行其他报文请求for (String msg : socketMsgList) {if (isReverseControlMode) {addQnList(qnList, qn);} else {//监测数据上传,这里重新生成qnqn = DateUtil.format(new Date(), "yyyyMMddHHmmssSSS");addQnList(qnList, qn);}if (msg.startsWith("QN=")) {msg = msg.replaceFirst("QN=[^;]+;", String.format("QN=%s;", qn));} else {msg = String.format("QN=%s;", qn) + msg;}if (datetime != null && msg.contains("DataTime=")) {msg = msg.replaceFirst("DataTime=[^;]+;", String.format("DataTime=%s;", datetime));}msg = handleFinalMsg(msg);try {Thread.sleep(SEND_INTERVAL);} catch (InterruptedException e) {Thread.currentThread().interrupt();log.info("发送线程被中断");break;}sendMessage(writer, msg);}// writer.close(); socket也会关闭}HashMap<Object, Object> qnMap = Maps.newHashMap();String key = String.format("请求结束,%s,返回qn列表:", isReverseControlMode ? "当前是反控模式(1个qn)" : "当期是监测数据上传模式");qnMap.put(key, qnList);return qnMap;} catch (IOException e) {log.info("连接或通信异常: " + e.getMessage());throw new GnException("连接或通信异常: " + e.getMessage());} catch (Exception e) {throw new GnException("当前反控接口调用失败:" + e.getMessage());} finally {//disconnect();}}private static void addQnList(List<String> qnList, String qn) {if (!qnList.contains(qn)) {qnList.add(qn);}}/*** 处理成最终的报文格式** @param msg* @return*/private static String handleFinalMsg(String msg) {//判断是否需要组装成完整的报文if (msg.startsWith("QN")) {// 找到最后一个 "&&" 的位置int index = msg.lastIndexOf("&&");if (index > 0 && !msg.endsWith("&&")) {//先截断msg = msg.substring(0, index + 2); // 保留 "&&}int length = msg.length();String msgStart = String.format("##%04d", length);msg = msgStart + msg;}//这个应该紧跟上面的逻辑,拼接的内容好像不重要if (msg.endsWith("&&")) {msg += "4540";}return msg;}/*** 获取当前请求的request对象** @author xuyuxiang* @date 2020/3/30 15:10*/public static HttpServletRequest getRequest() {ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();if (requestAttributes == null) {throw new GnException("请求的参数异常");} else {return requestAttributes.getRequest();}}// 提取方法private static String extractValue(String input, String regex) {Pattern pattern = Pattern.compile(regex);Matcher matcher = pattern.matcher(input);if (matcher.find()) {return matcher.group(1);}return null;}// 封装发送方法:添加 \r\n 并刷新private void sendMessage(BufferedWriter writer, String message) {try {// 添加回车换行 ,应该只要 \nwriter.write(message + "\r\n");writer.flush();log.info("doTestAll,已发送报文 {}", message);} catch (IOException e) {log.info("doTestAll,发送报文失败 {}", message);}}// 启动一个线程接收服务器响应(可选)private void startServerResponseThread(Socket socket) {new Thread(() -> {try (InputStream in = socket.getInputStream();BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {String responseLine;while ((responseLine = reader.readLine()) != null) {log.info(" 收到来自服务器的报文: " + responseLine);}} catch (IOException e) {log.info(" 接收服务器报文失败: " + e.getMessage());}}).start();}/*** 单例模式复用socket*/public void connectToServer() {if (socket == null || socket.isClosed()) {//|| socket.isConnected()只表示这个 Socket 对象是否曾经成功连接过一次。synchronized (lock) {if (socket == null || socket.isClosed()) {//|| socket.isConnected()只表示这个 Socket 对象是否曾经成功连接过一次。try {socket = new Socket(socketIP, SOCKET_PORT);// 可以在这里配置 socket 参数,如设置超时时间log.info("成功连接到socket服务器:{}:{}", socketIP, SOCKET_PORT);SocketMessageReceiver receiver = new SocketMessageReceiver(socket, new MessageHandler() {@Overridepublic void onMessageReceived(String message) {log.info("收到socket回复消息:{}", message);}@Overridepublic void onConnectionClosed() {log.info("socket断开连接");}@Overridepublic void onError(Exception e) {log.info("socket连接发生错误", e);}});receiver.start();} catch (IOException e) {throw new GnException("socket连接异常:" + e.getMessage());}} else {log.info("socket已连接:{}:{}", socketIP, SOCKET_PORT);}}} else {log.info("socket已连接:{}:{}", socketIP, SOCKET_PORT);}}public void disconnect() {try {if (socket != null && !socket.isClosed()) {socket.close();socket = null;}} catch (IOException e) {throw new GnException("socket关闭异常:" + e.getMessage());}}/*** 模拟第一次业务触发*/@Datapublic static class FirstUrlRequest {private String url;private String method;private Object urlParams;}@Datapublic static class SocketAllParams {//模拟第一次业务触发private FirstUrlRequest firstUrlRequest;//其他交互报文private List<String> socketMsgList;//是否自动更新DataTime,默认是private Boolean autoUseNowDataTime;}interface MessageHandler {void onMessageReceived(String message);void onConnectionClosed();void onError(Exception e);}static class SocketMessageReceiver {private final Socket socket;private final MessageHandler handler;private volatile boolean running = true;public SocketMessageReceiver(Socket socket, MessageHandler handler) {this.socket = socket;this.handler = handler;}public void start() {new Thread(this::runLoop).start();}public void stop() {running = false;try {if (!socket.isClosed()) {socket.close();}} catch (IOException e) {handler.onError(e);}}private void runLoop() {try (InputStream in = socket.getInputStream();BufferedReader reader = new BufferedReader(new InputStreamReader(in))) {String line;while (running && (line = reader.readLine()) != null) {handler.onMessageReceived(line);}handler.onConnectionClosed();} catch (IOException e) {handler.onError(e);}}}
}

3. ControllerInvoker mock调用http接口代码

ControllerInvoker.java

package cn.jiangjiesheng.code.core.utils;import cn.jiangjiesheng.code.exception.GnException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.http.MediaType;
import org.springframework.mock.web.MockHttpServletResponse;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.HandlerAdapter;
import org.springframework.web.servlet.HandlerExecutionChain;
import org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerMapping;
import org.springframework.mock.web.MockHttpServletRequest;import java.nio.charset.StandardCharsets;/*** http调用Controller http代码接口* 依赖* <dependency>*  <groupId>org.springframework.boot</groupId>*   <artifactId>spring-boot-starter-test</artifactId>*  <!--  <version>2.0.6.RELEASE</version>-->* </dependency>*/
@Service
public class ControllerInvoker {@Autowiredprivate RequestMappingHandlerMapping handlerMapping;@Autowired//有3个@Qualifier("requestMappingHandlerAdapter")private HandlerAdapter handlerAdapter;/*** http调用Controller http代码接口,http不能直接调用自身服务的http接口,会阻塞* @param uri 不要server.servlet.context-path,示例:/your http api/remoteControl* @param method* @param jsonBody* @param authorization* @return* @throws Exception*/public String invokeController(String uri, String method,  String jsonBody, String authorization) throws Exception {MockHttpServletRequest request = new MockHttpServletRequest();request.setRequestURI(uri);request.setMethod(method.toUpperCase());request.addHeader("Authorization", authorization);request.setContentType(MediaType.APPLICATION_JSON_VALUE);request.setContent(jsonBody.getBytes(StandardCharsets.UTF_8));HandlerExecutionChain chain = handlerMapping.getHandler(request);if (chain == null) {throw new GnException("没找到对应url: " + uri);}// 执行 Controller 方法MockHttpServletResponse response = new MockHttpServletResponse();handlerAdapter.handle(request, response, chain.getHandler());return new String(response.getContentAsByteArray(), StandardCharsets.UTF_8);}
}

最新版本更新
https://code.jiangjiesheng.cn/article/367?from=csdn

推荐 《高并发 & 微服务 & 性能调优实战案例100讲 源码下载》

相关文章:

  • 练习小项目9:打字效果文字展示(多段文字循环+删除+光标闪烁)
  • 嵌入式学习笔记 - freeRTOS同优先级任务时间片抢占的实现
  • 酒店管理破局:AI 引领智能化转型
  • hf-mirror断点续传下载权重
  • (6)-Fiddler抓包-Fiddler状态面板详解
  • Java面试:从Spring Boot到分布式系统的技术探讨
  • SCL语言两台电机正反转控制程序从选型、安装到调试全过程的详细步骤指南(下)
  • 目标检测预测框置信度(Confidence Score)计算方式
  • 一文清晰理解目标检测指标计算
  • 深入分析SD-WAN成本效益和ROI(投资回报率):真的能降低WAN成本吗?
  • Spring Boot+Activiti7入坑指南初阶版
  • HbuilderX设置禁止import 引入模块换行
  • HOW - 简历和求职面试宝典(七)
  • spring的多语言怎么实现?
  • [解决]在 Vue 3 使用 Vite 开发的项目中,放在 public 文件夹里的文件,在打包部署后出现 404 的问题
  • Linux `|` 管道符与 `grep` 命令深度解析与高阶应用指南
  • 国产化redis 替代产品tendis 安装
  • siglip2(1) 设置模型返回所需变量
  • Redis--缓存击穿详解及解决方案
  • 动态规划法在解决实际问题中的应用
  • 那个网站的是做vb题目的/微信软文范例100字
  • 宝山网站建设公司/百度竞价被点击软件盯上
  • 热点 做网站和营销 我只服他/产品的推广及宣传思路
  • wordpress首页访问密码/专业seo站长工具
  • cms系统网站/站长推广网
  • 哈尔滨座做网站的/seo基础