当前位置: 首页 > news >正文

使用Python高效读取ZIP压缩文件中的UTF-8 JSON数据到Pandas和PySpark DataFrame

本文将详细介绍如何使用Python快速高效地读取ZIP压缩文件中的UTF-8编码JSON文件,并将其转换为Pandas DataFrame和PySpark DataFrame。我们将探讨多种方法,包括标准库方法、优化技巧以及处理大文件的策略。

目录

  1. 准备工作与环境设置
  2. 使用标准库方法读取ZIP中的JSON
  3. 高效读取方法
  4. 转换为Pandas DataFrame
  5. 转换为PySpark DataFrame
  6. 处理大型ZIP文件的策略
  7. 性能优化建议
  8. 完整示例代码

准备工作与环境设置

在开始之前,确保已安装必要的Python库:

pip install pandas pyspark pyarrow

使用标准库方法读取ZIP中的JSON

Python的标准库zipfile提供了处理ZIP文件的基本功能。以下是基础读取方法:

import zipfile
import json
import pandas as pd
from pyspark.sql import SparkSession# 初始化Spark会话
spark = SparkSession.builder \.appName("ZIP_JSON_Reader") \.getOrCreate()

高效读取方法

方法1:使用zipfile和io模块

import zipfile
import json
import iodef read_json_from_zip_basic(zip_path, json_filename):"""基础方法:读取ZIP中的单个JSON文件"""with zipfile.ZipFile(zip_path, 'r') as zip_ref:with zip_ref.open(json_filename, 'r') as json_file:# 读取并解析JSON内容json_data = json.loads(json_file.read().decode('utf-8'))return json_data

方法2:批量处理ZIP中的多个JSON文件

def read_multiple_json_from_zip(zip_path, file_extension='.json'):"""读取ZIP中所有JSON文件"""all_data = []with zipfile.ZipFile(zip_path, 'r') as zip_ref:# 获取所有JSON文件json_files = [f for f in zip_ref.namelist() if f.endswith(file_extension)]for json_file in json_files:with zip_ref.open(json_file, 'r') as file:try:json_data = json.loads(file.read().decode('utf-8'))all_data.append(json_data)except json.JSONDecodeError as e:print(f"Error reading {json_file}: {e}")return all_data

转换为Pandas DataFrame

方法1:直接转换

def zip_json_to_pandas_simple(zip_path, json_filename):"""将ZIP中的JSON文件转换为Pandas DataFrame(简单版)"""json_data = read_json_from_zip_basic(zip_path, json_filename)# 如果JSON是数组格式,直接转换为DataFrameif isinstance(json_data, list):return pd.DataFrame(json_data)# 如果JSON是对象格式,可能需要特殊处理else:return pd.DataFrame([json_data])

方法2:使用pandas直接读取(推荐)

def zip_json_to_pandas_efficient(zip_path, json_filename):"""高效方法:使用pandas直接读取ZIP中的JSON文件"""with zipfile.ZipFile(zip_path, 'r') as zip_ref:with zip_ref.open(json_filename, 'r') as json_file:# 使用pandas直接读取JSON流df = pd.read_json(json_file, encoding='utf-8')return df

方法3:处理大型JSON文件

import ijsondef read_large_json_from_zip(zip_path, json_filename):"""使用流式处理读取大型JSON文件"""items = []with zipfile.ZipFile(zip_path, 'r') as zip_ref:with zip_ref.open(json_filename, 'r') as json_file:# 使用ijson进行流式解析parser = ijson.parse(json_file)for prefix, event, value in parser:# 根据JSON结构进行相应处理if event == 'start_array' or event == 'end_array':continue# 这里需要根据实际JSON结构调整解析逻辑return pd.DataFrame(items)

转换为PySpark DataFrame

方法1:通过Pandas中转

def zip_json_to_pyspark_via_pandas(zip_path, json_filename):"""通过Pandas将ZIP中的JSON转换为PySpark DataFrame"""# 先读取为Pandas DataFramepandas_df = zip_json_to_pandas_efficient(zip_path, json_filename)# 转换为PySpark DataFramespark_df = spark.createDataFrame(pandas_df)return spark_df

方法2:直接使用PySpark读取(需解压)

