当前位置: 首页 > news >正文

在scala中使用sparkSQL连接MySQL并添加新数据

以下是使用 Spark SQL(Scala)连接 MySQL 并添加新数据的完整代码示例:

scala

import org.apache.spark.sql.SparkSessionobject MySQLSparkExample {def main(args: Array[String]): Unit = {// 创建SparkSessionval spark = SparkSession.builder.appName("MySQLDataInsertExample").config("spark.master", "local[*]").config("spark.jars.packages", "mysql:mysql-connector-java:8.0.26").getOrCreate()// 配置MySQL连接参数val jdbcUrl = "jdbc:mysql://localhost:3306/your_database?useSSL=false"val connectionProperties = new java.util.Properties()connectionProperties.setProperty("user", "your_username")connectionProperties.setProperty("password", "your_password")connectionProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver")try {// 1. 读取现有数据示例val existingData = spark.read.jdbc(jdbcUrl, "employees", connectionProperties)println("现有数据:")existingData.show()// 2. 创建要添加的新数据import spark.implicits._val newData = Seq((1001, "John Doe", "Engineering", 5000.0),(1002, "Jane Smith", "Marketing", 6000.0)).toDF("id", "name", "department", "salary")// 3. 将新数据追加到MySQL表newData.write.mode("append").jdbc(jdbcUrl, "employees", connectionProperties)println("数据添加成功!")// 4. 验证添加后的数据val updatedData = spark.read.jdbc(jdbcUrl, "employees", connectionProperties)println("添加后的数据:")updatedData.show()} catch {case e: Exception =>println(s"操作失败: ${e.getMessage}")e.printStackTrace()} finally {// 关闭SparkSessionspark.stop()}}
}

关键配置说明:

  1. 依赖配置

    scala

    .config("spark.jars.packages", "mysql:mysql-connector-java:8.0.26")
    
  2. JDBC URL 格式

    scala

    jdbc:mysql://<host>:<port>/<database>?useSSL=false
    
  3. 写入模式

    • append:追加数据(不会删除现有数据)
    • overwrite:覆盖表(先删除再插入)
    • errorIfExists:如果表存在则报错(默认)

执行步骤:

  1. 确保 MySQL 服务已启动
  2. 创建测试表:

    sql

    CREATE TABLE employees (id INT PRIMARY KEY,name VARCHAR(50),department VARCHAR(50),salary DOUBLE
    );
    
  3. 运行 Spark 应用:

    bash

    spark-submit --class MySQLSparkExample \--master local[*] \--packages mysql:mysql-connector-java:8.0.26 \your-application.jar
    

注意事项:

  1. 替换数据库连接参数:

    • your_database
    • your_username
    • your_password
  2. 如果遇到时区问题,可在 URL 中添加:

    scala

    ?serverTimezone=UTC
    
  3. 确保 MySQL 用户有写入权限:

    sql

    GRANT INSERT ON your_database.employees TO 'your_username'@'%';
    
  4. 对于生产环境,建议:

    • 使用连接池(如 HikariCP)
    • 启用 SSL 加密
    • 配置适当的重试机制
    • 监控数据库连接状态

相关文章:

  • 需求跟踪矩阵准确性的5大策略
  • java使用 FreeMarker 模板生成包含图片的 `.doc` 文件
  • 《数据库原理》部分习题解析
  • MySQL——八、SQL优化
  • 精简大语言模型:用于定制语言模型的自适应知识蒸馏
  • 商业航天运动控制系统中的高可靠性芯片解决方案:挑战、策略与应用研究
  • 每周靶点分享:Nectin-4、CDH6及文献分享
  • Deno、Bun、Node.js 性能对比与选型指南
  • Linux进程信号处理(26)
  • Axure高级交互设计:文本框循环赋值实现新增、修改和查看
  • Codis集群搭建和集成使用的详细步骤示例
  • Chrome浏览器离线版安装包下载
  • TensorFlow之微分求导
  • spark-cache模式
  • Java基础 5.13
  • SQL 中 INSTR 函数简介及 截取地址应用
  • 125.在 Vue3 中使用 OpenLayers 实现通过 WebGLVector 的方式添加海量点
  • Deepseek+Xmind:秒速生成思维导图与流程图
  • HTML、CSS 和 JavaScript 基础知识点
  • Tomcat和Nginx的主要区别
  • 深圳拟出让3宗居住用地,共计用地面积6.77公顷
  • 外交部:反对美方人士发表不负责任谬论
  • 黄仕忠丨戏曲文献研究之回顾与展望
  • 地下5300米开辟“人造气路”,我国页岩气井垂深纪录再刷新
  • 从600名外到跻身大满贯,孙发京:走过的路成就了现在的我
  • 青海规范旅游包车行为:不得引导外省籍旅游包车违规驻地运营