Spark SQL 读取 CSV 文件,并将数据写入 MySQL 数据库
在 Spark 中,可以使用 Spark SQL 读取 CSV 文件,并将数据写入 MySQL 数据库。以下是一个完整的示例,展示如何实现这一过程。
环境准备
- 安装 MySQL:确保 MySQL 数据库已安装并运行。
- 创建 MySQL 数据库和表:
CREATE DATABASE sparkdb; USE sparkdb;CREATE TABLE users (id INT AUTO_INCREMENT PRIMARY KEY,name VARCHAR(50),age INT,country VARCHAR(50) );
- 下载 MySQL JDBC 驱动:
- 从 MySQL 官方网站 下载 MySQL JDBC 驱动(
mysql-connector-java-x.x.xx.jar
)。 - 将下载的 JAR 文件放置在 Spark 的
jars
目录下(例如spark-3.3.0/jars/
)。
- 从 MySQL 官方网站 下载 MySQL JDBC 驱动(
示例代码
以下是一个完整的 Scala 示例代码,展示如何读取 CSV 文件并将其写入 MySQL 数据库:
import org.apache.spark.sql.{SparkSession, DataFrame}object CsvToMySQL {def main(args: Array[String]): Unit = {// 初始化 SparkSessionval spark = SparkSession.builder.appName("CsvToMySQL").master("local[*]").getOrCreate()// 读取 CSV 文件val csvFilePath = "path/to/users.csv" // 替换为你的 CSV 文件路径val df: DataFrame = spark.read.option("header", "true") // 第一行是表头.option("inferSchema", "true") // 自动推断数据类型.csv(csvFilePath)// 查看读取的数据df.show()// 配置 MySQL 数据库连接信息val jdbcUrl = "jdbc:mysql://localhost:3306/sparkdb"val jdbcUser = "root" // 替换为你的 MySQL 用户名val jdbcPassword = "password" // 替换为你的 MySQL 密码val jdbcTable = "users"// 将数据写入 MySQL 数据库df.write.format("jdbc").option("url", jdbcUrl).option("dbtable", jdbcTable).option("user", jdbcUser).option("password", jdbcPassword).mode("append") // 如果表已存在,追加数据.save()// 停止 SparkSessionspark.stop()}
}
示例 CSV 文件
假设你的 CSV 文件 users.csv
内容如下:
name,age,country
Alice,25,China
Bob,30,USA
Charlie,35,Japan
David,40,Germany
运行步骤
- 保存代码:将上述代码保存为
CsvToMySQL.scala
文件。 - 编译和运行:
- 使用 SBT 或 Maven 构建项目。
- 在 IntelliJ IDEA 中运行程序。
- 验证结果:
- 登录到 MySQL 数据库,检查
sparkdb
数据库中的users
表,确保数据已正确插入。
- 登录到 MySQL 数据库,检查
注意事项
- CSV 文件路径:确保
csvFilePath
指向正确的 CSV 文件路径。 - MySQL 用户名和密码:替换为你的实际 MySQL 用户名和密码。
- JDBC 驱动:确保 MySQL JDBC 驱动已正确放置在 Spark 的
jars
目录下。 - 数据模式:在写入数据库时,
mode("append")
表示追加数据。如果需要覆盖表,可以使用mode("overwrite")
。 - 性能优化:对于大规模数据写入,可以考虑使用批量插入(
batchsize
)等优化选项。
通过以上步骤,你可以使用 Spark SQL 读取 CSV 文件,并将数据写入 MySQL 数据库。