import tempfile
import osdef zip_json_to_pyspark_direct(zip_path, json_filename):"""将ZIP文件解压后使用PySpark直接读取"""with tempfile.TemporaryDirectory() as temp_dir:# 解压ZIP文件with zipfile.ZipFile(zip_path, 'r') as zip_ref:zip_ref.extract(json_filename, temp_dir)# 使用PySpark读取解压后的JSON文件json_path = os.path.join(temp_dir, json_filename)spark_df = spark.read \.option("encoding", "UTF-8") \.json(json_path)return spark_df

方法3:处理ZIP中的多个JSON文件

def multiple_zip_json_to_pyspark(zip_path):"""读取ZIP中所有JSON文件到PySpark DataFrame"""all_dfs = []with tempfile.TemporaryDirectory() as temp_dir:with zipfile.ZipFile(zip_path, 'r') as zip_ref:# 解压所有JSON文件json_files = [f for f in zip_ref.namelist() if f.endswith('.json')]zip_ref.extractall(temp_dir, json_files)# 读取所有JSON文件for json_file in json_files:json_path = os.path.join(temp_dir, json_file)df = spark.read.option("encoding", "UTF-8").json(json_path)all_dfs.append(df)# 合并所有DataFrameif all_dfs:result_df = all_dfs[0]for df in all_dfs[1:]:result_df = result_df.union(df)return result_dfelse:return spark.createDataFrame([], schema=None)

处理大型ZIP文件的策略

方法1:分块读取

def read_large_zip_json_chunked(zip_path, json_filename, chunk_size=1000):"""分块读取大型ZIP中的JSON文件"""chunks = []with zipfile.ZipFile(zip_path, 'r') as zip_ref:with zip_ref.open(json_filename, 'r') as json_file:# 使用pandas的分块读取功能for chunk in pd.read_json(json_file, encoding='utf-8', lines=True, chunksize=chunk_size):chunks.append(chunk)# 合并所有块if chunks:return pd.concat(chunks, ignore_index=True)else:return pd.DataFrame()

方法2:使用内存映射

def read_zip_json_with_mmap(zip_path, json_filename):"""使用内存映射处理大型ZIP文件"""import mmapwith zipfile.ZipFile(zip_path, 'r') as zip_ref:# 获取文件信息file_info = zip_ref.getinfo(json_filename)with zip_ref.open(json_filename, 'r') as json_file:# 创建内存映射with mmap.mmap(json_file.fileno(), 0, access=mmap.ACCESS_READ) as mmapped_file:df = pd.read_json(mmapped_file, encoding='utf-8')return df

性能优化建议

1. 使用适当的数据类型

def optimize_pandas_dataframe(df):"""优化Pandas DataFrame的内存使用"""# 转换数据类型以减少内存使用for col in df.columns:if df[col].dtype == 'object':# 尝试转换为分类类型if df[col].nunique() / len(df) < 0.5:df[col] = df[col].astype('category')# 转换数值类型elif df[col].dtype in ['int64', 'float64']:df[col] = pd.to_numeric(df[col], downcast='integer')return df

2. 并行处理

from concurrent.futures import ThreadPoolExecutordef parallel_zip_processing(zip_paths, processing_function, max_workers=4):"""并行处理多个ZIP文件"""with ThreadPoolExecutor(max_workers=max_workers) as executor:results = list(executor.map(processing_function, zip_paths))return results

完整示例代码

