Pandas 分组聚合进阶:过滤与 apply
Pandas 分组聚合进阶:过滤与 apply (含完整代码)更新于2025年8月26日
本文通过一组可直接运行的示例,系统讲解 Pandas 在分组聚合中的进阶用法:过滤过滤条件、混合聚合、多级列命名、分组后 apply 自定义函数、时间序列分组等。复制代码即可在本地运行(建议 pandas>=1.5
,python>=3.9
)。
准备数据
import pandas as pd
import numpy as npnp.random.seed(42)df = pd.DataFrame({"user_id": np.random.choice(list("AABBBCCCDD"), size=20),"city": np.random.choice(["北京", "上海", "深圳"], size=20),"date": pd.date_range("2024-01-01", periods=20, freq="D"),"amount": np.random.randint(10, 200, size=20).astype(float),"qty": np.random.randint(1, 8, size=20)
})# 注入缺失/异常值
df.loc[3, "amount"] = np.nan
df.loc[7, "qty"] = 0
df.head(10)
快速回顾:基础 groupby
# 单键分组聚合
df.groupby("city")["amount"].sum().reset_index(name="amount_sum")# 多键分组聚合
df.groupby(["city", "user_id"], as_index=False).agg(amount_sum=("amount", "sum"),qty_mean=("qty", "mean")
)
进阶一:分组过滤 filter(保留满足整体条件的组)
- 语义是“按组做判断,整组保留或丢弃”,而不是逐行筛选。
# 只保留组内交易次数>=3的用户
filtered = df.groupby("user_id").filter(lambda g: len(g) >= 3)# 只保留组内消费总额>300的用户组
filtered2 = df.groupby("user_id").filter(lambda g: g["amount"].sum(skipna=True) > 300)# 只保留城市-用户组合中订单均价>80的组
filtered3 = df.groupby(["city", "user_id"]).filter(lambda g: g["amount"].mean(skipna=True) > 80
)
对比逐行筛选的差异:
# 逐行筛选是“行级条件”,不是“组级条件”
row_filtered = df[df["amount"] > 80]
进阶二:分组聚合的多函数与自定义命名
- 同一列多函数、不同列不同函数,且结果列清晰命名。
agg_df = df.groupby(["city", "user_id"]).agg(amount_sum=("amount", "sum"),amount_avg=("amount", "mean"),amount_p90=("amount", lambda s: s.quantile(0.9)),qty_sum=("qty", "sum"),order_cnt=("qty", "size") # 行数
).reset_index()agg_df
如果你喜欢多重列索引(MultiIndex)风格:
multi_agg = df.groupby("city").agg({"amount": ["count", "mean", "sum", lambda s: s.quantile(0.75)],"qty": ["min", "max"]
})
multi_agg.columns = ["_".join([c for c in map(str, col) if c and c != "<lambda>"] or ["amount_p75"]
) # 扁平化列名
multi_agg
进阶三:transform vs apply
- transform:保持与原表等长,适合“对齐回填(广播)”
- apply:返回任意形状结果,灵活但速度可能慢
# 每个用户的消费均值,回填到行
df["user_amount_mean"] = df.groupby("user_id")["amount"].transform("mean")# 标准化 z-score(组内)
def zscore(s):return (s - s.mean()) / s.std(ddof=0)df["amount_z_user"] = df.groupby("user_id")["amount"].transform(zscore)
apply 自定义返回任意结构:
# 组内 Top-N 记录
def topn_amount(g, n=2):return g.sort_values("amount", ascending=False).head(n)top2 = df.groupby("user_id", group_keys=False).apply(topn_amount, n=2)# 组内派生指标(返回单行 DataFrame)
def ratio_summary(g):amt = g["amount"].fillna(0)return pd.DataFrame({"sum": [amt.sum()],"mean": [amt.mean()],"p90": [amt.quantile(0.9)],"gt100_ratio": [(amt > 100).mean()]})summary = df.groupby("city").apply(ratio_summary).reset_index(level=1, drop=True).reset_index()
summary
进阶四:分组后构造衍生列与排序
# 组内排名(amount 从高到低)
df["rank_in_city"] = df.groupby("city")["amount"].rank(method="dense", ascending=False)# 组内累计和、累计数
df["cum_amount_user"] = df.groupby("user_id")["amount"].cumsum()
df["cum_cnt_city_user"] = df.groupby(["city", "user_id"]).cumcount() + 1# 组内前向/后向填充缺失
df["amount_ffill_user"] = df.sort_values("date").groupby("user_id")["amount"].ffill()
进阶五:时间序列分组与窗口
# 按月分组汇总
monthly = (df.set_index("date").groupby([pd.Grouper(freq="M"), "city"])["amount"].sum().reset_index(name="amount_sum")
)# 滚动窗口(按时间排序后组内 rolling)
df_sorted = df.sort_values(["user_id", "date"])
df_sorted["rolling_3_sum"] = (df_sorted.groupby("user_id")["amount"].rolling(window=3, min_periods=1).sum().reset_index(level=0, drop=True)
)# expanding(从起点到当前的扩展窗口)
df_sorted["expanding_mean"] = (df_sorted.groupby("user_id")["amount"].expanding(min_periods=1).mean().reset_index(level=0, drop=True)
)
进阶六:透视表 pivot_table(分组聚合的便捷语法)
# 以 city 为行,以 user_id 为列,值为 amount 的均值
pv = pd.pivot_table(df,index="city",columns="user_id",values="amount",aggfunc="mean",fill_value=0,margins=True, # 总计margins_name="总计"
)
pv# 多值多函数聚合,且扁平化列名
pv2 = pd.pivot_table(df,index=["city"],columns=["user_id"],values=["amount", "qty"],aggfunc={"amount": ["sum", "mean"], "qty": "sum"},fill_value=0
)
pv2.columns = [f"{v}_{f}_{c}" for v, f, c in pv2.columns] # 扁平化
pv2.reset_index()
进阶七:分组后的缺失与异常处理
# 分组前先清洗(如负数、为0等异常)
df_clean = df.copy()
df_clean.loc[df_clean["qty"] <= 0, "qty"] = np.nan# 分组聚合时忽略缺失
clean_agg = df_clean.groupby("user_id").agg(amount_sum=("amount", lambda s: s.fillna(0).sum()),qty_mean=("qty", "mean")
).reset_index()
进阶八:组合技实战
目标:在“城市-用户”维度,构建用户价值看板,包含交易次数、总额、均额、近7日交易额、组内排名等。
df2 = df.sort_values(["city", "user_id", "date"]).copy()# 基础聚合
base = df2.groupby(["city", "user_id"]).agg(order_cnt=("qty", "size"),amount_sum=("amount", lambda s: s.fillna(0).sum()),amount_mean=("amount", "mean")
).reset_index()# 近7日交易额(基于每组 rolling 计算)
df2["amount_fill"] = df2["amount"].fillna(0)
df2["amount_7d"] = (df2.groupby(["city", "user_id"]).apply(lambda g: g.set_index("date")["amount_fill"].rolling("7D").sum()).reset_index(level=[0,1], drop=True)
)# 取每个城市-用户最后一行作为当前快照
latest_snap = (df2.sort_values("date").groupby(["city", "user_id"], as_index=False).tail(1)[["city", "user_id", "amount_7d"]]
)# 合并
dash = base.merge(latest_snap, on=["city", "user_id"], how="left")# 城市内用户价值排名(按总额)
dash["rank_amount_city"] = dash.groupby("city")["amount_sum"].rank(ascending=False, method="dense")dash.sort_values(["city", "rank_amount_city"]).head(10)
常见坑与优化建议
- 缺失值:聚合前先
fillna
,或聚合函数内部处理,避免均值/分位数出现偏差。 - 分组键类型:确保类别字段类型一致(如字符串 vs 分类类型),可用
astype("category")
提升内存与性能。 - transform 与 apply:优先使用内置聚合/转换函数(如
"sum"
,"mean"
,transform("mean")
),自定义apply
灵活但慢。 - 排序影响:时间窗口计算前需要按时间排序;
groupby(..., sort=False)
可略提速。 - 多重索引:聚合/透视后若得到 MultiIndex 列,建议在产线前扁平化,便于后续处理。