springboot 使用CompletableFuture多线程调用多个url接口,等待所有接口返回后统一处理接口返回结果
1. 线程池配置,定义一个线程池,可以根据业务需求调整线程池的配置,以达到提高性能的目的
@Configuration
@EnableAsync
public class AsyncConfig {@Bean("apiTaskExecutor")public Executor taskExecutor() {ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();executor.setCorePoolSize(8); // 核心线程数executor.setMaxPoolSize(10); // 最大线程数executor.setQueueCapacity(50); // 队列容量executor.setThreadNamePrefix("API-Thread-");executor.initialize();return executor;}
}
2. 异步调用第三方接口
接口调用返回类
@Data
@AllArgsConstructor
public class ApiResponse {private String apiName; // API 标识private Object responseData; // 原始响应数据(不同格式)private String errorMessage; // 错误信息public ApiResponse(String apiName, Object responseData) {this.apiName = apiName;this.responseData = responseData;}
}
@Async("apiTaskExecutor")public CompletableFuture<ApiResponse> callApi(String apiName, String url, String param) {try {String result = HttpUtil.post(url, param);return CompletableFuture.completedFuture(new ApiResponse(apiName,result));} catch (Exception e) {return CompletableFuture.completedFuture(new ApiResponse(apiName, null, "API call failed: " + e.getMessage()));}}
3. 异步方法调用接口,apiConfig 里面保存了是接口名称,接口地址,接口参数,我删除了一部分代码,这个集合需要大家自己去组合,allFutures.thenApply处理接口返回,我这种逻辑可以根据不同的接口名称去处理不同的返回接口,然后统一处理业务逻辑
@Async("apiTaskExecutor")public CompletableFuture<Map<String, Object>> aggregateApiResponses() {// 定义要调用的API列表Map<String, Map<String, String>> apiConfig = Map.of("riskAnalysis", Map.of("url", String.format("%s/riskAnalysis", aiHost), "param", JSON.toJSONString(risksJson, JSONWriter.Feature.WriteMapNullValue)),"bmi", Map.of("url", String.format("%s/BMI", aiHost), "param", bmiParam.toString()),"coreObjectiveAnalysis", Map.of("url", String.format("%s/coreObjectiveAnalysis", aiHost), "param", suggestionsJson.toString()),"suggestionsForCoreObjectives", Map.of("url", String.format("%s/suggestionsForCoreObjectives", aiHost), "param", suggestionsJson.toString()),"foodSuggestions", Map.of("url", String.format("%s/foodSuggestions", aiHost), "param", suggestionsJson.toString()),"sportSuggestions", Map.of("url", String.format("%s/sportSuggestions", aiHost), "param", suggestionsJson.toString()),"allSuggestions", Map.of("url", String.format("%s/allSuggestions", aiHost), "param", suggestionsJson.toString()));System.out.println(apiConfig);// 创建异步任务列表List<CompletableFuture<ApiResponse>> futures = apiConfig.entrySet().stream().map(entry -> {String apiName = entry.getKey();Map<String, String> params = entry.getValue();String url = params.get("url");String param = params.get("param");return surveysReportService.callApi(apiName, url, param);}).collect(Collectors.toList());// 组合所有异步任务CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));// 处理结果合并return allFutures.thenApply(voidd -> {futures.forEach(future -> {String apiName = "";try {ApiResponse response = future.get();apiName = response.getApiName();// 处理不同的响应结构switch (response.getApiName()) {case "bmi"://业务逻辑处理break;}} catch (Exception e) {//如果ai返回错误,则重试一次//throw new ServiceException("接口调用错误");}});//业务逻辑处理return null;});}