Spark处理过程-案例数据清洗
(一)需求说明
准备十条符合包含用户信息的文本文件,每行格式为 姓名,年龄,性别,需要清洗掉年龄为空或者非数字的行。
例如:
张三,25,男
李四,,女
王五,30,男
赵六,a,女
孙七,35,男
周八,40,女
吴九,abc,男
郑十,45,女
王十,50,男
李二,55,女
(二)思路分析
- 读入文件
- 对每一行数据进行分析
- 字段拆分,拆分出年龄这个字段
- 判断
- 如果它不是数字或者缺失,则忽略这条数据
- 否则保存
(三) 代码展示
import org.apache.spark.{SparkConf, SparkContext}
object DataCleaning {
def main(args: Array[String]): Unit = {
// 创建 SparkConf 对象
val conf = new SparkConf().setAppName("DataCleaning").setMaster("local[*]")
// 创建 SparkContext 对象
val sc = new SparkContext(conf)
// 读取文本文件,创建 RDD
val inputFile = "input/file.txt"
val lines = sc.textFile(inputFile)
// 数据清洗操作
val cleanedLines = lines.filter(line => { // 使用filter算子
val fields = line.split(",")
if (fields.length == 3) {
val age = fields(1).trim
age.matches("\\d+")
} else {
false
}
})
// 输出清洗后的数据
cleanedLines.collect().foreach(println)
// 停止 SparkContext
sc.stop()
}
}
拓展:如何把清洗之后的数据保存到一个文件中。
可以使用coalesce(1)这个方法可以让结果全部保存在一个文件中。
代码如下:
val singlePartitionRDD = cleanedLines.coalesce(1)
// 保存清洗后的数据到文件
val outputPath = "path/to/your/output/file.txt"
singlePartitionRDD.saveAsTextFile(outputPath)
// 停止 SparkContext
sc.stop()