当前位置: 首页 > news >正文

利用python pandas库清洗病例处方清洗步骤

文章目录

    • @[toc]
      • Pandas 数据清洗代码整合
  • 假设您的患者数据DataFrame名为 df_patients
  • 假设您的病例处方数据DataFrame名为 df_records
  • df_patients = pd.read_excel('your_patient_data.xlsx')
  • df_records = pd.read_excel('your_record_data.xlsx')
        • 一、 患者数据清洗
  • 1. 首先按'就诊日期'降序排序,确保最新的记录在前
  • 2. 使用.drop_duplicates()方法,subset指定判断重复的列,keep='first'保留第一条(即最新日期)
  • 这会直接删除所有重复项,只保留每个唯一组合的最新记录
  • 3. 可以按索引或其它列重新排序(可选)
  • 定义一个函数来处理每一行
  • 应用函数,创建新的'身份证号'列
  • 可以选择删除原来的两列
  • df_patients_cleaned.drop(['身份证号1', '身份证号2'], axis=1, inplace=True)
  • 定义一个映射字典
  • 应用函数到身份证号列
  • 将结果拆分成两列
        • 二、 病例处方数据清洗
  • 方法1:如果两列都是字符串
  • 方法2(更稳健):如果两列是datetime/time类型,先转换为字符串
  • df_records['完整时间'] = df_records['日期列'].dt.strftime('%Y-%m-%d') + ' ' + df_records['时间列'].astype(str)
  • 方法3:最推荐,直接组合成DateTime对象
  • 首先,确保数据按患者ID和就诊时间排序
  • 使用shift方法判断当前行的患者ID是否与上一行相同
  • 如果相同,则是“复诊”,否则是“初诊”
  • 假设我们有一个药品信息的DataFrame: df_drugs,包含'药品ID'和'药品名称'
  • 我们想在df_records中根据'药品ID'匹配出'药品名称'
  • 方法1:merge (类似于SQL的JOIN)
  • 方法2:map (适用于匹配单个字段,更高效)
  • 首先创建一个映射字典
  • 然后应用映射
  • 应用检查
  • 处理无效日期:筛选出来或替换为NaN
  • df_records.loc[~df_records['日期有效'], '你的日期列'] = np.nan # 可选:将无效日期替换为NaN
  • 应用到需要清洗的列
        • 整合代码

Pandas 数据清洗代码整合

首先,导入必要的库并加载数据。

import pandas as pd
import numpy as np
from datetime import datetime

假设您的患者数据DataFrame名为 df_patients

假设您的病例处方数据DataFrame名为 df_records

df_patients = pd.read_excel(‘your_patient_data.xlsx’)

df_records = pd.read_excel(‘your_record_data.xlsx’)

一、 患者数据清洗

1. 数据去重(保留最新就诊记录)

这是Pandas的强项,比Excel操作更简洁。

1. 首先按’就诊日期’降序排序,确保最新的记录在前

df_patients.sort_values(by=‘就诊日期’, ascending=False, inplace=True)

2. 使用.drop_duplicates()方法,subset指定判断重复的列,keep='first’保留第一条(即最新日期)

这会直接删除所有重复项,只保留每个唯一组合的最新记录

df_patients_cleaned = df_patients.drop_duplicates(subset=[‘姓名’, ‘性别’, ‘年龄’, ‘手机号’], keep=‘first’)

3. 可以按索引或其它列重新排序(可选)

df_patients_cleaned.sort_index(inplace=True)

2. 合并身份证号列

假设两列名为 身份证号1身份证号2

定义一个函数来处理每一行

def merge_id_number(row):
id1 = str(row[‘身份证号1’]).strip().replace(‘,’, ‘’) # 清理列1
id2 = str(row[‘身份证号2’]).strip().replace(‘,’, ‘’) # 清理列2

# 如果id1不是'nan'且不为空,则使用id1;否则使用id2
if id1 != 'nan' and id1 != '':return id1
else:return id2

应用函数,创建新的’身份证号’列

df_patients_cleaned[‘身份证号’] = df_patients_cleaned.apply(merge_id_number, axis=1)

可以选择删除原来的两列

df_patients_cleaned.drop([‘身份证号1’, ‘身份证号2’], axis=1, inplace=True)

3. 职业字段替换

使用Pandas的 .replace() 方法,可以传入一个字典进行批量替换。

定义一个映射字典

occupation_mapping = {
‘工人’: ‘务工人员’,
‘教师’: ‘教育从业者’,
# … 在此添加所有需要替换的键值对
‘农民’: ‘务农人员’,
‘医生’: ‘医疗从业者’
}

