当前位置: 首页 > news >正文

RDD案例数据清洗

在 Spark 中,RDD(Resilient Distributed Dataset)是分布式数据集的基本抽象。数据清洗是数据预处理中的一个重要步骤,通常包括去除重复数据、过滤无效数据、转换数据格式等操作。以下是一个使用 RDD 进行数据清洗的完整示例。

示例场景

假设我们有一个包含用户信息的文本文件 users.txt,每行是一个用户记录,格式如下:

user1,25,China
user2,30,USA
user3,invalid,Australia
user4,22,China
user5,28,USA
user6,35,invalid

我们需要对数据进行清洗,包括:

  1. 过滤掉无效的年龄数据(非数字或不在合理范围)。
  2. 过滤掉无效的国家数据(只保留指定的国家,如 ChinaUSA)。
  3. 去除重复的用户记录。

实现步骤

  1. 创建 SparkContext:初始化 Spark 环境。
  2. 读取数据:从文件中加载数据到 RDD。
  3. 数据清洗:过滤无效数据和重复数据。
  4. 保存结果:将清洗后的数据保存到文件。

以下是完整的代码实现:

import org.apache.spark.{SparkConf, SparkContext}object DataCleaning {def main(args: Array[String]): Unit = {// 初始化 Spark 环境val conf = new SparkConf().setAppName("DataCleaning").setMaster("local[*]") // 使用本地模式运行val sc = new SparkContext(conf)// 读取数据val inputPath = "path/to/users.txt"val rawData = sc.textFile(inputPath)// 数据清洗val cleanedData = rawData.map(line => line.split(",")) // 将每行数据分割为数组.filter(arr => arr.length == 3) // 确保每行有三个字段.filter(arr => {// 过滤无效年龄数据val age = try {arr(1).toInt} catch {case _: NumberFormatException => -1}age >= 18 && age <= 100 // 假设年龄范围为 18 到 100}).filter(arr => {// 过滤无效国家数据val country = arr(2)country == "China" || country == "USA"}).map(arr => (arr(0), arr(1), arr(2))) // 转换为元组.distinct() // 去除重复记录// 保存清洗后的数据val outputPath = "path/to/cleaned_users.txt"cleanedData.saveAsTextFile(outputPath)// 停止 SparkContextsc.stop()}
}

代码说明

  1. 初始化 Spark 环境

    • 使用 SparkConf 配置 Spark 应用程序的名称和运行模式(本地模式)。
    • 创建 SparkContext 实例。
  2. 读取数据

    • 使用 sc.textFile 方法从指定路径加载数据到 RDD。
  3. 数据清洗

    • 使用 map 方法将每行数据分割为数组。
    • 使用 filter 方法过滤无效的年龄数据和国家数据。
    • 使用 distinct 方法去除重复记录。
  4. 保存结果

    • 使用 saveAsTextFile 方法将清洗后的数据保存到指定路径。

示例输入和输出

输入文件 users.txt
user1,25,China
user2,30,USA
user3,invalid,Australia
user4,22,China
user5,28,USA
user6,35,invalid
user1,25,China
输出文件 cleaned_users.txt
user1,25,China
user2,30,USA
user4,22,China
user5,28,USA

运行项目

  1. 将上述代码保存为 DataCleaning.scala 文件。
  2. 在 IntelliJ IDEA 中运行该程序。
  3. 查看输出文件 cleaned_users.txt,确保数据清洗结果正确。

通过以上步骤,你可以使用 Spark 的 RDD API 完成数据清洗任务。

相关文章:

  • Maven 动态插件配置:Profile的灵活集成实践
  • PowerShell 实现 conda 懒加载
  • 新建一个reactnative 0.72.0的项目
  • 【神经网络与深度学习】局部最小值和全局最小值
  • Python中元组(Tuple)使用详解和注意事项
  • 微服务的“导航系统”:使用Spring Cloud Eureka实现服务注册与发现
  • Qt6.5.3 windows下安装教程
  • 微信小程序的开发及问题解决
  • 力扣-226.翻转二叉树
  • Linux基础 -- 用户态Generic Netlink库高性能接收与回调框架
  • 免费实用的远程办公方案​
  • 基于RT-Thread的STM32F4开发第三讲——DAC
  • flinksql实践(从kafka读数据)
  • GZip+Base64压缩字符串在ios上解压报错问题解决(安卓、PC模拟器正常)
  • 基于FPGA的视频接口之千兆网口(七GigE)
  • C++—特殊类设计设计模式
  • 【Linux学习笔记】理解一切皆文件实现原理和文件缓冲区
  • 文件同步2
  • 用 VS Code / PyCharm 编写你的第一个 Python 程序
  • aardio - 虚表 —— vlistEx.listbar2 多层菜单演示
  • 宝通科技:与宇树合作已签约,四足机器人在工业场景落地是重点商业化项目
  • 韩国总统选战打响:7人角逐李在明领跑,执政党临阵换将陷入分裂
  • 菲律宾举行中期选举
  • 中美经贸高层会谈在日内瓦结束,中国代表团将举行发布会
  • 首映丨纪录电影《滚烫年华》:献给所有奋斗者
  • 上汽享道出行完成13亿元C轮融资,已启动港股IPO计划