数据清洗:基于python抽取jsonl文件数据字段
基于python抽取目录下所有“jsonl”格式文件。遍历文件内某个字段进行抽取并合并。
import os
import json
import time
from tqdm import tqdm # 需要先安装:pip install tqdm
def process_files():
# 设置目录路径
dir_path = r"D:\daku\关键词识别\1623-0000001\zh"
# 获取并排序文件列表
file_list = sorted([f for f in os.listdir(dir_path) if f.lower().endswith('.jsonl')],
key=lambda x: os.path.getsize(os.path.join(dir_path, x)),
reverse=True) # 按文件大小降序排列
# 进度统计
total_files = len(file_list)
processed_files = 0
total_lines = sum(1 for f in file_list for _ in open(os.path.join(dir_path, f), 'r', encoding='utf-8'))
processed_lines = 0
start_time = time.time()
# 输出文件设置
output_file = os.path.join(dir_path, "combined_contents.txt")
with open(output_file, "w", encoding="utf-8") as outfile:
with tqdm(total=total_lines, desc="合并进度", unit="line") as pbar:
for filename in file_list:
file_path = os.path.join(dir_path, filename)
try:
with open(file_path, "r", encoding="utf-8") as infile:
file_size = os.path.getsize(file_path)
chunk_size = max(1024 * 1024, file_size // 100) # 动态调整读取块大小
while True:
lines = infile.readlines(chunk_size)
if not lines:
break
for line_num, line in enumerate(lines, 1):
line = line.strip()
if not line:
continue
try:
data = json.loads(line)
content = data.get("content", "").replace("\n", " ") # 清除内容中的换行符
outfile.write(content + "\n\n") # 用双换行分隔记录
processed_lines += 1
except json.JSONDecodeError:
print(f"\nJSON解析失败: {filename} 第{processed_lines + 1}行")
except Exception as e:
print(f"\n处理异常: {filename} 第{processed_lines + 1}行 - {str(e)}")
# 进度更新
pbar.update(1)
if processed_lines % 1000 == 0:
elapsed = time.time() - start_time
speed = processed_lines / (elapsed + 1e-5)
remaining = (total_lines - processed_lines) / (speed + 1e-5)
pbar.set_postfix({
'速度': f"{speed:.1f} lines/s",
'剩余时间': f"{remaining // 3600:.0f}h {remaining % 3600 // 60:.0f}m"
})
processed_files += 1
except Exception as e:
print(f"\n无法读取文件 {filename}: {str(e)}")
# 生成统计报告
end_time = time.time()
print(f"\n合并完成!共处理 {processed_files}/{total_files} 个文件")
print(f"总记录数: {processed_lines:,} 条")
print(f"耗时: {end_time - start_time:.2f} 秒")
print(f"输出文件路径: {output_file}")
if __name__ == "__main__":
process_files()