df_patients_cleaned[‘职业’] = df_patients_cleaned[‘职业’].replace(occupation_mapping)

4. & 5. 出生日期提取与身份证号合法性校验

我们将这两个功能写成一个函数,因为它需要共用身份证号长度判断逻辑。

def process_id_number(id_str):
“”"
处理身份证号码,返回一个元组:(校验结果, 出生日期)
“”"
if pd.isna(id_str):
return ‘空’, None

id_str = str(id_str).strip()
length = len(id_str)# 1. 位数检查
if length not in (15, 18):return '位数不对', None# 2. 提取出生日期 (只处理18位)
birth_date = None
if length == 18:try:year = int(id_str[6:10])month = int(id_str[10:12])day = int(id_str[12:14])# 尝试构造日期对象,如果日期不合法(如2月30日)会抛出异常birth_date = pd.Timestamp(year, month, day)birth_date_str = birth_date.strftime('%Y-%m-%d')except ValueError:# 如果提取的日期无效birth_date_str = '日期无效'
else:# 15位身份证号,如果需要可以在此提取(7-12位:YYMMDD)birth_date_str = '旧号(15位)'# 3. 18位身份证校验码检查 (可选,如果要求高精度校验)
if length == 18:# 校验码权重weights = [7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8, 4, 2]# 校验码对应表check_code_map = ['1', '0', 'X', '9', '8', '7', '6', '5', '4', '3', '2']try:# 计算前17位的加权和total = sum(int(a) * b for a, b in zip(id_str[:17], weights))# 取模得到校验码索引calculated_check_code = check_code_map[total % 11]# 获取实际的校验码actual_check_code = id_str[17].upper()if calculated_check_code != actual_check_code:return '校验码错误', birth_date_strexcept ValueError:# 前17位中包含非数字字符return '格式错误', birth_date_strreturn '正确', birth_date_str

应用函数到身份证号列

results = df_patients_cleaned[‘身份证号’].apply(process_id_number)

将结果拆分成两列

df_patients_cleaned[‘身份证校验结果’] = results.apply(lambda x: x[0])
df_patients_cleaned[‘出生日期’] = results.apply(lambda x: x[1])

6. 手机号合法性校验

def validate_phone(phone_num):
“”"
验证手机号是否合法
“”"
if pd.isna(phone_num):
return ‘空’

phone_str = str(phone_num).strip()# 检查长度是否为11位
if len(phone_str) != 11:return '长度非法'# 检查是否全为数字
if not phone_str.isdigit():return '包含非数字'# 检查是否以1开头
if not phone_str.startswith('1'):return '非1开头'# 检查第二位是否是3-9
if phone_str[1] not in '3456789':return '号段非法'return '合法'

df_patients_cleaned[‘手机号校验’] = df_patients_cleaned[‘手机号’].apply(validate_phone)


二、 病例处方数据清洗

1. 时间拼接

假设有 日期列时间列

方法1:如果两列都是字符串

df_records[‘完整时间’] = df_records[‘日期列’] + ’ ’ + df_records[‘时间列’]

方法2(更稳健):如果两列是datetime/time类型,先转换为字符串

df_records[‘完整时间’] = df_records[‘日期列’].dt.strftime(‘%Y-%m-%d’) + ’ ’ + df_records[‘时间列’].astype(str)

方法3:最推荐,直接组合成DateTime对象

df_records[‘日期列’] = pd.to_datetime(df_records[‘日期列’])
df_records[‘时间列’] = pd.to_timedelta(df_records[‘时间列’].astype(str)) # 假设时间列格式为’14:30:00’
df_records[‘完整时间’] = df_records[‘日期列’] + df_records[‘时间列’]

2. 初诊/复诊标识

首先,确保数据按患者ID和就诊时间排序

df_records.sort_values(by=[‘患者ID’, ‘就诊时间’], ascending=[True, False], inplace=True)

使用shift方法判断当前行的患者ID是否与上一行相同

如果相同,则是“复诊”,否则是“初诊”

df_records[‘就诊类型’] = np.where(df_records[‘患者ID’] == df_records[‘患者ID’].shift(1), ‘复诊’, ‘初诊’)

3. 数据匹配(VLOOKUP)

使用Pandas的 mergemap 函数。

假设我们有一个药品信息的DataFrame: df_drugs,包含’药品ID’和’药品名称’

我们想在df_records中根据’药品ID’匹配出’药品名称’

方法1:merge (类似于SQL的JOIN)

