pyspark大规模数据加解密优化实践
假如有1亿行数据
方法1 spark udf解密
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyDes import *
import binasciispark=SparkSession.builder.getOrCreate()def dec_fun(text):key = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, b"XXXXXXXX", padmode=PAD_PKCS5)if not text:return Nonereturn k.decrypt(base64.b64decode(text)).decode('utf-8')
spark.udf.register("dec_fun",dec_fun)df.withColumn("dec_col",F.expr("dec_fun(en_col)")).write.mode("overwrite").save("OOOO")
- 密钥初始化1亿次
- 每条数据触发一次JVM → Python进程的数据传输
- 每次传输需序列化/反序列化数据(性能杀手)
- 高频进程间通信(IPC)产生巨大开销
方法2 repartition+mapPartition
为了提高效率,我们可以利用mapPartitions
在每个分区内部只初始化一次解密对象,避免重复初始化。
def dec_fun(text):if not text:return Noneelse:result = base64.b64decode(text)k = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, "XXXXXXXX", pad=None, padmode=PAD_PKCS5)d = k.decrypt(result)return d.decode("utf-8")spark.udf.register("dec_fun", dec_fun)# 分区解密
def rdd_decrypt(partitionData):for row in partitionData:try:yield [dec_fun(row.zj_no)]except:passdf.select("en_col").repartition(30).rdd.mapPartitions(rdd_decrypt).toDF(["dec_col"]).write.mode("overwrite").save("OOOO")
密钥初始化依然是1亿次,这个代码写的不好,应该每个分区初始化1次而不是每行。但相对方法1依然有答复性能提升,
- mapPartitions:
- 分区级批量传输:每个分区一次性从JVM发送到Python进程(例如1个分区10万条数据,仅1次传输)
- 几个分区就调用几次函数(对比1亿次的UDF调用)
方法3 repartition+mapPartition+分区1次初始化
def dec_fun(partitionData):k = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, "XXXXXXXX", pad=None, padmode=PAD_PKCS5)for row in partitionData:try:if row.zj_no:result = base64.b64decode(row.zj_no)d = k.decrypt(result)yield [d.decode("utf-8")]else:continueexcept:pass# 如果要保留原始一大堆列,更麻烦
df.select("en_col").repartition(20).rdd.mapPartitions(dec_fun).toDF(["dec_col"]).write.mode("overwrite").save("OOOO")
- 密钥初始化数=分区数
- 通过
repartition(20)
合理调整分区 - 利用RDD底层优化
方法4 scalar pandas_udf
import pyspark.sql.functions as F
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") @F.pandas_udf("string")
def dec_fun(series: pd.Series) -> pd.Series:k = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, "XXXXXXXX", pad=None, padmode=PAD_PKCS5)def _decrypt(text):try:if text:result = base64.b64decode(text)d = k.decrypt(result)return d.decode("utf-8")except:passreturn series.apply(_decrypt)df.select("en_col").repartition(20).withColumn("dec_col",F.expr("dec_fun(en_col)")).write.mode("overwrite").save("OOOO")
🌟 优势:
- 利用Apache Arrow高效内存传输
- Pandas向量化操作潜力
- 与DataFrame API无缝集成
⚠️ 局限:
- 密钥初始化数=batch数
- 大分区内存压力大
方法5 迭代器型pandas_udf
我们可以使用迭代器类型的Pandas UDF,在每次处理一个迭代器(一个迭代器对应一个batch)时只初始化一次密钥对象:
import pyspark.sql.functions as F
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") @F.pandas_udf("string")
def dec_fun(series_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:k = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, "XXXXXXXX", pad=None, padmode=PAD_PKCS5)def _decrypt(text):try:if text:result = base64.b64decode(text)d = k.decrypt(result)return d.decode("utf-8")except:passfor series in series_iter:de_series = series.apply(_decrypt)yield de_seriesdf.select("en_col").repartition(20).withColumn("dec_col",F.expr("dec_fun(en_col)")).write.mode("overwrite").save("OOOO")
- 密钥初始化数=分区数
- 内存友好的批处理流
- 向量化
- Arrow优化零拷贝传输
- 完美平衡初始化开销和并行效率
综合
方法5>(方法4,方法3)>方法2>方法1
方法4,方法3这两者,我也倾向方法4