pyspark 从postgresql读取数据
因为我安装的是spark 3.5.6,所以需要安装
pip install pyspark==3.5.6
Pyspark从postgresql读数据
import time
from pyspark.sql import DataFrame, SparkSession,DataFrameReader
from pyspark.sql.functions import to_json, structspark: SparkSession.Builder = SparkSession.buildersession: SparkSession = spark.appName("Python Spark SQL data source example") \.config("spark.jars", r"C:\Users\84977\Downloads\postgresql-42.7.6.jar") \.master("spark://192.168.220.132:7077")\.getOrCreate()last_max_id = 0 # 保存上次读取的最大IDpage_size= 2while True:query = f"""(SELECT * FROM public.complexjsonWHERE id > {last_max_id}ORDER BY id ASCLIMIT {page_size}) as t"""df: DataFrame = session.read \.format("jdbc") \.option("url", "jdbc:postgresql://192.168.220.130:32222/postgresdb") \.option("dbtable", query) \.option("driver", "org.postgresql.Driver") \.option("user", "postgresadmin") \.option("password", "admin123") \.load()if df.count() > 0:df.show(truncate=False)json_df = df.select(to_json(struct("*")).alias("json"))for row in json_df.collect():print(row["json"])# 更新 last_max_idmax_id = df.agg({"id": "max"}).collect()[0][0]last_max_id = max_idtime.sleep(10) # 每10秒轮询一次