当前位置: 首页 > news >正文

PySpark查询Dataframe中包含乱码的数据记录的方法

首先,用PySpark获取Dataframe中所有非ASCII字符,找到其中的非乱码字符。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, concat_ws, explode, split, coalesce, lit
from pyspark.sql.types import StringType

spark = SparkSession.builder.appName("InvalidCharacterFinder").getOrCreate()

# 假设已存在DataFrame df
# df = ...

# 获取所有字符串类型列名
string_columns = [f.name for f in df.schema.fields if isinstance(f.dataType, StringType)]
result = []

if string_columns:
    # 处理空值并合并字符串列
    non_null_cols = [coalesce(col(c), lit("")).alias(c) for c in string_columns]
    combined_df = df.select(non_null_cols).select(concat_ws("", *string_columns).alias("merged_str"))
    
    # 拆分字符并过滤空字符串
    chars_df = combined_df.withColumn("char", explode(split(col("merged_str"), "")))\
                          .filter(col("char") != "")
    
    # 定义合法字符集合
    allowed_chars = set('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!'
                        '"#$%&\'()*+,-./:;<=>?@[]^_`{|}~')
    
    # 收集非法字符并去重
    unique_invalid = chars_df.rdd.map(lambda x: x.char)\
                             .filter(lambda c: c not in allowed_chars)\
                             .distinct()\
                             .collect()
    
    # 按首次出现顺序保留字符(分布式环境无法保证绝对顺序)
    seen = set()
    ordered_result = []
    for char in unique_invalid:
        if char not in seen:
            ordered_result.append(char)
            seen.add(char)
    result = ordered_result

print("非法字符集合:", ''.join(result))

代码说明:

  1. 数据准备:通过DataFrame Schema识别所有字符串类型的列
  2. 空值处理:使用coalesce函数将NULL转换为空字符串,确保后续字符串合并有效
  3. 列合并:使用concat_ws将多个字符串列的值合并为单个字符串
  4. 字符拆分:通过split+explode将字符串拆分为单个字符,并过滤空字符
  5. 非法字符过滤:使用RDD操作过滤不在白名单中的字符,并通过distinct去重
  6. 结果处理:使用有序集合保持字符首次出现的顺序(注意:分布式环境下无法保证绝对顺序)

注意事项:

  • 最终结果字符顺序可能与实际数据中的出现顺序不完全一致
  • 白名单包含94个可打印ASCII字符(排除空格和控制字符)
  • 使用RDD操作提升分布式处理性能
  • 最终结果字符串可能包含各类特殊符号、中文、表情符号等非标准ASCII字符

然后,将非乱码字符加入排除的字符中,用PySpark检查Dataframe中包含乱码字符的记录并导出Excel文件。

from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType
import pandas as pd

# 初始化Spark会话
spark = SparkSession.builder.appName("StringFilter").getOrCreate()

# 假设原始DataFrame为df(这里需要替换为实际的数据源读取逻辑)
# df = spark.read.csv("input.csv", header=True)

# 获取所有字符串类型的列名
string_columns = [field.name for field in df.schema.fields if isinstance(field.dataType, StringType)]

# 定义允许的字符集合
allowed_chars = set('abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!"#$%&\'()*+,-./:;<=>?@[]^_`{|}~<非乱码字符>')

# 定义检查非法字符的UDF
def has_invalid_chars(s):
    if s is None:
        return False
    return any(c not in allowed_chars for c in s)

has_invalid_udf = udf(has_invalid_chars, BooleanType())

# 构建过滤条件
if not string_columns:
    # 如果没有字符串列,直接创建空DataFrame
    result_df = spark.createDataFrame([], df.schema)
else:
    # 组合所有字符串列的检查条件
    condition = None
    for col_name in string_columns:
        col_condition = has_invalid_udf(col(col_name))
        if condition is None:
            condition = col_condition
        else:
            condition = condition | col_condition
    
    # 过滤出包含非法字符的行
    filtered_df = df.filter(condition)
    
    # 创建结构相同的空DataFrame并合并结果
    empty_df = spark.createDataFrame([], df.schema)
    result_df = empty_df.union(filtered_df)

# 导出为Excel文件(注意:此操作会将数据收集到Driver节点)
pd_df = result_df.toPandas()
pd_df.to_excel("output.xlsx", index=False)

# 停止Spark会话(根据实际需要决定是否保留会话)
spark.stop()

代码说明:

  1. 初始化与数据读取:需要根据实际数据源替换读取逻辑(示例中被注释掉的spark.read.csv部分)
  2. 获取字符串列:通过分析Schema获取所有字符串类型的字段
  3. 定义字符白名单:使用集合类型提升查询效率
  4. UDF定义:用于检查字符串是否包含非法字符
  5. 条件构建:使用逻辑或组合所有字符串列的检查条件
  6. 结果处理
    • 直接处理空字符串列的边界情况
    • 使用union保持与原DataFrame结构一致
  7. Excel导出
    • 通过转换为Pandas DataFrame实现
    • 注意大数据量时可能存在的内存问题

注意事项:

  1. 大数据量场景下建议分批次处理或使用分布式写入方式
  2. Excel导出操作会触发数据收集到Driver节点,需确保资源充足
  3. 实际应用中建议添加异常处理机制
  4. 空值处理逻辑可根据业务需求调整(当前版本忽略NULL值)

相关文章:

  • React Native之React整理(一)
  • K8s组件
  • 「软件设计模式」建造者模式(Builder)
  • Java--IO流详解(下)--相互转换(含Properties详解)
  • 强化 CSS 样式优先级的多种方法
  • Linux基础20-C语言篇之流程控制Ⅰ【入门级】
  • 利用Python和SQLite进行数据处理与优化——从数据库操作到高级数据压缩
  • CMake技术细节:递归搜索目录添加源文件
  • 【C语言】C语言 停车场管理系统的设计与实现(源码)【独一无二】
  • 微信小程序日程预约
  • 第一章:认识Tailwind CSS - 第二节 - Utility First CSS 的优势与挑战
  • 深入剖析 Burp Suite:Web 应用安全测试利器
  • 哈希:LeetCode49. 字母异位词分组 128.最长连续序列
  • wps配置deepseek
  • IP属地:是否等同于当前登录位置?
  • 深度解析2025最新微服务版本特性
  • 二十九、vite项目集成webpack+vue2项目
  • C++ Primer 简单语句
  • clickhouse集群搭建
  • kotlin-kapt
  • 毗邻三市人均GDP全部超过20万元,苏锡常是怎样做到的?
  • 趣看 | 五一黄金周:你拍风景,拍风景的人在拍你
  • 美联储宣布维持基准利率不变
  • AI聊天机器人涉多起骚扰行为,专家呼吁加强伦理设计与监管
  • 过半中国上市公司去年都在“扩编”,哪些公司人效最高
  • 中国电信财务部总经理周响华调任华润集团总会计师