Spark读取MySQL数据库表
官方地址:JDBC To Other Databases - Spark 4.0.0 Documentation
官方案例:
// Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
// Loading data from a JDBC source
val jdbcDF = spark.read.format("jdbc").option("url", "jdbc:postgresql:dbserver").option("dbtable", "schema.tablename").option("user", "username").option("password", "password").load()val connectionProperties = new Properties()
connectionProperties.put("user", "username")
connectionProperties.put("password", "password")
val jdbcDF2 = spark.read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
// Specifying the custom data types of the read schema
connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
val jdbcDF3 = spark.read.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)// Saving data to a JDBC source
jdbcDF.write.format("jdbc").option("url", "jdbc:postgresql:dbserver").option("dbtable", "schema.tablename").option("user", "username").option("password", "password").save()jdbcDF2.write.jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)// Specifying create table column data types on write
jdbcDF.write.option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)").jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
验证:
<dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.13.16</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.13</artifactId><version>4.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.13</artifactId><version>4.0.0</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>3.8.1</version><scope>test</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.15</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.13</artifactId><version>4.0.0</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.13</artifactId><version>4.0.0</version></dependency></dependencies>
import org.apache.spark.SparkConfobject SparkCase04 {import org.apache.spark.sql.SparkSessiondef main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[1]").setAppName("WC")val spark = SparkSession.builder().config(conf).getOrCreate()var df = spark.read.format("jdbc").option("url", "jdbc:mysql://node11:3306/wjobs?useSSL=false").option("dbtable", "user").option("user", "root").option("password", "root123").load()df.show()}}