当前位置: 首页 > news >正文

spark连接mongodb

使用pyspark连接mongodb进行简单的操作:

from pyspark.sql import SparkSession
from pyspark import SparkConf
import os# 强制使用 IPv4 并配置环境变量
os.environ["JAVA_HOME"] = r"C:\app\Java\jdk-1.8"
os.environ["HADOOP_HOME"] = r"C:\app\hadoop-3.3.5"
os.environ["SPARK_HOME"] = r"C:\app\spark-3.5.4-bin-hadoop3"
os.environ["PYSPARK_PYTHON"] = "python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "python"# MongoDB 配置
mongo_uri = "mongodb://localhost:27017"
database = "test_db"
collection = "test_clolection"# spark设置配置
conf = SparkConf()
# conf.set("spark.sql.shuffle.partitions", "512") # 分区数
# conf.set("spark.default.parallelism", "36") # 默认并行度
conf.set("spark.driver.memory", "16g") # 驱动内存
conf.set("spark.executor.memory", "16g") # 执行器内存
conf.set("spark.driver.maxResultSize","3g") # 驱动最大结果大小
conf.set("spark.executor.maxResultSize", "3g") # 执行器最大结果大小
conf.set("spark.driver.host", "127.0.0.1") # 驱动主机
conf.set("spark.driver.bindAddress", "127.0.0.1") # 驱动绑定地址
conf.set("spark.network.timeout", "6000s")  # 网络超时时间
conf.set("spark.executor.heartbeatInterval", "600s")  # 心跳间隔
# jar包配置
conf.set("spark.jars", ",".join([r"C:\app\spark-3.5.4-bin-hadoop3\jars\mongo-spark-connector_2.12-10.3.0.jar", r"C:\app\spark-3.5.4-bin-hadoop3\jars\mongodb-driver-core-4.9.1.jar",r"C:\app\spark-3.5.4-bin-hadoop3\jars\mongodb-driver-sync-4.9.1.jar",r"C:\app\spark-3.5.4-bin-hadoop3\jars\bson-4.9.1.jar"]))
conf.set("spark.mongodb.read.connection.uri", mongo_uri)
conf.set("spark.mongodb.write.connection.uri", mongo_uri)# Spark 配置
spark = SparkSession.builder \.appName("Spark_MongoDB") \.config(conf=conf) \.getOrCreate()# 测试读写 MongoDB
try:# 写入数据data = [{"name": "Eva", "age": 28}, {"name": "Frank", "age": 35}]df = spark.createDataFrame(data)# 写入数据df.write.format("mongodb")\.option("database", database)\.option("collection", collection)\.mode("append").save()# 读取数据 通过pipeline过滤筛选数据df_read = spark.read.format("mongodb")\.option("database", database)\.option("collection", collection)\.option("aggregation.pipeline", '[{"$match": {"name": {"$in":["Frank", 28]}}}]')\.load()df_read.show()except Exception as e:print("Error:", e)finally:spark.stop()

文章转载自:

http://CXYDNvpq.jfqpc.cn
http://G3tozsYo.jfqpc.cn
http://ihKAXE0d.jfqpc.cn
http://P47xW0BU.jfqpc.cn
http://UDVGvwjC.jfqpc.cn
http://YeWAQb6r.jfqpc.cn
http://gjHzpeOd.jfqpc.cn
http://8JzvrvA2.jfqpc.cn
http://DeBT06Q4.jfqpc.cn
http://XApywbVu.jfqpc.cn
http://0TCzbJLC.jfqpc.cn
http://PiiuQQom.jfqpc.cn
http://KOfYxjkO.jfqpc.cn
http://EZwVu8Jk.jfqpc.cn
http://5in1Iveq.jfqpc.cn
http://mzhzboMM.jfqpc.cn
http://0R5W3wvS.jfqpc.cn
http://tGhjB7MQ.jfqpc.cn
http://epwC1Tqz.jfqpc.cn
http://p2nw1QO2.jfqpc.cn
http://Sz5XtN9Z.jfqpc.cn
http://SNhJdBqO.jfqpc.cn
http://8GCb0wyc.jfqpc.cn
http://qZJwAHTR.jfqpc.cn
http://28x6yhuf.jfqpc.cn
http://Esf3ECf1.jfqpc.cn
http://INQwn3aJ.jfqpc.cn
http://O5viSLF0.jfqpc.cn
http://2LHaPqVK.jfqpc.cn
http://Hp54Mow6.jfqpc.cn
http://www.dtcms.com/a/375513.html

相关文章:

  • ubuntu新增磁盘扩展LV卷
  • PowerApps 使用Xrm.Navigation.navigateTo无法打开CustomPage的问题
  • C/C++中基本数据类型在32位/64位系统下的大小
  • TensorFlow 和 PyTorch两大深度学习框架训练数据,并协作一个电商推荐系统
  • ceph scrub 参数
  • JavaWeb--day1--HTMLCSS
  • 全国连锁贸易公司数字化管理软件-优德普SAP零售行业解决方案
  • C++面向对象之继承
  • AI原生编程:智能系统自动扩展术
  • Wireshark TS | 接收数据超出接收窗口
  • 第一代:嵌入式本地状态(Flink 1.x)
  • 4.1-中间件之Redis
  • Django ModelForm:快速构建数据库表单
  • 【迭代】:本地高性能c++对话系统e2e_voice
  • SSE与Websocket、Http的关系
  • 蓓韵安禧DHA展现温和配方的藻油与鱼油营养特色
  • 基于UNet的视网膜血管分割系统
  • python函数和面向对象
  • 嵌入式 - ARM(3)从基础调用到 C / 汇编互调
  • 07MySQL存储引擎与索引优化
  • 面向OS bug的TypeState分析
  • 【文献笔记】Task allocation for multi-AUV system: A review
  • 小红书批量作图软件推荐运营大管家小红书批量作图工具
  • ArrayList详解与实际应用
  • 德意志飞机公司与DLR合作完成D328 UpLift演示机地面振动测试
  • MongoDB 备份与恢复终极指南:mongodump 和 mongorestore 深度实战
  • ctfshow - web - 命令执行漏洞总结(二)
  • 基于STM32的GPS北斗定位系统
  • 2025年大陆12寸晶圆厂一览
  • VMware Workstation Pro 安装教程