读入csv文件写入MySQL
### 使用 Spark RDD 读取 CSV 文件并写入 MySQL 的实现方法
#### 1. 环境准备
在使用 Spark 读取 CSV 文件并写入 MySQL 数据库之前,需要确保以下环境已配置完成:
- 添加 Maven 依赖项以支持 JDBC 连接。
- 配置 MySQL 数据库连接参数,包括 URL、用户名和密码。
示例 `pom.xml` 中的依赖项:
```xml
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.30</version>
</dependency>
```
---
#### 2. 读取 CSV 文件
使用 Spark RDD 读取 CSV 文件时,可以通过 `textFile` 方法加载文件内容,并将其解析为结构化的数据。以下是具体步骤:
##### (1)加载 CSV 文件
通过 `sc.textFile` 方法加载 CSV 文件的内容。每行数据将以字符串的形式存在 RDD 中。
示例代码:
```scala
val csvRDD = sc.textFile("path/to/csvfile.csv")
```
##### (2)解析 CSV 行
由于 CSV 文件通常是以逗号分隔的多列数据,因此需要对每一行进行分割处理。可以使用 `split` 方法将字符串拆分为数组,并进一步映射为元组或自定义对象。
示例代码:
```scala
case class Record(col1: String, col2: String, col3: String)
val parsedRDD = csvRDD.map(line => {
val columns = line.split(",")
Record(columns(0), columns(1), columns(2))
})
```
---
#### 3. 写入 MySQL 数据库
将解析后的 RDD 数据写入 MySQL 数据库时,需借助 JDBC 接口完成操作。以下是具体的实现步骤:
##### (1)建立数据库连接属性
设置 MySQL 数据库的连接参数,包括 URL、用户名和密码。
示例代码:
```scala
import java.util.Properties
val mysqlUrl = "jdbc:mysql://localhost:3306/database_name"
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "password")
```
##### (2)将 RDD 转换为 DataFrame
为了更方便地与 MySQL 数据库交互,建议先将 RDD 转换为 DataFrame。这一步可通过隐式转换完成。
示例代码:
```scala
import spark.implicits._
val df = parsedRDD.map(record => (record.col1, record.col2, record.col3)).toDF("col1", "col2", "col3")
```
##### (3)写入数据到 MySQL
使用 DataFrame 的 `write.jdbc` 方法将数据写入指定的 MySQL 表中。
示例代码:
```scala
df.write.mode("append").jdbc(mysqlUrl, "table_name", props)
```
---
#### 4. 完整代码示例
以下是完整的代码示例,展示了如何从 CSV 文件读取数据并写入 MySQL 数据库。
```scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.{SQLContext, SaveMode}
import java.util.Properties
object CsvToMysqlExample {
case class Record(col1: String, col2: String, col3: String)
def main(args: Array[String]): Unit = {
// 初始化 Spark Context
val conf = new SparkConf().setAppName("CsvToMysql").setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// 读取 CSV 文件
val csvRDD = sc.textFile("path/to/csvfile.csv")
// 解析 CSV 数据
val parsedRDD = csvRDD.map(line => {
val columns = line.split(",")
Record(columns(0), columns(1), columns(2))
})
// 将 RDD 转换为 DataFrame
val df = parsedRDD.toDF("col1", "col2", "col3")
// 设置 MySQL 数据库连接参数
val mysqlUrl = "jdbc:mysql://localhost:3306/database_name"
val props = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "password")
// 写入数据到 MySQL
df.write.mode(SaveMode.Append).jdbc(mysqlUrl, "table_name", props)
// 关闭 Spark Context
sc.stop()
}
}
```
---
### 总结
上述流程涵盖了从 CSV 文件读取数据并写入 MySQL 数据库的主要步骤:
1. **加载 CSV 文件**:通过 `textFile` 方法获取原始数据。
2. **解析 CSV 数据**:使用 `split` 和 `map` 方法将数据转换为结构化格式。
3. **转换为 DataFrame**:利用隐式转换功能将 RDD 映射为 DataFrame。
4. **写入 MySQL 数据库**:调用 `write.jdbc` 方法完成数据存储。
此方案充分利用了 Spark 的分布式计算能力和 JDBC 支持,适用于大规模数据场景下的 ETL 处理任务。
---