Python_电商日报_不同数据来源清洗整理成一张表
前提要景:
公司有多个电商平台,需要自己每天从不同地方下载数据,整理成每日的销售日报。
使用PQ去 清洗+整理 会使文件过大,筛选的时候会卡顿。
使用数据库会让表简洁明了。
import numpy as np
import pandas as pd
import sqlalchemy as sa
from sqlalchemy import create_engine
from datetime import date
def main():
# ==================== 数据库连接配置 ====================
db_config = {
'user': 'your_username',
'password': 'your_password',
'host': 'localhost',
'port': 3306,
'database': 'your_database'
}
connection_url = (
f"mysql+pymysql://{db_config['user']}:{db_config['password']}"
f"@{db_config['host']}:{db_config['port']}/{db_config['database']}"
)
engine = create_engine(connection_url)
try:
# 获取今日日期
today = date.today().strftime('%Y-%m-%d')
# 获取明天日期用于范围查询
tomorrow = (date.today() + timedelta(days=1)).strftime('%Y-%m-%d')
# ==================== 数据读取 ====================
df_orders = pd.read_sql(
"SELECT * FROM sale WHERE 发货时间 BETWEEN '2023-01-01' AND '2023-12-31'"
engine
)
df_shops = pd.read_sql("SELECT * FROM base_shop", engine)
df_products = pd.read_sql("SELECT * FROM base_goods", engine)
# ==================== 数据清洗 ====================
columns_to_drop = ["仓库", "成本", "价格"]
existing_drop_cols = [col for col in columns_to_drop if col in df_orders.columns]
orders_clean = df_orders.drop(columns=existing_drop_cols)
# ==================== 合并商品信息 ====================
product_cols = ['品名', '产品', '主计量单位', '主计量数量',
'计量单位2', '计量数量2']
df_merged = pd.merge(
orders_clean,
df_products[product_cols],
on='品名',
how='left',
validate="m:1"
)
# ==================== 单位换算 ====================
conditions = [
df_merged['单位'] == df_merged['主计量单位'],
df_merged['单位'] == df_merged['计量单位2']
]
choices = [
df_merged['数量'] * df_merged['主计量数量'],
df_merged['数量'] * df_merged['计量数量2']
]
df_merged['Total'] = np.select(conditions, choices, default=np.nan)
# ==================== 合并店铺信息 ====================
shop_cols = ['店铺']
df_merged = pd.merge(
df_merged,
df_shops[shop_cols],
on='店铺',
how='left',
suffixes=('', '_shop')
)
# ==================== 日期处理 ====================
# 重命名日期列并转换类型
df_merged.rename(columns={'发货时间': '日期'}, inplace=True)
df_merged['日期'] = pd.to_datetime(df_merged['日期'], errors='coerce')
# ==================== 按日期汇总 ====================
group_cols = ['日期', '产品']
final_summary = df_merged.groupby(group_cols)['Total'].sum().reset_index()
# ==================== 提取年月 ====================
final_summary['年'] = final_summary['日期'].dt.year
final_summary['月'] = final_summary['日期'].dt.month
# ==================== 数据存储 ====================
table_name = 'emoc_daily_summary'
try:
final_summary.to_sql(
table_name,
engine,
if_exists='append',
index=False,
dtype={
'日期': sa.Date(),
'产品': sa.String(50),
'Total': sa.Numeric(12,2),
'年': sa.Integer(),
'月': sa.Integer()
}
)
print(f"成功追加 {len(final_summary)} 条记录")
except sa.exc.ProgrammingError as pe:
if "1146" in str(pe): # 处理表不存在的情况
print("目标表不存在,正在创建新表...")
final_summary.to_sql(
table_name,
engine,
if_exists='fail',
index=False,
dtype={
# 保持与上述相同的类型定义
}
)
print(f"成功创建表并写入 {len(final_summary)} 条记录")
else:
raise
except Exception as e:
print(f"处理失败:{str(e)}")
raise
if __name__ == "__main__":
main()
产品存在单位转换,需要都转换为最小单位
改进前:
# 步骤:单位转换逻辑(增加空值处理)
def convert_quantity(row):
try:
order_unit = row['单位']
order_qty = row['数量']
# 处理可能的空值
if pd.isna(order_unit) or pd.isna(row['主计量单位']):
print(f"单位缺失:商品 {row['品名']}")
return None
if order_unit == row['主计量单位']:
return order_qty * row['主计量数量']
elif order_unit == row['计量单位2']:
return order_qty * row['计量数量2']
else:
print(f"单位不匹配:商品 {row['品名']} 单位 {order_unit}")
return None
except KeyError as e:
print(f"列缺失错误:{str(e)}")
return None
改进后:
进行性能优化
# 使用矢量化操作替代apply
conditions = [
df_merged['单位'] == df_merged['主计量单位'],
df_merged['单位'] == df_merged['计量单位2'],
df_merged['单位'] == df_merged['计量单位3']
]
choices = [
df_merged['数量'] * df_merged['主计量数量'],
df_merged['数量'] * df_merged['计量数量2'],
df_merged['数量'] * df_merged['计量数量3']
]
df_merged['Total'] = np.select(conditions, choices, default=np.nan)