当前位置: 首页 > news >正文

【Easylive】定时任务-每日数据统计和临时文件清理

【Easylive】项目常见问题解答(自用&持续更新中…) 汇总版

这个定时任务系统主要包含两个核心功能:每日数据统计和临时文件清理。下面我将详细解析这两个定时任务的实现逻辑和技术要点:


@Component
@Slf4j
public class SysTask {

    @Resource
    private StatisticsInfoService statisticsInfoService;

    @Resource
    private AppConfig appConfig;

    @Scheduled(cron = "0 0 0 * * ?")
    public void statisticsData() {
        statisticsInfoService.statisticsData();
    }

    @Scheduled(cron = "0 */1 * * * ?")
    public void delTempFile() {
        String tempFolderName = appConfig.getProjectFolder() + Constants.FILE_FOLDER + Constants.FILE_FOLDER_TEMP;
        File folder = new File(tempFolderName);
        File[] listFile = folder.listFiles();
        if (listFile == null) {
            return;
        }
        String twodaysAgo = DateUtil.format(DateUtil.getDayAgo(2), DateTimePatternEnum.YYYYMMDD.getPattern()).toLowerCase();
        Integer dayInt = Integer.parseInt(twodaysAgo);
        for (File file : listFile) {
            Integer fileDate = Integer.parseInt(file.getName());
            if (fileDate <= dayInt) {
                try {
                    FileUtils.deleteDirectory(file);
                } catch (IOException e) {
                    log.info("删除临时文件失败", e);
                }
            }
        }
    }
}

一、statisticsData() 每日数据统计任务

