当前位置: 首页 > news >正文

7.7日 实验03-Spark批处理开发(2)

使用Spark处理数据文件

检查数据

检查$DATA_EXERCISE/activations里的数据,每个XML文件包含了客户在指定月份活跃的设备数据。

拷贝数据到HDFS的/dw目录

样本数据示例:

<activations><activation timestamp="1225499258" type="phone"><account-number>316</account-number><device-id>d61b6971-33e1-42f0-bb15-aa2ae3cd8680</device-id><phone-number>5108307062</phone-number><model>iFruit 1</model></activation>…
</activations>

处理文件

读取XML文件并抽取账户号和设备型号,把结果保存到/dw/account-models,格式为account_number:model

输出示例:

1234:iFruit 1
987:Sorrento F00L
4566:iFruit 1
…

提供了解析XML的函数如下:

// Stub code to copy into Spark Shellimport scala.xml._// Given a string containing XML, parse the string, and 
// return an iterator of activation XML records (Nodes) contained in the stringdef getActivations(xmlstring: String): Iterator[Node] = {val nodes = XML.loadString(xmlstring) \\ "activation"nodes.toIterator
}// Given an activation record (XML Node), return the model name
def getModel(activation: Node): String = {(activation \ "model").text
}// Given an activation record (XML Node), return the account number
def getAccount(activation: Node): String = {(activation \ "account-number").text
}

上传数据

# 1. 检查并创建HDFS目录
hdfs dfs -mkdir -p /dw# 2. 将本地数据上传到HDFS(替换$DATA_EXERCISE为实际路径)
hdfs dfs -put $DATA_EXERCISE/activations /dw/# 3. 检查文件是否上传成功
hdfs dfs -ls /dw/activations
定义题目提供的解析函数
def getActivations(xmlstring: String): Iterator[Node] = {(XML.loadString(xmlstring) \\ "activation").toIterator
}def getModel(activation: Node): String = (activation \ "model").text
def getAccount(activation: Node): String = (activation \ "account-number").text
读取数据(像处理日志一样)
val xmlRDD = sc.wholeTextFiles("/dw/activations/*.xml")
测试解析(查看第一条记录)
val firstRecord = getActivations(xmlRDD.first()._2).next()
println(s"测试解析结果: ${getAccount(firstRecord)}:${getModel(firstRecord)}")

处理全部数据
val resultRDD = xmlRDD.flatMap { case (_, xml) => getActivations(xml).map(act => s"${getAccount(act)}:${getModel(act)}")
}
查看结果样例(10条)
resultRDD.take(10).foreach(println)
保存结果(先清理旧数据)
import org.apache.hadoop.fs._
val outputPath = "/dw/account-models"
val fs = FileSystem.get(sc.hadoopConfiguration)
if (fs.exists(new Path(outputPath))) fs.delete(new Path(outputPath), true)resultRDD.saveAsTextFile(outputPath)
println(s"结果已保存到 hdfs://$outputPath")

验证结果(在Linux终端执行)

# 查看输出结果
hdfs dfs -cat /dw/account-models/part-* | head -n 10# 如果需要合并结果到单个文件
hdfs dfs -getmerge /dw/account-models ./account_models.txt
head account_models.txt

http://www.dtcms.com/a/269109.html

相关文章:

  • Playfun即将开启大型Web3线上活动,打造沉浸式GameFi体验生态
  • C++11标准库算法:深入理解std::none_of
  • 低代码平台的性能测试实践与挑战
  • qiankun 微前端项目中的 Token 鉴权方案
  • python dict list 去重
  • 【数据驱动视角下的流体模拟:CFD 与深度学习(GANs/PINN)在圆柱绕流及机翼分析中的应用】
  • Video Background Remover V3版 - AI视频一键抠像/视频换背景 支持50系显卡 一键整合包下载
  • 动手学深度学习13.7. 单发多框检测(SSD)-笔记练习(PyTorch)
  • Pycharm恢复默认设置,配置导致复制粘贴等不能使用
  • 气候大模型的演化路径与产业落地展望:AI重构全球气候科学的新范式
  • 在bash shell 函数传递数组的问题
  • CSS知识复习4
  • 卷积神经网络:卷积层的核心原理与机制
  • MATLAB | 绘图复刻(二十一)| 扇形热图+小提琴图
  • C++11中的std::ratio:编译时有理数运算的艺术
  • 暑假算法日记第三天
  • WebRTC与RTMP
  • iOS App抓包工具排查后台唤醒引发请求异常
  • Python编译器(Pycharm Jupyter)
  • MySql:多表查询——子查询
  • 【应急响应】Linux 自用应急响应工具(LinuxCheckShoot)
  • 腾讯地图 vue3 使用 封装 地图组件
  • 赛事开启|第三届视觉语音识别挑战赛 CNVSRC 2025 启动
  • 自动驾驶ROS2应用技术详解
  • 鸿蒙arkts使用关系型数据库,使用DB Browser for SQLite连接和查看数据库数据?使用TaskPool进行频繁数据库操作
  • Python 异步编程从基础到高级全面指南
  • 模拟数字电路基础-2
  • 初识Neo4j之Cypher(三)
  • leetcode1089.复写零
  • 代码审计-SQL注入