作业03-SparkSQL开发
作业03-SparkSQL开发
一、数据读取与转换操作
- 从HDFS路径
/dw/accounts
读取accounts数据文件,并选取id、address(第6列)字段,创建DataFrame,名为acctdf(字段名分别为userid、address)。
val acct = sc.textFile("/dw/accounts/*")val acctdf = acctRDD.map(line => {val fields = line.split(",")(fields(0), fields(5)) // 提取id和address
}).toDF("userid", "address")acctdf.printSchema()
acctdf.show(2)
从HDFS路径/dw/accounts/读取所有文件,创建RDD[String]
每行是一个字符串,代表一条账户记录对每条记录进行映射转换:
使用,分割字符串
提取第1个字段(id)和第6个字段(address)首先输入的是line,经过split被分割程数组fields
再将fields(0)当做key,fields(5)当做value输出为一个pairRdd将RDD转换为DataFrame,并命名列名为"userid"和"address"
在调用 toDF() 之后,它就变成了一个普通的DataFrame,不再保留键值对的特性。
转换过程:第一行: "1,2008-10-23...,2275 Washburn Street,..."
split(",") → Array("1", "2008-10-23...", ..., "2275 Washburn Street", ...)
取第0个和第5个元素 → ("1", "2275 Washburn Street")
- 上传日志数据
$DATA_EXERCISE/weblogs
到HDFS的/dw/weblogs
目录,从HDFS路径/dw/weblogs
读取日志数据,提取用户id(第3个字段),统计每个用户的请求次数,生成(userid, reqs)的RDD。 - 将上述RDD转换为DataFrame,名为userdf,字段名为userid、reqs。
// 2. 处理weblogs数据,创建userdf DataFrame
val logsRDD = sc.textFile("/dw/weblogs")
// 提取用户id(第3个字段)并统计每个用户的请求次数
val userReqRDD = logsRDD.map(line => {val fields = line.split(" ")(fields(2), 1) // (userid, 1)
}).reduceByKey(_ + _) // 按userid聚合计算总请求数// 将RDD转换为DataFrame
val userdf = spark.createDataFrame(userReqRDD.map{case (userid, reqs) => Row(userid, reqs)}, StructType(Seq(StructField("userid", StringType, false),StructField("reqs", IntegerType, false)))// 显示userdf结构
userdf.printSchema()
userdf.show(2)
对每条日志记录进行映射转换:
使用空格分割字符串
提取第3个字段(userid)作为键,值为1
使用reduceByKey对相同userid的记录进行求和,得到每个用户的总请求数map 操作:
输入:logsRDD 的每一行 line(假设是日志字符串)
对每行按空格分割成数组 fields
提取 fields(2) 作为 key(假设是 userid),1 作为 value
→ 生成一个 PairRDD[(String, Int)],形式为 (userid, 1)
reduceByKey 操作:
对相同 userid(key)的所有 1(value)进行求和(_ + _)
→ 最终得到每个 userid 的总请求次数,形式为 (userid, total_requests)
结果:
userReqRDD 是一个不可变的 PairRDD,内容为 (userid, 总请求数)。将RDD转换为DataFrame:
使用Row对象封装每行数据
定义Schema:userid(字符串类型,非空)和reqs(整数类型,非空)map{case (userid, reqs) => Row(userid, reqs)}
对 userReqRDD 的每个键值对进行映射,将 (userid, reqs) 转换为 Spark 的 Row 对象。
Row 是 DataFrame 中一行数据的容器,类似于数据库中的一行记录。
例如:("user1", 5) → Row("user1", 5)
代码的等价简化写法
如果觉得 Row
和 StructType
的写法较复杂,可以用更简洁的方式实现相同功能:
val userdf = userReqRDD.toDF("userid", "reqs")
转换过程:
第一行: "34.28.1.122 - 65255 [01/Mar/2014..."
split(" ") → Array("34.28.1.122", "-", "65255", "[01/Mar/2014...", ...)
取第2个元素 → ("65255", 1)
对相同userid的值求和 → ("65255", 2)
提示:
使用spark.createDataFrame实现将RDD转换为DataFrame
使用toDF来更名
二、数据关联与分析操作
- 将userdf与acctdf按用户id进行关联,生成包含userid、reqs、address字段的新DataFrame。
- 对关联结果进行过滤,仅保留请求次数大于5次的用户。
- 按照reqs字段降序排列并显示出前10行数据
// 1. 关联两个 DataFrame
val joinedDF = userdf.join(acctdf, "userid")// 2. 过滤请求次数 > 5 的用户
val filteredDF = joinedDF.filter("reqs > 5")// 3. 按 reqs 降序排序,并显示前 10 行
val resultDF = filteredDF.orderBy($"reqs".desc).limit(10)
resultDF.show()