当前位置: 首页 > news >正文

在scala中sparkSQL连接masql并添加新数据

以下是 Scala 中使用 Spark SQL 连接 MySQL 并添加数据的完整代码示例(纯文本):

 

1. 准备连接参数(需替换实际信息)

 

scala

val jdbcUrl = "jdbc:mysql://localhost:3306/test_db?useUnicode=true&characterEncoding=utf-8"  

val tableName = "users" // 目标表名  

val user = "root"  

val password = "your_password"  

val driverClass = "com.mysql.cj.jdbc.Driver" // MySQL 8+ 驱动类(5.x 用 com.mysql.jdbc.Driver)  

 

 

2. 创建 SparkSession

 

scala

import org.apache.spark.sql.SparkSession  

 

val spark = SparkSession.builder()  

  .appName("Spark SQL MySQL Insert")  

  .master("local[*]") // 单机模式,集群改为 "yarn" 等  

  .getOrCreate()  

 

 

3. 生成待插入数据(示例 DataFrame)

 

scala

import spark.implicits._  

 

// 示例数据:插入两条用户记录(假设表结构为 id INT, name STRING, age INT)  

val newData = Seq(  

  (3, "Alice", 28),  

  (4, "Bob", 30)  

).toDF("id", "name", "age")  

 

 

4. 写入数据到 MySQL(追加模式)

 

scala

newData.write.jdbc(  

  url = jdbcUrl,  

  table = tableName,  

  mode = "append", // 写入模式:append(追加)、overwrite(覆盖)等  

  properties = new java.util.Properties() {{  

    setProperty("user", user)  

    setProperty("password", password)  

    setProperty("driver", driverClass)  

  }}  

)  

 

 

关键说明

 

1. 写入模式(mode):

 

-  append :数据追加到现有表(表需存在)。

 

-  overwrite :覆盖现有表(需注意权限和数据安全)。

 

-  ignore :忽略重复数据(需表有唯一约束)。

 

-  failIfExists :表存在时抛出异常(默认模式)。

 

2. 表结构要求:

 

- 目标表需提前创建,字段类型需与 DataFrame 匹配(如  id  对应  INT , name  对应  VARCHAR )。

 

3. 驱动与版本适配:

 

- 若报  ClassNotFoundException ,检查驱动是否正确部署(通过  --jars  参数或放入  $SPARK_HOME/jars/ )。

 

- MySQL 5.x 和 8.x 驱动类名不同,需对应修改  driverClass 。

 

4. 批量写入优化:

 

- 可添加参数  ?rewriteBatchedStatements=true  到  jdbcUrl  中,提升批量插入性能:

scala

val jdbcUrl = "jdbc:mysql://localhost:3306/test_db?useUnicode=true&characterEncoding=utf-8&rewriteBatchedStatements=true"  

 

 

完整代码整合

 

scala

import org.apache.spark.sql.SparkSession  

import spark.implicits._  

 

object SparkMySQLInsert {  

  def main(args: Array[String]): Unit = {  

    // 连接参数  

    val jdbcUrl = "jdbc:mysql://localhost:3306/test_db?useUnicode=true&characterEncoding=utf-8"  

    val tableName = "users"  

    val user = "root"  

    val password = "your_password"  

    val driverClass = "com.mysql.cj.jdbc.Driver"  

 

    // 创建 SparkSession  

    val spark = SparkSession.builder()  

      .appName("Spark SQL MySQL Insert")  

      .master("local[*]")  

      .getOrCreate()  

 

    // 生成待插入数据  

    val newData = Seq(  

      (3, "Alice", 28),  

      (4, "Bob", 30)  

    ).toDF("id", "name", "age")  

 

    // 写入数据  

    newData.write.jdbc(  

      url = jdbcUrl,  

      table = tableName,  

      mode = "append",  

      properties = new java.util.Properties() {{  

        setProperty("user", user)  

        setProperty("password", password)  

        setProperty("driver", driverClass)  

      }}  

    )  

 

    spark.stop()  

  }  

}  

 

 

执行时需通过  spark-submit  命令提交,并指定 MySQL 驱动包:

 

bash

spark-submit --jars /path/to/mysql-connector-java.jar your_app.jar

相关文章:

  • 分割一切(SAM) 论文阅读:Segment Anything
  • c++ std库中的文件操作学习笔记
  • QEMU模拟32位ARM实现自定义系统调用
  • CodeBuddy 中国版 Cursor 实战:Redis+MySQL双引擎驱动〈王者荣耀〉战区排行榜
  • RAG之大规模解析 PDF 文档全流程实战
  • 网络协议分析 实验四 ICMPv4与ICMPv6
  • web-ui开源程序是建立在浏览器使用的基础上,旨在使 AI 代理可以访问网站
  • MySQL 学习(八)如何打开binlog日志
  • sqli-labs靶场第四关——“)闭合
  • deepseek梳理java高级开发工程师微服务面试题
  • SQL、Oracle 和 SQL Server 的比较与分析
  • 一次讲清 FP32 / FP16 / BF16 / INT8 / INT4
  • MySQL 8.0 OCP(1Z0-908)英文题库(31-40)
  • UI-TARS Desktop:用自然语言操控电脑,AI 重新定义人机交互
  • YOLO11解决方案之物体模糊探索
  • 自然语言生成在商业智能中的应用实践
  • 【工作记录】Kong Gateway 入门篇之部署及简单测试
  • 基于javaweb的SpringBoot爱游旅行平台设计和实现(源码+文档+部署讲解)
  • 【github】主页显示star和fork
  • STM32 __rt_entry
  • 上海浦东机场1号、2号航站楼均推出国内出发安检24小时服务
  • 科学家用AI寻找外星生命
  • 首映丨纪录电影《滚烫年华》:献给所有奋斗者
  • 5.19中国旅游日,上海56家景区景点限时门票半价
  • 家电维修担心遇“李鬼”?上海推动“物业+专业服务”进社区
  • 看展览|2025影像上海艺博会:市场与当代媒介中的摄影