当前位置: 首页 > news >正文

写spark程序数据计算( 数据库的计算,求和,汇总之类的)连接mysql数据库,写入计算结果

1. 添加依赖

在项目的 `pom.xml`(Maven)中添加以下依赖:

```xml

<!-- Spark SQL -->

<dependency>

    <groupId>org.apache.spark</groupId>

    <artifactId>spark-sql_2.12</artifactId>

    <version>3.3.0</version>

</dependency>

 

<!-- MySQL Connector -->

<dependency>

    <groupId>mysql</groupId>

    <artifactId>mysql-connector-java</artifactId>

    <version>8.0.33</version>

</dependency>

代码

import org.apache.spark.sql.{SparkSession, SaveMode}

object SparkMySQLDemo {
  def main(args: Array[String]): Unit = {
    // 创建 SparkSession
    val spark = SparkSession.builder()
      .appName("SparkMySQLDemo")
      .master("local[*]") // 生产环境需改为集群模式,如 yarn
      .config("spark.sql.shuffle.partitions", "5") // 优化分区数
      .getOrCreate()

    // 设置 MySQL 连接参数
    val jdbcUrl = "jdbc:mysql://localhost:3306/your_database"
    val jdbcUsername = "your_username"
    val jdbcPassword = "your_password"

    try {
      // 从 MySQL 读取数据
      val df = spark.read
        .format("jdbc")
        .option("url", jdbcUrl)
        .option("dbtable", "source_table") // 要读取的表名
        .option("user", jdbcUsername)
        .option("password", jdbcPassword)
        .load()

      // 执行计算(示例:按 category 分组求和)
      val resultDF = df.groupBy("category")
        .agg(
          sum("amount").alias("total_amount"),
          count("*").alias("record_count")
        )

      // 打印计算结果(调试用)
      resultDF.show()

      // 将结果写入 MySQL
      resultDF.write
        .format("jdbc")
        .option("url", jdbcUrl)
        .option("dbtable", "result_table") // 目标表名
        .option("user", jdbcUsername)
        .option("password", jdbcPassword)
        .mode(SaveMode.Append) // 写入模式:覆盖/追加
        .save()

      println("数据写入 MySQL 成功!")
    } catch {
      case e: Exception => e.printStackTrace()
    } finally {
      spark.stop()
    }
  }
}

相关文章:

  • 【数据结构】2-3-2 单链表的插入删除
  • JSON Schema 高效校验 JSON 数据格式
  • 翻到了一段2005年写的关于需求的文字
  • ⭐️白嫖的阿里云认证⭐️ 第二弹【课时1:提示词(Prompt)技巧】for 「大模型Clouder认证:利用大模型提升内容生产能力」
  • 软件工具:批量图片区域识别+重命名文件的方法,发票识别和区域选择方法参考,基于阿里云实现
  • HarmonyOS 与 OpenHarmony:同根而不同途
  • Kubernetes控制平面组件:Kubelet详解(六):pod sandbox(pause)容器
  • Kubernetes控制平面组件:Kubelet详解(五):切换docker运行时为containerd
  • 【提高+/省选−】洛谷P1495 —— 【模板】中国剩余定理(CRT)/ 曹冲养猪
  • 游戏引擎学习第291天:跳跃的怪物与占据的树木
  • Linux搜索
  • 【ubuntu24.04】pycharm 死机结束进程
  • 正则表达式 - 语法
  • Trae IDE和VSCode Trae插件初探
  • 第6章 实战案例:基于 STEVAL-IDB011V1 板级 CI/CD 全流程
  • PyTorch音频处理技术及应用研究:从特征提取到相似度分析
  • 中级统计师-统计学基础知识-第三章 参数估计
  • 【沉浸式求职学习day43】【Java面试题精选3】
  • Linux的进程概念
  • Java面试场景:从音视频到AI应用的技术探讨
  • 病愈出院、跳大神消灾也办酒,新华每日电讯:农村滥办酒席何时休
  • 刘小涛任江苏省委副书记
  • 讲座|消逝之钟:《红楼梦》与《布登勃洛克一家》中的时间观
  • 外交部介绍对巴西、阿根廷、智利、秘鲁、乌拉圭等5国试行免签政策
  • 中央军委决定调整组建3所军队院校
  • 上海锦江乐园摩天轮正在拆除中,预计5月底6月初拆完