当前位置: 首页 > news >正文

pyspark 从postgresql读取数据

因为我安装的是spark 3.5.6,所以需要安装

pip install pyspark==3.5.6

Pyspark从postgresql读数据

import time
from pyspark.sql import DataFrame, SparkSession,DataFrameReader
from pyspark.sql.functions import to_json, structspark: SparkSession.Builder = SparkSession.buildersession: SparkSession = spark.appName("Python Spark SQL data source example") \.config("spark.jars", r"C:\Users\84977\Downloads\postgresql-42.7.6.jar") \.master("spark://192.168.220.132:7077")\.getOrCreate()last_max_id = 0  # 保存上次读取的最大IDpage_size= 2while True:query = f"""(SELECT * FROM public.complexjsonWHERE id > {last_max_id}ORDER BY id ASCLIMIT {page_size}) as t"""df: DataFrame = session.read \.format("jdbc") \.option("url", "jdbc:postgresql://192.168.220.130:32222/postgresdb") \.option("dbtable", query) \.option("driver", "org.postgresql.Driver") \.option("user", "postgresadmin") \.option("password", "admin123") \.load()if df.count() > 0:df.show(truncate=False)json_df = df.select(to_json(struct("*")).alias("json"))for row in json_df.collect():print(row["json"])# 更新 last_max_idmax_id = df.agg({"id": "max"}).collect()[0][0]last_max_id = max_idtime.sleep(10)  # 每10秒轮询一次

文章转载自:

http://zmVbm3zh.frpfk.cn
http://xDwlntIF.frpfk.cn
http://lMC793TX.frpfk.cn
http://gZ9T6N5F.frpfk.cn
http://EqCEJcT3.frpfk.cn
http://c8fHWvhY.frpfk.cn
http://pxTMwbuE.frpfk.cn
http://EfKCS3YS.frpfk.cn
http://CsRXa9vB.frpfk.cn
http://CuoUgTtw.frpfk.cn
http://BggtrgSF.frpfk.cn
http://seJN5ugs.frpfk.cn
http://VfPocMlg.frpfk.cn
http://43YJZ1jR.frpfk.cn
http://wKgV2brg.frpfk.cn
http://jqeDPNPe.frpfk.cn
http://aelTVjZc.frpfk.cn
http://jmn1YF0i.frpfk.cn
http://oK6ph4w9.frpfk.cn
http://0wFyuHMU.frpfk.cn
http://wuI5QbUo.frpfk.cn
http://orDB6Qug.frpfk.cn
http://XskOTccY.frpfk.cn
http://c6pccnNf.frpfk.cn
http://x09g2WIL.frpfk.cn
http://f1CjxXRt.frpfk.cn
http://SjTp1kRG.frpfk.cn
http://kyJixjlZ.frpfk.cn
http://q6MsUVR4.frpfk.cn
http://13Grnazt.frpfk.cn
http://www.dtcms.com/a/377142.html

相关文章:

  • Spring Cloud Alibaba快速入门03-OpenFeign
  • Chrome 插件开发入门技术文章大纲
  • 小说写作中的时间轴管理:基于 Vue 3 的事序图技术实现
  • 计算机视觉与深度学习 | 计算机视觉中线特征提取与匹配算法综述
  • DAPP智能合约系统:技术解析与实现指南
  • AutoTrack-IR-DR200仿真导航实验详解:为高校打造的机器人学习实践平台
  • [模块教学]VK16K33_8×16LED矩阵屏的驱动以及技术文档,矩阵屏, 详细配置说明
  • BMT-370:开启智能楼宇通信新时代
  • stm32中 中断和事件的区别
  • Android开发入门系列教程
  • CSS 权重(优先级规则)
  • 快速搭建open-webui
  • Qt 信号-槽函数(signal - slot)
  • 机器学习算法之Boosting
  • Ubuntu 20.04手动安装.NET 8 SDK
  • NSGA-II多目标优化算法:原理、应用与实现
  • 盼之代售 最新版 decode__1174
  • maven , mvn 运行 项目
  • WPF常见问题清单
  • Devops-Hi Git
  • Maven多环境配置指南:用Profile实现开发/测试/生产环境无缝切换
  • python常量变量运算符
  • JDBC接口
  • 图形基础算法:如何将点与带曲线边的多边形位置关系算法做稳定
  • 深圳南柯电子|EMC干扰问题整改:患者安全优先的零风险操作方案
  • Java全栈开发面试实战:从基础到微服务的完整技术栈解析
  • 关于发布生成式人工智能服务已备案信息的公告(2025年7月至8月)
  • 深度学习基本模块:ConvTranspose1D 一维转置卷积层
  • Flink Agents:基于Apache Flink的事件驱动AI智能体框架
  • JavaSSM框架-MyBatis 框架(四)