当前位置: 首页 > news >正文

Spark SQL 读取 CSV 文件,并将数据写入 MySQL 数据库

在 Spark 中,可以使用 Spark SQL 读取 CSV 文件,并将数据写入 MySQL 数据库。以下是一个完整的示例,展示如何实现这一过程。

环境准备

  1. 安装 MySQL:确保 MySQL 数据库已安装并运行。
  2. 创建 MySQL 数据库和表
    CREATE DATABASE sparkdb;
    USE sparkdb;CREATE TABLE users (id INT AUTO_INCREMENT PRIMARY KEY,name VARCHAR(50),age INT,country VARCHAR(50)
    );
    
  3. 下载 MySQL JDBC 驱动
    • 从 MySQL 官方网站 下载 MySQL JDBC 驱动(mysql-connector-java-x.x.xx.jar)。
    • 将下载的 JAR 文件放置在 Spark 的 jars 目录下(例如 spark-3.3.0/jars/)。

示例代码

以下是一个完整的 Scala 示例代码,展示如何读取 CSV 文件并将其写入 MySQL 数据库:

import org.apache.spark.sql.{SparkSession, DataFrame}object CsvToMySQL {def main(args: Array[String]): Unit = {// 初始化 SparkSessionval spark = SparkSession.builder.appName("CsvToMySQL").master("local[*]").getOrCreate()// 读取 CSV 文件val csvFilePath = "path/to/users.csv" // 替换为你的 CSV 文件路径val df: DataFrame = spark.read.option("header", "true") // 第一行是表头.option("inferSchema", "true") // 自动推断数据类型.csv(csvFilePath)// 查看读取的数据df.show()// 配置 MySQL 数据库连接信息val jdbcUrl = "jdbc:mysql://localhost:3306/sparkdb"val jdbcUser = "root" // 替换为你的 MySQL 用户名val jdbcPassword = "password" // 替换为你的 MySQL 密码val jdbcTable = "users"// 将数据写入 MySQL 数据库df.write.format("jdbc").option("url", jdbcUrl).option("dbtable", jdbcTable).option("user", jdbcUser).option("password", jdbcPassword).mode("append") // 如果表已存在,追加数据.save()// 停止 SparkSessionspark.stop()}
}

示例 CSV 文件

假设你的 CSV 文件 users.csv 内容如下:

name,age,country
Alice,25,China
Bob,30,USA
Charlie,35,Japan
David,40,Germany

运行步骤

  1. 保存代码:将上述代码保存为 CsvToMySQL.scala 文件。
  2. 编译和运行
    • 使用 SBT 或 Maven 构建项目。
    • 在 IntelliJ IDEA 中运行程序。
  3. 验证结果
    • 登录到 MySQL 数据库,检查 sparkdb 数据库中的 users 表,确保数据已正确插入。

注意事项

  1. CSV 文件路径:确保 csvFilePath 指向正确的 CSV 文件路径。
  2. MySQL 用户名和密码:替换为你的实际 MySQL 用户名和密码。
  3. JDBC 驱动:确保 MySQL JDBC 驱动已正确放置在 Spark 的 jars 目录下。
  4. 数据模式:在写入数据库时,mode("append") 表示追加数据。如果需要覆盖表,可以使用 mode("overwrite")
  5. 性能优化:对于大规模数据写入,可以考虑使用批量插入(batchsize)等优化选项。

通过以上步骤,你可以使用 Spark SQL 读取 CSV 文件,并将数据写入 MySQL 数据库。

相关文章:

  • 【认知思维】过度自信效应:高估自我能力的认知偏差
  • 【Pandas】pandas DataFrame cumprod
  • PostgreSQL 服务器信号函数
  • ZYNQ实战:可编程差分晶振Si570的配置与应用指南
  • 安卓刷机模式详解:Fastboot、Fastbootd、9008与MTK深刷
  • 项目:博客系统——基于SSM框架Mybatis-plus
  • 基于 Spring Boot 瑞吉外卖系统开发(十三)
  • Vxworks 系统详解
  • 装饰器在Python中的作用及在PyTorchMMDetection中的实战应用
  • 我国城市轨道交通行业人工智能大模型发布,迈向智慧化新征程​
  • 本地的ip实现https访问-OpenSSL安装+ssl正式的生成(Windows 系统)
  • Java【10_1】用户注册登录(面向过程与面向对象)
  • tomcat搭建内网论坛
  • 【论信息系统项目的资源管理】
  • docker大镜像优化实战
  • 专题三:穷举vs暴搜vs深搜vs回溯vs剪枝(全排列)决策树与递归实现详解
  • 企业如何构建安全高效的数据合规体系?
  • python使用OpenCV 库将视频拆解为帧并保存为图片
  • 问题及解决02-处理后的图像在坐标轴外显示
  • 用自写的jQuery库+Ajax实现了省市联动
  • 宝通科技:与宇树合作已签约,四足机器人在工业场景落地是重点商业化项目
  • 人民日报仲音:大力纠治违规吃喝顽瘴痼疾
  • 科普|揭秘女性压力性尿失禁的真相
  • 人大新闻教育70年丨16759门课程里的时代密码
  • 中方发布会:中美经贸高层会谈氛围是坦诚的、深入的、具有建设性的
  • 乘联分会:上半年车市价格竞争温和,下半年价格战或再开启