import zipfile
import json
import pandas as pd
from pyspark.sql import SparkSession
import tempfile
import osclass ZIPJSONReader:"""ZIP文件中的JSON读取器"""def __init__(self):self.spark = SparkSession.builder \.appName("ZIP_JSON_Reader") \.getOrCreate()def read_to_pandas(self, zip_path, json_filename=None, optimize=True):"""读取ZIP中的JSON文件到Pandas DataFrame"""# 如果未指定文件名,自动查找第一个JSON文件if json_filename is None:with zipfile.ZipFile(zip_path, 'r') as zip_ref:json_files = [f for f in zip_ref.namelist() if f.endswith('.json')]if not json_files:raise ValueError("No JSON files found in ZIP")json_filename = json_files[0]# 读取JSON文件with zipfile.ZipFile(zip_path, 'r') as zip_ref:with zip_ref.open(json_filename, 'r') as json_file:df = pd.read_json(json_file, encoding='utf-8')# 优化内存使用if optimize:df = self._optimize_dataframe(df)return dfdef read_to_pyspark(self, zip_path, json_filename=None):"""读取ZIP中的JSON文件到PySpark DataFrame"""# 使用临时目录解压文件with tempfile.TemporaryDirectory() as temp_dir:with zipfile.ZipFile(zip_path, 'r') as zip_ref:if json_filename:# 解压指定文件zip_ref.extract(json_filename, temp_dir)json_path = os.path.join(temp_dir, json_filename)else:# 解压所有JSON文件json_files = [f for f in zip_ref.namelist() if f.endswith('.json')]if not json_files:raise ValueError("No JSON files found in ZIP")zip_ref.extractall(temp_dir, json_files)json_path = temp_dir# 使用PySpark读取df = self.spark.read \.option("encoding", "UTF-8") \.json(json_path)return dfdef _optimize_dataframe(self, df):"""优化DataFrame内存使用"""for col in df.columns:col_type = df[col].dtypeif col_type == 'object':# 转换为分类类型num_unique_values = len(df[col].unique())num_total_values = len(df[col])if num_unique_values / num_total_values < 0.5:df[col] = df[col].astype('category')elif col_type in ['int64']:# 下转换整数类型df[col] = pd.to_numeric(df[col], downcast='integer')elif col_type in ['float64']:# 下转换浮点类型df[col] = pd.to_numeric(df[col], downcast='float')return dfdef close(self):"""关闭Spark会话"""self.spark.stop()# 使用示例
if __name__ == "__main__":reader = ZIPJSONReader()try:# 读取到Pandaspandas_df = reader.read_to_pandas('data.zip', 'example.json')print("Pandas DataFrame:")print(pandas_df.head())print(f"Pandas DataFrame shape: {pandas_df.shape}")# 读取到PySparkspark_df = reader.read_to_pyspark('data.zip', 'example.json')print("\nPySpark DataFrame:")spark_df.show(5)print(f"PySpark DataFrame count: {spark_df.count()}")finally:reader.close()

总结

本文介绍了多种高效读取ZIP压缩文件中UTF-8编码JSON数据的方法:

  1. 对于Pandas DataFrame

    • 使用zipfilepandas.read_json直接读取
    • 处理大型文件时使用分块读取
    • 优化数据类型以减少内存使用
  2. 对于PySpark DataFrame

    • 通过Pandas中转(适合中小型数据)
    • 解压后直接读取(适合大型数据)
    • 支持处理ZIP中的多个JSON文件
  3. 性能优化

    • 使用适当的数据类型
    • 并行处理多个文件
    • 流式处理大型文件

根据数据大小和处理需求选择合适的方法,可以在保证性能的同时高效地处理ZIP压缩文件中的JSON数据。

http://www.dtcms.com/a/474117.html

相关文章:

  • 基于Spring Boot + Vue 3的乡村振兴综合服务平台性能优化与扩展实践
  • 基于单片机的声光控制楼道灯(论文+源码)
  • 网站运营分析云平台网站建设方案
  • 【Linux】进程间同步与互斥(下)
  • 现成的手机网站做APP手机网站开发教程pdf
  • 【栈】5. 验证栈序列(medium)
  • Leetcode之 Hot 100
  • 建立能网上交易的网站多少钱wordpress调取多个分类文章
  • MySQL 索引:原理、分类与操作指南
  • Blender机箱盒体门窗铰链生成器资产预设 Hingegenious
  • 网站托管就业做美食有哪些网站
  • 神经符号AI的深度探索:从原理到实践的全景指南
  • 零食网站建设规划书建行输了三次密码卡锁怎么解
  • Python代码示例
  • 济南市历下区建设局官方网站wordpress高级套餐
  • ALLEGRO X APD版图单独显示某一网络的方法
  • 计算机网络基础篇——如何学习计算机网络?
  • 电子商务网站建设的总体设计wordpress dux主题5.0版本
  • 《jEasyUI 创建简单的菜单》
  • AI【前端浅学】
  • 怎么设置网站名称失眠先生 wordpress
  • 低空物流+韧性供应链:技术架构与企业级落地的开发实践指南
  • Quartus II软件安装步骤(附安装包)Quartus II 18超详细下载安装教程
  • 动规——棋圣匹配
  • 侵入别人的网站怎么做我的家乡网页制作步骤
  • Thonny(Python IDE)下载和安装教程(附安装包)
  • Fastdfs_MinIO_腾讯COS_具体逻辑解析
  • SDCC下载和安装图文教程(附安装包,C语言编译器)
  • 用python做的电商网站常德网站建设费用
  • LSTM新架构论文分享5