df_records = pd.merge(df_records, df_drugs[[‘药品ID’, ‘药品名称’]], on=‘药品ID’, how=‘left’)

方法2:map (适用于匹配单个字段,更高效)

首先创建一个映射字典

drug_name_map = df_drugs.set_index(‘药品ID’)[‘药品名称’].to_dict()

然后应用映射

df_records[‘药品名称’] = df_records[‘药品ID’].map(drug_name_map)

4. 日期合法性检查

def check_date_validity(date_col):
“”"
检查日期列的有效性
“”"
try:
# 尝试转换为datetime,errors='coerce’会将无效日期转为NaT
parsed_dates = pd.to_datetime(date_col, errors=‘coerce’)
# 判断是否为NaT(Not a Time),如果是则为无效日期
is_valid = ~parsed_dates.isna()
return is_valid
except:
return pd.Series([False] * len(date_col))

应用检查

df_records[‘日期有效’] = check_date_validity(df_records[‘你的日期列’])

处理无效日期:筛选出来或替换为NaN

invalid_dates = df_records[~df_records[‘日期有效’]]
print(f"找到 {len(invalid_dates)} 条无效日期记录")

df_records.loc[~df_records[‘日期有效’], ‘你的日期列’] = np.nan # 可选:将无效日期替换为NaN

5. 去除汉字字符(保留数字/符号)

使用正则表达式是最简单的方法。

import re

