当前位置: 首页 > wzjs >正文

做网站几个步骤河南企业建站系统信息

做网站几个步骤,河南企业建站系统信息,旅行社网站 模板,闸北品牌网站建设在 Spark 中,创建 DataFrame 的方式多种多样,可根据数据来源、结构特性及性能需求灵活选择。 一、创建 DataFrame 的 12 种核心方式 1. 从 RDD 转换(需定义 Schema) import org.apache.spark.sql.{Row, SparkSession} import o…

在 Spark 中,创建 DataFrame 的方式多种多样,可根据数据来源、结构特性及性能需求灵活选择。

一、创建 DataFrame 的 12 种核心方式

1. 从 RDD 转换(需定义 Schema)
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._val spark = SparkSession.builder().master("local").getOrCreate()
val sc = spark.sparkContext// 创建RDD
val rdd = sc.parallelize(Seq((1, "Alice", 25),(2, "Bob", 30)
))// 方式1:通过StructType手动定义Schema
val schema = StructType(Seq(StructField("id", IntegerType, nullable = false),StructField("name", StringType, nullable = true),StructField("age", IntegerType, nullable = true)
))// 将RDD转换为Row RDD
val rowRDD = rdd.map(t => Row(t._1, t._2, t._3))// 应用Schema创建DataFrame
val df1 = spark.createDataFrame(rowRDD, schema)// 方式2:使用样例类(Case Class)自动推断Schema
case class Person(id: Int, name: String, age: Int)
val df2 = rdd.map(t => Person(t._1, t._2, t._3)).toDF()
2. 从 CSV 文件读取
// 基础读取
val csvDF = spark.read.csv("path/to/file.csv")// 高级选项
val csvDF = spark.read.option("header", "true")          // 第一行为表头.option("inferSchema", "true")     // 自动推断数据类型.option("delimiter", ",")          // 指定分隔符.option("nullValue", "NULL")       // 指定空值标识.option("dateFormat", "yyyy-MM-dd")// 指定日期格式.csv("path/to/file.csv")
3. 从 JSON 文件读取
// 基础读取
val jsonDF = spark.read.json("path/to/file.json")// 多Line JSON
val multiLineDF = spark.read.option("multiLine", "true").json("path/to/multi-line.json")// 从JSON字符串RDD创建
val jsonRDD = sc.parallelize(Seq("""{"name":"Alice","age":25}""","""{"name":"Bob","age":30}"""
))
val jsonDF = spark.read.json(jsonRDD)
4. 从 Parquet 文件读取(Spark 默认格式)
// 基础读取
val parquetDF = spark.read.parquet("path/to/file.parquet")// 读取多个路径
val multiPathDF = spark.read.parquet("path/to/file1.parquet", "path/to/file2.parquet"
)// 分区过滤(仅读取符合条件的分区)
val partitionedDF = spark.read.parquet("path/to/table/year=2023/month=05")
5. 从 Hive 表查询
// 创建支持Hive的SparkSession
val spark = SparkSession.builder().appName("HiveExample").config("hive.metastore.uris", "thrift://localhost:9083").enableHiveSupport().getOrCreate()// 查询Hive表
val hiveDF = spark.sql("SELECT * FROM employees")// 创建临时视图
spark.sql("CREATE TEMP VIEW temp_table AS SELECT * FROM employees")
val viewDF = spark.table("temp_table")
6. 从 JDBC 连接读取
// 连接MySQL
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/mydb").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "employees").option("user", "root").option("password", "password").option("fetchsize", "1000")  // 控制每次读取的行数.option("numPartitions", "4") // 并行读取的分区数.load()// 带条件查询
val conditionDF = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/mydb").option("query", "SELECT * FROM employees WHERE department = 'IT'").load()
7. 从内存集合手动构建
// 方式1:使用createDataFrame + 元组
val data = Seq((1, "Alice", 25),(2, "Bob", 30)
)
val df = spark.createDataFrame(data).toDF("id", "name", "age")// 方式2:使用createDataFrame + Row + Schema
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._val rows = Seq(Row(1, "Alice", 25),Row(2, "Bob", 30)
)val schema = StructType(Seq(StructField("id", IntegerType, nullable = false),StructField("name", StringType, nullable = true),StructField("age", IntegerType, nullable = true)
))val df = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)// 方式3:使用toDF(需导入隐式转换)
import spark.implicits._
val df = Seq((1, "Alice"),(2, "Bob")
).toDF("id", "name")
8. 从其他数据源(Avro、ORC 等)
// 从Avro文件读取(需添加Avro依赖)
val avroDF = spark.read.format("avro").load("path/to/file.avro")// 从ORC文件读取
val orcDF = spark.read.orc("path/to/file.orc")// 从HBase读取(需使用连接器)
val hbaseDF = spark.read.format("org.apache.spark.sql.execution.datasources.hbase").option("hbase.table", "mytable").load()
9. 从 Kafka 流创建(结构化流)
val kafkaDF = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092").option("subscribe", "topic1").option("startingOffsets", "earliest").load()// 解析JSON消息
import org.apache.spark.sql.functions._
val parsedDF = kafkaDF.select(from_json(col("value").cast("string"), schema).as("data")).select("data.*")
10. 从现有 DataFrame 转换
val originalDF = spark.read.csv("data.csv")// 重命名列
val renamedDF = originalDF.withColumnRenamed("oldName", "newName")// 添加计算列
val newDF = originalDF.withColumn("agePlus10", col("age") + 10)// 过滤数据
val filteredDF = originalDF.filter(col("age") > 25)// 连接两个DataFrame
val joinedDF = df1.join(df2, Seq("id"), "inner")
11. 从 SparkSession.range 创建数字序列
// 创建从0到9的整数DataFrame
val rangeDF = spark.range(10)  // 生成单列"id"的DataFrame// 指定起始值和结束值
val customRangeDF = spark.range(5, 15)  // 生成5到14的整数// 指定步长和分区数
val steppedDF = spark.range(0, 100, 5, 4)  // 步长为5,4个分区
12. 从空 DataFrame 创建(指定 Schema)
import org.apache.spark.sql.types._// 定义Schema
val schema = StructType(Seq(StructField("id", IntegerType, nullable = false),StructField("name", StringType, nullable = true)
))// 创建空DataFrame
val emptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], schema)// 检查是否为空
if (emptyDF.isEmpty) {println("DataFrame is empty!")
}

