Python 如何优雅处理 100GB 数据集——实战案例
背景
假设我们有一个 100GB 的用户行为日志数据集,存放在 CSV 文件中,字段包括:
user_id
(用户 ID)timestamp
(时间戳)action
(用户行为,如 click、view、buy)amount
(金额)
目标是:
按 用户 ID 聚合用户行为数
统计 每个用户的消费总额
提取 消费金额排名前 10 的用户
Step 1:Pandas 分块加载(Chunking)
第一步,避免一次性读入内存,采用分块读取和聚合。
import pandas as pdfilename = "user_logs_100GB.csv"chunksize = 10**6 # 每次读取 100 万行
user_stats = {}for i, chunk in enumerate(pd.read_csv(filename, chunksize=chunksize)):# 按用户 ID 聚合grouped = chunk.groupby("user_id").agg(actions=("action", "count"),total_amount=("amount", "sum"))# 累加结果for user_id, row in grouped.iterrows():if user_id not in user_stats:user_stats[user_id] = {"actions": 0, "total_amount": 0}user_stats[user_id]["actions"] += row["actions"]user_stats[user_id]["total_amount"] += row["total_amount"]print(f"已完成 {i+1} 个分块处理,共 {len(user_stats)} 个用户")
📌 这样处理时,内存峰值始终受控,只需存放聚合结果,而不是整份 100GB 文件。
Step 2:用 Dask 提升性能
如果需要 并行计算,可以直接用 Dask 替换。
import dask.dataframe as ddfilename = "user_logs_100GB.csv"# 延迟加载,不会一次性读入内存
df = dd.read_csv(filename)# 聚合统计
result = df.groupby("user_id").agg({"action": "count","amount": "sum"
}).compute() # 触发计算# 排名前 10
top10 = result.sort_values("amount", ascending=False).head(10)
print(top10)
📌 Dask 的优势是:
自动并行计算,速度更快
API 接近 Pandas,学习成本低
Step 3:尝试 Polars 提升单机性能
如果机器内存够(比如 64GB+),Polars 可以进一步优化单机性能。
import polars as plfilename = "user_logs_100GB.csv"# 懒加载 + 查询优化
df = pl.scan_csv(filename)top10 = (df.groupby("user_id").agg([pl.count("action").alias("actions"),pl.sum("amount").alias("total_amount")]).sort("total_amount", descending=True).limit(10).collect()
)print(top10)
📌 Polars 的延迟模式 (scan_csv
) 会自动优化查询,不会一次性把数据塞进内存。
Step 4:用 SQLite 存储并查询
如果需要反复查询不同条件,可以把数据写入 SQLite:
import pandas as pd
import sqlite3filename = "user_logs_100GB.csv"
conn = sqlite3.connect("user_logs.db")# 分块写入 SQLite
for chunk in pd.read_csv(filename, chunksize=10**6):chunk.to_sql("logs", conn, if_exists="append", index=False)# SQL 查询:消费金额前 10
query = """
SELECT user_id, COUNT(action) as actions, SUM(amount) as total_amount
FROM logs
GROUP BY user_id
ORDER BY total_amount DESC
LIMIT 10;
"""
result = pd.read_sql_query(query, conn)
print(result)
📌 SQLite 的优势是:
避免重复加载大文件
SQL 语法直观,适合探索性分析
Step 5:方法对比
方法 | 内存消耗 | 速度 | 适用场景 |
---|---|---|---|
Pandas 分块 | 最低 | 较慢 | 一次性线性处理 |
Dask | 适中 | 快 | 分布式/并行计算 |
Polars | 高效 | 非常快 | 单机大内存 |
SQLite | 磁盘为主 | 较慢 | 重复查询 |
总结
在 100GB 级别的数据处理中,选择合适工具至关重要:
一次性批处理 → Pandas 分块最简单
多机/并行加速 → Dask 更适合
单机高性能 → Polars 体验最佳
探索性查询 → SQLite 最友好
👉 技巧就是:用磁盘替代内存,用并行替代单线程,用延迟计算替代立即计算。
这样,即使面对 TB 级别的数据,也能在 Python 生态里稳稳处理。