当前位置: 首页 > news >正文

宿主机运行pyspark任务读取docker hadoop容器上的数据

熬了四个大夜才搞明白,最晚一天熬到早上十点/(ㄒoㄒ)/~~,最后发现只要加个参数就解决了。。。抱头痛哭

问题描述:

Hadoop集群部署在docker容器中,宿主机执行pyspark程序读取hive表

问题一:当master('local[*]')时,docker容器返回给driver端datanode节点的内网ip地址,修改hosts只能将域名转发到ip地址,不能将ip地址转发给ip地址。

问题二:当master('spark://localhost:7077'),因为容器做了端口映射,这里使用的时localhost。driver端为宿主机,spark会把driver端的hostname传到spark集群worker节点上,spark work容器无法识别宿主机hostname

解决方法:

在宿主机配置好hosts,格式为:127.0.0.1 容器hostname(eg:datanode)

问题一:SparkSession加参数config("dfs.client.use.datanode.hostname", "true")//客户端(如 Spark Driver)通过主机名访问 DataNode。

问题二:SparkSession加参数config("spark.driver.host", "192.168.1.5") //宿主机ip地址

就是这么简单。。。哭死(;´༎ຶД༎ຶ`) 

当时试了好多种办法,nginx反向代理、DNSmasq自定义DNS、NPS内网穿透、Macvlan网络模式,SNAT、最后甚至还装了k8s集群外加k8s监控界面,真的哭死。看看现在时间吧,已经4:03了。。。😭😭😭

最后附上完整代码:

import string
import random
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
    # =============================== spark local模式 ===============================
    # 想要读写数据必须要配置windows下的Hadoop环境(不一定,未验证)
    # 想要使用Hive元数据必须添加enableHiveSupport()这行代码
    # 无论spark集群配置文件中有没有配置spark元数据信息,都要在代码工程中配置元数据信息,因为本地读取不到集群中的环境变量,创建hive-site.xml文件或代码中定义config都行
    # 如果不指定spark.sql.warehouse.dir信息则默认为: file:/C:/Users/yelor/spark-warehouse
    # 如果不知道hive.metastore.uris的值则找不到hive元数据,但不会报错,只是无法使用hive元数据
    # spark.sql.warehouse.dir和hive.metastore.uris的值可以在代码工程中配置hive-site.xml文件来指定
    # 容器外需要注意ip地址互通问题,需要配置hosts
    # 如果Hadoop集群部署在docker容器中,dfs.client.use.datanode.hostname=true在本地local模式下必须要加,不然spark会使用datanode的内网ip来通信
    import os
    # 这里可以选择本地win系统的PySpark环境执行pySpark代码,也可以使用虚拟机中PySpark环境,通过os可以配置。
    # os.environ['SPARK_HOME'] = r'D:\software2\spark-3.1.2-bin-hadoop3.2' # 暂时不写也行
    PYSPARK_PYTHON = r"D:\AnacondaCache\envs\mspro\python" #python.exe或者简写为python都行
    # 当存在多个python版本环境时(如使用conda),不指定python环境会报错
    os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
    # 配置指定提交作业至HDFS的用户 不然上传文件到 HDFS 的时候会报权限错误 参考文章:https://zhuanlan.zhihu.com/p/538265736
    # os.environ["HADOOP_USER_NAME"] = "yelor" # 暂时不写也行
    # 在local模式下,如果想使用hive元数据,以下参数是必须要配置的:spark.sql.warehouse.dir、hive.metastore.uris
    spark = SparkSession.builder.\
        appName('udf_define').\
        master('local[*]').\
        config('spark.sql.shuffle.partitions', 2).\
        config('spark.sql.warehouse.dir', 'hdfs://localhost:9000/user/hive/warehouse').\
        config('hive.metastore.uris', 'thrift://localhost:9083').\
        config("spark.executor.memory", "1g").\
        config("spark.driver.memory", "1g").\
        config("dfs.client.use.datanode.hostname", "true").\
        config("spark.driver.host", "192.168.1.5").\
        enableHiveSupport(). \
        getOrCreate() 
    
    
    # =============================== spark master集群模式 ===============================
    # 如果要连接spark集群,需要保证pyspark包的版本与集群的spark版本一致
    # 查看spark版本:spark-submit --version
    # 就算是使用集群执行作业,也必须要配置hive-site.xml文件中的信息,因为还是读取driver端的环境变量
    # 必须添加enableHiveSupport()这行代码
    # spark.driver.host保证代码传到spark容器中时以指定的ip地址为driver地址,不然会使用本机的hostname
    # import os
    # os.environ['PYSPARK_PYTHON']=r"D:\\AnacondaCache\\envs\\mspro\\python.exe"
    # spark = SparkSession.builder.\
    #     appName('udf_define').\
    #     master('spark://localhost:7077').\
    #     config('spark.sql.warehouse.dir', 'hdfs://localhost:9000/user/hive/warehouse').\
    #     config('hive.metastore.uris', 'thrift://localhost:9083').\
    #     config("spark.executor.memory", "512m").\
    #     config("spark.driver.memory", "512m").\
    #     config("spark.driver.host", "192.168.1.5").\
    #     enableHiveSupport(). \
    #     getOrCreate() # 如果spark配置文件中没有配置spark元数据信息,就不能使用enableHiveSupport().\ 直接在代码中配置元数据信息也能脸上hive元数据
        
    sc = spark.sparkContext

    # 设置日志级别为 DEBUG
    # sc.setLogLevel("DEBUG")
    
    # 查看表的存储位置
    warehouse_dir = spark.conf.get("spark.sql.warehouse.dir")
    print(f"Spark SQL warehouse directory: {warehouse_dir}")
    
    # 指定要使用的数据库
    database_name = "tb"
    spark.sql(f"USE {database_name}")

    # 执行 SQL 查询
    # query = "show databases"
    # query = "SELECT * FROM orders"
    # query = "SELECT * FROM students"
    # df = spark.sql(query)

    # # 显示查询结果
    # df.show()
    
    # 生成 10000 条模拟数据
    data = []
    for _ in range(1000):
        # 生成随机的姓名和年龄
        name = ''.join(random.choices(string.ascii_letters, k=5))
        age = random.randint(18, 60)
        data.append((name, age))

    # 定义 DataFrame 的列名
    columns = ["name", "age"]

    # 创建 DataFrame
    df = spark.createDataFrame(data, columns)

    # 创建临时视图
    # df.createOrReplaceTempView("test_table")
    try:
        # 创建持久化表(可选)
        df.write.saveAsTable("testaa_table", mode="overwrite")
    
        # 验证数据插入
        spark.sql("SELECT * FROM testaa_table LIMIT 5").show()
        
        # 加入循环,保持 SparkSession 一直运行,方便看 Spark UI
        while True:
            try:
                import time
                time.sleep(1)
            except KeyboardInterrupt:
                break
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        spark.stop()

    # 停止 SparkSession
    # spark.stop()

相关文章:

  • 0基础 | 看懂原理图Datasheet 系列1
  • 前端状态管理 pinia和vuex高频面试题
  • 通过动态获取后端数据判断输入的值打小
  • 【C语言】 一维数组传参的本质
  • 车间图纸安全传输需要注意什么?
  • iTextSharp-PDF批量导出
  • 深入浅出Bearer Token:解析工作原理及其在Vue、Uni-app与Java中的实现Demo
  • OpenCV中文路径图片读写终极指南(Python实现)
  • [C++面试] 对通透比较器了解多少?(较少涉及,可跳过)
  • Kylin麒麟操作系统服务部署 | Ansible基础
  • 使用 Python 爬取 item_get_video 接口获取某书笔记详情
  • IDEA 创建SpringCloud 工程(图文)
  • Python----计算机视觉处理(Opencv:二值化,阈值法,反阈值法,截断阈值法,OTSU阈值法)
  • 小学数学原理1:14 - 为什么是减法?
  • 云创智城 ×YunParking停车源码+YunCharge充电源码+DeepSeek:AI 驱动城市级停车平台升级,构建安全智慧出行新生态
  • 搜索 之 组合问题
  • Kubernetes Pod 生命周期详解 之 探针
  • 函数(函数的概念、库函数、自定义函数、形参和实参、return语句、数组做函数参数、嵌套调用和链式访问、函数的声明和定义、static和extern)
  • 【面试题系列】Redis 常见面试题答案
  • 蓝桥杯备考:数据结构堆之第k小
  • 国际博物馆日|航海博物馆:穿梭于海洋神话与造船工艺间
  • 上海公办小学验证今起开始,下周一和周二分区进行民办摇号
  • 上海一保租房社区亮相,首批546套房源可拎包入住
  • 新版城市规划体检评估解读:把城市安全韧性摆在更加突出位置
  • 国防部:中方愿与俄方不断增强两军关系良好发展势头
  • 今年有望投产里程已近3000公里,高铁冲刺谁在“狂飙”?