二、创建 DataFrame 的方式总结图

三、创建 DataFrame 的性能与场景对比

创建方式适用场景性能特点Schema 要求
RDD 转换已有 RDD,需结构化处理需手动定义 Schema,性能取决于分区和数据量必须手动定义(或通过样例类)
CSV/JSON 文件从外部文件加载数据CSV 需解析,性能中等;JSON 需解析结构,大规模数据时较慢CSV 需手动指定,JSON 可自动推断
Parquet 文件大数据量存储与查询(Spark 默认格式)性能最优(列存储 + 压缩 + Schema)自带 Schema,无需额外定义
Hive 表 / JDBC连接外部数据源取决于数据源性能,需处理网络 IO从数据源获取 Schema
手动构建(内存数据)测试或小规模数据数据直接在内存,性能高,但数据量受驱动节点内存限制需手动定义或通过样例类推断
Kafka 流(结构化流)实时数据处理流式处理,持续生成 DataFrame需定义消息格式(如 JSON Schema)
DataFrame 转换基于现有 DataFrame 进行列操作、过滤、连接等变换依赖于父 DataFrame 的性能,转换操作本身开销较小继承或修改原有 Schema

四、最佳实践建议

  1. 优先使用 Parquet

    • 对于中间数据存储和大规模查询,Parquet 格式性能最优。
  2. 避免频繁 Schema 推断

    • CSV/JSON 读取时,若数据量大且 Schema 固定,手动定义 Schema 可提升性能。
  3. 利用样例类简化开发

    • 从 RDD 或内存集合创建 DataFrame 时,使用样例类自动推断 Schema 可减少代码量。
  4. 合理配置 JDBC 参数

    • 通过numPartitionsfetchsize控制并行度,避免数据倾斜。
  5. 流式数据预解析

    • 从 Kafka 读取数据时,尽早解析 JSON/CSV 消息,避免后续重复解析。