执行时机:每天凌晨0点(0 0 0 * * ?

1. 数据统计流程
开始
获取统计日期
统计播放量
统计粉丝量
统计评论量
统计互动数据
批量入库
结束
2. 核心代码解析
public void statisticsData() {
    // 1. 准备数据结构
    List<StatisticsInfo> statisticsInfoList = new ArrayList<>();
    final String statisticsDate = DateUtil.getBeforeDayDate(1); // 获取昨天的日期
    
    // 2. 播放量统计(Redis → DB)
    Map<String, Integer> videoPlayCountMap = redisComponent.getVideoPlayCount(statisticsDate);
    // 处理视频ID格式
    List<String> playVideoKeys = videoPlayCountMap.keySet().stream()
        .map(item -> item.substring(item.lastIndexOf(":") + 1))
        .collect(Collectors.toList());
    
    // 3. 关联用户信息
    VideoInfoQuery query = new VideoInfoQuery();
    query.setVideoIdArray(playVideoKeys.toArray(new String[0]));
    List<VideoInfo> videoInfoList = videoInfoMapper.selectList(query);
    
    // 4. 按用户聚合播放量
    Map<String, Integer> videoCountMap = videoInfoList.stream()
        .collect(Collectors.groupingBy(
            VideoInfo::getUserId,
            Collectors.summingInt(item -> 
                videoPlayCountMap.getOrDefault(
                    Constants.REDIS_KEY_VIDEO_PLAY_COUNT + statisticsDate + ":" + item.getVideoId(), 
                    0
                )
            )
        ));
    
    // 5. 构建播放统计对象
    videoCountMap.forEach((userId, count) -> {
        StatisticsInfo info = new StatisticsInfo();
        info.setStatisticsDate(statisticsDate);
        info.setUserId(userId);
        info.setDataType(StatisticsTypeEnum.PLAY.getType());
        info.setStatisticsCount(count);
        statisticsInfoList.add(info);
    });
    
    // 6. 补充其他维度数据
    addFansData(statisticsDate, statisticsInfoList);    // 粉丝统计
    addCommentData(statisticsDate, statisticsInfoList); // 评论统计
    addInteractionData(statisticsDate, statisticsInfoList); // 点赞/收藏/投币
    
    // 7. 批量入库
    statisticsInfoMapper.insertOrUpdateBatch(statisticsInfoList);
}
3. 关键技术点
  1. Redis+DB混合统计
    • 播放量等高频数据先记录到Redis
    • 定时任务从Redis获取昨日数据后持久化到DB

  2. 多维度统计
    • 播放量:基于视频ID聚合后关联用户
    • 粉丝量:直接查询DB关系数据
    • 互动数据:统一处理点赞/收藏/投币等行为

  3. 批量操作优化
    • 使用insertOrUpdateBatch实现批量upsert
    • 减少数据库连接次数提升性能


二、delTempFile() 临时文件清理任务

执行时机:每分钟执行一次(0 */1 * * * ?

1. 文件清理逻辑
public void delTempFile() {
    // 1. 构建临时文件夹路径
    String tempFolder = appConfig.getProjectFolder() 
        + Constants.FILE_FOLDER 
        + Constants.FILE_FOLDER_TEMP;
    
    // 2. 获取两天前的日期(基准线)
    String twodaysAgo = DateUtil.format(DateUtil.getDayAgo(2), "yyyyMMdd");
    Integer thresholdDate = Integer.parseInt(twodaysAgo);
    
    // 3. 遍历临时文件夹
    File folder = new File(tempFolder);
    File[] files = folder.listFiles();
    if (files == null) return;
    
    for (File file : files) {
        try {
            // 4. 按日期命名规则清理旧文件
            Integer fileDate = Integer.parseInt(file.getName());
            if (fileDate <= thresholdDate) {
                FileUtils.deleteDirectory(file); // 递归删除
            }
        } catch (Exception e) {
            log.error("删除临时文件失败", e);
        }
    }
}
2. 设计要点
  1. 安全机制
    • 文件名强制使用日期格式(如20230815
    • 只删除命名合规的文件夹

  2. 容错处理
    • 捕获IOException防止单次失败影响后续操作
    • 空文件夹自动跳过

  3. 性能考虑
    • 高频检查(每分钟)但低负载(仅处理过期文件)
    • 使用FileUtils工具类保证删除可靠性


三、架构设计亮点

  1. 解耦设计
    • 统计服务与业务逻辑分离
    • 文件清理与业务模块隔离

  2. 数据一致性

    业务操作
    Redis
    定时统计任务
    DB

    • 实时数据写入Redis保证性能
    • 定时同步到DB保证持久化

  3. 扩展性
    • 新增统计维度只需添加对应方法
    • 文件清理策略可配置化


四、潜在优化建议

  1. 统计任务优化

    // 可考虑分片统计(大用户量场景)
    @Scheduled(cron = "0 0 1 * * ?") 
    public void statsUserShard1() {
        statisticsService.processByUserRange(0, 10000);
    }
    
  2. 文件清理增强

    // 添加文件大小监控
    if (file.length() > MAX_TEMP_FILE_SIZE) {
        alertService.notifyOversizeFile(file);
    }
    
  3. 异常处理强化

    @Scheduled(...)
    public void safeStatistics() {
        try {
            statisticsData();
        } catch (Exception e) {
            log.error("统计任务失败", e);
            retryLater(); // 延迟重试机制
        }
    }
    

这套定时任务系统通过合理的职责划分和稳健的实现方式,有效解决了数据统计和资源清理这两类经典的后台任务需求。

@Override
public void statisticsData() {
    // 创建统计结果容器
    List<StatisticsInfo> statisticsInfoList = new ArrayList<>();
    
    // 获取统计日期(昨天)
    final String statisticsDate = DateUtil.getBeforeDayDate(1);
    
    // ========== 播放量统计 ==========
    // 从Redis获取昨日所有视频的播放量数据
    // 格式:Map<"video_play_count:20230815:video123", 播放次数>
    Map<String, Integer> videoPlayCountMap = redisComponent.getVideoPlayCount(statisticsDate);
    
    // 提取视频ID列表(去掉前缀)
    List<String> playVideoKeys = new ArrayList<>(videoPlayCountMap.keySet());
    playVideoKeys = playVideoKeys.stream()
        .map(item -> item.substring(item.lastIndexOf(":") + 1)) // 截取最后一个:后的视频ID
        .collect(Collectors.toList());
    
    // 构建视频查询条件
    VideoInfoQuery videoInfoQuery = new VideoInfoQuery();
    videoInfoQuery.setVideoIdArray(playVideoKeys.toArray(new String[playVideoKeys.size()]));
    
    // 批量查询视频基本信息
    List<VideoInfo> videoInfoList = videoInfoMapper.selectList(videoInfoQuery);
    
    // 按用户ID分组统计总播放量
    Map<String, Integer> videoCountMap = videoInfoList.stream()
        .collect(Collectors.groupingBy(
            VideoInfo::getUserId, // 按用户ID分组
            Collectors.summingInt(item -> {
                // 重组Redis key获取该视频播放量
                String redisKey = Constants.REDIS_KEY_VIDEO_PLAY_COUNT + statisticsDate + ":" + item.getVideoId();
                Integer count = videoPlayCountMap.get(redisKey);
                return count == null ? 0 : count; // 空值保护
            })
        ));
    
    // 转换播放统计结果对象
    videoCountMap.forEach((userId, count) -> {
        StatisticsInfo statisticsInfo = new StatisticsInfo();
        statisticsInfo.setStatisticsDate(statisticsDate); // 统计日期
        statisticsInfo.setUserId(userId);                // 用户ID
        statisticsInfo.setDataType(StatisticsTypeEnum.PLAY.getType()); // 数据类型=播放
        statisticsInfo.setStatisticsCount(count);       // 播放次数
        statisticsInfoList.add(statisticsInfo);          // 加入结果集
    });
    
    // ========== 粉丝量统计 ==========
    // 从数据库查询昨日粉丝变化数据
    List<StatisticsInfo> fansDataList = this.statisticsInfoMapper.selectStatisticsFans(statisticsDate);
    
    // 设置统计维度和日期
    for (StatisticsInfo statisticsInfo : fansDataList) {
        statisticsInfo.setStatisticsDate(statisticsDate);    // 统一日期格式
        statisticsInfo.setDataType(StatisticsTypeEnum.FANS.getType()); // 数据类型=粉丝
    }
    statisticsInfoList.addAll(fansDataList); // 合并结果
    
    // ========== 评论统计 ==========
    // 从数据库查询昨日评论数据
    List<StatisticsInfo> commentDataList = this.statisticsInfoMapper.selectStatisticsComment(statisticsDate);
    
    // 设置统计维度和日期
    for (StatisticsInfo statisticsInfo : commentDataList) {
        statisticsInfo.setStatisticsDate(statisticsDate);     // 统一日期格式
        statisticsInfo.setDataType(StatisticsTypeEnum.COMMENT.getType()); // 数据类型=评论
    }
    statisticsInfoList.addAll(commentDataList); // 合并结果
    
    // ========== 互动行为统计 ==========
    // 查询点赞/收藏/投币数据(参数为行为类型数组)
    List<StatisticsInfo> statisticsInfoOthers = this.statisticsInfoMapper.selectStatisticsInfo(
        statisticsDate,
        new Integer[]{
            UserActionTypeEnum.VIDEO_LIKE.getType(),     // 点赞
            UserActionTypeEnum.VIDEO_COIN.getType(),     // 投币
            UserActionTypeEnum.VIDEO_COLLECT.getType()  // 收藏
        }
    );
    
    // 转换行为类型为统计类型
    for (StatisticsInfo statisticsInfo : statisticsInfoOthers) {
        statisticsInfo.setStatisticsDate(statisticsDate); // 统一日期格式
        
        // 行为类型转换
        if (UserActionTypeEnum.VIDEO_LIKE.getType().equals(statisticsInfo.getDataType())) {
            statisticsInfo.setDataType(StatisticsTypeEnum.LIKE.getType());       // 点赞
        } else if (UserActionTypeEnum.VIDEO_COLLECT.getType().equals(statisticsInfo.getDataType())) {
            statisticsInfo.setDataType(StatisticsTypeEnum.COLLECTION.getType());  // 收藏
        } else if (UserActionTypeEnum.VIDEO_COIN.getType().equals(statisticsInfo.getDataType())) {
            statisticsInfo.setDataType(StatisticsTypeEnum.COIN.getType());       // 投币
        }
    }
    statisticsInfoList.addAll(statisticsInfoOthers); // 合并结果
    
    // ========== 最终入库 ==========
    // 批量插入或更新统计结果
    this.statisticsInfoMapper.insertOrUpdateBatch(statisticsInfoList);
}

这段代码使用Java 8的Stream API和Collectors工具类,实现了按用户ID分组统计视频总播放量的功能。下面我将从多个维度进行详细解析:


一、代码结构分解

Map<String, Integer> videoCountMap = videoInfoList.stream()
    .collect(Collectors.groupingBy(
        VideoInfo::getUserId, 
        Collectors.summingInt(item -> {
            String redisKey = Constants.REDIS_KEY_VIDEO_PLAY_COUNT + statisticsDate + ":" + item.getVideoId();
            Integer count = videoPlayCountMap.get(redisKey);
            return count == null ? 0 : count;
        })
    ));

二、逐层解析

1. 数据准备阶段

输入videoInfoList(视频信息列表,包含videoIduserId等字段)
目标:生成Map<用户ID, 该用户所有视频的总播放量>

2. Stream流水线
videoInfoList.stream() 

将List转换为Stream,准备进行流式处理。

3. 核心收集器
Collectors.groupingBy(
    VideoInfo::getUserId,          // 分组依据:用户ID
    Collectors.summingInt(...)     // 聚合方式:求和
)

groupingBy:按用户ID分组,生成Map<String, List<VideoInfo>>
summingInt:对每组内的元素进行整数求和

4. 播放量计算逻辑
item -> {
    // 重组Redis key:video_play_count:20230815:video123
    String redisKey = Constants.REDIS_KEY_VIDEO_PLAY_COUNT 
        + statisticsDate + ":" + item.getVideoId();
    
    // 从预加载的Map获取播放量
    Integer count = videoPlayCountMap.get(redisKey);
    
    // 空值保护(没有记录则视为0次播放)
    return count == null ? 0 : count;
}

三、关键技术点

1. 嵌套收集器

外层groupingBy按用户分组
内层summingInt对播放量求和
形成两级聚合操作。

2. 实时Key重组
Constants.REDIS_KEY_VIDEO_PLAY_COUNT + statisticsDate + ":" + item.getVideoId()

动态拼接Redis key,与之前从Redis加载数据时的key格式严格一致
• 常量前缀:video_play_count:
• 日期分区:20230815
• 视频ID::video123

3. 空值防御
count == null ? 0 : count

处理可能存在的:
• Redis中无该视频记录
• 视频刚上传尚未有播放数据


四、内存数据流演示

假设原始数据:

videoInfoList = [
    {videoId: "v1", userId: "u1"}, 
    {videoId: "v2", userId: "u1"},
    {videoId: "v3", userId: "u2"}
]

videoPlayCountMap = {
    "video_play_count:20230815:v1": 100,
    "video_play_count:20230815:v2": 50,
    "video_play_count:20230815:v3": 200
}

执行过程:

  1. 流处理开始
  2. 遇到v1(用户u1)→ 计算播放量100 → u1分组累计100
  3. 遇到v2(用户u1)→ 计算播放量50 → u1分组累计150
  4. 遇到v3(用户u2)→ 计算播放量200 → u2分组累计200

最终结果:

{u1=150, u2=200}

五、设计优势

  1. 高效聚合
    单次遍历完成分组+求和,时间复杂度O(n)

  2. 内存友好
    流式处理避免中间集合的创建

  3. 可维护性
    清晰表达业务逻辑:
    “按用户分组,求和每个用户所有视频的播放量”

  4. 扩展性
    如需修改统计逻辑(如求平均值),只需替换summingInt为其他收集器


六、潜在优化方向

1. 并行处理(大数据量时)
videoInfoList.parallelStream()...

注意线程安全和顺序保证

2. 缓存Key构建
// 预先生成videoId到播放量的映射,避免重复拼接字符串
Map<String, Integer> videoIdToCount = ...;
3. 异常处理增强
try {
    Integer count = videoPlayCountMap.get(redisKey);
    return Optional.ofNullable(count).orElse(0);
} catch (Exception e) {
    log.warn("播放量统计异常 videoId:{}", item.getVideoId());
    return 0;
}

这段代码展示了如何优雅地结合Stream API和集合操作,实现复杂的数据聚合统计,是Java函数式编程的典型实践。

相关文章:

  • JavaWeb 课堂笔记 —— 04 Ajax
  • 我提了一个 Androidx IssueTracker
  • [QMT量化交易小白入门]-四十二、五年年化收益率26%,当日未成交的下单,取消后重新委托
  • PHP开发效率提升利器:通义灵码在VSCode中的应用与技巧
  • Model Context Protocol(MCP)介绍
  • CPP杂项
  • 下载firefox.tar.xz后如何将其加入到Gnome启动器
  • 《Spring Boot+策略模式:企业级度假订单Excel导入系统的架构演进与技术实现》
  • vue3的一些新特性
  • vcs中的looprepprt
  • kafka存储原理
  • 定积分__
  • C应用常见的编程错误
  • java入门
  • LeetCode:有效的括号
  • mysql镜像创建docker容器,及其可能遇到的问题
  • 远程监控系统项目里练习
  • 分享一个可以跨平台进行等保核查的脚本
  • 记录一次家里宽带 被修改带宽维权的事情
  • Design Compiler:语法检查工具dcprocheck
  • 著名蒙古族音乐学者马•斯尔古愣逝世,享年86岁
  • 心相印回应官方旗舰店客服辱骂消费者:正排查
  • 近4小时会谈、3项联合声明、20多份双边合作文本,中俄元首今年首次面对面会晤成果颇丰
  • 国家税务总局泰安市税务局:山东泰山啤酒公司欠税超536万元
  • 王耀庆化身“罗朱”说书人,一人挑战15个角色
  • 暴雨蓝色预警:南方开启较强降雨过程