java通过RESTful API实现两个项目之间相互传输数据
项目场景:
一些特殊场景中,两个项目发布在不同的服务器,并且由于服务器限制特殊情况ip无法相通时进行开放接口方式进行数据传输
问题描述
两个服务器之间ip无法相互访问,数据没法进行数据传输
解决方案:
通过RESTful API方式定义开放接口实现数据传输
1.开放接口定义
@RequestMapping(value = "/getss", produces = "application/json;charset=UTF-8") public void getTestChunked(HttpServletResponse response) {log.info("getTest分块传输接口调用");long startTime = System.currentTimeMillis();OutputStream outputStream = null;JsonGenerator generator = null;// 调用 GetTime 类的 getFSKTime 方法GetTime getTime = new GetTime();HashMap<Object, Object> timeData = getTime.getFSKTime();try {// 获取查询参数String formattedDate = (String) timeData.get("formattedDate");String formattedEnd = (String) timeData.get("formattedEnd");log.info("查询时间范围:{} ~ {}", formattedDate, formattedEnd);// 获取所有数据DataSourceUtil.setDB("db2");List<HbCcsPolicyDataRc> dataList = synchronizationService.getSynchronization(formattedDate, formattedEnd);if (dataList == null) {dataList = Collections.emptyList();}log.info("查询完成:共{}条数据,准备分块传输", dataList.size());// 设置响应头response.setContentType("application/json;charset=UTF-8");response.setHeader("Connection", "keep-alive");response.setHeader("Transfer-Encoding", "chunked");response.setHeader("X-Total-Count", String.valueOf(dataList.size()));response.setHeader("Cache-Control", "no-cache");response.setHeader("Pragma", "no-cache");// 使用Jackson流式API分块写入响应ObjectMapper objectMapper = new ObjectMapper();outputStream = response.getOutputStream();generator = objectMapper.getFactory().createGenerator(outputStream);generator.writeStartArray(); // 开始数组int chunkSize = 500; // 减小每块的大小,避免缓冲区溢出int totalSize = dataList.size();for (int i = 0; i < totalSize; i++) {// 检查客户端是否仍然连接try {response.getOutputStream(); // 这将抛出异常如果客户端断开} catch (IOException e) {log.warn("客户端已断开连接,终止传输");break;}// 写入单个对象objectMapper.writeValue(generator, dataList.get(i));// 每chunkSize条数据刷新一次缓冲区if ((i + 1) % chunkSize == 0) {generator.flush();log.info("已传输{}条数据,进度: {}%", i + 1, (i + 1) * 100 / totalSize);// 添加小延迟,避免 overwhelming 客户端try {Thread.sleep(10);} catch (InterruptedException ie) {Thread.currentThread().interrupt();break;}}}generator.writeEndArray(); // 结束数组generator.flush();log.info("分块传输完成:共{}条数据,耗时{}ms",dataList.size(), System.currentTimeMillis() - startTime);} catch (ClientAbortException e) {log.warn("客户端中止了连接: {}", e.getMessage());} catch (Exception e) {log.error("getTest分块传输接口执行失败", e);if (!response.isCommitted()) {try {response.sendError(HttpStatus.INTERNAL_SERVER_ERROR.value(), "数据获取失败");} catch (IOException ex) {log.error("发送错误响应失败", ex);}}} finally {// 确保资源被正确关闭try {if (generator != null) {generator.close();}} catch (IOException e) {log.warn("关闭JsonGenerator时发生错误", e);}try {if (outputStream != null) {outputStream.close();}} catch (IOException e) {log.warn("关闭OutputStream时发生错误", e);}// 帮助GC回收大对象data1 = null;System.gc();} }
2.开放接口调用
public void fetchDataFromServer1() {log.info("===== 开始执行 fetchDataFromServer1(分块传输模式)=====");RestTemplate chunkedRestTemplate = createChunkedRestTemplate();String url = server1BaseUrl + "/getReceive/getsss";log.info("调用 getTestChunked 分块传输接口,URL: {}", url);try {// 使用自定义的ResponseExtractor来处理流式响应List<HbCcsPolicyDataRc> dataList = chunkedRestTemplate.execute(url,HttpMethod.GET,null,new ResponseExtractor<List<HbCcsPolicyDataRc>>() {@Overridepublic List<HbCcsPolicyDataRc> extractData(ClientHttpResponse response) throws IOException {return processStreamingResponse(response, HbCcsPolicyDataRc.class);}});log.info("===== fetchDataFromServer1 完成,共处理{}条数据 =====", dataList != null ? dataList.size() : 0);getCCSCroData();} catch (Exception e) {log.error("fetchDataFromServer1 整体失败", e);throw new RuntimeException("fetchDataFromServer1 失败:" + e.getMessage(), e);} finally {// 帮助GC回收资源System.gc();} }// 修改流式响应处理方法 private <T> List<T> processStreamingResponse(ClientHttpResponse response, Class<T> valueType) throws IOException {List<T> dataList = new ArrayList<>();ObjectMapper objectMapper = getConfiguredObjectMapper();InputStream inputStream = response.getBody();try (JsonParser parser = objectMapper.getFactory().createParser(inputStream)) {if (parser.nextToken() != JsonToken.START_ARRAY) {throw new IOException("Expected data to start with an Array");}int count = 0;long lastLogTime = System.currentTimeMillis();while (parser.nextToken() != JsonToken.END_ARRAY) {T record = objectMapper.readValue(parser, valueType);dataList.add(record);count++;// 每1000条或每30秒日志输出一次long currentTime = System.currentTimeMillis();if (count % 1000 == 0 || currentTime - lastLogTime > 30000) {log.info("已解析{}条数据", count);lastLogTime = currentTime;}// 定期批量处理,避免内存占用过高if (count % 5000 == 0) {processBatchData(dataList, valueType);dataList.clear(); // 清空列表,避免内存占用过高System.gc(); // 建议垃圾回收}}// 处理最后一批数据if (!dataList.isEmpty()) {processBatchData(dataList, valueType);}}log.info("共解析{}条数据", dataList.size());return dataList; }// 批量处理数据的方法 private <T> void processBatchData(List<T> dataList, Class<T> valueType) {try {DataSourceUtil.setDB("db3");if (valueType == HbCcsPolicyDataRc.class) {dmService.setHbCcsPolicyDataRc((List<HbCcsPolicyDataRc>) dataList);} else if (valueType == HbCcsCrosssellPEMIUM.class) {dmService.setHbCcsCrosssellPEMIUM((List<HbCcsCrosssellPEMIUM>) dataList);} else if (valueType == HbCcsLpDataRc.class) {dmService.HbCcsLpDataRc((List<HbCcsLpDataRc>) dataList);}log.info("成功处理一批数据,数量: {}", dataList.size());} catch (Exception e) {log.error("处理批量数据时发生错误", e);// 这里可以添加重试逻辑或错误记录} }// 6. 工具方法:复用 JSON 解析器配置(避免重复设置容错属性) private ObjectMapper getConfiguredObjectMapper() {ObjectMapper objectMapper = new ObjectMapper();// 容错配置:忽略未知字段、允许单值数组objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);objectMapper.configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);return objectMapper; }// 创建专用的分块传输RestTemplate private RestTemplate createChunkedRestTemplate() {SimpleClientHttpRequestFactory factory = new SimpleClientHttpRequestFactory();// 设置超时时间(单位:毫秒)factory.setConnectTimeout(120000); // 2分钟连接超时factory.setReadTimeout(3600000); // 1小时读取超时(大数据传输需要更长时间)// 禁用缓冲factory.setBufferRequestBody(false);RestTemplate restTemplate = new RestTemplate(factory);// 配置消息转换器List<HttpMessageConverter<?>> converters = new ArrayList<>();// 字符串转换器StringHttpMessageConverter stringConverter = new StringHttpMessageConverter(StandardCharsets.UTF_8);stringConverter.setWriteAcceptCharset(false);converters.add(stringConverter);// JSON转换器MappingJackson2HttpMessageConverter jsonConverter = new MappingJackson2HttpMessageConverter();jsonConverter.setSupportedMediaTypes(Collections.singletonList(MediaType.APPLICATION_JSON));converters.add(jsonConverter);restTemplate.setMessageConverters(converters);return restTemplate; }
3.核心配置
这个配置两个项目的yml都需要加
server1:base-url: http://127.0.0.1:8060#根据情况而变auth:username: adminpassword: adminserver:max-http-header-size: 1000000servlet:multipart:max-file-size: 1000MBmax-request-size: 1000MBtomcat:max-swallow-size: 1000MBmax-http-form-post-size: 1000MBthreads:max: 200min-spare: 20keep-alive-timeout: 300000----------------------------------------------------------------------
@Value("${server1.base-url}") private String server1BaseUrl; 这段代码加到controller类中这是将yml配置文件的路径注册到controller了
这种适合小数据传输,大数据传输需要配置服务器的Nginx配置需要将接口配置上尽量统一接口前缀不然可能会报错具体就不展示了,