当前位置: 首页 > news >正文

pyspark大规模数据加解密优化实践

假如有1亿行数据

方法1 spark udf解密

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyDes import *
import binasciispark=SparkSession.builder.getOrCreate()def dec_fun(text):key = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, b"XXXXXXXX", padmode=PAD_PKCS5)if not text:return Nonereturn k.decrypt(base64.b64decode(text)).decode('utf-8')
spark.udf.register("dec_fun",dec_fun)df.withColumn("dec_col",F.expr("dec_fun(en_col)")).write.mode("overwrite").save("OOOO")
  • 密钥初始化1亿次
  • 每条数据触发一次JVM → Python进程的数据传输
  • 每次传输需序列化/反序列化数据(性能杀手)
  • 高频进程间通信(IPC)产生巨大开销

方法2 repartition+mapPartition

为了提高效率,我们可以利用mapPartitions在每个分区内部只初始化一次解密对象,避免重复初始化。

def dec_fun(text):if not text:return Noneelse:result = base64.b64decode(text)k = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, "XXXXXXXX", pad=None, padmode=PAD_PKCS5)d = k.decrypt(result)return d.decode("utf-8")spark.udf.register("dec_fun", dec_fun)# 分区解密
def rdd_decrypt(partitionData):for row in partitionData:try:yield [dec_fun(row.zj_no)]except:passdf.select("en_col").repartition(30).rdd.mapPartitions(rdd_decrypt).toDF(["dec_col"]).write.mode("overwrite").save("OOOO")

密钥初始化依然是1亿次,这个代码写的不好,应该每个分区初始化1次而不是每行。但相对方法1依然有答复性能提升,

  • mapPartitions
    • 分区级批量传输:每个分区一次性从JVM发送到Python进程(例如1个分区10万条数据,仅1次传输)
    • 几个分区就调用几次函数(对比1亿次的UDF调用)

方法3 repartition+mapPartition+分区1次初始化

def dec_fun(partitionData):k = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, "XXXXXXXX", pad=None, padmode=PAD_PKCS5)for row in partitionData:try:if row.zj_no:result = base64.b64decode(row.zj_no)d = k.decrypt(result)yield [d.decode("utf-8")]else:continueexcept:pass# 如果要保留原始一大堆列,更麻烦
df.select("en_col").repartition(20).rdd.mapPartitions(dec_fun).toDF(["dec_col"]).write.mode("overwrite").save("OOOO")
  • 密钥初始化数=分区数
  • 通过repartition(20)合理调整分区
  • 利用RDD底层优化

方法4 scalar pandas_udf

import pyspark.sql.functions as F
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") @F.pandas_udf("string")
def dec_fun(series: pd.Series) -> pd.Series:k = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, "XXXXXXXX", pad=None, padmode=PAD_PKCS5)def _decrypt(text):try:if text:result = base64.b64decode(text)d = k.decrypt(result)return d.decode("utf-8")except:passreturn series.apply(_decrypt)df.select("en_col").repartition(20).withColumn("dec_col",F.expr("dec_fun(en_col)")).write.mode("overwrite").save("OOOO")

🌟 优势

  • 利用Apache Arrow高效内存传输
  • Pandas向量化操作潜力
  • 与DataFrame API无缝集成

⚠️ 局限

  • 密钥初始化数=batch数
  • 大分区内存压力大

方法5 迭代器型pandas_udf

我们可以使用迭代器类型的Pandas UDF,在每次处理一个迭代器(一个迭代器对应一个batch)时只初始化一次密钥对象:

import pyspark.sql.functions as F
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") @F.pandas_udf("string")
def dec_fun(series_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:k = triple_des(b"HHHHHHHHHHHHHHHHHHHHHHHH", CBC, "XXXXXXXX", pad=None, padmode=PAD_PKCS5)def _decrypt(text):try:if text:result = base64.b64decode(text)d = k.decrypt(result)return d.decode("utf-8")except:passfor series in series_iter:de_series = series.apply(_decrypt)yield de_seriesdf.select("en_col").repartition(20).withColumn("dec_col",F.expr("dec_fun(en_col)")).write.mode("overwrite").save("OOOO")
  • 密钥初始化数=分区数
  • 内存友好的批处理流
  • 向量化
  • Arrow优化零拷贝传输
  • 完美平衡初始化开销和并行效率

综合

方法5>(方法4,方法3)>方法2>方法1

方法4,方法3这两者,我也倾向方法4

http://www.dtcms.com/a/267391.html

相关文章:

  • NVMe高速传输之摆脱XDMA设计13:PCIe初始化状态机设计
  • 2025 Centos 安装PostgreSQL
  • Java类变量(静态变量)
  • LangChain:向量存储和检索器(入门篇三)
  • 【Qt】qml组件对象怎么传递给c++
  • appnium-巨量测试
  • LVGL移植(外部SRAM)
  • ESP32-S3开发板播放wav音频
  • 应急响应靶机-linux1-知攻善防实验室
  • 介绍electron
  • 若依学习笔记1-validated
  • Qt工具栏设计
  • Tensorboard无法显示图片(已解决)
  • 编程中的英语
  • CHAIN(GAN的一种)训练自己的数据集
  • Ubuntu基础(监控重启和查找程序)
  • 【Elasticsearch】深度分页及其替代方案
  • 基于 Python Django 和 Spark 的电力能耗数据分析系统设计与实现7000字论文实现
  • .NET9 实现排序算法(MergeSortTest 和 QuickSortTest)性能测试
  • Redis--黑马点评--基于stream消息队列的秒杀优化业务详解
  • 升级到MySQL 8.4,MySQL启动报错:io_setup() failed with EAGAIN
  • 每日算法刷题Day42 7.5:leetcode前缀和3道题,用时2h
  • Node.js worker_threads:并发 vs 并行
  • 洛谷刷题9
  • 如何在idea里快速地切换Windows CMD、git bash、powershell
  • 谷物干燥的滚筒式烘干机的设计cad【11张】三维图+设计说明书+绛重
  • LinkedList剖析
  • OneCode 图表组件核心优势解析
  • Kafka消息积压全面解决方案:从应急处理到系统优化
  • <script setup>中的setup作用以及和不带的区别对比