6、Python-Pandas数据处理与分析
学习目标:熟练掌握结构化数据的处理和分析方法,建立数据科学项目的核心技能,培养数据洞察能力
(纯小白建议手敲,都是这么来的,不然就是看得快忘得快)
如果说NumPy是Python数据科学的数值计算基础,那么Pandas就是数据分析的瑞士军刀。从Excel表格到数据库查询,从数据清洗到统计分析,Pandas将复杂的数据操作变得直观而高效。我们将从电子表格的思维模式过渡到程序化数据处理的专业水平。
6.1 Pandas数据结构:从表格到DataFrame的认知转换
> 为什么需要Pandas
NumPy擅长处理同质数值数据,但现实世界的数据往往是异质的:姓名是字符串、年龄是整数、成绩是浮点数、是否及格是布尔值。Pandas 专门为处理这种结构化、异质数据而设计,它在NumPy的基础上构建了更高级的数据抽象。
import pandas as pd
import numpy as np
from datetime import datetime, timedelta# Pandas vs NumPy的数据处理能力对比
print("=== Pandas vs NumPy数据处理对比 ===")# NumPy的局限性
try:numpy_array = np.array(['张三', 25, 85.5, True])print(f"NumPy混合数据:{numpy_array}")print(f"数据类型:{numpy_array.dtype}") # 全部被转换为字符串
except Exception as e:print(f"NumPy处理异质数据的问题:{e}")# Pandas的优势
pandas_data = pd.DataFrame({'姓名': ['张三', '李四', '王五'],'年龄': [25, 30, 28],'成绩': [85.5, 92.0, 78.5],'及格': [True, True, True]
})print(f"\nPandas DataFrame:")
print(pandas_data)
print(f"\n数据类型:")
print(pandas_data.dtypes)
print(f"\n内存使用:")
print(pandas_data.info())
> Series:一维标签数组
Series 是Pandas的一维数据结构,可以看作是带标签的NumPy数组,或者是字典和列表的结合体。
# Series的创建和基本操作
print("=== Series数据结构详解 ===")# 从列表创建Series
scores = pd.Series([85, 92, 78, 96, 88])
print("从列表创建的Series:")
print(scores)
print(f"索引:{scores.index.tolist()}")
print(f"值:{scores.values}")# 带标签的Series
student_scores = pd.Series([85, 92, 78, 96, 88], index=['张三', '李四', '王五', '赵六', '钱七'])
print(f"\n带标签的Series:")
print(student_scores)# 从字典创建Series
score_dict = {'张三': 85, '李四': 92, '王五': 78, '赵六': 96, '钱七': 88}
dict_series = pd.Series(score_dict)
print(f"\n从字典创建的Series:")
print(dict_series)# Series的基本属性和方法
print(f"\nSeries基本信息:")
print(f"形状:{student_scores.shape}")
print(f"大小:{student_scores.size}")
print(f"数据类型:{student_scores.dtype}")
print(f"名称:{student_scores.name}")# 设置Series名称
student_scores.name = '期末成绩'
student_scores.index.name = '学生姓名'
print(f"\n设置名称后:")
print(student_scores)# Series的索引操作
print(f"\n=== Series索引操作 ===")
print(f"张三的成绩:{student_scores['张三']}")
print(f"前三名学生:")
print(student_scores[:3])
print(f"成绩大于85的学生:")
print(student_scores[student_scores > 85])# Series的运算
print(f"\n=== Series运算 ===")
print(f"所有成绩加5分:")
print(student_scores + 5)
print(f"成绩统计:")
print(student_scores.describe())
> DataFrame:二维标签数据结构
DataFrame 是Pandas的核心数据结构,可以理解为多个Series的集合,或者是带行列标签的二维表格。
# DataFrame的创建方式
print("=== DataFrame创建方式 ===")# 方式1:从字典创建
student_data = {'姓名': ['张三', '李四', '王五', '赵六', '钱七'],'年龄': [20, 21, 19, 22, 20],'性别': ['男', '女', '男', '男', '女'],'数学': [85, 92, 78, 96, 88],'英语': [78, 89, 85, 88, 92],'物理': [82, 87, 76, 94, 85]
}df = pd.DataFrame(student_data)
print("从字典创建的DataFrame:")
print(df)# 方式2:从列表的列表创建
data_list = [['张三', 20, '男', 85, 78, 82],['李四', 21, '女', 92, 89, 87],['王五', 19, '男', 78, 85, 76],['赵六', 22, '男', 96, 88, 94],['钱七', 20, '女', 88, 92, 85]
]columns = ['姓名', '年龄', '性别', '数学', '英语', '物理']
df2 = pd.DataFrame(data_list, columns=columns)
print(f"\n从列表创建的DataFrame:")
print(df2)# 方式3:从NumPy数组创建
np_data = np.random.randint(60, 100, size=(5, 3))
df3 = pd.DataFrame(np_data, columns=['语文', '数学', '英语'],index=[f'学生{i+1}' for i in range(5)])
print(f"\n从NumPy数组创建的DataFrame:")
print(df3)# DataFrame的基本信息
print(f"\n=== DataFrame基本信息 ===")
print(f"形状:{df.shape}")
print(f"列名:{df.columns.tolist()}")
print(f"索引:{df.index.tolist()}")
print(f"数据类型:")
print(df.dtypes)# 设置索引
df_indexed = df.set_index('姓名')
print(f"\n设置姓名为索引:")
print(df_indexed)
6.2 数据导入导出:连接外部数据源
> 文件格式支持概览
Pandas支持多种数据格式的读写,这是数据分析项目的起点。
格式 | 读取函数 | 写入函数 | 适用场景 | 优势 | 劣势 |
---|---|---|---|---|---|
CSV | read_csv() | to_csv() | 数据交换、简单存储 | 通用性强、体积小 | 无数据类型信息 |
Excel | read_excel() | to_excel() | 业务报告、多表数据 | 格式丰富、易读 | 体积大、加载慢 |
JSON | read_json() | to_json() | API数据、嵌套结构 | 结构灵活、网络友好 | 非表格数据处理复杂 |
SQL | read_sql() | to_sql() | 数据库集成 | 查询灵活、数据量大 | 需要数据库连接 |
Parquet | read_parquet() | to_parquet() | 大数据处理 | 压缩率高、读取快 | 兼容性一般 |
# 创建示例数据用于演示
sample_data = {'date': pd.date_range('2024-01-01', periods=100),'product': np.random.choice(['手机', '电脑', '平板'], 100),'sales': np.random.randint(1000, 10000, 100),'profit': np.random.uniform(0.1, 0.3, 100),'region': np.random.choice(['北京', '上海', '广州', '深圳'], 100)
}sales_df = pd.DataFrame(sample_data)
print("=== 样例销售数据 ===")
print(sales_df.head())# CSV文件操作
print(f"\n=== CSV文件操作 ===")
csv_file = 'sales_data.csv'
sales_df.to_csv(csv_file, index=False, encoding='utf-8')
print(f"数据已保存到 {csv_file}")# 读取CSV并指定参数
df_from_csv = pd.read_csv(csv_file, parse_dates=['date'], # 解析日期encoding='utf-8')
print(f"从CSV读取的数据类型:")
print(df_from_csv.dtypes)# Excel文件操作
print(f"\n=== Excel文件操作 ===")
excel_file = 'sales_report.xlsx'# 写入多个工作表
with pd.ExcelWriter(excel_file, engine='openpyxl') as writer:sales_df.to_excel(writer, sheet_name='原始数据', index=False)# 按产品分组的汇总数据summary = sales_df.groupby('product').agg({'sales': ['sum', 'mean', 'count'],'profit': 'mean'}).round(2)summary.to_excel(writer, sheet_name='产品汇总')# 按地区分组的汇总数据region_summary = sales_df.groupby('region')['sales'].sum().sort_values(ascending=False)region_summary.to_excel(writer, sheet_name='地区汇总')print(f"Excel文件已保存到 {excel_file}")# 读取Excel文件
df_from_excel = pd.read_excel(excel_file, sheet_name='原始数据')
print(f"从Excel读取的前5行:")
print(df_from_excel.head())# JSON文件操作
print(f"\n=== JSON文件操作 ===")
json_file = 'sales_data.json'# 保存为JSON(不同格式)
sales_df.to_json(json_file, orient='records', date_format='iso', force_ascii=False)
print(f"数据已保存到 {json_file}")# 读取JSON
df_from_json = pd.read_json(json_file)
print(f"从JSON读取的数据形状:{df_from_json.shape}")
> 高级导入技巧
在实际项目中,数据往往需要特殊处理才能正确导入。
# 创建有问题的示例数据用于演示
problematic_data = """date,product,sales,profit,notes
2024-01-01,手机,5000,0.15,正常销售
2024-01-02,电脑,,0.20,缺少销售数据
2024-01-03,平板,3000,0.25,
2024-01-04,手机,7000,无效数据,数据错误
2024-01-05,电脑,4500,0.18,#这是注释
"""with open('problematic_data.csv', 'w', encoding='utf-8') as f:f.write(problematic_data)print("=== 处理问题数据 ===")# 基础读取(会有问题)
try:basic_df = pd.read_csv('problematic_data.csv')print("基础读取结果:")print(basic_df)print(f"数据类型:")print(basic_df.dtypes)
except Exception as e:print(f"基础读取出错:{e}")# 高级读取参数
print(f"\n高级读取处理:")
advanced_df = pd.read_csv('problematic_data.csv',parse_dates=['date'], # 解析日期na_values=['', '无效数据'], # 自定义空值comment='#', # 忽略注释行dtype={'product': 'category'}) # 指定数据类型print(advanced_df)
print(f"\n处理后的数据类型:")
print(advanced_df.dtypes)def clean_sales(x):"""清理销售数据"""if pd.isna(x) or x == '' or str(x).strip() == '':return 0try:return int(float(x)) # 先转float再转int,处理小数点情况except (ValueError, TypeError):return 0def clean_profit(x):"""清理利润数据"""if pd.isna(x) or x == '' or str(x).strip() == '':return np.nantry:return float(x)except (ValueError, TypeError):return np.nan# 使用转换函数读取
print(f"\n=== 使用转换函数 ===")
cleaned_df = pd.read_csv('problematic_data.csv',converters={'sales': clean_sales,'profit': clean_profit},parse_dates=['date'])print(cleaned_df)
print(f"\n清理后的数据类型:")
print(cleaned_df.dtypes)# 分块读取大文件
print(f"\n=== 分块读取演示 ===")
chunk_size = 2
chunk_list = []for chunk in pd.read_csv('sales_data.csv', chunksize=chunk_size):# 对每个块进行处理processed_chunk = chunk[chunk['sales'] > 5000]chunk_list.append(processed_chunk)large_sales = pd.concat(chunk_list, ignore_index=True)
print(f"分块处理结果(销售额>5000):")
print(large_sales.head())
6.3 数据清洗:从脏数据到分析就绪
> 缺失值处理策略
缺失值是数据分析中最常见的问题,不同的处理策略会显著影响分析结果。
# 创建带缺失值的示例数据
np.random.seed(42)
data_with_missing = {'id': range(1, 21),'name': [f'客户{i}' if i % 7 != 0 else np.nan for i in range(1, 21)],'age': [np.random.randint(20, 65) if i % 5 != 0 else np.nan for i in range(1, 21)],'income': [np.random.randint(3000, 15000) if i % 4 != 0 else np.nan for i in range(1, 21)],'city': [np.random.choice(['北京', '上海', '广州']) if i % 6 != 0 else np.nan for i in range(1, 21)]
}df_missing = pd.DataFrame(data_with_missing)
print("=== 带缺失值的数据 ===")
print(df_missing)# 缺失值检测
print(f"\n=== 缺失值检测 ===")
print(f"缺失值统计:")
print(df_missing.isnull().sum())
print(f"\n缺失值比例:")
print(df_missing.isnull().mean().round(2))# 可视化缺失值模式
missing_matrix = df_missing.isnull()
print(f"\n缺失值矩阵(前10行):")
print(missing_matrix.head(10))# 完整记录统计
complete_records = df_missing.dropna()
print(f"\n完整记录数量:{len(complete_records)}")
print(f"完整率:{len(complete_records)/len(df_missing):.2%}")# 缺失值处理策略
print(f"\n=== 缺失值处理策略 ===")# 策略1:删除包含缺失值的行
df_drop_rows = df_missing.dropna()
print(f"1. 删除缺失行后:{df_drop_rows.shape}")# 策略2:删除包含缺失值的列
df_drop_cols = df_missing.dropna(axis=1)
print(f"2. 删除缺失列后:{df_drop_cols.shape}")# 策略3:条件删除(至少有3个非空值的行)
df_thresh = df_missing.dropna(thresh=3)
print(f"3. 至少3个非空值的行:{df_thresh.shape}")# 策略4:填充缺失值
df_filled = df_missing.copy()# 数值列用均值填充
numeric_cols = ['age', 'income']
for col in numeric_cols:mean_value = df_filled[col].mean()df_filled[col] = df_filled[col].fillna(mean_value)print(f"4. {col}列用均值{mean_value:.1f}填充")# 分类列用众数填充
mode_value = df_filled['city'].mode()[0] if not df_filled['city'].mode().empty else '未知城市'
df_filled['city'] = df_filled['city'].fillna(mode_value)
print(f"5. city列用众数填充")# 字符串列用自定义值填充
df_filled['name'] = df_filled['name'].fillna('未知客户')
print(f"6. name列用'未知客户'填充")print(f"\n填充后的数据:")
print(df_filled)# 高级填充方法
print(f"\n=== 高级填充方法 ===")
df_advanced = df_missing.copy()# 前向填充和后向填充(使用新的方法)
df_advanced['age'] = df_advanced['age'].ffill() # 前向填充
df_advanced['income'] = df_advanced['income'].bfill() # 后向填充# 插值填充
df_advanced['age'] = df_advanced['age'].interpolate()print(f"高级填充后age列缺失值:{df_advanced['age'].isnull().sum()}")
print(f"高级填充后income列缺失值:{df_advanced['income'].isnull().sum()}")
> 重复数据处理
重复数据会扭曲分析结果,需要系统化的检测和处理。
# 创建带重复的示例数据
duplicate_data = {'customer_id': [1, 2, 3, 3, 4, 5, 5, 6, 7, 7],'name': ['张三', '李四', '王五', '王五', '赵六', '钱七', '钱七', '孙八', '周九', '周九'],'phone': ['13111111111', '13222222222', '13333333333', '13333333333', '13444444444', '13555555555', '13555555555', '13666666666','13777777777', '13777777777'],'email': ['zhang@email.com', 'li@email.com', 'wang@email.com', 'wang@email.com','zhao@email.com', 'qian@email.com', 'qian@email.com', 'sun@email.com','zhou@email.com', 'zhou@email.com'],'purchase_amount': [1000, 1500, 2000, 2000, 800, 1200, 1200, 1800, 900, 900]
}df_dup = pd.DataFrame(duplicate_data)
print("=== 带重复数据的DataFrame ===")
print(df_dup)# 重复值检测
print(f"\n=== 重复值检测 ===")
print(f"完全重复的行数:{df_dup.duplicated().sum()}")
print(f"重复的行:")
print(df_dup[df_dup.duplicated()])# 基于特定列检测重复
print(f"\n基于customer_id的重复:{df_dup.duplicated(subset=['customer_id']).sum()}")
print(f"基于name的重复:{df_dup.duplicated(subset=['name']).sum()}")
print(f"基于多列的重复:{df_dup.duplicated(subset=['name', 'phone']).sum()}")# 查看所有重复项(包括第一次出现)
all_duplicates = df_dup.duplicated(keep=False)
print(f"\n所有重复项(包括首次出现):")
print(df_dup[all_duplicates])# 重复值处理策略
print(f"\n=== 重复值处理策略 ===")# 策略1:删除完全重复的行(保留第一个)
df_drop_first = df_dup.drop_duplicates()
print(f"1. 删除完全重复(保留首个):{df_drop_first.shape}")# 策略2:删除完全重复的行(保留最后一个)
df_drop_last = df_dup.drop_duplicates(keep='last')
print(f"2. 删除完全重复(保留末个):{df_drop_last.shape}")# 策略3:基于关键列删除重复
df_drop_key = df_dup.drop_duplicates(subset=['customer_id', 'name'])
print(f"3. 基于关键列删除重复:{df_drop_key.shape}")# 策略4:聚合重复数据
print(f"4. 聚合重复数据:")
df_aggregated = df_dup.groupby(['customer_id', 'name']).agg({'phone': 'first','email': 'first','purchase_amount': 'sum'
}).reset_index()print(df_aggregated)# 重复数据质量报告
print(f"\n=== 数据质量报告 ===")
original_rows = len(df_dup)
clean_rows = len(df_drop_key)
duplicate_rate = (original_rows - clean_rows) / original_rowsprint(f"原始数据行数:{original_rows}")
print(f"清洗后行数:{clean_rows}")
print(f"重复率:{duplicate_rate:.2%}")
print(f"数据完整性:{1-duplicate_rate:.2%}")
> 数据类型转换与标准化
正确的数据类型是高效分析的基础。
# 创建类型混乱的示例数据
messy_data = {'id': ['1', '2', '3', '4', '5'],'date': ['2024-01-01', '2024/01/02', '20240103', '2024.01.04', '2024-1-5'],'price': ['100.50', '200.75', '150', '300.25', '250'],'category': ['A', 'B', 'A', 'C', 'B'],'is_premium': ['True', 'False', '1', '0', 'yes'],'rating': ['4.5', '3.2', '5.0', '2.8', '4.1']
}df_messy = pd.DataFrame(messy_data)
print("=== 类型混乱的数据 ===")
print(df_messy)
print(f"\n原始数据类型:")
print(df_messy.dtypes)# 数据类型转换
print(f"\n=== 数据类型转换 ===")
df_clean = df_messy.copy()# 转换数值类型
df_clean['id'] = pd.to_numeric(df_clean['id'])
df_clean['price'] = pd.to_numeric(df_clean['price'])
df_clean['rating'] = pd.to_numeric(df_clean['rating'])print(f"数值转换后:")
print(df_clean.dtypes)# 转换日期类型(处理不同格式)
print(f"\n处理日期格式:")
date_formats = ['%Y-%m-%d', '%Y/%m/%d', '%Y%m%d', '%Y.%m.%d', '%Y-%m-%d']def parse_flexible_date(date_str):"""灵活解析多种日期格式"""for fmt in date_formats:try:return pd.to_datetime(date_str, format=fmt)except:continuereturn pd.to_datetime(date_str, infer_datetime_format=True)df_clean['date'] = df_clean['date'].apply(parse_flexible_date)
print(f"日期转换后类型:{df_clean['date'].dtype}")# 转换分类类型
df_clean['category'] = df_clean['category'].astype('category')
print(f"分类转换后:{df_clean['category'].dtype}")
print(f"分类取值:{df_clean['category'].cat.categories.tolist()}")# 转换布尔类型(处理多种表示)
def standardize_boolean(value):"""标准化布尔值"""if str(value).lower() in ['true', '1', 'yes', 'y']:return Trueelif str(value).lower() in ['false', '0', 'no', 'n']:return Falseelse:return np.nandf_clean['is_premium'] = df_clean['is_premium'].apply(standardize_boolean)
print(f"布尔转换后类型:{df_clean['is_premium'].dtype}")print(f"\n=== 转换后的完整数据 ===")
print(df_clean)
print(f"\n最终数据类型:")
print(df_clean.dtypes)# 数据标准化和规范化
print(f"\n=== 数据标准化 ===")# 数值标准化(Z-score)
from scipy import stats
df_clean['price_zscore'] = stats.zscore(df_clean['price'])
df_clean['rating_zscore'] = stats.zscore(df_clean['rating'])# 数值归一化(Min-Max缩放)
from sklearn.preprocessing import MinMaxScaler
scaler = MinMaxScaler()
df_clean[['price_normalized', 'rating_normalized']] = scaler.fit_transform(df_clean[['price', 'rating']])print(f"标准化后的价格:")
print(df_clean[['price', 'price_zscore', 'price_normalized']])# 文本标准化
print(f"\n=== 文本数据标准化 ===")
text_data = pd.Series([' Apple ', 'BANANA', 'orange ', ' Grape', 'APPLE'])
print(f"原始文本:{text_data.tolist()}")# 文本清理步骤
cleaned_text = (text_data.str.strip() # 去除首尾空格.str.lower() # 转换为小写.str.title()) # 首字母大写print(f"清理后文本:{cleaned_text.tolist()}")# 类别编码
print(f"\n=== 类别编码 ===")
categories = df_clean['category']# Label Encoding
label_encoded = categories.cat.codes
print(f"Label编码:{label_encoded.tolist()}")# One-Hot Encoding
one_hot = pd.get_dummies(categories, prefix='category')
print(f"One-Hot编码:")
print(one_hot)
6.4 数据筛选与查询:精确获取目标数据
> 条件筛选的多种方法
Pandas提供了多种数据筛选方法,从简单的布尔索引到复杂的查询语句。
# 创建综合示例数据
np.random.seed(42)
sales_data = {'date': pd.date_range('2024-01-01', periods=50),'salesperson': np.random.choice(['张三', '李四', '王五', '赵六'], 50),'product': np.random.choice(['手机', '电脑', '平板', '耳机'], 50),'region': np.random.choice(['北京', '上海', '广州', '深圳'], 50),'quantity': np.random.randint(1, 20, 50),'unit_price': np.random.randint(100, 5000, 50),'customer_type': np.random.choice(['个人', '企业'], 50)
}sales_data['total_amount'] = sales_data['quantity'] * sales_data['unit_price']
df_sales = pd.DataFrame(sales_data)print("=== 销售数据样本 ===")
print(df_sales.head(10))# 基础条件筛选
print(f"\n=== 基础条件筛选 ===")# 单一条件
high_value_sales = df_sales[df_sales['total_amount'] > 10000]
print(f"高价值销售(>10000):{len(high_value_sales)}条")# 多重条件(AND)
beijing_phone = df_sales[(df_sales['region'] == '北京') & (df_sales['product'] == '手机')]
print(f"北京地区手机销售:{len(beijing_phone)}条")# 多重条件(OR)
target_products = df_sales[(df_sales['product'] == '手机') | (df_sales['product'] == '电脑')]
print(f"手机或电脑销售:{len(target_products)}条")# 范围筛选
medium_sales = df_sales[(df_sales['total_amount'] >= 5000) & (df_sales['total_amount'] <= 15000)]
print(f"中等金额销售(5000-15000):{len(medium_sales)}条")# 日期范围筛选
jan_sales = df_sales[(df_sales['date'] >= '2024-01-01') & (df_sales['date'] <= '2024-01-15')]
print(f"1月上半月销售:{len(jan_sales)}条")# 使用isin()方法
target_salespeople = df_sales[df_sales['salesperson'].isin(['张三', '李四'])]
print(f"指定销售员的记录:{len(target_salespeople)}条")target_regions = df_sales[df_sales['region'].isin(['北京', '上海'])]
print(f"一线城市销售:{len(target_regions)}条")# 字符串筛选
phone_related = df_sales[df_sales['product'].str.contains('手机')]
print(f"包含'手机'的产品:{len(phone_related)}条")# 空值筛选
print(f"\n=== 空值筛选 ===")
non_null_sales = df_sales[df_sales['total_amount'].notnull()]
print(f"非空销售记录:{len(non_null_sales)}条")# 使用query()方法(更直观的SQL风格)
print(f"\n=== Query方法筛选 ===")# 简单查询
query_result1 = df_sales.query('total_amount > 8000')
print(f"Query高价值销售:{len(query_result1)}条")# 复杂查询
query_result2 = df_sales.query('region == "上海" and product in ["电脑", "平板"]')
print(f"上海地区电脑/平板销售:{len(query_result2)}条")# 使用变量的查询
min_amount = 6000
query_result3 = df_sales.query('total_amount > @min_amount')
print(f"使用变量的查询结果:{len(query_result3)}条")# 日期查询
query_result4 = df_sales.query('date >= "2024-01-10" and date <= "2024-01-20"')
print(f"特定日期范围:{len(query_result4)}条")
> 高级筛选技巧
# 高级筛选技巧
print(f"\n=== 高级筛选技巧 ===")# 1. 分位数筛选
q75 = df_sales['total_amount'].quantile(0.75)
top_25_percent = df_sales[df_sales['total_amount'] >= q75]
print(f"销售额前25%的记录:{len(top_25_percent)}条")# 2. 异常值筛选(使用IQR方法)
Q1 = df_sales['total_amount'].quantile(0.25)
Q3 = df_sales['total_amount'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQRoutliers = df_sales[(df_sales['total_amount'] < lower_bound) | (df_sales['total_amount'] > upper_bound)]
normal_data = df_sales[(df_sales['total_amount'] >= lower_bound) & (df_sales['total_amount'] <= upper_bound)]print(f"异常值:{len(outliers)}条")
print(f"正常数据:{len(normal_data)}条")# 3. 条件计数和聚合筛选
print(f"\n=== 条件统计 ===")# 每个销售员的高价值销售数量
high_value_count = (df_sales[df_sales['total_amount'] > 8000].groupby('salesperson').size().sort_values(ascending=False))
print(f"各销售员高价值销售次数:")
print(high_value_count)# 筛选高产出销售员
productive_salespeople = high_value_count[high_value_count >= 2].index
productive_records = df_sales[df_sales['salesperson'].isin(productive_salespeople)]
print(f"高产出销售员的所有记录:{len(productive_records)}条")# 4. 复合条件筛选函数
def advanced_filter(df, **conditions):"""高级筛选函数"""mask = pd.Series([True] * len(df), index=df.index)for column, condition in conditions.items():if isinstance(condition, dict):if 'min' in condition:mask &= (df[column] >= condition['min'])if 'max' in condition:mask &= (df[column] <= condition['max'])if 'in' in condition:mask &= (df[column].isin(condition['in']))if 'contains' in condition:mask &= (df[column].str.contains(condition['contains'], na=False))else:mask &= (df[column] == condition)return df[mask]# 使用复合筛选
filtered_data = advanced_filter(df_sales,region={'in': ['北京', '上海']},total_amount={'min': 5000, 'max': 20000},customer_type='企业'
)print(f"\n复合筛选结果:{len(filtered_data)}条")
print(f"筛选条件:一线城市 + 销售额5000-20000 + 企业客户")# 5. 时间序列筛选
print(f"\n=== 时间序列筛选 ===")# 最近一周的数据
recent_week = df_sales[df_sales['date'] >= df_sales['date'].max() - pd.Timedelta(days=7)]
print(f"最近一周销售:{len(recent_week)}条")# 工作日vs周末
df_sales['weekday'] = df_sales['date'].dt.dayofweek
weekday_sales = df_sales[df_sales['weekday'] < 5] # 0-4表示周一到周五
weekend_sales = df_sales[df_sales['weekday'] >= 5] # 5-6表示周六日print(f"工作日销售:{len(weekday_sales)}条")
print(f"周末销售:{len(weekend_sales)}条")# 月份筛选
df_sales['month'] = df_sales['date'].dt.month
jan_data = df_sales[df_sales['month'] == 1]
print(f"1月份销售:{len(jan_data)}条")
6.5 分组聚合:从个体数据到群体洞察
> GroupBy操作的核心概念
分组聚合(Group By) 是数据分析的核心操作,它将"分割-应用-合并"的模式用于从原始数据中提取群体层面的洞察。
# 分组聚合基础操作
print("=== 分组聚合基础操作 ===")# 基础分组统计
print("1. 按销售员分组的基础统计:")
salesperson_stats = df_sales.groupby('salesperson').agg({'total_amount': ['sum', 'mean', 'count', 'std'],'quantity': ['sum', 'mean'],'unit_price': 'mean'
})# 展平多级列名
salesperson_stats.columns = ['_'.join(col).strip() for col in salesperson_stats.columns]
salesperson_stats = salesperson_stats.round(2)
print(salesperson_stats)# 按产品分组
print(f"\n2. 按产品分组的销售统计:")
product_performance = df_sales.groupby('product').agg({'total_amount': ['sum', 'mean', 'count'],'quantity': 'sum'
}).round(2)product_performance.columns = ['_'.join(col) for col in product_performance.columns]
print(product_performance)# 多级分组
print(f"\n3. 多级分组(地区+产品):")
region_product = df_sales.groupby(['region', 'product']).agg({'total_amount': ['sum', 'count'],'quantity': 'sum'
}).round(2)print(region_product.head(10))# 自定义聚合函数
print(f"\n=== 自定义聚合函数 ===")def sales_cv(series):"""计算变异系数"""return series.std() / series.mean() if series.mean() != 0 else 0def price_range(series):"""计算价格范围"""return series.max() - series.min()custom_agg = df_sales.groupby('salesperson').agg({'total_amount': ['mean', sales_cv],'unit_price': [price_range, 'median']
}).round(3)custom_agg.columns = ['平均销售额', '销售额变异系数', '价格范围', '价格中位数']
print(custom_agg)# 条件聚合
print(f"\n=== 条件聚合 ===")# 只统计高价值销售
high_value_agg = (df_sales[df_sales['total_amount'] > 5000].groupby('region').agg({'total_amount': ['count', 'sum', 'mean']}))high_value_agg.columns = ['高价值销售次数', '高价值销售总额', '高价值销售均值']
print(high_value_agg)
> 高级分组技巧
# 时间维度分组
print(f"\n=== 时间维度分组 ===")# 按日期分组
daily_sales = df_sales.groupby(df_sales['date'].dt.date).agg({'total_amount': 'sum','quantity': 'sum'
}).reset_index()daily_sales.columns = ['日期', '日销售额', '日销售量']
print("每日销售汇总(前10天):")
print(daily_sales.head(10))# 按周分组(使用isocalendar().week)
weekly_sales = df_sales.groupby(df_sales['date'].dt.isocalendar().week).agg({'total_amount': ['sum', 'count'],'quantity': 'sum'
})weekly_sales.columns = ['周销售额', '周订单数', '周销售量']
print(f"\n每周销售汇总:")
print(weekly_sales)# 按工作日/周末分组
df_sales['is_weekend'] = df_sales['date'].dt.dayofweek >= 5
weekend_comparison = df_sales.groupby('is_weekend').agg({'total_amount': ['mean', 'sum', 'count']
})weekend_comparison.columns = ['平均订单金额', '总销售额', '订单数量']
weekend_comparison.index = ['工作日', '周末']
print(f"\n工作日vs周末对比:")
print(weekend_comparison)
> 透视表和交叉分析
# 透视表分析
print(f"\n=== 透视表分析 ===")# 基础透视表
basic_pivot = pd.pivot_table(df_sales, values='total_amount',index='region',columns='product',aggfunc='sum',fill_value=0)print("地区-产品销售额透视表:")
print(basic_pivot)# 多值透视表
multi_value_pivot = pd.pivot_table(df_sales,values=['total_amount', 'quantity'],index='region',columns='customer_type',aggfunc={'total_amount': 'sum', 'quantity': 'mean'},fill_value=0)print(f"\n地区-客户类型多指标透视表:")
print(multi_value_pivot.round(2))# 多级分组透视表
complex_pivot = pd.pivot_table(df_sales,values='total_amount',index=['region', 'salesperson'],columns='product',aggfunc=['sum', 'count'],fill_value=0)print(f"\n复杂透视表形状:{complex_pivot.shape}")
print("复杂透视表(部分数据):")
print(complex_pivot.head())# 交叉分析表
print(f"\n=== 交叉分析 ===")# 频率交叉表
frequency_crosstab = pd.crosstab(df_sales['region'], df_sales['product'], margins=True)print("地区-产品销售频次交叉表:")
print(frequency_crosstab)# 比例交叉表
proportion_crosstab = pd.crosstab(df_sales['region'], df_sales['customer_type'], normalize='index')print(f"\n地区内客户类型比例:")
print(proportion_crosstab.round(3))# 统计量交叉表
value_crosstab = pd.crosstab(df_sales['region'],df_sales['product'],values=df_sales['total_amount'],aggfunc='mean')print(f"\n地区-产品平均销售额交叉表:")
print(value_crosstab.round(2))# 多维交叉分析
print(f"\n=== 多维交叉分析 ===")# 三维交叉表
three_way_table = df_sales.groupby(['region', 'product', 'customer_type']).agg({'total_amount': ['sum', 'count', 'mean']
}).round(2)print("三维交叉分析表(前15行):")
print(three_way_table.head(15))# 重塑数据进行分析
unstacked_data = three_way_table.unstack(level='customer_type')
print(f"\n重塑后的数据形状:{unstacked_data.shape}")
6.6 数据合并与重塑:整合多源数据
> 数据连接操作
数据连接是将多个数据源整合的关键技术,类似于SQL中的JOIN操作。
# 创建多个相关数据表进行连接演示
print("=== 创建相关数据表 ===")# 客户信息表
customers = pd.DataFrame({'customer_id': [1, 2, 3, 4, 5],'customer_name': ['公司A', '公司B', '公司C', '个人D', '个人E'],'customer_type': ['企业', '企业', '企业', '个人', '个人'],'credit_rating': ['A', 'B', 'A', 'C', 'B'],'city': ['北京', '上海', '广州', '深圳', '杭州']
})# 订单信息表
orders = pd.DataFrame({'order_id': [101, 102, 103, 104, 105, 106, 107],'customer_id': [1, 2, 2, 3, 4, 4, 6], # 注意customer_id=6不存在于客户表'product_id': ['P001', 'P002', 'P001', 'P003', 'P002', 'P001', 'P003'],'order_date': pd.date_range('2024-01-01', periods=7),'amount': [5000, 8000, 4500, 12000, 3000, 7500, 9000],'status': ['完成', '完成', '进行中', '完成', '取消', '完成', '完成']
})# 产品信息表
products = pd.DataFrame({'product_id': ['P001', 'P002', 'P003', 'P004'],'product_name': ['笔记本电脑', '台式电脑', '服务器', '打印机'],'category': ['电脑', '电脑', '服务器', '外设'],'unit_price': [8000, 6000, 25000, 2000]
})print("客户信息表:")
print(customers)
print(f"\n订单信息表:")
print(orders)
print(f"\n产品信息表:")
print(products)# 内连接(INNER JOIN)
print(f"\n=== 内连接操作 ===")
inner_join = pd.merge(orders, customers, on='customer_id', how='inner')
print(f"内连接结果({len(inner_join)}行):")
print(inner_join)# 左连接(LEFT JOIN)
print(f"\n=== 左连接操作 ===")
left_join = pd.merge(orders, customers, on='customer_id', how='left')
print(f"左连接结果({len(left_join)}行):")
print(left_join)# 右连接(RIGHT JOIN)
print(f"\n=== 右连接操作 ===")
right_join = pd.merge(orders, customers, on='customer_id', how='right')
print(f"右连接结果({len(right_join)}行):")
print(right_join)# 外连接(FULL OUTER JOIN)
print(f"\n=== 外连接操作 ===")
outer_join = pd.merge(orders, customers, on='customer_id', how='outer')
print(f"外连接结果({len(outer_join)}行):")
print(outer_join)# 多表连接
print(f"\n=== 多表连接 ===")
# 先连接订单和客户
order_customer = pd.merge(orders, customers, on='customer_id', how='left')
# 再连接产品信息
complete_data = pd.merge(order_customer, products, on='product_id', how='left')print(f"三表连接结果:")
print(complete_data)# 不同列名的连接
print(f"\n=== 不同列名连接 ===")
# 创建销售员表,使用不同的列名
salespeople = pd.DataFrame({'sales_id': [1, 2, 3, 4],'sales_name': ['张三', '李四', '王五', '赵六'],'department': ['销售一部', '销售二部', '销售一部', '销售三部']
})# 为订单表添加销售员ID
orders_with_sales = orders.copy()
orders_with_sales['salesperson_id'] = [1, 2, 1, 3, 2, 1, 4]# 使用left_on和right_on进行连接
sales_orders = pd.merge(orders_with_sales, salespeople, left_on='salesperson_id', right_on='sales_id')print("不同列名连接结果:")
print(sales_orders[['order_id', 'customer_id', 'amount', 'sales_name', 'department']])
> 数据拼接与组合
# 数据拼接操作
print(f"\n=== 数据拼接操作 ===")# 垂直拼接(行方向)
q1_sales = pd.DataFrame({'month': ['2024-01', '2024-02', '2024-03'],'sales': [100000, 120000, 110000],'profit': [15000, 18000, 16500]
})q2_sales = pd.DataFrame({'month': ['2024-04', '2024-05', '2024-06'],'sales': [130000, 125000, 140000],'profit': [19500, 18750, 21000]
})print("Q1销售数据:")
print(q1_sales)
print(f"\nQ2销售数据:")
print(q2_sales)# 垂直拼接
half_year_sales = pd.concat([q1_sales, q2_sales], ignore_index=True)
print(f"\n半年销售数据:")
print(half_year_sales)# 水平拼接(列方向)
cost_data = pd.DataFrame({'month': ['2024-01', '2024-02', '2024-03', '2024-04', '2024-05', '2024-06'],'cost': [85000, 102000, 93500, 110500, 106250, 119000],'marketing': [5000, 6000, 5500, 6500, 6250, 7000]
})# 基于month列进行合并
complete_data = pd.merge(half_year_sales, cost_data, on='month')
print(f"\n完整财务数据:")
print(complete_data)# 使用concat进行水平拼接(按索引)
aligned_cost = cost_data.set_index('month')
aligned_sales = half_year_sales.set_index('month')horizontal_concat = pd.concat([aligned_sales, aligned_cost], axis=1)
print(f"\n水平拼接结果:")
print(horizontal_concat)# 处理重叠列名
print(f"\n=== 处理重叠列名 ===")
df1 = pd.DataFrame({'id': [1, 2, 3],'value': [10, 20, 30],'category': ['A', 'B', 'C']
})df2 = pd.DataFrame({'id': [1, 2, 4],'value': [15, 25, 35],'score': [85, 90, 88]
})# 合并时处理重叠列
merged_with_suffix = pd.merge(df1, df2, on='id', how='outer', suffixes=('_left', '_right'))
print("处理重叠列名的合并:")
print(merged_with_suffix)# 多键连接
print(f"\n=== 多键连接 ===")
sales_detail = pd.DataFrame({'region': ['北京', '北京', '上海', '上海', '广州'],'product': ['A', 'B', 'A', 'B', 'A'],'sales_2023': [100, 150, 120, 180, 90]
})sales_target = pd.DataFrame({'region': ['北京', '北京', '上海', '上海', '广州'],'product': ['A', 'B', 'A', 'B', 'A'],'target_2024': [110, 160, 130, 190, 95]
})multi_key_merge = pd.merge(sales_detail, sales_target, on=['region', 'product'])
print("多键连接结果:")
print(multi_key_merge)
> 数据重塑操作
# 长宽格式转换
print(f"\n=== 数据重塑:长宽格式转换 ===")# 创建宽格式数据
wide_data = pd.DataFrame({'student': ['张三', '李四', '王五'],'math': [85, 92, 78],'english': [88, 85, 92],'physics': [82, 89, 75]
})print("宽格式数据:")
print(wide_data)# 宽转长(melt)
long_data = pd.melt(wide_data, id_vars=['student'], value_vars=['math', 'english', 'physics'],var_name='subject', value_name='score')print(f"\n长格式数据:")
print(long_data)# 长转宽(pivot)
back_to_wide = long_data.pivot(index='student', columns='subject', values='score')
print(f"\n转回宽格式:")
print(back_to_wide)# 复杂的重塑操作
print(f"\n=== 复杂重塑操作 ===")# 创建多维销售数据
sales_multi = pd.DataFrame({'date': pd.date_range('2024-01-01', periods=12),'region': ['北京', '上海'] * 6,'product_A': np.random.randint(100, 500, 12),'product_B': np.random.randint(100, 500, 12),'product_C': np.random.randint(100, 500, 12)
})print("原始多维数据:")
print(sales_multi.head())# 多列融合
melted_sales = pd.melt(sales_multi,id_vars=['date', 'region'],value_vars=['product_A', 'product_B', 'product_C'],var_name='product',value_name='sales')print(f"\n融合后的数据:")
print(melted_sales.head(10))# 使用pivot_table进行重塑
pivot_sales = melted_sales.pivot_table(values='sales',index='date',columns=['region', 'product'],aggfunc='sum')print(f"\n透视重塑结果形状:{pivot_sales.shape}")
print("透视重塑结果(前5行):")
print(pivot_sales.head())# 堆叠和取消堆叠
print(f"\n=== 堆叠操作 ===")# 创建多级列索引数据
multi_col_data = pd.DataFrame({('销售额', '北京'): [100, 120, 110],('销售额', '上海'): [90, 100, 95],('利润', '北京'): [15, 18, 16],('利润', '上海'): [13, 15, 14]
}, index=['Q1', 'Q2', 'Q3'])multi_col_data.columns = pd.MultiIndex.from_tuples(multi_col_data.columns)
print("多级列索引数据:")
print(multi_col_data)# 堆叠操作(stack)
stacked_data = multi_col_data.stack(future_stack=True)
print(f"\n堆叠后的数据:")
print(stacked_data)# 取消堆叠(unstack)
unstacked_data = stacked_data.unstack()
print(f"\n取消堆叠后:")
print(unstacked_data)# 重置索引
print(f"\n=== 索引重置 ===")
reset_data = stacked_data.reset_index()
reset_data.columns = ['季度', '城市', '销售额', '利润']
print("重置索引后:")
print(reset_data)# 设置多级索引
multi_index_data = reset_data.set_index(['季度', '城市'])
print(f"\n多级索引数据:")
print(multi_index_data)
6.7 实战项目:销售数据分析平台
现在让我们将所有Pandas技能整合到一个完整的销售数据分析项目中。
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')class SalesAnalyticsPlatform:"""销售数据分析平台"""def __init__(self):self.sales_data = Noneself.customer_data = Noneself.product_data = Noneself.combined_data = Nonedef generate_sample_data(self, num_records=1000):"""生成示例销售数据"""np.random.seed(42)# 生成销售数据start_date = datetime(2023, 1, 1)end_date = datetime(2024, 3, 31)self.sales_data = pd.DataFrame({'transaction_id': range(1, num_records + 1),'date': pd.date_range(start_date, end_date, periods=num_records),'customer_id': np.random.randint(1, 201, num_records),'product_id': np.random.choice(['P001', 'P002', 'P003', 'P004', 'P005', 'P006'], num_records),'salesperson_id': np.random.randint(1, 21, num_records),'quantity': np.random.randint(1, 10, num_records),'unit_price': np.random.choice([299, 599, 899, 1299, 1899, 2499], num_records),'discount_rate': np.random.choice([0, 0.05, 0.1, 0.15, 0.2], num_records),'region': np.random.choice(['华北', '华东', '华南', '华中', '西南', '东北'], num_records)})# 计算总金额self.sales_data['gross_amount'] = self.sales_data['quantity'] * self.sales_data['unit_price']self.sales_data['discount_amount'] = self.sales_data['gross_amount'] * self.sales_data['discount_rate']self.sales_data['net_amount'] = self.sales_data['gross_amount'] - self.sales_data['discount_amount']# 生成客户数据self.customer_data = pd.DataFrame({'customer_id': range(1, 201),'customer_name': [f'客户{i:03d}' for i in range(1, 201)],'customer_type': np.random.choice(['个人', '小企业', '大企业'], 200, p=[0.6, 0.3, 0.1]),'registration_date': pd.date_range('2020-01-01', '2023-12-31', periods=200),'credit_rating': np.random.choice(['A', 'B', 'C', 'D'], 200, p=[0.3, 0.4, 0.2, 0.1])})# 生成产品数据self.product_data = pd.DataFrame({'product_id': ['P001', 'P002', 'P003', 'P004', 'P005', 'P006'],'product_name': ['智能手机', '笔记本电脑', '平板电脑', '智能手表', '无线耳机', '游戏主机'],'category': ['手机', '电脑', '平板', '可穿戴', '音频', '游戏'],'cost': [200, 400, 250, 150, 80, 350],'launch_date': pd.to_datetime(['2021-01-01', '2020-06-01', '2021-03-01', '2022-01-01', '2021-09-01', '2020-11-01'])})print("示例数据生成完成")print(f" - 销售记录:{len(self.sales_data):,} 条")print(f" - 客户记录:{len(self.customer_data):,} 条")print(f" - 产品记录:{len(self.product_data):,} 条")def merge_data(self):"""合并多表数据"""# 合并销售、客户和产品数据temp_data = pd.merge(self.sales_data, self.customer_data, on='customer_id', how='left')self.combined_data = pd.merge(temp_data, self.product_data, on='product_id', how='left')# 添加时间维度特征self.combined_data['year'] = self.combined_data['date'].dt.yearself.combined_data['month'] = self.combined_data['date'].dt.monthself.combined_data['quarter'] = self.combined_data['date'].dt.quarterself.combined_data['weekday'] = self.combined_data['date'].dt.day_name()self.combined_data['is_weekend'] = self.combined_data['date'].dt.weekday >= 5# 计算利润self.combined_data['profit'] = self.combined_data['net_amount'] - (self.combined_data['quantity'] * self.combined_data['cost'])self.combined_data['profit_margin'] = self.combined_data['profit'] / self.combined_data['net_amount']print("数据合并完成")print(f" - 合并后数据形状:{self.combined_data.shape}")def data_quality_report(self):"""生成数据质量报告"""print("=== 数据质量报告 ===")# 缺失值检查missing_report = self.combined_data.isnull().sum()missing_pct = (missing_report / len(self.combined_data) * 100).round(2)print("1. 缺失值统计:")has_missing = Falsefor col in missing_report.index:if missing_report[col] > 0:print(f" {col}: {missing_report[col]} 条 ({missing_pct[col]}%)")has_missing = Trueif not has_missing:print(" 无缺失值")# 重复值检查duplicates = self.combined_data.duplicated().sum()print(f"\n2. 重复记录:{duplicates} 条")# 数据范围检查print(f"\n3. 数值范围检查:")numeric_cols = ['quantity', 'unit_price', 'net_amount', 'profit_margin']for col in numeric_cols:min_val = self.combined_data[col].min()max_val = self.combined_data[col].max()print(f" {col}: [{min_val:.2f}, {max_val:.2f}]")# 异常值检查(IQR方法)print(f"\n4. 异常值检查(基于IQR):")for col in ['net_amount', 'profit_margin']:Q1 = self.combined_data[col].quantile(0.25)Q3 = self.combined_data[col].quantile(0.75)IQR = Q3 - Q1lower_bound = Q1 - 1.5 * IQRupper_bound = Q3 + 1.5 * IQRoutliers = ((self.combined_data[col] < lower_bound) | (self.combined_data[col] > upper_bound)).sum()print(f" {col}: {outliers} 个异常值 ({outliers/len(self.combined_data)*100:.1f}%)")def sales_overview(self):"""销售总览分析"""print("\n=== 销售总览分析 ===")# 总体指标total_sales = self.combined_data['net_amount'].sum()total_profit = self.combined_data['profit'].sum()avg_order_value = self.combined_data['net_amount'].mean()total_customers = self.combined_data['customer_id'].nunique()total_transactions = len(self.combined_data)print(f"核心指标:")print(f" 总销售额:¥{total_sales:,.0f}")print(f" 总利润:¥{total_profit:,.0f}")print(f" 平均订单金额:¥{avg_order_value:,.0f}")print(f" 客户总数:{total_customers:,} 人")print(f" 交易总数:{total_transactions:,} 笔")print(f" 客均订单数:{total_transactions/total_customers:.1f} 笔")print(f" 整体利润率:{total_profit/total_sales*100:.1f}%")# 时间趋势分析monthly_trend = self.combined_data.groupby(['year', 'month']).agg({'net_amount': 'sum','profit': 'sum','transaction_id': 'count'}).reset_index()monthly_trend['period'] = monthly_trend['year'].astype(str) + '-' + monthly_trend['month'].astype(str).str.zfill(2)print(f"\n月度趋势(最近6个月):")recent_months = monthly_trend.tail(6)for _, row in recent_months.iterrows():print(f" {row['period']}: 销售额¥{row['net_amount']:,.0f}, "f"利润¥{row['profit']:,.0f}, 订单{row['transaction_id']}笔")def customer_analysis(self):"""客户分析"""print("\n=== 客户分析 ===")# 客户价值分析customer_value = self.combined_data.groupby('customer_id').agg({'net_amount': ['sum', 'mean', 'count'],'profit': 'sum','date': ['min', 'max']}).round(2)customer_value.columns = ['总消费', '平均订单', '订单数', '总利润', '首次购买', '最近购买']customer_value = customer_value.reset_index()# 合并客户基本信息customer_analysis = pd.merge(customer_value, self.customer_data, on='customer_id')# 客户分层(RFM简化版)customer_analysis['消费水平'] = pd.qcut(customer_analysis['总消费'], q=4, labels=['低', '中', '高', '极高'])customer_analysis['活跃度'] = pd.qcut(customer_analysis['订单数'], q=4, labels=['低', '中', '高', '极高'])print("客户分层统计:")segment_stats = customer_analysis.groupby(['消费水平', '活跃度']).size().unstack(fill_value=0)print(segment_stats)# 客户类型分析print(f"\n客户类型分析:")type_analysis = self.combined_data.groupby('customer_type').agg({'net_amount': ['sum', 'mean', 'count'],'customer_id': 'nunique'}).round(2)type_analysis.columns = ['总销售额', '平均订单', '订单总数', '客户数量']type_analysis['客均消费'] = type_analysis['总销售额'] / type_analysis['客户数量']type_analysis['客均订单数'] = type_analysis['订单总数'] / type_analysis['客户数量']print(type_analysis)# 高价值客户识别print(f"\n高价值客户TOP10:")top_customers = customer_analysis.nlargest(10, '总消费')for _, customer in top_customers.iterrows():print(f" {customer['customer_name']}: ¥{customer['总消费']:,.0f} "f"({customer['订单数']}笔订单, {customer['customer_type']})")def product_analysis(self):"""产品分析"""print("\n=== 产品分析 ===")# 产品销售分析product_performance = self.combined_data.groupby(['product_id', 'product_name']).agg({'net_amount': ['sum', 'mean'],'quantity': 'sum','profit': 'sum','transaction_id': 'count'}).round(2)product_performance.columns = ['总销售额', '平均单价', '销售数量', '总利润', '订单数']product_performance = product_performance.reset_index()product_performance['利润率'] = (product_performance['总利润'] / product_performance['总销售额'] * 100).round(1)print("产品销售排行:")top_products = product_performance.sort_values('总销售额', ascending=False)for _, product in top_products.iterrows():print(f" {product['product_name']}: ¥{product['总销售额']:,.0f} "f"(数量{product['销售数量']}, 利润率{product['利润率']}%)")# 类别分析print(f"\n产品类别分析:")category_analysis = self.combined_data.groupby('category').agg({'net_amount': 'sum','quantity': 'sum','profit': 'sum'}).round(2)category_analysis['利润率'] = (category_analysis['profit'] / category_analysis['net_amount'] * 100).round(1)category_analysis = category_analysis.sort_values('net_amount', ascending=False)for category, data in category_analysis.iterrows():print(f" {category}: ¥{data['net_amount']:,.0f} "f"(数量{data['quantity']}, 利润率{data['利润率']}%)")def regional_analysis(self):"""区域分析"""print("\n=== 区域分析 ===")# 区域销售分析regional_performance = self.combined_data.groupby('region').agg({'net_amount': ['sum', 'mean'],'customer_id': 'nunique','transaction_id': 'count','profit': 'sum'}).round(2)regional_performance.columns = ['总销售额', '平均订单', '客户数', '订单数', '总利润']regional_performance['客均消费'] = (regional_performance['总销售额'] / regional_performance['客户数'])regional_performance['利润率'] = (regional_performance['总利润'] / regional_performance['总销售额'] * 100).round(1)regional_performance = regional_performance.sort_values('总销售额', ascending=False)print("区域销售排行:")for region, data in regional_performance.iterrows():print(f" {region}: ¥{data['总销售额']:,.0f} "f"({data['客户数']}客户, {data['订单数']}订单, 利润率{data['利润率']}%)")# 区域产品偏好print(f"\n各区域热销产品TOP3:")for region in regional_performance.index:region_data = self.combined_data[self.combined_data['region'] == region]top_products = region_data.groupby('product_name')['quantity'].sum().nlargest(3)products_str = ', '.join([f"{prod}({qty}件)" for prod, qty in top_products.items()])print(f" {region}: {products_str}")def time_analysis(self):"""时间维度分析"""print("\n=== 时间维度分析 ===")# 月度趋势monthly_data = self.combined_data.groupby(['year', 'month']).agg({'net_amount': 'sum','transaction_id': 'count'}).reset_index()# 计算月度增长率monthly_data['sales_growth'] = monthly_data['net_amount'].pct_change() * 100monthly_data['order_growth'] = monthly_data['transaction_id'].pct_change() * 100print("最近6个月趋势:")recent_data = monthly_data.tail(6)for _, row in recent_data.iterrows():growth_str = f"({row['sales_growth']:+.1f}%)" if not pd.isna(row['sales_growth']) else ""print(f" {int(row['year'])}-{int(row['month']):02d}: ¥{row['net_amount']:,.0f} {growth_str}")# 工作日vs周末分析print(f"\n工作日vs周末对比:")weekday_analysis = self.combined_data.groupby('is_weekend').agg({'net_amount': ['mean', 'sum'],'transaction_id': 'count'})weekday_analysis.columns = ['平均订单', '总销售额', '订单数']weekday_analysis.index = ['工作日', '周末']print(weekday_analysis)# 季度分析print(f"\n季度表现:")quarterly_data = self.combined_data.groupby(['year', 'quarter']).agg({'net_amount': 'sum','profit': 'sum'}).round(2)for (year, quarter), data in quarterly_data.iterrows():profit_rate = data['profit'] / data['net_amount'] * 100print(f" {int(year)}Q{int(quarter)}: ¥{data['net_amount']:,.0f} "f"(利润¥{data['profit']:,.0f}, 利润率{profit_rate:.1f}%)")def generate_comprehensive_report(self):"""生成综合分析报告"""print("="*60)print("销售数据分析平台 - 综合报告")print("="*60)# 执行所有分析self.data_quality_report()self.sales_overview()self.customer_analysis()self.product_analysis()self.regional_analysis()self.time_analysis()print("\n" + "="*60)print("分析报告生成完成")print("="*60)# 运行完整的销售分析项目
def run_sales_analytics_demo():"""运行销售分析演示"""# 创建分析平台实例platform = SalesAnalyticsPlatform()# 生成示例数据platform.generate_sample_data(1000)# 合并数据platform.merge_data()# 生成综合报告platform.generate_comprehensive_report()return platform# 执行演示
analytics_platform = run_sales_analytics_demo()
6.8 学习总结与进阶指导
> 核心技能掌握检查
通过本课学习,我们建立了完整的结构化数据处理体系。从基础的Series和DataFrame操作,到高级的多表连接和数据重塑,每个技能都是数据分析工作流程中的关键环节。
Pandas技能矩阵:
技能类别 | 基础操作 | 高级应用 | 实战场景 |
---|---|---|---|
数据结构 | Series/DataFrame创建 | 多级索引操作 | 复杂数据建模 |
数据导入 | CSV/Excel读写 | 批量处理/格式转换 | ETL流程构建 |
数据清洗 | 缺失值/重复值处理 | 自定义清洗规则 | 数据质量保证 |
数据筛选 | 条件筛选/布尔索引 | 复合查询/动态筛选 | 精确数据提取 |
分组聚合 | GroupBy基础操作 | 自定义聚合函数 | 多维度分析 |
数据合并 | Join/Merge操作 | 复杂连接策略 | 多源数据整合 |
> 数据分析思维的建立
Pandas不仅是工具,更重要的是培养了数据分析的系统性思维。从"表格操作"到"数据管道",从"单一视角"到"多维分析",这种思维转变将贯穿整个数据科学学习过程。
分析思维框架:
- 数据理解:结构、质量、完整性评估
- 探索分析:描述统计、分布特征、关联关系
- 深度挖掘:分组对比、趋势分析、异常检测
- 洞察提炼:业务含义、决策支持、行动建议
> 性能优化意识
在处理大规模数据时,Pandas的性能优化成为关键技能。向量化操作、内存管理、数据类型优化等都是高级数据分析师必备的技能。
> 与后续课程的连接
Pandas为后续的数据可视化和机器学习奠定了坚实基础。Matplotlib/Seaborn将直接使用Pandas数据结构进行绘图,而scikit-learn等机器学习库也以Pandas DataFrame作为标准数据输入格式。
掌握Pandas的核心价值在于建立了处理结构化数据的标准化流程,这是从数据收集到模型部署整个数据科学项目中最重要的基础能力。
附录:专业术语表
DataFrame:Pandas的二维标签数据结构,类似于Excel表格或SQL数据表,是数据分析的核心对象
Series:Pandas的一维标签数组,可视为DataFrame的单列或带索引的列表
索引(Index):Pandas数据结构中的行标签,提供快速数据访问和对齐功能
分组聚合(GroupBy):按照指定列的值将数据分组,然后对每组应用聚合函数的操作模式
透视表(Pivot Table):重新组织数据的汇总表,将行索引、列索引和值进行重排以便分析
数据连接(Join/Merge):将两个或多个数据表基于共同列进行合并的操作
数据重塑(Reshape):改变数据的组织结构,包括长宽格式转换、堆叠等操作
向量化操作(Vectorization):对整个数组或列同时进行操作,避免显式循环提高性能
布尔索引(Boolean Indexing):使用布尔数组来筛选满足条件的数据行
分位数(Quantile):将数据按大小顺序分割成相等部分的分割点,用于数据分布分析
缺失值(Missing Value):数据集中不存在或无效的数据点,通常用NaN表示
数据类型(dtype):指定列中数据的类型,如int64、float64、object、category等
ETL(Extract, Transform, Load):数据处理的标准流程,包括提取、转换和加载三个阶段
数据清洗(Data Cleaning):识别和纠正数据中的错误、不一致和不完整问题的过程