当前位置: 首页 > news >正文

作业03-SparkSQL开发

作业03-SparkSQL开发


一、数据读取与转换操作

  1. 从HDFS路径/dw/accounts读取accounts数据文件,并选取id、address(第6列)字段,创建DataFrame,名为acctdf(字段名分别为userid、address)。
val acct = sc.textFile("/dw/accounts/*")val acctdf = acctRDD.map(line => {val fields = line.split(",")(fields(0), fields(5)) // 提取id和address
}).toDF("userid", "address")acctdf.printSchema()
acctdf.show(2)

从HDFS路径/dw/accounts/读取所有文件,创建RDD[String]
每行是一个字符串,代表一条账户记录对每条记录进行映射转换:
使用,分割字符串
提取第1个字段(id)和第6个字段(address)首先输入的是line,经过split被分割程数组fields
再将fields(0)当做key,fields(5)当做value输出为一个pairRdd将RDD转换为DataFrame,并命名列名为"userid"和"address"
在调用 toDF() 之后,它就变成了一个普通的DataFrame,不再保留键值对的特性。

    转换过程:第一行: "1,2008-10-23...,2275 Washburn Street,..."
    split(",") → Array("1", "2008-10-23...", ..., "2275 Washburn Street", ...)
    取第0个和第5个元素 → ("1", "2275 Washburn Street")
    1. 上传日志数据$DATA_EXERCISE/weblogs到HDFS的/dw/weblogs目录,从HDFS路径/dw/weblogs读取日志数据,提取用户id(第3个字段),统计每个用户的请求次数,生成(userid, reqs)的RDD。
    2. 将上述RDD转换为DataFrame,名为userdf,字段名为userid、reqs。
    // 2. 处理weblogs数据,创建userdf DataFrame
    val logsRDD = sc.textFile("/dw/weblogs")
    // 提取用户id(第3个字段)并统计每个用户的请求次数
    val userReqRDD = logsRDD.map(line => {val fields = line.split(" ")(fields(2), 1) // (userid, 1)
    }).reduceByKey(_ + _) // 按userid聚合计算总请求数// 将RDD转换为DataFrame
    val userdf = spark.createDataFrame(userReqRDD.map{case (userid, reqs) => Row(userid, reqs)}, StructType(Seq(StructField("userid", StringType, false),StructField("reqs", IntegerType, false)))// 显示userdf结构
    userdf.printSchema()
    userdf.show(2)
    对每条日志记录进行映射转换:
    使用空格分割字符串
    提取第3个字段(userid)作为键,值为1
    使用reduceByKey对相同userid的记录进行求和,得到每个用户的总请求数map 操作:
    输入:logsRDD 的每一行 line(假设是日志字符串)
    对每行按空格分割成数组 fields
    提取 fields(2) 作为 key(假设是 userid),1 作为 value
    → 生成一个 PairRDD[(String, Int)],形式为 (userid, 1)
    reduceByKey 操作:
    对相同 userid(key)的所有 1(value)进行求和(_ + _)
    → 最终得到每个 userid 的总请求次数,形式为 (userid, total_requests)
    结果:
    userReqRDD 是一个不可变的 PairRDD,内容为 (userid, 总请求数)。将RDD转换为DataFrame:
    使用Row对象封装每行数据
    定义Schema:userid(字符串类型,非空)和reqs(整数类型,非空)map{case (userid, reqs) => Row(userid, reqs)}
    对 userReqRDD 的每个键值对进行映射,将 (userid, reqs) 转换为 Spark 的 Row 对象。
    Row 是 DataFrame 中一行数据的容器,类似于数据库中的一行记录。
    例如:("user1", 5) → Row("user1", 5)

    代码的等价简化写法

    如果觉得 Row 和 StructType 的写法较复杂,可以用更简洁的方式实现相同功能:

    val userdf = userReqRDD.toDF("userid", "reqs")

    转换过程:
    第一行: "34.28.1.122 - 65255 [01/Mar/2014..."
    split(" ") → Array("34.28.1.122", "-", "65255", "[01/Mar/2014...", ...)
    取第2个元素 → ("65255", 1)
    对相同userid的值求和 → ("65255", 2)

    提示:

    使用spark.createDataFrame实现将RDD转换为DataFrame

    使用toDF来更名


    二、数据关联与分析操作

    1. 将userdf与acctdf按用户id进行关联,生成包含userid、reqs、address字段的新DataFrame。
    2. 对关联结果进行过滤,仅保留请求次数大于5次的用户。
    3. 按照reqs字段降序排列并显示出前10行数据
    // 1. 关联两个 DataFrame
    val joinedDF = userdf.join(acctdf, "userid")// 2. 过滤请求次数 > 5 的用户
    val filteredDF = joinedDF.filter("reqs > 5")// 3. 按 reqs 降序排序,并显示前 10 行
    val resultDF = filteredDF.orderBy($"reqs".desc).limit(10)
    resultDF.show()


    http://www.dtcms.com/a/270097.html

    相关文章:

  1. 数字化校园升级:传统网络架构与SD-WAN智能方案对比详解
  2. 汽车功能安全-软件单元验证 (Software Unit Verification)【定义、目的、要求建议】6
  3. 【数据分析】基于 HRS 数据的多变量相关性分析与可视化
  4. uniapp b树
  5. C++笔记之使用bitset对uint32_t类型变量对位状态判断
  6. 2025年深圳杉川机器人性格测评和Verify测评SHL题库高分攻略
  7. 论文略读:Parameter-efficient transfer learning for NLP
  8. InstructBLIP:迈向具备指令微调能力的通用视觉语言模型
  9. Go语言标识符命名规则详解:工程化实践
  10. Spring的依赖注入(xml)
  11. RISC-V:开源芯浪潮下的技术突围与职业新赛道 (一)为什么RISC-V是颠覆性创新?
  12. 安装 asciidoctor-vscode 最新版
  13. 针对 SSD 固态硬盘的安全擦除 Secure Erase
  14. Kotlin协程中的Job详解
  15. 如何用Python编程计算权重?
  16. Anolis OS 23 架构支持家族新成员:Anolis OS 23.3 版本及 RISC-V 预览版发布
  17. 数据库设计精要:完整性和范式理论
  18. 去掉长按遥控器power键后提示关机、飞行模式的弹窗
  19. 数据提取之lxml模块与xpath工具
  20. 基于Java+SpringBoot 协同过滤算法私人诊所管理系统
  21. 系统架构设计师论文分享-论系统安全设计
  22. IoTDB:专为物联网场景设计的高性能时序数据库
  23. 把word中表格转成excle文件
  24. 基于GeoTools的根据Shp文件生成完全包围格网实战
  25. Oracle 存储过程、函数与触发器
  26. AI标注平台label-studio之二添加机器学习后端模型辅助标注
  27. vue3官方文档学习心得
  28. SpringCloud系列 - Gateway 网关功能(五)
  29. 人体坐姿检测系统开发实战(YOLOv8+PyTorch+可视化)
  30. 本地部署 R 语言环境运行软件 RStudio Server 并实现外部访问