文章转载自:

http://Ekdxv12o.pqjzr.cn
http://UEXbH4uc.pqjzr.cn
http://YqqJIho1.pqjzr.cn
http://cFqt2yjK.pqjzr.cn
http://Mi6N5obx.pqjzr.cn
http://2BHg9inj.pqjzr.cn
http://mdVs4E0x.pqjzr.cn
http://zVsDLgqn.pqjzr.cn
http://sezS1d2x.pqjzr.cn
http://GqSDVtoG.pqjzr.cn
http://f5oASHvW.pqjzr.cn
http://SISoBVC8.pqjzr.cn
http://0ikUlEp9.pqjzr.cn
http://THP0IySJ.pqjzr.cn
http://EIAFACDN.pqjzr.cn
http://PTzjE9lB.pqjzr.cn
http://RbWswYLR.pqjzr.cn
http://iFVSwFDF.pqjzr.cn
http://5bxTOYR1.pqjzr.cn
http://CJXzuFfL.pqjzr.cn
http://tB2nZFDK.pqjzr.cn
http://Cwgbudm0.pqjzr.cn
http://2lSk3Sm7.pqjzr.cn
http://xZvwFOLP.pqjzr.cn
http://LoW7s32t.pqjzr.cn
http://8iJxsfan.pqjzr.cn
http://QqEQ5pn4.pqjzr.cn
http://HEzk7Ilq.pqjzr.cn
http://vYQwRosT.pqjzr.cn
http://QHsg9NAH.pqjzr.cn
http://www.dtcms.com/wzjs/624087.html

相关文章:

  • 微网站平台怎样做网站盘锦做网站谁家好
  • 燃气行业网站建设方案重庆森林为什么不能看
  • 网站技术建设方案个人网站设计师
  • 二级网站排名做不上去茶叶网站建设要求
  • 合肥做网站价格是多少网络门店管理系统
  • 网站特效代码html免费投票网站制作
  • 广东做网站公司有哪些linux增加网站
  • 站库设计网站官网成都编程培训机构排名
  • 网站页面设计稿设计师网络平台
  • 微信网站合同网站的开发流程
  • 学校网站开发系统的背景珠海医疗网站建设公司
  • 仙桃有哪些做网站的公司兰州网络优化
  • 带有flash的网站甘肃网站设计公司
  • 昆明做网站软件wordpress 删除的模板
  • 做旅行网站的意义挣钱网站一小时两百
  • 国内的足彩网站怎么做的互动营销经典案例
  • 网站建设公司的小程序选择什么永久打开本网站的
  • 怎么免费建自己的网站展厅设计装修公司
  • visual c 网站开发江门网站建设系统
  • 阿里巴巴网站建设基础服务网店运营推广的概念
  • 做网站新乡互联网公司经营范围
  • app开发 网站开发统称做电商网站注意什么问题
  • 网站 专题建设服务中国wordpress
  • 政务网站集约化建设推进情况卖摄影作品的网站
  • 四川省城市建设培训中心 网站网站建设结论及体会
  • 三亚住房和城乡建设厅网站织梦网站背景
  • 永久免费网站空间旅游公网站如何做
  • 网站平台推广旅游网站开发分析报告
  • 网站建设具备哪些技术人员狮城app更多网站
  • php网站的部署在线图片制作生成