JDBC-MySQL数据库连接与使用
JDBC-MySQL数据库连接与使用
1.虚拟机Ubuntu操作系统上
1.1 下载JDBC-MySQL数据库连接器
-
首先检查MySQL Server的版本,输入以下语句:
hadoop@ubuntuvm1:~$ mysql --version mysql Ver 8.0.40-0ubuntu0.24.04.1 for Linux on x86_64 ((Ubuntu)) -
下载与MySQL Server相兼容版本的JDBC-MySQL数据库连接器,下载网址:JDBC-MySQL数据库连接器下载官网->Archive->Operating System:Platform Independent(即Java平台)->选择JDBC的版本->选择tar.gz文件,这里下载
mysql-connector-j-8.4.0.tar.gz版本,并将其放在共享文件夹E:\Linux\VirtualBox_VMs\vmshare。 -
MySQL server8.0版本及以上需下载JDBC-MySQL8.0数据库连接器以上的版本
1.2 安装JDBC
1.以 hadoop 用户登录虚拟机,首先将操作系统更新到最新。
sudo apt update -y && sudo apt upgrade -y
2.解压缩mysql-connector-j-8.4.0.tar.gz。
# 解压缩。cd /opt/apptar zxf /media/sf_vmshare/mysql-connector-j-8.4.0.tar.gz
3.将解压缩的文件中的mysql-connector-j-8.4.0.jar放到/opt/app/spark-3.5.3-bin-hadoop3/jars/中.
4.至此PySpark中即可读写MySQL数据库。
2.通过JDBC-MySQL连接器连接数据库
2.1 在jupyter中读取文件,例如:
from pyspark.sql import SparkSession# Spark session & context
spark = SparkSession.builder.master("MySQL Example").getOrCreate()
sc = spark.sparkContext# Reading CSV file from /sparkdata folder
csv_path = "/home/hadoop/datafile/androidapps.csv" # This will read any CSV file in the /sparkdata folder
df = spark.read.csv(csv_path, header=True, inferSchema=True) # Assuming the CSV has a header# Show the DataFrame
df.show(10)
2.2 在jupyter中查看数据的模式
#查看df的模式df.printSchema()
输出:
root#(13)|-- App: string (nullable = true)|-- Category: string (nullable = true)|-- Rating: string (nullable = true)|-- Reviews: string (nullable = true)|-- Size: string (nullable = true)|-- Installs: double (nullable = true)|-- Type: string (nullable = true)|-- Price: double (nullable = true)|-- Content Rating: string (nullable = true)|-- Genres: string (nullable = true)|-- Last Updated: string (nullable = true)|-- Current Ver: string (nullable = true)|-- Android Ver: string (nullable = true)
2.3 在MySQL数据库中创建数据表
- 进入MySQL server服务器
hadoop@ubuntuvm01:~$ sudo mysql -u root[sudo] password for hadoop:Welcome to the MySQL monitor. Commands end with ; or \g.Your MySQL connection id is 8Server version: 8.0.40-0ubuntu0.24.04.1 (Ubuntu)
- 在MySQL数据库中创建数据表
#创建数据库
CREATE DATABASE IF NOT EXISTS myandroidapps DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_general_ci;use myandroidapps;
#创建数据表
drop table if exists androidapps;#如果该表存在则删除
create table androidapps
(App varchar(20) not null,Category varchar(20) not null,Rating varchar(20) not null,Reviews varchar(20) not null,Size varchar(20) not null,Installs double not null,Type varchar(20) not null,Price double not null,ContentRating varchar(20) not null,Genres varchar(20) not null,LastUpdated varchar(20) not null,likes varchar(512) not null,CurrentVer varchar(20) not null,AndroidVer varchar(20) not null,primary key (App)
)charset = utf8mb4 ;
2.4 在jupyter中往数据库中写数据
#定义一个函数,以每一个分区来存储数据
def partition_to_mysql(partition):#建立数据库连接import pymysqlconnection = pymysql.connect(#connection用于连接数据库 host='127.0.0.1',#IP地址user='root',#用户名password='123456',#密码为123456database='myandroidapps',#数据库名字charset='utf8'#编码为UTF8)cursor = connection.cursor()#cursor可以执行SQL语句#批量插入数据#replace新增或更新,如果存在就更新,不存在就新增,其中占位符%s,就算是数字也是它。insert_query = "replace INTO androidapps (App,Category,Rating,Reviews,Size,Installs,Type,Price,ContentRating,Genres,LastUpdated,CurrentVer,AndroidVer) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)"#新建,将数据全放到列表里,这是一个列表推导式,在partition的每个row写成这种形式(一个元组及其当中的属性),这样写是应为数据量不大,数据量大的话可以100条的插入data_to_insert=[(row['App'],row['Category'],row['Rating'],row['Reviews'],row['Size'],row['Installs'],row['Type'],row['Price'],row['ContentRating'],row['Genres'],row['LastUpdated'],row['CurrentVer'],row['AndroidVer']) for row in partition]#执行批量插入cursor.executemany(第一个是sql,第二个是数据)cursor.executemany(insert_query,data_to_insert)connection.commit()#提交执行#关闭连接cursor.close()connection.close()#repartition(100)分为100个分区,100个任务同时插入数据,foreachPartition(partition_to_mysql)对每个partition执行这个函数
df.repartition(100).foreachPartition(partition_to_mysql)#执行完毕即可将数据插入到数据库的数据表中
2.5 在jupyter中查询数据库的数据
jdbc_url = "jdbc:mysql://127.0.0.1:3306/myandroidapps"#注意数据库名字mytest
connection_properties = {"user":"root","password":"123456","driver":"com.mysql.cj.jdbc.Driver"#加载驱动
}#创建零时表
sql = "(select * from androidapps) as aas"
#从数据库中读取数据
df = spark.read.jdbc(url=jdbc_url,table = sql,properties=connection_properties)df.show(5)
本文参考链接:
- 共享文件夹设置参考链接:https://blog.csdn.net/2301_77987130/article/details/153210246?spm=1011.2415.3001.5331
- hadoop 用户创建参考链接:https://blog.csdn.net/2301_77987130/article/details/153742680?spm=1011.2415.3001.5331
- jupyter 安装参考链接:https://blog.csdn.net/2301_77987130/article/details/153276491?spm=1011.2415.3001.5331