def remove_chinese(text):
if pd.isna(text):
return text
# 匹配所有非ASCII字符(包括中文)并替换为空字符串
return re.sub(r’[^\x00-\x7F]', ‘’, str(text))

应用到需要清洗的列

df_records[‘清洗后列’] = df_records[‘需要清洗的列’].apply(remove_chinese)


整合代码
# -*- coding: utf-8 -*-
"""
患者与病例数据清洗自动化脚本
功能:实现Excel数据清洗方案的Pandas代码整合
"""import pandas as pd
import numpy as np
import re
from datetime import datetimedef main():"""主函数:执行完整的数据清洗流程"""# ==================== 1. 数据加载 ====================print("正在加载数据...")# 请根据实际文件路径修改这里的文件名try:df_patients = pd.read_excel('患者数据.xlsx')df_records = pd.read_excel('病例处方数据.xlsx')except FileNotFoundError as e:print(f"文件加载失败: {e}")print("请确保文件存在且路径正确")returnprint(f"患者数据形状: {df_patients.shape}")print(f"病例数据形状: {df_records.shape}")# ==================== 2. 患者数据清洗 ====================print("\n开始清洗患者数据...")# 2.1 数据去重(保留最新就诊记录)def clean_patient_duplicates(df):"""数据去重:保留每组重复数据中的最新记录"""df_sorted = df.sort_values(by='就诊日期', ascending=False)df_cleaned = df_sorted.drop_duplicates(subset=['姓名', '性别', '年龄', '手机号'], keep='first')return df_cleaned.sort_index()df_patients_cleaned = clean_patient_duplicates(df_patients)print(f"去重后患者数据形状: {df_patients_cleaned.shape}")# 2.2 合并身份证号列(假设列名为'身份证号1'和'身份证号2')def merge_id_columns(df):"""合并身份证号列并清理数据"""def process_id(row):id1 = str(row.get('身份证号1', '')).strip().replace(',', '').replace(',', '')id2 = str(row.get('身份证号2', '')).strip().replace(',', '').replace(',', '')if id1 != 'nan' and id1 != '':return id1else:return id2df['身份证号'] = df.apply(process_id, axis=1)return dfdf_patients_cleaned = merge_id_columns(df_patients_cleaned)# 2.3 职业字段替换def replace_occupation(df):"""职业字段批量替换"""occupation_mapping = {'工人': '务工人员','教师': '教育从业者','农民': '务农人员','医生': '医疗从业者','护士': '医疗从业者','职员': '办公室职员','员工': '企业员工'# 可根据需要继续添加更多映射}if '职业' in df.columns:df['职业'] = df['职业'].replace(occupation_mapping)return dfdf_patients_cleaned = replace_occupation(df_patients_cleaned)# 2.4 身份证号处理(提取出生日期+合法性校验)def process_id_number(id_str):"""处理身份证号码:校验合法性并提取出生日期"""if pd.isna(id_str) or str(id_str).strip() in ['', 'nan']:return '空', Noneid_str = str(id_str).strip()length = len(id_str)# 位数检查if length not in (15, 18):return '位数不对', None# 提取出生日期(只处理18位)birth_date_str = Noneif length == 18:try:year = int(id_str[6:10])month = int(id_str[10:12])day = int(id_str[12:14])birth_date = pd.Timestamp(year, month, day)birth_date_str = birth_date.strftime('%Y-%m-%d')except ValueError:birth_date_str = '日期无效'else:birth_date_str = '旧号(15位)'# 18位身份证校验码检查if length == 18:weights = [7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8, 4, 2]check_code_map = ['1', '0', 'X', '9', '8', '7', '6', '5', '4', '3', '2']try:total = sum(int(a) * b for a, b in zip(id_str[:17], weights))calculated_check_code = check_code_map[total % 11]actual_check_code = id_str[17].upper()if calculated_check_code != actual_check_code:return '校验码错误', birth_date_strexcept ValueError:return '格式错误', birth_date_strreturn '正确', birth_date_str# 应用身份证处理id_results = df_patients_cleaned['身份证号'].apply(process_id_number)df_patients_cleaned['身份证校验结果'] = id_results.apply(lambda x: x[0])df_patients_cleaned['出生日期'] = id_results.apply(lambda x: x[1])# 2.5 手机号合法性校验def validate_phone(phone_num):"""验证手机号合法性"""if pd.isna(phone_num):return '空'phone_str = str(phone_num).strip()if len(phone_str) != 11:return '长度非法'if not phone_str.isdigit():return '包含非数字'if not phone_str.startswith('1'):return '非1开头'if phone_str[1] not in '3456789':return '号段非法'return '合法'df_patients_cleaned['手机号校验'] = df_patients_cleaned['手机号'].apply(validate_phone)# ==================== 3. 病例处方数据清洗 ====================print("\n开始清洗病例处方数据...")# 3.1 时间拼接(假设有'日期列'和'时间列')def combine_datetime(df):"""拼接日期和时间列"""if '日期列' in df.columns and '时间列' in df.columns:# 转换为datetime类型df['日期列'] = pd.to_datetime(df['日期列'], errors='coerce')# 处理时间列(假设格式为HH:MM:SS)def parse_time(time_val):if pd.isna(time_val):return pd.NaTtime_str = str(time_val)if ':' in time_str:return pd.to_timedelta(time_str)else:return pd.NaTdf['时间列'] = df['时间列'].apply(parse_time)df['完整时间'] = df['日期列'] + df['时间列']return dfdf_records_cleaned = combine_datetime(df_records)# 3.2 初诊/复诊标识def mark_visit_type(df):"""标记初诊和复诊"""if '患者ID' in df.columns and '就诊时间' in df.columns:df_sorted = df.sort_values(by=['患者ID', '就诊时间'], ascending=[True, False])df_sorted['就诊类型'] = np.where(df_sorted['患者ID'] == df_sorted['患者ID'].shift(1), '复诊', '初诊')return df_sortedreturn dfdf_records_cleaned = mark_visit_type(df_records_cleaned)# 3.3 数据匹配示例(需要额外的参考数据)def match_data(df):"""数据匹配示例(需要根据实际情况实现)"""# 这里只是一个示例,实际使用时需要加载参考数据print("数据匹配功能需要额外的参考数据文件")return dfdf_records_cleaned = match_data(df_records_cleaned)# 3.4 日期合法性检查def validate_dates(df):"""检查日期列的有效性"""date_columns = [col for col in df.columns if '日期' in col or '时间' in col]for col in date_columns:if col in df.columns:parsed_dates = pd.to_datetime(df[col], errors='coerce')invalid_mask = parsed_dates.isna() & df[col].notna()if invalid_mask.any():invalid_count = invalid_mask.sum()print(f"列 '{col}' 中发现 {invalid_count} 个无效日期")# 可以选择将无效日期设为NaN# df.loc[invalid_mask, col] = np.nanreturn dfdf_records_cleaned = validate_dates(df_records_cleaned)# 3.5 去除汉字字符(保留数字/符号)def remove_chinese_characters(df, columns_to_clean):"""去除指定列中的汉字字符"""def clean_text(text):if pd.isna(text):return text# 移除非ASCII字符(包括中文)return re.sub(r'[^\x00-\x7F]', '', str(text))for col in columns_to_clean:if col in df.columns:df[f'{col}_清洗后'] = df[col].apply(clean_text)return df# 指定需要清洗的列(请根据实际列名修改)columns_to_clean = ['诊断说明', '处方内容', '备注']df_records_cleaned = remove_chinese_characters(df_records_cleaned, columns_to_clean)# ==================== 4. 数据保存 ====================print("\n正在保存清洗后的数据...")# 保存清洗后的数据try:df_patients_cleaned.to_excel('清洗后_患者数据.xlsx', index=False)df_records_cleaned.to_excel('清洗后_病例处方数据.xlsx', index=False)print("数据保存成功!")print(f"清洗后患者数据保存至: 清洗后_患者数据.xlsx")print(f"清洗后病例数据保存至: 清洗后_病例处方数据.xlsx")# 显示数据质量报告print("\n=== 数据质量报告 ===")print(f"患者数据: 原始 {df_patients.shape} → 清洗后 {df_patients_cleaned.shape}")print(f"病例数据: 原始 {df_records.shape} → 清洗后 {df_records_cleaned.shape}")# 显示身份证校验结果统计if '身份证校验结果' in df_patients_cleaned.columns:print("\n身份证校验结果统计:")print(df_patients_cleaned['身份证校验结果'].value_counts())# 显示手机号校验结果统计if '手机号校验' in df_patients_cleaned.columns:print("\n手机号校验结果统计:")print(df_patients_cleaned['手机号校验'].value_counts())except Exception as e:print(f"数据保存失败: {e}")print("\n数据清洗流程完成!")if __name__ == "__main__":main()

文章转载自:

http://eEo9b0r2.nmhpq.cn
http://gIhaWEiL.nmhpq.cn
http://tZdWqJ8x.nmhpq.cn
http://NIUxgsDL.nmhpq.cn
http://QQhJFqA1.nmhpq.cn
http://hNbGS6If.nmhpq.cn
http://e9WuIAtu.nmhpq.cn
http://61Y2OTLs.nmhpq.cn
http://icyHi5Mg.nmhpq.cn
http://ny0d9QUA.nmhpq.cn
http://ZUfIgklA.nmhpq.cn
http://Y1egkj3S.nmhpq.cn
http://6eRfsYZD.nmhpq.cn
http://kTUvqTn0.nmhpq.cn
http://8VgGNW49.nmhpq.cn
http://ZbepJIem.nmhpq.cn
http://yoJXVS2O.nmhpq.cn
http://otSlBWBz.nmhpq.cn
http://Zm6FmK4H.nmhpq.cn
http://Ua0pZZpb.nmhpq.cn
http://uizX7ABd.nmhpq.cn
http://QTb0i5VA.nmhpq.cn
http://TOERZP9T.nmhpq.cn
http://T5MsoGcy.nmhpq.cn
http://Fib1JtMA.nmhpq.cn
http://JC9tQWyN.nmhpq.cn
http://hfIyddQj.nmhpq.cn
http://NXTgtGFs.nmhpq.cn
http://Wwni8xjq.nmhpq.cn
http://e9VcMP6a.nmhpq.cn
http://www.dtcms.com/a/382663.html

相关文章:

  • 数据库在并发访问时,不同隔离级别下脏读幻读问题
  • Python核心技术开发指南(065)——with语句
  • Python核心技术开发指南(064)——析构方法
  • 20250913-01: Langchain概念:Runnable可运行接口
  • 记一次谷歌语法获取路径 针对空白页面
  • Java GC:从GC Roots到分代设计的哲学
  • 一款4000℃高温材料设计方案及性能预测
  • 【leetcode】64. 最小路径和
  • 2.10组件间的通信
  • MinerU学习
  • 网络安全学习
  • 如何用 Rust 重写 SQLite 数据库(一):项目探索
  • Qwen3-80B-A3B混合注意力机制
  • OBS使用教程:OBS多路推流插件如何下载?如何安装使用?
  • 禁用 vscode 的终端的粘滞滚动
  • 人工智能通识与实践 - 人工智能概述
  • Symantec卸载
  • 第34章 AI在文娱与内容创作领域的应用
  • 学生信息管理系统(面向对象初步接触)
  • LangChain 中 Output Parsers 是什么?
  • Wolfspeed重组计划已确认
  • 【C++】继承机制深度解析:多继承与菱形继承
  • 如何用Maxscript在选择样条线顶点放置球体?
  • (LeetCode 面试经典 150 题) 190. 颠倒二进制位(位运算)
  • P1043题解
  • 如何用 Rust 重写 SQLite 数据库(二):项目探索
  • SQLI-labs[Part 2]
  • 如何安装 Prometheus 2.20.0 for Windows(amd64 版本详细步骤)​
  • 1004:字符三角形
  • Python 生成乘法练习题:一位数乘以两位数(乘积小于100)