当前位置: 首页 > news >正文

Python_电商日报_不同数据来源清洗整理成一张表

前提要景:

公司有多个电商平台,需要自己每天从不同地方下载数据,整理成每日的销售日报。

使用PQ去 清洗+整理 会使文件过大,筛选的时候会卡顿。

使用数据库会让表简洁明了。

import numpy as np
import pandas as pd
import sqlalchemy as sa
from sqlalchemy import create_engine
from datetime import date

def main():
    # ==================== 数据库连接配置 ====================
    db_config = {
        'user': 'your_username',
        'password': 'your_password',
        'host': 'localhost',
        'port': 3306,
        'database': 'your_database'
    }
    connection_url = (
        f"mysql+pymysql://{db_config['user']}:{db_config['password']}"
        f"@{db_config['host']}:{db_config['port']}/{db_config['database']}"
    )
    engine = create_engine(connection_url)

    try:
        # 获取今日日期
        today = date.today().strftime('%Y-%m-%d')
        # 获取明天日期用于范围查询
        tomorrow = (date.today() + timedelta(days=1)).strftime('%Y-%m-%d')
        # ==================== 数据读取 ====================
        df_orders = pd.read_sql(
            "SELECT * FROM sale WHERE 发货时间 BETWEEN '2023-01-01' AND '2023-12-31'"
            engine
        )
        df_shops = pd.read_sql("SELECT * FROM base_shop", engine)
        df_products = pd.read_sql("SELECT * FROM base_goods", engine)

        # ==================== 数据清洗 ====================
        columns_to_drop = ["仓库", "成本", "价格"]
        existing_drop_cols = [col for col in columns_to_drop if col in df_orders.columns]
        orders_clean = df_orders.drop(columns=existing_drop_cols)

        # ==================== 合并商品信息 ====================
        product_cols = ['品名', '产品', '主计量单位', '主计量数量',
                       '计量单位2', '计量数量2']
        df_merged = pd.merge(
            orders_clean,
            df_products[product_cols],
            on='品名',
            how='left',
            validate="m:1"
        )

        # ==================== 单位换算 ====================
        conditions = [
            df_merged['单位'] == df_merged['主计量单位'],
            df_merged['单位'] == df_merged['计量单位2']
        ]
        choices = [
            df_merged['数量'] * df_merged['主计量数量'],
            df_merged['数量'] * df_merged['计量数量2']
        ]
        df_merged['Total'] = np.select(conditions, choices, default=np.nan)

        # ==================== 合并店铺信息 ====================
        shop_cols = ['店铺']
        df_merged = pd.merge(
            df_merged,
            df_shops[shop_cols],
            on='店铺',
            how='left',
            suffixes=('', '_shop')
        )

        # ==================== 日期处理 ====================
        # 重命名日期列并转换类型
        df_merged.rename(columns={'发货时间': '日期'}, inplace=True)
        df_merged['日期'] = pd.to_datetime(df_merged['日期'], errors='coerce')

        # ==================== 按日期汇总 ====================
        group_cols = ['日期', '产品']
        final_summary = df_merged.groupby(group_cols)['Total'].sum().reset_index()

        # ==================== 提取年月 ====================
        final_summary['年'] = final_summary['日期'].dt.year
        final_summary['月'] = final_summary['日期'].dt.month

        # ==================== 数据存储 ====================
        table_name = 'emoc_daily_summary'
        try:
            final_summary.to_sql(
                table_name,
                engine,
                if_exists='append',
                index=False,
                dtype={
                    '日期': sa.Date(),
                    '产品': sa.String(50),
                    'Total': sa.Numeric(12,2),
                    '年': sa.Integer(),
                    '月': sa.Integer()
                }
            )
            print(f"成功追加 {len(final_summary)} 条记录")
        except sa.exc.ProgrammingError as pe:
            if "1146" in str(pe):  # 处理表不存在的情况
                print("目标表不存在,正在创建新表...")
                final_summary.to_sql(
                    table_name,
                    engine,
                    if_exists='fail',
                    index=False,
                    dtype={
                        # 保持与上述相同的类型定义
                    }
                )
                print(f"成功创建表并写入 {len(final_summary)} 条记录")
            else:
                raise

    except Exception as e:
        print(f"处理失败:{str(e)}")
        raise

if __name__ == "__main__":
    main()

  

产品存在单位转换,需要都转换为最小单位

改进前:

# 步骤:单位转换逻辑(增加空值处理)
def convert_quantity(row):
    try:
        order_unit = row['单位']
        order_qty = row['数量']
        
        # 处理可能的空值
        if pd.isna(order_unit) or pd.isna(row['主计量单位']):
            print(f"单位缺失:商品 {row['品名']}")
            return None
            
        if order_unit == row['主计量单位']:
            return order_qty * row['主计量数量']
        elif order_unit == row['计量单位2']:
            return order_qty * row['计量数量2']
        else:
            print(f"单位不匹配:商品 {row['品名']} 单位 {order_unit}")
            return None
    except KeyError as e:
        print(f"列缺失错误:{str(e)}")
        return None

改进后:

进行性能优化

# 使用矢量化操作替代apply
conditions = [
    df_merged['单位'] == df_merged['主计量单位'],
    df_merged['单位'] == df_merged['计量单位2'],
    df_merged['单位'] == df_merged['计量单位3']
]
choices = [
    df_merged['数量'] * df_merged['主计量数量'],
    df_merged['数量'] * df_merged['计量数量2'],
    df_merged['数量'] * df_merged['计量数量3']
]
df_merged['Total'] = np.select(conditions, choices, default=np.nan)

相关文章:

  • javaweb自用笔记:Mybatis
  • 案例实践 | 招商局集团以长安链构建“基于DID的航运贸易数据资产目录链”
  • 基于大模型预测的初治菌阳肺结核诊疗方案研究报告
  • ORBITVU 欧保图,开启自动化摄影新时代
  • 【精心整理】2025 DeepSeek 精品学习资料合集-共50份(教程+原理解读+行业应用+技术实践).zip
  • 关于瑞芯微开发工具(RKDevTool)刷机下载Boot失败原因的研究
  • 2025-3-25算法打卡
  • H3C交接机初始基本配置
  • 论文评估指标
  • 敏捷需求分析之INVEST原则
  • 手机销售终端MPR+LTC项目项目总体方案P183(183页PPT)(文末有下载方式)
  • 《Python全栈开发》第14课:项目部署 - Docker与云服务实战
  • Android设计模式之工厂方法模式
  • 全面讲解python的uiautomation包
  • 数据库的操作,以及sql之DML
  • Emacs 折腾日记(十九)——配置输入法和vim操作方式
  • 64. MfgTool烧写工具详解
  • 在K8S中使用ArgoCD做持续部署
  • 模拟算法的应用
  • React 知识回顾(HOC、合成事件、Fiber)
  • 长春电商网站建设哪家专业/百度关键词推广一年多少钱
  • 佛山网站建设哪家效果好/seo网站优化技术
  • 中卫网红大型蹦床设备/seo观察网
  • 外贸网站怎么做促销/重庆seo排名
  • 网站备案号在哪里/什么是友情链接?
  • 爱网站关键词查询/网络优化