宿主机运行pyspark任务读取docker hadoop容器上的数据
熬了四个大夜才搞明白,最晚一天熬到早上十点/(ㄒoㄒ)/~~,最后发现只要加个参数就解决了。。。抱头痛哭
问题描述:
Hadoop集群部署在docker容器中,宿主机执行pyspark程序读取hive表
问题一:当master('local[*]')时,docker容器返回给driver端datanode节点的内网ip地址,修改hosts只能将域名转发到ip地址,不能将ip地址转发给ip地址。
问题二:当master('spark://localhost:7077'),因为容器做了端口映射,这里使用的时localhost。driver端为宿主机,spark会把driver端的hostname传到spark集群worker节点上,spark work容器无法识别宿主机hostname
解决方法:
在宿主机配置好hosts,格式为:127.0.0.1 容器hostname(eg:datanode)
问题一:SparkSession加参数config("dfs.client.use.datanode.hostname", "true")//客户端(如 Spark Driver)通过主机名访问 DataNode。
问题二:SparkSession加参数config("spark.driver.host", "192.168.1.5") //宿主机ip地址
就是这么简单。。。哭死(;´༎ຶД༎ຶ`)
当时试了好多种办法,nginx反向代理、DNSmasq自定义DNS、NPS内网穿透、Macvlan网络模式,SNAT、最后甚至还装了k8s集群外加k8s监控界面,真的哭死。看看现在时间吧,已经4:03了。。。😭😭😭
最后附上完整代码:
import string
import random
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType, StringType, StructType, ArrayType
if __name__ == '__main__':
# =============================== spark local模式 ===============================
# 想要读写数据必须要配置windows下的Hadoop环境(不一定,未验证)
# 想要使用Hive元数据必须添加enableHiveSupport()这行代码
# 无论spark集群配置文件中有没有配置spark元数据信息,都要在代码工程中配置元数据信息,因为本地读取不到集群中的环境变量,创建hive-site.xml文件或代码中定义config都行
# 如果不指定spark.sql.warehouse.dir信息则默认为: file:/C:/Users/yelor/spark-warehouse
# 如果不知道hive.metastore.uris的值则找不到hive元数据,但不会报错,只是无法使用hive元数据
# spark.sql.warehouse.dir和hive.metastore.uris的值可以在代码工程中配置hive-site.xml文件来指定
# 容器外需要注意ip地址互通问题,需要配置hosts
# 如果Hadoop集群部署在docker容器中,dfs.client.use.datanode.hostname=true在本地local模式下必须要加,不然spark会使用datanode的内网ip来通信
import os
# 这里可以选择本地win系统的PySpark环境执行pySpark代码,也可以使用虚拟机中PySpark环境,通过os可以配置。
# os.environ['SPARK_HOME'] = r'D:\software2\spark-3.1.2-bin-hadoop3.2' # 暂时不写也行
PYSPARK_PYTHON = r"D:\AnacondaCache\envs\mspro\python" #python.exe或者简写为python都行
# 当存在多个python版本环境时(如使用conda),不指定python环境会报错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
# 配置指定提交作业至HDFS的用户 不然上传文件到 HDFS 的时候会报权限错误 参考文章:https://zhuanlan.zhihu.com/p/538265736
# os.environ["HADOOP_USER_NAME"] = "yelor" # 暂时不写也行
# 在local模式下,如果想使用hive元数据,以下参数是必须要配置的:spark.sql.warehouse.dir、hive.metastore.uris
spark = SparkSession.builder.\
appName('udf_define').\
master('local[*]').\
config('spark.sql.shuffle.partitions', 2).\
config('spark.sql.warehouse.dir', 'hdfs://localhost:9000/user/hive/warehouse').\
config('hive.metastore.uris', 'thrift://localhost:9083').\
config("spark.executor.memory", "1g").\
config("spark.driver.memory", "1g").\
config("dfs.client.use.datanode.hostname", "true").\
config("spark.driver.host", "192.168.1.5").\
enableHiveSupport(). \
getOrCreate()
# =============================== spark master集群模式 ===============================
# 如果要连接spark集群,需要保证pyspark包的版本与集群的spark版本一致
# 查看spark版本:spark-submit --version
# 就算是使用集群执行作业,也必须要配置hive-site.xml文件中的信息,因为还是读取driver端的环境变量
# 必须添加enableHiveSupport()这行代码
# spark.driver.host保证代码传到spark容器中时以指定的ip地址为driver地址,不然会使用本机的hostname
# import os
# os.environ['PYSPARK_PYTHON']=r"D:\\AnacondaCache\\envs\\mspro\\python.exe"
# spark = SparkSession.builder.\
# appName('udf_define').\
# master('spark://localhost:7077').\
# config('spark.sql.warehouse.dir', 'hdfs://localhost:9000/user/hive/warehouse').\
# config('hive.metastore.uris', 'thrift://localhost:9083').\
# config("spark.executor.memory", "512m").\
# config("spark.driver.memory", "512m").\
# config("spark.driver.host", "192.168.1.5").\
# enableHiveSupport(). \
# getOrCreate() # 如果spark配置文件中没有配置spark元数据信息,就不能使用enableHiveSupport().\ 直接在代码中配置元数据信息也能脸上hive元数据
sc = spark.sparkContext
# 设置日志级别为 DEBUG
# sc.setLogLevel("DEBUG")
# 查看表的存储位置
warehouse_dir = spark.conf.get("spark.sql.warehouse.dir")
print(f"Spark SQL warehouse directory: {warehouse_dir}")
# 指定要使用的数据库
database_name = "tb"
spark.sql(f"USE {database_name}")
# 执行 SQL 查询
# query = "show databases"
# query = "SELECT * FROM orders"
# query = "SELECT * FROM students"
# df = spark.sql(query)
# # 显示查询结果
# df.show()
# 生成 10000 条模拟数据
data = []
for _ in range(1000):
# 生成随机的姓名和年龄
name = ''.join(random.choices(string.ascii_letters, k=5))
age = random.randint(18, 60)
data.append((name, age))
# 定义 DataFrame 的列名
columns = ["name", "age"]
# 创建 DataFrame
df = spark.createDataFrame(data, columns)
# 创建临时视图
# df.createOrReplaceTempView("test_table")
try:
# 创建持久化表(可选)
df.write.saveAsTable("testaa_table", mode="overwrite")
# 验证数据插入
spark.sql("SELECT * FROM testaa_table LIMIT 5").show()
# 加入循环,保持 SparkSession 一直运行,方便看 Spark UI
while True:
try:
import time
time.sleep(1)
except KeyboardInterrupt:
break
except Exception as e:
print(f"An error occurred: {e}")
finally:
spark.stop()
# 停止 SparkSession
# spark.stop()