当前位置: 首页 > news >正文

Spark mapreduce 的一个用法

完整流程示例

假设原始数据:

Seq(Record(seqId = "A", operationTime = "2023-09-15 10:00:00", ...),Record(seqId = "A", operationTime = "2023-09-15 11:00:00", ...), // 这个更新Record(seqId = "B", operationTime = "2023-09-15 09:00:00", ...),Record(seqId = "B", operationTime = "2023-09-15 12:00:00", ...)  // 这个更新
)

处理过程:

  1. keyBy 之后:

    ("A", Record(A, 10:00))
    ("A", Record(A, 11:00))
    ("B", Record(B, 09:00))
    ("B", Record(B, 12:00))

  2. reduceByKey 之后:

    ("A", Record(A, 11:00))  // 保留时间更晚的
    ("B", Record(B, 12:00))  // 保留时间更晚的
  3. values 之后:

    Record(A, 11:00)
    Record(B, 12:00)

实际应用场景

这种模式非常常见于:

场景 1:数据去重,保留最新版本

// 处理用户数据更新,每个用户只保留最新的记录
userUpdates.keyBy(_.userId).reduceByKey((old, new) => if (new.updateTime > old.updateTime) new else old).values

场景 2:日志处理,保留最后一条日志

// 处理系统日志,每个会话只保留最后一条日志
logs.keyBy(_.sessionId).reduceByKey((log1, log2) => if (log1.timestamp > log2.timestamp) log1 else log2).values

场景 3:状态同步,获取最新状态

// 从多个数据源同步设备状态,取最新的状态
deviceStates.keyBy(_.deviceId).reduceByKey((state1, state2) => if (state1.lastUpdate > state2.lastUpdate) state1 else state2).values

注意事项

1. 时间格式一致性

确保 operationTime 是可比较的(如时间戳、DateTime 或格式统一的字符串):

// 如果 operationTime 是字符串,需要确保格式统一
// 更好的做法是使用时间戳或 DateTime 类型

2. 处理相等的情况

当前代码在时间相同时会选择第二个记录(r2),如果需要特定行为可以修改:

.reduceByKey((r1, r2) => {if (r1.operationTime > r2.operationTime) r1else if (r1.operationTime < r2.operationTime) r2else {// 时间相同时的其他逻辑,比如选择先处理的记录或其他字段比较if (r1.otherField > r2.otherField) r1 else r2}
})

3. 性能考虑

对于大数据集,这种操作可能会产生 shuffle,建议:

  • 在 reduceByKey 前先进行过滤或预处理

  • 考虑使用合适的 partitioning

  • 对于超大数据集,可以考虑其他优化策略

替代写法

使用 groupByKey 的替代方案(但性能较差):

data.groupBy(_.seqId).mapValues(records => records.maxBy(_.operationTime)).values

但 reduceByKey 版本通常性能更好,因为它在 map 端进行了组合器操作。

总结

这段代码是一个经典的"按键分组并取最大值"模式,特别适用于:

  • 数据去重:去除重复记录,保留最新版本

  • 状态聚合:合并多个状态更新,获取最终状态

  • 日志处理:处理重复日志条目

这种模式在大数据处理中非常常见且实用。


文章转载自:

http://jUJNouS7.zpmmn.cn
http://kj7XTrJl.zpmmn.cn
http://CnhFtz9B.zpmmn.cn
http://fq6oJIy2.zpmmn.cn
http://4GA0w0uu.zpmmn.cn
http://OFjqIKb3.zpmmn.cn
http://qIxvoL1p.zpmmn.cn
http://xvJ6o3Yu.zpmmn.cn
http://4JV4n9uC.zpmmn.cn
http://zKY4bE0w.zpmmn.cn
http://HETVa5Fy.zpmmn.cn
http://r8RuKKh0.zpmmn.cn
http://JVzC3Dm4.zpmmn.cn
http://Zq0Mc9N6.zpmmn.cn
http://ONlymkhn.zpmmn.cn
http://52LMWqBJ.zpmmn.cn
http://oGGIdyoy.zpmmn.cn
http://8z52Ttah.zpmmn.cn
http://OF5JFWtH.zpmmn.cn
http://LKaLzQQ4.zpmmn.cn
http://kQ0NsgPm.zpmmn.cn
http://lMi5rav8.zpmmn.cn
http://nEijFmvo.zpmmn.cn
http://2pa5VySK.zpmmn.cn
http://zWTlsHL8.zpmmn.cn
http://wFnvQD7z.zpmmn.cn
http://moKO9mqu.zpmmn.cn
http://f347FaIv.zpmmn.cn
http://SyRrjHfJ.zpmmn.cn
http://kUqVKnGL.zpmmn.cn
http://www.dtcms.com/a/371306.html

相关文章:

  • [iOS] push 和 present Controller 的区别
  • 五.贪心算法
  • vue中axios与fetch比较
  • 【iOS】block复习
  • 打造第二大脑读书笔记目录
  • 【Docker】Docker基础
  • 一、CMake基础
  • 【音视频】WebRTC P2P、SFU 和 MCU 架构
  • VBA 自动转化sheet到csv文件
  • rabbitmq 重试机制
  • 《C++进阶之STL》【set/map 使用介绍】
  • 【RabbitMQ】----初识 RabbitMQ
  • WebRTC开启实时通信新时代
  • JVM-默背版
  • Java内存区域与内存溢出
  • Python3使用Flask开发Web项目新手入门开发文档
  • 深入理解跳表:多层索引加速查找的经典实现
  • 从 “Hello AI” 到企业级应用:Spring AI 如何重塑 Java 生态的 AI 开发
  • 大模型架构演进全景:从Transformer到下一代智能系统的技术路径(MoE、Mamba/SSM、混合架构)
  • leetcode 912 排序数组(归并排序)
  • Flutter SDK 安装与国内镜像配置全流程(Windows / macOS / Linux)
  • 【算法】92.反转链表Ⅱ--通俗讲解
  • Spring Cloud Alibaba快速入门02-Nacos(上)
  • Selenium自动化测试
  • B.50.10.11-Spring框架核心与电商应用
  • 芯片ATE测试PAT(Part Average Testing)学习总结-20250916
  • Visual acoustic Field,360+X论文解读
  • Android系统更新系统webview. 2025-09-06
  • Simulink子系统、变体子系统及封装知识
  • 详解 Java 中的 CopyOnWriteArrayList