【Easylive】定时任务-每日数据统计和临时文件清理
【Easylive】项目常见问题解答(自用&持续更新中…) 汇总版
这个定时任务系统主要包含两个核心功能:每日数据统计和临时文件清理。下面我将详细解析这两个定时任务的实现逻辑和技术要点:
@Component
@Slf4j
public class SysTask {
@Resource
private StatisticsInfoService statisticsInfoService;
@Resource
private AppConfig appConfig;
@Scheduled(cron = "0 0 0 * * ?")
public void statisticsData() {
statisticsInfoService.statisticsData();
}
@Scheduled(cron = "0 */1 * * * ?")
public void delTempFile() {
String tempFolderName = appConfig.getProjectFolder() + Constants.FILE_FOLDER + Constants.FILE_FOLDER_TEMP;
File folder = new File(tempFolderName);
File[] listFile = folder.listFiles();
if (listFile == null) {
return;
}
String twodaysAgo = DateUtil.format(DateUtil.getDayAgo(2), DateTimePatternEnum.YYYYMMDD.getPattern()).toLowerCase();
Integer dayInt = Integer.parseInt(twodaysAgo);
for (File file : listFile) {
Integer fileDate = Integer.parseInt(file.getName());
if (fileDate <= dayInt) {
try {
FileUtils.deleteDirectory(file);
} catch (IOException e) {
log.info("删除临时文件失败", e);
}
}
}
}
}
一、statisticsData()
每日数据统计任务
执行时机:每天凌晨0点(0 0 0 * * ?
)
1. 数据统计流程
2. 核心代码解析
public void statisticsData() {
// 1. 准备数据结构
List<StatisticsInfo> statisticsInfoList = new ArrayList<>();
final String statisticsDate = DateUtil.getBeforeDayDate(1); // 获取昨天的日期
// 2. 播放量统计(Redis → DB)
Map<String, Integer> videoPlayCountMap = redisComponent.getVideoPlayCount(statisticsDate);
// 处理视频ID格式
List<String> playVideoKeys = videoPlayCountMap.keySet().stream()
.map(item -> item.substring(item.lastIndexOf(":") + 1))
.collect(Collectors.toList());
// 3. 关联用户信息
VideoInfoQuery query = new VideoInfoQuery();
query.setVideoIdArray(playVideoKeys.toArray(new String[0]));
List<VideoInfo> videoInfoList = videoInfoMapper.selectList(query);
// 4. 按用户聚合播放量
Map<String, Integer> videoCountMap = videoInfoList.stream()
.collect(Collectors.groupingBy(
VideoInfo::getUserId,
Collectors.summingInt(item ->
videoPlayCountMap.getOrDefault(
Constants.REDIS_KEY_VIDEO_PLAY_COUNT + statisticsDate + ":" + item.getVideoId(),
0
)
)
));
// 5. 构建播放统计对象
videoCountMap.forEach((userId, count) -> {
StatisticsInfo info = new StatisticsInfo();
info.setStatisticsDate(statisticsDate);
info.setUserId(userId);
info.setDataType(StatisticsTypeEnum.PLAY.getType());
info.setStatisticsCount(count);
statisticsInfoList.add(info);
});
// 6. 补充其他维度数据
addFansData(statisticsDate, statisticsInfoList); // 粉丝统计
addCommentData(statisticsDate, statisticsInfoList); // 评论统计
addInteractionData(statisticsDate, statisticsInfoList); // 点赞/收藏/投币
// 7. 批量入库
statisticsInfoMapper.insertOrUpdateBatch(statisticsInfoList);
}
3. 关键技术点
-
Redis+DB混合统计:
• 播放量等高频数据先记录到Redis
• 定时任务从Redis获取昨日数据后持久化到DB -
多维度统计:
• 播放量:基于视频ID聚合后关联用户
• 粉丝量:直接查询DB关系数据
• 互动数据:统一处理点赞/收藏/投币等行为 -
批量操作优化:
• 使用insertOrUpdateBatch
实现批量upsert
• 减少数据库连接次数提升性能
二、delTempFile()
临时文件清理任务
执行时机:每分钟执行一次(0 */1 * * * ?
)
1. 文件清理逻辑
public void delTempFile() {
// 1. 构建临时文件夹路径
String tempFolder = appConfig.getProjectFolder()
+ Constants.FILE_FOLDER
+ Constants.FILE_FOLDER_TEMP;
// 2. 获取两天前的日期(基准线)
String twodaysAgo = DateUtil.format(DateUtil.getDayAgo(2), "yyyyMMdd");
Integer thresholdDate = Integer.parseInt(twodaysAgo);
// 3. 遍历临时文件夹
File folder = new File(tempFolder);
File[] files = folder.listFiles();
if (files == null) return;
for (File file : files) {
try {
// 4. 按日期命名规则清理旧文件
Integer fileDate = Integer.parseInt(file.getName());
if (fileDate <= thresholdDate) {
FileUtils.deleteDirectory(file); // 递归删除
}
} catch (Exception e) {
log.error("删除临时文件失败", e);
}
}
}
2. 设计要点
-
安全机制:
• 文件名强制使用日期格式(如20230815
)
• 只删除命名合规的文件夹 -
容错处理:
• 捕获IOException
防止单次失败影响后续操作
• 空文件夹自动跳过 -
性能考虑:
• 高频检查(每分钟)但低负载(仅处理过期文件)
• 使用FileUtils
工具类保证删除可靠性
三、架构设计亮点
-
解耦设计:
• 统计服务与业务逻辑分离
• 文件清理与业务模块隔离 -
数据一致性:
• 实时数据写入Redis保证性能
• 定时同步到DB保证持久化 -
扩展性:
• 新增统计维度只需添加对应方法
• 文件清理策略可配置化
四、潜在优化建议
-
统计任务优化:
// 可考虑分片统计(大用户量场景) @Scheduled(cron = "0 0 1 * * ?") public void statsUserShard1() { statisticsService.processByUserRange(0, 10000); }
-
文件清理增强:
// 添加文件大小监控 if (file.length() > MAX_TEMP_FILE_SIZE) { alertService.notifyOversizeFile(file); }
-
异常处理强化:
@Scheduled(...) public void safeStatistics() { try { statisticsData(); } catch (Exception e) { log.error("统计任务失败", e); retryLater(); // 延迟重试机制 } }
这套定时任务系统通过合理的职责划分和稳健的实现方式,有效解决了数据统计和资源清理这两类经典的后台任务需求。
@Override
public void statisticsData() {
// 创建统计结果容器
List<StatisticsInfo> statisticsInfoList = new ArrayList<>();
// 获取统计日期(昨天)
final String statisticsDate = DateUtil.getBeforeDayDate(1);
// ========== 播放量统计 ==========
// 从Redis获取昨日所有视频的播放量数据
// 格式:Map<"video_play_count:20230815:video123", 播放次数>
Map<String, Integer> videoPlayCountMap = redisComponent.getVideoPlayCount(statisticsDate);
// 提取视频ID列表(去掉前缀)
List<String> playVideoKeys = new ArrayList<>(videoPlayCountMap.keySet());
playVideoKeys = playVideoKeys.stream()
.map(item -> item.substring(item.lastIndexOf(":") + 1)) // 截取最后一个:后的视频ID
.collect(Collectors.toList());
// 构建视频查询条件
VideoInfoQuery videoInfoQuery = new VideoInfoQuery();
videoInfoQuery.setVideoIdArray(playVideoKeys.toArray(new String[playVideoKeys.size()]));
// 批量查询视频基本信息
List<VideoInfo> videoInfoList = videoInfoMapper.selectList(videoInfoQuery);
// 按用户ID分组统计总播放量
Map<String, Integer> videoCountMap = videoInfoList.stream()
.collect(Collectors.groupingBy(
VideoInfo::getUserId, // 按用户ID分组
Collectors.summingInt(item -> {
// 重组Redis key获取该视频播放量
String redisKey = Constants.REDIS_KEY_VIDEO_PLAY_COUNT + statisticsDate + ":" + item.getVideoId();
Integer count = videoPlayCountMap.get(redisKey);
return count == null ? 0 : count; // 空值保护
})
));
// 转换播放统计结果对象
videoCountMap.forEach((userId, count) -> {
StatisticsInfo statisticsInfo = new StatisticsInfo();
statisticsInfo.setStatisticsDate(statisticsDate); // 统计日期
statisticsInfo.setUserId(userId); // 用户ID
statisticsInfo.setDataType(StatisticsTypeEnum.PLAY.getType()); // 数据类型=播放
statisticsInfo.setStatisticsCount(count); // 播放次数
statisticsInfoList.add(statisticsInfo); // 加入结果集
});
// ========== 粉丝量统计 ==========
// 从数据库查询昨日粉丝变化数据
List<StatisticsInfo> fansDataList = this.statisticsInfoMapper.selectStatisticsFans(statisticsDate);
// 设置统计维度和日期
for (StatisticsInfo statisticsInfo : fansDataList) {
statisticsInfo.setStatisticsDate(statisticsDate); // 统一日期格式
statisticsInfo.setDataType(StatisticsTypeEnum.FANS.getType()); // 数据类型=粉丝
}
statisticsInfoList.addAll(fansDataList); // 合并结果
// ========== 评论统计 ==========
// 从数据库查询昨日评论数据
List<StatisticsInfo> commentDataList = this.statisticsInfoMapper.selectStatisticsComment(statisticsDate);
// 设置统计维度和日期
for (StatisticsInfo statisticsInfo : commentDataList) {
statisticsInfo.setStatisticsDate(statisticsDate); // 统一日期格式
statisticsInfo.setDataType(StatisticsTypeEnum.COMMENT.getType()); // 数据类型=评论
}
statisticsInfoList.addAll(commentDataList); // 合并结果
// ========== 互动行为统计 ==========
// 查询点赞/收藏/投币数据(参数为行为类型数组)
List<StatisticsInfo> statisticsInfoOthers = this.statisticsInfoMapper.selectStatisticsInfo(
statisticsDate,
new Integer[]{
UserActionTypeEnum.VIDEO_LIKE.getType(), // 点赞
UserActionTypeEnum.VIDEO_COIN.getType(), // 投币
UserActionTypeEnum.VIDEO_COLLECT.getType() // 收藏
}
);
// 转换行为类型为统计类型
for (StatisticsInfo statisticsInfo : statisticsInfoOthers) {
statisticsInfo.setStatisticsDate(statisticsDate); // 统一日期格式
// 行为类型转换
if (UserActionTypeEnum.VIDEO_LIKE.getType().equals(statisticsInfo.getDataType())) {
statisticsInfo.setDataType(StatisticsTypeEnum.LIKE.getType()); // 点赞
} else if (UserActionTypeEnum.VIDEO_COLLECT.getType().equals(statisticsInfo.getDataType())) {
statisticsInfo.setDataType(StatisticsTypeEnum.COLLECTION.getType()); // 收藏
} else if (UserActionTypeEnum.VIDEO_COIN.getType().equals(statisticsInfo.getDataType())) {
statisticsInfo.setDataType(StatisticsTypeEnum.COIN.getType()); // 投币
}
}
statisticsInfoList.addAll(statisticsInfoOthers); // 合并结果
// ========== 最终入库 ==========
// 批量插入或更新统计结果
this.statisticsInfoMapper.insertOrUpdateBatch(statisticsInfoList);
}
这段代码使用Java 8的Stream API和Collectors工具类,实现了按用户ID分组统计视频总播放量的功能。下面我将从多个维度进行详细解析:
一、代码结构分解
Map<String, Integer> videoCountMap = videoInfoList.stream()
.collect(Collectors.groupingBy(
VideoInfo::getUserId,
Collectors.summingInt(item -> {
String redisKey = Constants.REDIS_KEY_VIDEO_PLAY_COUNT + statisticsDate + ":" + item.getVideoId();
Integer count = videoPlayCountMap.get(redisKey);
return count == null ? 0 : count;
})
));
二、逐层解析
1. 数据准备阶段
• 输入:videoInfoList
(视频信息列表,包含videoId
和userId
等字段)
• 目标:生成Map<用户ID, 该用户所有视频的总播放量>
2. Stream流水线
videoInfoList.stream()
将List转换为Stream,准备进行流式处理。
3. 核心收集器
Collectors.groupingBy(
VideoInfo::getUserId, // 分组依据:用户ID
Collectors.summingInt(...) // 聚合方式:求和
)
• groupingBy
:按用户ID分组,生成Map<String, List<VideoInfo>>
• summingInt
:对每组内的元素进行整数求和
4. 播放量计算逻辑
item -> {
// 重组Redis key:video_play_count:20230815:video123
String redisKey = Constants.REDIS_KEY_VIDEO_PLAY_COUNT
+ statisticsDate + ":" + item.getVideoId();
// 从预加载的Map获取播放量
Integer count = videoPlayCountMap.get(redisKey);
// 空值保护(没有记录则视为0次播放)
return count == null ? 0 : count;
}
三、关键技术点
1. 嵌套收集器
• 外层:groupingBy
按用户分组
• 内层:summingInt
对播放量求和
形成两级聚合操作。
2. 实时Key重组
Constants.REDIS_KEY_VIDEO_PLAY_COUNT + statisticsDate + ":" + item.getVideoId()
动态拼接Redis key,与之前从Redis加载数据时的key格式严格一致:
• 常量前缀:video_play_count:
• 日期分区:20230815
• 视频ID::video123
3. 空值防御
count == null ? 0 : count
处理可能存在的:
• Redis中无该视频记录
• 视频刚上传尚未有播放数据
四、内存数据流演示
假设原始数据:
videoInfoList = [
{videoId: "v1", userId: "u1"},
{videoId: "v2", userId: "u1"},
{videoId: "v3", userId: "u2"}
]
videoPlayCountMap = {
"video_play_count:20230815:v1": 100,
"video_play_count:20230815:v2": 50,
"video_play_count:20230815:v3": 200
}
执行过程:
- 流处理开始
- 遇到v1(用户u1)→ 计算播放量100 → u1分组累计100
- 遇到v2(用户u1)→ 计算播放量50 → u1分组累计150
- 遇到v3(用户u2)→ 计算播放量200 → u2分组累计200
最终结果:
{u1=150, u2=200}
五、设计优势
-
高效聚合
单次遍历完成分组+求和,时间复杂度O(n) -
内存友好
流式处理避免中间集合的创建 -
可维护性
清晰表达业务逻辑:
“按用户分组,求和每个用户所有视频的播放量” -
扩展性
如需修改统计逻辑(如求平均值),只需替换summingInt
为其他收集器
六、潜在优化方向
1. 并行处理(大数据量时)
videoInfoList.parallelStream()...
注意线程安全和顺序保证
2. 缓存Key构建
// 预先生成videoId到播放量的映射,避免重复拼接字符串
Map<String, Integer> videoIdToCount = ...;
3. 异常处理增强
try {
Integer count = videoPlayCountMap.get(redisKey);
return Optional.ofNullable(count).orElse(0);
} catch (Exception e) {
log.warn("播放量统计异常 videoId:{}", item.getVideoId());
return 0;
}
这段代码展示了如何优雅地结合Stream API和集合操作,实现复杂的数据聚合统计,是Java函数式编程的典型实践。