【面试题】- 使用CompletableFuture实现多线程统计策略工厂模式
【面试题】- 使用CompletableFuture实现多线程统计策略工厂模式
摘要:
本文介绍了一种结合CompletableFuture多线程和策略工厂模式实现的统计查询方案。通过自定义线程池执行异步任务,利用工厂模式创建不同时间维度(时/周/月)的统计策略,并行查询后汇总结果。方案包含:1)接口层接收查询时间参数;2)业务层使用CompletableFuture异步执行三种统计策略;3)策略工厂根据类型返回具体实现类;4)各策略类实现统一接口完成具体统计逻辑。该设计实现了线程安全的高效查询,同时通过策略模式保证代码可扩展性,适用于需要多维度并行统计的业务场景。
前言:
常见面试题:说下你项目哪里用到的多线程?用到设计模式吗?哪几种设计模式?使用设计模式的优点有哪些?..
业务场景:
1,三方对接 统计天、周、月的自动和手动的个数
2,数据汇总统一返回
废话少说直接上代码。
1、查询指定时间的工单信息
@ApiOperation("根据时间查询指定时间工单的统计信息")@GetMapping("/queryStatisticAnalysisByTime")public Map<String,List<OrderStatisticsVo>> queryStatisticAnalysisByTime(@RequestParam(value = "time", required = false) @DateTimeFormat(pattern = "yyyy-MM-dd HH:mm:ss") Date time) {return adjustableService.queryStatisticAnalysisByTime(time);}
功能实现:
@Resourceprivate ThreadPoolExecutor threadCustomPoolExecutor;@AutowiredDigitalStatisticsStrategyFactory strategyFactory;@Overridepublic Map<String, List<OrderStatisticsVo>> queryStatisticAnalysisByTime(Date queryTime) {Map<String, List<OrderStatisticsVo>> resultMap = new HashMap<>(3);Date queryDateTime = Optional.ofNullable(queryTime).orElse(new Date());CompletableFuture<List<OrderStatisticsVo>> hourlyFuture = CompletableFuture.supplyAsync(() -> {DigitalStatisticsStrategy hourlyStrategy = strategyFactory.createDigitalStatisticsStrategy("HOURLY");return hourlyStrategy.executeStatistics(queryDateTime);}, threadCustomPoolExecutor);CompletableFuture<List<OrderStatisticsVo>> weeklyFuture = CompletableFuture.supplyAsync(() -> {DigitalStatisticsStrategy weeklyStrategy = strategyFactory.createDigitalStatisticsStrategy("WEEKLY");return weeklyStrategy.executeStatistics(queryDateTime);}, threadCustomPoolExecutor);CompletableFuture<List<OrderStatisticsVo>> monthlyFuture = CompletableFuture.supplyAsync(() -> {DigitalStatisticsStrategy monthlyStrategy = strategyFactory.createDigitalStatisticsStrategy("MONTHLY");return monthlyStrategy.executeStatistics(queryDateTime);}, threadCustomPoolExecutor);// 等待所有的任务完成放行CompletableFuture.allOf(hourlyFuture, weeklyFuture, monthlyFuture).join();try {// 收集结果resultMap.put("dayRecord", hourlyFuture.get());resultMap.put("weekRecord", weeklyFuture.get());resultMap.put("monthRecord", monthlyFuture.get());} catch (InterruptedException | ExecutionException e) {Thread.currentThread().interrupt();throw new RuntimeException("获取xxxx-统计分析失败", e);}return resultMap;}
2、策略工厂
import com.xx.xxx.xxx.enums.ResultCodeEnum;
import com.xx.xxx.xxx.exception.CloudException;
import com.xx.xxx.xxx.service.factory.DigitalStatisticsStrategy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;/*** @author psd xxx-统计分析策略工厂*/
@Component
public class DigitalStatisticsStrategyFactory {@Autowiredprivate HourlyDigitalStatisticsStrategy hourlyStrategy;@Autowiredprivate WeeklyDigitalStatisticsStrategy weeklyStrategy;@Autowiredprivate MonthlyDigitalStatisticsStrategy monthlyStrategy;/*** 自动化流程-统计分析策略工厂** @param strategyType* 统计策略类型* @return 策略对象*/public DigitalStatisticsStrategy createDigitalStatisticsStrategy(String strategyType) {switch (strategyType) {case "HOURLY":return hourlyStrategy;case "WEEKLY":return weeklyStrategy;case "MONTHLY":return monthlyStrategy;default:throw new XXXException(ResultCodeEnum.PARAM_ERROR.getCode(), "未知的统计策略类型:" + strategyType);}}
3、 查询策略模式
import xxx.xxx.xxx.xxx.entity.vo.OrderStatisticsVo;import java.util.Date;
import java.util.List;/*** @author psd 自动化流程-统计分析*/
public interface DigitalStatisticsStrategy {/*** 根据查询时间查询指定时间工单的统计信息* * @param queryTime* 查询时间* @return 统计信息集合*/List<OrderStatisticsVo> executeStatistics(Date queryTime);}
1.按小时统计
import com.xxx.xxx.common.utils.DateUtils;
import com.xx.xxx.xxx.entity.vo.OrderStatisticsItemVo;
import com.xx.xx.xx.mapper.DigitalEmployeeMapper;
import com.xx.xx.xx.service.factory.DigitalStatisticsStrategy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;/*** @author psd 按小时统计*/
@Service
public class HourlyDigitalStatisticsStrategy implements DigitalStatisticsStrategy {@AutowiredDigitalEmployeeMapper digitalEmployeeMapper;@Overridepublic List<OrderStatisticsVo> executeStatistics(Date queryTime) {// 1.根据查询时间 获取当天的开始时间和结束时间Date startDate = ...;Date endDate = ...;List<OrderStatisticsVo> hourlyResult = digitalEmployeeMapper.queryHourxxxDigitalByDateRange(startDate, endDate);// 2.补全不存在的数据return completeHourlyData(hourlyResult);}/*** 补全所有小时段数据,确保每个小时都有记录* * @param dbHourlyData* 数据库小时段数据* @return 补全后的数据*/private List<OrderStatisticsVo> completeHourlyData(List<OrderStatisticsVo> dbHourlyData) {// 创建24小时的完整时间列表List<String> allHours = IntStream.range(0, 24).mapToObj(hour -> String.format("%02d:00", hour)).collect(Collectors.toList());// 将数据查询的数据转换成MapMap<String, OrderStatisticsVo> dataMap = dbHourlyData.stream().collect(Collectors.toMap(OrderStatisticsVo::getTime, Function.identity()));return allHours.stream().map(hour -> {if (dataMap.containsKey(hour)) {return dataMap.get(hour);} else {return new OrderStatisticsItemVo(hour, 0, 0);}}).collect(Collectors.toList());}
}
mapper:
<select id="queryHourxxxDigitalByDateRange"resultType="xxx.xxx.xxx.xxx.entity.vo.OrderStatisticsVo">SELECT DATE_FORMAT(create_time, '%H:00') as `time`,SUM(CASE WHEN send_type = 1 THEN 1 ELSE 0 END) as auto,SUM(CASE WHEN send_type = 2 THEN 1 ELSE 0 END) as interactionFROM digital_employeeWHERE create_time >= #{startDate}AND create_time <= #{endDate}AND is_delete = 0GROUP BY DATE_FORMAT(create_time, '%H:00')ORDER BY `time`</select>
2.周统计策略模式
import com.xxx.xxx.xxx.entity.vo.OrderStatisticsVo;
import com.xxx.xxx.xxx.mapper.DigitalEmployeeMapper;
import com.xxx.xx.xxx.service.factory.DigitalStatisticsStrategy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.text.SimpleDateFormat;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;/*** @author psd 按照周进行统计*/
@Service
public class WeeklyDigitalStatisticsStrategy implements DigitalStatisticsStrategy {@AutowiredDigitalEmployeeMapper digitalEmployeeMapper;@Overridepublic List<OrderStatisticsVo> executeStatistics(Date queryTime) {// 1.根据查询时间 获取当周的开始时间和结束时间Date weekEndDate = ...;Date weekStartDate = ...;List<OrderStatisticsVo> hourlyResult = digitalEmployeeMapper.queryxxxxDigitalByDateRange(weekStartDate, weekEndDate);// 2.补全不存在的数据return completeDailyData(hourlyResult, weekStartDate, weekEndDate);}private List<OrderStatisticsVo> completeDailyData(List<OrderStatisticsVo> dbHourlyResult, Date weekStartDate, Date weekEndDate) {// 1.创建日期范围的列表List<String> allDates = new ArrayList<>();Calendar calendar = Calendar.getInstance();calendar.setTime(weekStartDate);SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");while (!calendar.getTime().after(weekEndDate)) {allDates.add(sdf.format(calendar.getTime()));calendar.add(Calendar.DAY_OF_YEAR, 1);}// 将数据库数据转换为MapMap<String, OrderStatisticsVo> dataHourMap = dbHourlyResult.stream().collect(Collectors.toMap(OrderStatisticsVo::getTime, Function.identity()));return allDates.stream().map(date -> {if (dataHourMap.containsKey(date)) {return dataHourMap.get(date);} else {return new OrderStatisticsVo(date, 0, 0);}}).collect(Collectors.toList());}
}
3.月统计策略模式
import com.xxx.xxx.utils.DateUtils;
import com.xxx.xxx.xxx.entity.vo.OrderStatisticsItemVo;
import com.xx.xxx.xxx.mapper.DigitalEmployeeMapper;
import com.xxx.xxx.xxx.service.factory.DigitalStatisticsStrategy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;import java.text.SimpleDateFormat;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;/*** @author psd 按月统计策略*/
@Service
public class MonthlyDigitalStatisticsStrategy implements DigitalStatisticsStrategy {@AutowiredDigitalEmployeeMapper digitalEmployeeMapper;@Overridepublic List<OrderStatisticsVo> executeStatistics(Date queryTime) {// 1.根据查询时间 获取当月的开始时间和结束时间Date monthEndDate = ....;Date monthStartDate = ...;monthStartDate = DateUtils.addDateDays(monthStartDate, 1);List<OrderStatisticsVo> hourlyResult = digitalEmployeeMapper.queryxxxxDigitalByDateRange(monthStartDate, monthEndDate);// 2.补全不存在的数据return completeDailyData(hourlyResult, monthStartDate, monthEndDate);}private List<OrderStatisticsVo> completeDailyData(List<OrderStatisticsVo> dbHourlyResult, Date weekStartDate, Date weekEndDate) {// 1.创建日期范围的列表List<String> allDates = new ArrayList<>();Calendar calendar = Calendar.getInstance();calendar.setTime(weekStartDate);SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");while (!calendar.getTime().after(weekEndDate)) {allDates.add(sdf.format(calendar.getTime()));calendar.add(Calendar.DAY_OF_YEAR, 1);}// 将数据库数据转换为MapMap<String, OrderStatisticsItemVo> dataHourMap = dbHourlyResult.stream().collect(Collectors.toMap(OrderStatisticsItemVo::getTime, Function.identity()));return allDates.stream().map(date -> {if (dataHourMap.containsKey(date)) {return dataHourMap.get(date);} else {return new OrderStatisticsVo(date, 0, 0);}}).collect(Collectors.toList());}
}
mapper
<select id="queryxxxxDigitalByDateRange" resultType="com.dx.major.platform.entity.vo.OrderStatisticsItemVo">SELECTDATE(create_time) as `time`,SUM(CASE WHEN send_type = 1 THEN 1 ELSE 0 END) as auto,SUM(CASE WHEN send_type = 2 THEN 1 ELSE 0 END) as interactionFROM cm_cloud_fault_digital_employeeWHERE create_time >= #{startDate}AND create_time <= #{endDate}AND is_delete = 0GROUP BY DATE(create_time)ORDER BY `time`</select>
数据返回类型
{"dayRecord": [{"time": "00:00","auto": 12,"interaction": 5},{"time": "01:00","auto": 10,"interaction": 4}],"weekRecord": [{"time": "2023-07-14","auto": 120,"interaction": 50},{"time": "2023-07-15","auto": 100,"interaction": 40}],"monthRecord": [{"time": "2023-06-20","auto": 1200,"interaction": 500},{"time": "2023-06-21","auto": 1000,"interaction": 400}]
}
设计的优点:
-
CompletableFuture的使用
使用CompletableFuture.supplyAsync()方法创建三个异步任务,每个任务执行一种统计策略。通过指定自定义的线程池(executorService),我们可以控制并发线程的数量。 -
等待所有任务完成
使用CompletableFuture.allOf(hourlyFuture, weeklyFuture, monthlyFuture).join()等待所有任务完成。allOf()方法会返回一个新的CompletableFuture,它在所有给定的CompletableFuture都完成时完成。join()方法会阻塞当前线程,直到所有任务完成。 -
获取结果
使用hourlyFuture.get()、weeklyFuture.get()和monthlyFuture.get()方法获取每个任务的执行结果。这些方法会阻塞当前线程,直到对应的任务完成并返回结果。 -
异常处理
使用try-catch块捕获可能抛出的InterruptedException和ExecutionException异常,确保程序的健壮性。
3、总结设计模式优势
策略模式:将不同的统计算法封装在各自的策略类中,使它们可以互相替换,提高了代码的灵活性和可维护性。
工厂模式:通过工厂类统一创建策略对象,降低了客户端与具体策略类的耦合度。
模板方法模式:在策略接口中定义了统一的执行方法,确保了所有策略类都有相同的接口。
多线程执行:使用CompletableFuture实现了真正的异步并行处理,大大提高了系统响应速度。
喜欢我的文章记得点个在看,或者点赞,持续更新中ing…