当前位置: 首页 > news >正文

一个基于Python Streamlit sqlite3 的销售单管理系统,提供商品管理、客户管理、销售单管理及打印,和应收对账单等功能

图:

功能特性

核心功能

  • 商品管理 - 添加、编辑、删除商品信息,支持商品搜索

  • 客户管理 - 管理客户信息,包括客户名称、联系电话、地址等

  • 销售单管理 - 创建销售单,关联客户和商品,自动计算金额

  • 数据统计 - 销售数据统计和可视化分析

技术特性

  • 基于Streamlit框架,界面简洁易用

  • SQLite数据库存储,数据安全可靠

  • 响应式设计,支持即时数据刷新

  • 完整的错误处理和用户提示

安装和运行

环境要求

  • Python 3.8+

  • 推荐使用虚拟环境

安装步骤

  1. 克隆或下载项目文件

  2. 安装依赖包:

    pip install -r requirements.txt

    streamlit>=1.28.0

    pandas>=2.0.0

  3. 运行应用程序:

    streamlit run app.py

    run.bat文件

  4. 在浏览器中打开显示的URL(通常是 http://localhost:8501)

快速启动

Windows用户可以直接双击运行 run.bat 文件启动应用。

使用说明

商品管理

  1. 在左侧导航栏选择"商品管理"

  2. 使用表单添加新商品,包括商品名称、描述、单位和参考价格

  3. 在商品列表中可以进行编辑、删除操作

  4. 使用搜索框快速查找商品

客户管理

  1. 在左侧导航栏选择"客户管理"

  2. 添加客户信息,包括客户名称、联系电话和地址

  3. 客户列表显示所有客户信息,支持编辑和删除

  4. 客户信息会在创建销售单时自动关联

销售单管理

  1. 在左侧导航栏选择"销售单管理"

  2. 选择客户(支持搜索和新增客户)

  3. 添加商品到销售单,设置数量和单价

  4. 系统自动计算总金额

  5. 保存销售单后可以查看和打印

数据统计

  1. 在左侧导航栏选择"数据统计"

  2. 查看销售数据的统计图表

  3. 支持按时间范围筛选数据

文件结构

销售管理系统/
├── app.py              # 主应用程序文件
├── database.py         # 数据库操作模块
├── requirements.txt    # Python依赖包列表
├── run.bat            # Windows启动脚本
├── sales.db           # SQLite数据库文件(自动生成)
└── README.md          # 项目说明文档

数据库说明

系统使用SQLite数据库,主要数据表包括:

  • products - 商品信息表

  • customers - 客户信息表

  • sales - 销售单主表

  • sale_items - 销售明细表

数据库文件 sales.db 会在首次运行时自动创建。

开发说明

主要依赖包

  • streamlit - Web应用框架

  • pandas - 数据处理

  • sqlite3 - 数据库操作

自定义开发

如需扩展功能,可以修改以下文件:

  • app.py - 界面逻辑和业务处理

  • database.py - 数据库操作函数

  • 添加新的Python模块到项目目录

故障排除

常见问题

  1. 端口占用错误

    • 关闭其他占用8503端口的应用

    • 或使用 streamlit run app.py --server.port 8504 指定其他端口

  2. 导入错误

    • 确保已安装所有依赖包:pip install -r requirements.txt

    • 检查Python版本是否符合要求

  3. 数据库错误

    • 删除 sales.db 文件后重新启动应用

    • 检查文件读写权限


开始使用:运行 streamlit run app.py 启动应用!

app.py代码:

import streamlit as st
import pandas as pd
from database import Database
import datetime
from typing import Optional, Dict# 页面配置
st.set_page_config(page_title="销售单管理系统",page_icon="📋",layout="wide"
)# 初始化数据库
@st.cache_resource
def init_db():return Database()db = init_db()# 初始化session state
ITEMS_KEY = 'order_items'def get_default_state():"""返回一个新的默认状态字典"""return {ITEMS_KEY: [],'nav_page': '创建销售单','pending_nav_page': None,'pending_selected_order_id': None,'customer_name': '','customer_phone': '','customer_address': '','order_date': datetime.date.today(),'notes': '','new_product_name': '','new_quantity': 1.0,'new_unit_price': 0.0,'edit_mode': False,'available_customers': [],'selected_customer_option': '新增客户','customer_form_mode': 'create','customer_edit_id': None,'selected_order_id': None,'customer_form_name': '','customer_form_phone': '','customer_form_address': '','customer_form_reset_trigger': False,'customer_form_pending_values': None,'reset_form_trigger': False,}# 初始化缺失的状态
for state_key, default_value in get_default_state().items():if state_key not in st.session_state:st.session_state[state_key] = default_value# 如果触发了重置,在控件创建前统一恢复默认值
if st.session_state.get('reset_form_trigger'):defaults = get_default_state()for state_key, default_value in defaults.items():st.session_state[state_key] = default_valuest.session_state['reset_form_trigger'] = False# 处理客户表单重置
if st.session_state.get('customer_form_reset_trigger'):st.session_state.customer_form_mode = 'create'st.session_state.customer_edit_id = Nonest.session_state.customer_form_name = ''st.session_state.customer_form_phone = ''st.session_state.customer_form_address = ''st.session_state.customer_form_reset_trigger = False# 处理客户表单待填充值
pending_customer = st.session_state.get('customer_form_pending_values')
if pending_customer:st.session_state.customer_form_mode = pending_customer.get('mode', 'create')st.session_state.customer_edit_id = pending_customer.get('id')st.session_state.customer_form_name = pending_customer.get('name', '')st.session_state.customer_form_phone = pending_customer.get('phone', '')st.session_state.customer_form_address = pending_customer.get('address', '')st.session_state.customer_form_pending_values = None# 处理触发重新运行
if st.session_state.get('trigger_rerun'):st.session_state.trigger_rerun = Falsest.rerun()# 恢复客户信息(在重新运行后)
if st.session_state.get('preserved_customer_name') is not None:st.session_state.customer_name = st.session_state.preserved_customer_namest.session_state.customer_phone = st.session_state.preserved_customer_phonest.session_state.customer_address = st.session_state.preserved_customer_addressst.session_state.selected_customer_option = st.session_state.preserved_selected_customer_option# 清除保存的状态st.session_state.preserved_customer_name = Nonest.session_state.preserved_customer_phone = Nonest.session_state.preserved_customer_address = Nonest.session_state.preserved_selected_customer_option = Nonedef normalize_items(raw_items):"""规范化商品明细数据"""if not raw_items:return []if isinstance(raw_items, dict):raw_items = [raw_items]normalized = []for item in raw_items:if item is None:continueif not isinstance(item, dict):try:item = dict(item)except Exception:continuenormalized.append({'product_name': item.get('product_name', ''),'quantity': float(item.get('quantity', 0) or 0),'unit_price': float(item.get('unit_price', 0) or 0),'amount': float(item.get('amount', 0) or 0)})return normalizeddef reset_form():"""重置表单"""st.session_state['reset_form_trigger'] = Truedef reset_customer_form():"""重置客户表单"""st.session_state.customer_form_reset_trigger = Truedef schedule_customer_edit(customer: Dict):"""安排客户信息填充"""st.session_state.customer_form_pending_values = {'mode': 'edit','id': customer.get('id'),'name': customer.get('name', ''),'phone': customer.get('phone', ''),'address': customer.get('address', '')}def set_customer_fields(customer: Optional[Dict] = None):"""根据客户信息填充表单"""if customer:st.session_state.customer_name = customer.get('name', '')st.session_state.customer_phone = customer.get('phone', '')st.session_state.customer_address = customer.get('address', '')else:st.session_state.customer_name = ''st.session_state.customer_phone = ''st.session_state.customer_address = ''def handle_customer_selection():"""处理客户选择变化"""option = st.session_state.get('selected_customer_option', '新增客户')customers = st.session_state.get('available_customers', [])if option == '新增客户':set_customer_fields(None)returncustomer = next((c for c in customers if c.get('name') == option), None)if customer:set_customer_fields(customer)def load_customer_options():"""获取客户选项列表"""customers = db.get_customers()st.session_state.available_customers = customersoptions = ['新增客户'] + [c['name'] for c in customers]if st.session_state.selected_customer_option not in options:st.session_state.selected_customer_option = '新增客户'return optionsdef download_template(data_type: str, file_format: str):"""下载数据导入模板"""try:# 根据数据类型创建模板数据if data_type == "销售单":template_data = [{"order_number": "SO20240001","customer_name": "示例客户","customer_phone": "13800138000","customer_address": "示例地址","order_date": "2024-01-01","total_amount": 1000.00,"notes": "示例备注","created_at": "2024-01-01 10:00:00"}]file_name = "销售单导入模板"elif data_type == "客户":template_data = [{"name": "示例客户","phone": "13800138000","address": "示例地址","last_used": "2024-01-01 10:00:00"}]file_name = "客户导入模板"elif data_type == "商品":template_data = [{"name": "示例商品","description": "商品描述","unit": "个","reference_price": 100.00,"created_at": "2024-01-01 10:00:00","updated_at": "2024-01-01 10:00:00"}]file_name = "商品导入模板"else:st.error("不支持的数据类型")return# 转换为DataFramedf = pd.DataFrame(template_data)# 根据文件格式处理数据if file_format == "Excel (.xlsx)":import iobuffer = io.BytesIO()with pd.ExcelWriter(buffer, engine='openpyxl') as writer:df.to_excel(writer, sheet_name=data_type, index=False)# 提供下载st.download_button(label=f"下载{file_format}",data=buffer.getvalue(),file_name=f"{file_name}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx",mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",use_container_width=True,key=f"download_{data_type}_{file_format}")elif file_format == "CSV (.csv)":# 提供CSV下载csv_data = df.to_csv(index=False, encoding='utf-8-sig')st.download_button(label=f"下载{file_format}",data=csv_data,file_name=f"{file_name}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",mime="text/csv",use_container_width=True,key=f"download_{data_type}_{file_format}")elif file_format == "JSON (.json)":# 提供JSON下载json_data = df.to_json(orient='records', force_ascii=False, indent=2)st.download_button(label=f"下载{file_format}",data=json_data,file_name=f"{file_name}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.json",mime="application/json",use_container_width=True,key=f"download_{data_type}_{file_format}")st.success(f"{data_type}模板已准备就绪,请点击下载按钮")except Exception as e:st.error(f"模板下载失败:{str(e)}")def add_item_manual():"""手动添加商品项"""if (st.session_state.new_product_name andst.session_state.new_quantity > 0 andst.session_state.new_unit_price > 0):amount = st.session_state.new_quantity * st.session_state.new_unit_pricecurrent_items = normalize_items(st.session_state.get(ITEMS_KEY))current_items.append({'product_name': st.session_state.new_product_name,'quantity': st.session_state.new_quantity,'unit_price': st.session_state.new_unit_price,'amount': amount})st.session_state[ITEMS_KEY] = current_itemsst.session_state.new_product_name = ""st.session_state.new_quantity = 1.0st.session_state.new_unit_price = 0.0st.session_state.selected_product_option = "手动输入商品"# 保存当前客户信息,避免重新运行时丢失st.session_state.preserved_customer_name = st.session_state.get('customer_name', '')st.session_state.preserved_customer_phone = st.session_state.get('customer_phone', '')st.session_state.preserved_customer_address = st.session_state.get('customer_address', '')st.session_state.preserved_selected_customer_option = st.session_state.get('selected_customer_option', '新增客户')# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Truedef add_item_from_library():"""从商品库添加商品项"""if (st.session_state.new_product_name andst.session_state.new_quantity > 0 andst.session_state.new_unit_price > 0):amount = st.session_state.new_quantity * st.session_state.new_unit_pricecurrent_items = normalize_items(st.session_state.get(ITEMS_KEY))current_items.append({'product_name': st.session_state.new_product_name,'quantity': st.session_state.new_quantity,'unit_price': st.session_state.new_unit_price,'amount': amount})st.session_state[ITEMS_KEY] = current_itemsst.session_state.new_product_name = ""st.session_state.new_quantity = 1.0st.session_state.new_unit_price = 0.0st.session_state.selected_product_option = "手动输入商品"# 保存当前客户信息,避免重新运行时丢失st.session_state.preserved_customer_name = st.session_state.get('customer_name', '')st.session_state.preserved_customer_phone = st.session_state.get('customer_phone', '')st.session_state.preserved_customer_address = st.session_state.get('customer_address', '')st.session_state.preserved_selected_customer_option = st.session_state.get('selected_customer_option', '新增客户')# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Truedef remove_item(index):"""删除商品项"""current_items = normalize_items(st.session_state.get(ITEMS_KEY))if 0 <= index < len(current_items):current_items.pop(index)st.session_state[ITEMS_KEY] = current_items# 保存当前客户信息,避免重新运行时丢失st.session_state.preserved_customer_name = st.session_state.get('customer_name', '')st.session_state.preserved_customer_phone = st.session_state.get('customer_phone', '')st.session_state.preserved_customer_address = st.session_state.get('customer_address', '')st.session_state.preserved_selected_customer_option = st.session_state.get('selected_customer_option', '新增客户')# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Truedef calculate_total():"""计算总金额"""return sum(item['amount'] for item in normalize_items(st.session_state.get(ITEMS_KEY)))def generate_print_html(order_data):"""生成打印用的HTML(贴近纸质销售单模板样式)"""# 卖方信息(可按需修改)seller = {"name": "开平市xxxx电脑行","address": "开平市三xxx铺","tel": "0750-xxx","qq": "xxxx","wechat": "","wechat_public": "xxx","website": ""}def to_rmb_upper(value: float) -> str:nums = ["零", "壹", "贰", "叁", "肆", "伍", "陆", "柒", "捌", "玖"]units = ["", "拾", "佰", "仟"]big_units = ["", "万"]negative = value < 0value = abs(round(value + 1e-8, 2))integer = int(value)fraction = int(round((value - integer) * 100))def four_digit_to_upper(num: int) -> str:result = ""zero_flag = Falsefor idx in range(4):digit = num // (10 ** (3 - idx)) % 10if digit == 0:if not zero_flag and result:result += "零"zero_flag = Trueelse:result += nums[digit] + units[3 - idx]zero_flag = Falsereturn result.rstrip("零")int_str = "零元"if integer > 0:parts = []idx = 0while integer > 0:segment = integer % 10000if segment:segment_str = four_digit_to_upper(segment)if idx > 0:segment_str += big_units[idx]parts.append(segment_str)integer //= 10000idx += 1int_str = "".join(reversed(parts))int_str = (int_str.replace("零零", "零").replace("零万", "万"))if int_str.endswith("零"):int_str = int_str[:-1]int_str += "元"frac_str = "整"if fraction:jiao = fraction // 10fen = fraction % 10parts = []if jiao:parts.append(nums[jiao] + "角")if fen:parts.append(nums[fen] + "分")frac_str = "".join(parts)result = ("负" if negative else "") + int_strif frac_str != "整":result += frac_strreturn resultdef big_amount_boxes(text: str, total_length: int = 12) -> str:cleaned = text.replace("整", "").strip()cells = list(cleaned)if len(cells) < total_length:cells = [""] * (total_length - len(cells)) + cellselse:cells = cells[-total_length:]return "".join(f'<div class="box">{c}</div>' for c in cells)total_upper = to_rmb_upper(float(order_data["total_amount"]))total_lower = f"¥{float(order_data['total_amount']):.2f}"order_no = order_data.get("order_number", "")order_date = order_data.get("order_date", "")created_at = order_data.get("created_at", "")items_rows = ""for idx, item in enumerate(order_data["items"], 1):unit = item.get("unit", "")items_rows += f"""<tr><td class="c">{idx}</td><td>{item['product_name']}</td><td class="c">{unit}</td><td class="r">{item['quantity']}</td><td class="r">{item['unit_price']:.2f}</td><td class="r">{item['amount']:.2f}</td><td></td></tr>"""html = f"""
<!DOCTYPE html>
<html>
<head><meta charset="UTF-8"><title>销售单 - {order_no}</title><style>@media print {{@page {{size: 210mm 13.97mm; margin: 10mm;}}}}body {{font-family: "Microsoft YaHei", Arial, sans-serif;color: #000;}}.sheet {{width: 190mm;margin: 0 auto;padding: 8mm 8mm 6mm 8mm;}}.title-row {{display: flex;align-items: center;justify-content: center;position: relative;border-bottom: 0px solid #000;}}.title-row .title {{font-size: 20pt;font-weight: 700;letter-spacing: 1px;}}.title-row .no {{position: absolute;right: 0;top: 0;font-size: 12pt;}}.sub-row {{display: flex;justify-content: right;margin-top: 3mm;margin-bottom: 3mm;}}.label {{margin-right: 3mm;}}.frame {{border: 1px solid #000;padding: 2mm 3mm;height: 18mm;border-bottom: none;}}.frame1 {{border: 1px solid #000;border-left: none;border-bottom: none;padding: 2mm 3mm;height: 18mm;}}.flex {{display: flex;gap: 0mm;}}.w50 {{ width: 50%; }}.muted {{ font-size: 10pt; color: #333; }}table.items {{width: 100%;border-collapse: collapse;}}table.items th, table.items td {{border: 1px solid #000;padding: 1mm 1mm;font-size: 10pt;}}table.items th {{text-align: center;}}.c {{ text-align: center; }}.r {{ text-align: right; }}.amount-row {{display: flex;align-items: flex-end;gap: 6mm;border-left: 1px solid #000;border-right: 1px solid #000;margin-top: 0mm;padding-bottom: 2mm;}}.amount-row .label-big {{white-space: nowrap;font-weight: 700;font-size: 12pt;}}.amount-box-wrapper {{display: flex;align-items: center;gap: 2mm;}}.amount-box-wrapper .box {{width: 8mm;height: 6mm;border: 1px solid #000;display: flex;justify-content: center;align-items: center;font-size: 12pt;}}.amount-small {{display: flex;align-items: center;gap: 3mm;font-size: 10pt;}}.amount-small .amount-lower {{min-width: 30mm;border-bottom: 1px solid #000;text-align: right;font-size: 12pt;font-weight: 700;padding: 1mm 2mm;}}.note-area {{border: 1px solid #000;padding: 2mm 3mm;min-height: 12mm;}}.pay-ways {{display: flex;gap: 8mm;align-items: center;margin-top: 3mm;}}.checkbox {{border: 1px solid #000;width: 4mm; height: 4mm;display: inline-block;margin-right: 2mm;}}.sign-row {{display: flex;justify-content: space-between;align-items: center;margin-top: 6mm;}}.small {{font-size: 10pt;}}</style>
</head>
<body><div class="sheet"><div class="title-row"><div class="title">开平xxx电脑(销售单)</div><div class="no">No: {order_no}</div></div><div class="sub-row small"><div>开单日期:{order_date if order_date else created_at[:10] if created_at else ""}</div></div><div class="flex"><div class="w50 frame small"><div class="label"><strong>购买方信息</strong></div><div>名称:{order_data.get('customer_name','')}</div><div>联系信息:{order_data.get('customer_phone','')}</div><div>地址:{order_data.get('customer_address','')}</div></div><div class="w50 frame1 small"><div class="label"><strong>销售方信息</strong></div><div>名称:{seller["name"]}</div><div>地址:{seller["address"]} </div><div>电话:{seller["tel"]}</div></div></div><table class="items"><thead><tr><th style="width:8%">序号</th><th>名称规格</th><th style="width:10%">单位</th><th style="width:12%">数量</th><th style="width:12%">单价</th><th style="width:14%">总金额</th><th style="width:14%">备注</th></tr></thead><tbody>{items_rows}{"".join(f"<tr><td>&nbsp;</td><td></td><td></td><td></td><td></td><td></td><td></td></tr>" for _ in range(max(0, 6 - len(order_data['items']))))}</tbody></table><div class="amount-row"><div class="label-big">合计(大写)</div><div class="amount-box-wrapper">{big_amount_boxes(total_upper, total_length=10)}</div><div class="amount-small">(小写)<span class="amount-lower">{total_lower}</span></div></div><div class="note-area small">备注:{order_data.get('notes','')}<div class="pay-ways">付款方式:<span class="checkbox"></span>现金<span class="checkbox"></span>转账<span class="checkbox"></span>微信<span class="checkbox"></span>支付宝</div></div><div class="sign-row small"><div>销售人:__________ 收款人:__________</div><div> 客户签名:_________________日期:20____ 年 ____ 月 ____ 日</div></div></div>
</body>
</html>
"""return html# 主界面
st.title("📋 销售单管理系统")# 侧边栏导航
page_options = ["创建销售单", "销售单列表", "查看/编辑销售单", "客户管理", "商品管理", "应收对账管理", "数据管理"]
# 若存在挂起的页面跳转请求,需在渲染选择控件前应用
if st.session_state.get('pending_nav_page'):st.session_state.nav_page = st.session_state.pending_nav_pagest.session_state.pending_nav_page = None
if st.session_state.get('pending_selected_order_id') is not None:st.session_state.selected_order_id = st.session_state.pending_selected_order_idst.session_state.pending_selected_order_id = None
if st.session_state.nav_page not in page_options:st.session_state.nav_page = page_options[0]
page = st.sidebar.selectbox("选择功能", page_options, key="nav_page")if page == "创建销售单":st.header("创建新销售单")customer_options = load_customer_options()st.selectbox("选择已有客户",customer_options,key="selected_customer_option",on_change=handle_customer_selection,help="从历史客户中选择将自动填充客户信息")# 客户信息col1, col2 = st.columns(2)with col1:customer_name = st.text_input("客户名称 *", key="customer_name")customer_phone = st.text_input("联系电话", key="customer_phone")with col2:order_date = st.date_input("订单日期", key="order_date")customer_address = st.text_input("客户地址", key="customer_address")notes = st.text_area("备注", key="notes")st.divider()st.subheader("商品明细")# 添加商品 - 支持从商品库选择st.markdown("**添加商品**")# 获取商品列表products = db.get_products()product_options = ["手动输入商品"] + [f"{p['name']} ({p['unit']}) - ¥{p['reference_price']:.2f}" for p in products]col1, col2 = st.columns([2, 1])with col1:selected_product_option = st.selectbox("选择商品",product_options,key="selected_product_option",help="从商品库选择商品将自动填充名称和参考价格")# 根据选择显示不同的输入方式if selected_product_option == "手动输入商品":col1, col2, col3, col4 = st.columns([3, 2, 2, 1])with col1:st.text_input("商品名称", key="new_product_name", placeholder="输入商品名称")with col2:st.number_input("数量", key="new_quantity", min_value=0.01, step=0.01)with col3:st.number_input("单价", key="new_unit_price", min_value=0.0, step=0.01, format="%.2f")with col4:st.write("")st.write("")st.button("添加", on_click=add_item_manual, type="primary")else:# 从商品库选择selected_product_name = selected_product_option.split(" (")[0]selected_product = next((p for p in products if p['name'] == selected_product_name), None)if selected_product:col1, col2, col3, col4 = st.columns([3, 2, 2, 1])with col1:st.text_input("商品名称", value=selected_product['name'], key="new_product_name", disabled=True)with col2:st.number_input("数量", key="new_quantity", min_value=0.01, step=0.01)with col3:st.number_input("单价", value=selected_product['reference_price'], key="new_unit_price", min_value=0.0, step=0.01, format="%.2f")with col4:st.write("")st.write("")st.button("添加", on_click=add_item_from_library, type="primary")# 显示已添加的商品normalized_items = normalize_items(st.session_state.get(ITEMS_KEY))if normalized_items:st.write("")df_items = pd.DataFrame(normalized_items)df_items['序号'] = range(1, len(df_items) + 1)df_items = df_items[['序号', 'product_name', 'quantity', 'unit_price', 'amount']]df_items.columns = ['序号', '商品名称', '数量', '单价', '金额']# 显示表格,每行有删除按钮for idx, item in enumerate(normalized_items):col1, col2, col3, col4, col5, col6 = st.columns([1, 4, 2, 2, 2, 1])with col1:st.write(idx + 1)with col2:st.write(item['product_name'])with col3:st.write(f"{item['quantity']:.2f}")with col4:st.write(f"¥{item['unit_price']:.2f}")with col5:st.write(f"¥{item['amount']:.2f}")with col6:st.button("删除", key=f"del_{idx}", on_click=remove_item, args=(idx,))st.divider()total = calculate_total()st.markdown(f"### 合计金额:¥{total:.2f}")# 提交按钮col1, col2 = st.columns([1, 1])with col1:if st.button("保存销售单", type="primary", use_container_width=True):if not customer_name:st.error("请填写客户名称!")elif not normalized_items:st.error("请至少添加一个商品!")else:try:# 检查并保存新商品到商品库existing_products = db.get_products()existing_product_names = [p['name'] for p in existing_products]for item in normalized_items:product_name = item['product_name']if product_name and product_name not in existing_product_names:# 新商品,添加到商品库db.create_product({'name': product_name,'description': f'从销售单自动添加的商品:{product_name}','unit': '个','reference_price': item['unit_price']})st.info(f"新商品 '{product_name}' 已自动添加到商品库")order_data = {'customer_name': customer_name,'customer_phone': customer_phone,'customer_address': customer_address,'order_date': order_date.strftime('%Y-%m-%d'),'total_amount': total,'notes': notes,'items': normalized_items}db.save_customer({'name': customer_name,'phone': customer_phone,'address': customer_address})order_id = db.create_order(order_data)st.success(f"销售单创建成功!订单号:{db.get_order(order_id)['order_number']}")reset_form()# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Trueexcept Exception as e:st.error(f"创建失败:{str(e)}")with col2:if st.button("重置表单", use_container_width=True):reset_form()# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Trueelse:st.info("请添加商品明细")elif page == "销售单列表":st.header("销售单列表")orders = db.get_all_orders()if orders:# 搜索和筛选col1, col2 = st.columns([2, 1])with col1:search_term = st.text_input("搜索(订单号/客户名称)", "")with col2:date_filter = st.date_input("日期筛选", value=None)# 筛选订单filtered_orders = ordersif search_term:filtered_orders = [o for o in filtered_orders if search_term.lower() in o['order_number'].lower() or search_term.lower() in o['customer_name'].lower()]if date_filter:filtered_orders = [o for o in filtered_orders if o['order_date'] == date_filter.strftime('%Y-%m-%d')]if filtered_orders:# 分页设置if 'page_size' not in st.session_state:st.session_state.page_size = 10if 'current_page' not in st.session_state:st.session_state.current_page = 1# 分页控件col1, col2, col3 = st.columns([1, 2, 1])with col1:page_size = st.selectbox("每页显示数量", [5, 10, 20, 50], index=1)st.session_state.page_size = page_sizewith col2:st.write(f"共 {len(filtered_orders)} 条订单")with col3:total_pages = max(1, (len(filtered_orders) + st.session_state.page_size - 1) // st.session_state.page_size)# 确保当前页码不超过总页数if st.session_state.current_page > total_pages:st.session_state.current_page = 1current_page = st.number_input("页码", min_value=1, max_value=total_pages, value=st.session_state.current_page)st.session_state.current_page = current_page# 计算当前页的数据范围start_idx = (st.session_state.current_page - 1) * st.session_state.page_sizeend_idx = min(start_idx + st.session_state.page_size, len(filtered_orders))current_orders = filtered_orders[start_idx:end_idx]# 显示分页信息st.write(f"显示第 {start_idx + 1} - {end_idx} 条订单(共 {len(filtered_orders)} 条)")# 显示订单列表for order in current_orders:with st.expander(f"订单号:{order['order_number']} | 客户:{order['customer_name']} | 金额:¥{order['total_amount']:.2f} | 日期:{order['order_date']}"):col1, col2, col3 = st.columns(3)with col1:st.write(f"**客户名称:** {order['customer_name']}")st.write(f"**联系电话:** {order.get('customer_phone', '')}")with col2:st.write(f"**订单日期:** {order['order_date']}")st.write(f"**总金额:** ¥{order['total_amount']:.2f}")with col3:if st.button("查看详情", key=f"view_{order['id']}"):st.session_state.selected_order_id = order['id']st.session_state.pending_nav_page = "查看/编辑销售单"st.session_state.pending_selected_order_id = order['id']st.session_state.edit_mode = False# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Trueif st.button("删除", key=f"delete_{order['id']}"):try:db.delete_order(order['id'])st.success("删除成功!")# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Trueexcept Exception as e:st.error(f"删除失败:{str(e)}")# 分页导航if total_pages > 1:st.divider()col1, col2, col3, col4, col5 = st.columns([1, 1, 2, 1, 1])with col1:if st.button("首页", disabled=st.session_state.current_page == 1):st.session_state.current_page = 1st.session_state.trigger_rerun = Truest.rerun()with col2:if st.button("上一页", disabled=st.session_state.current_page == 1):st.session_state.current_page = max(1, st.session_state.current_page - 1)st.session_state.trigger_rerun = Truest.rerun()with col3:st.write(f"第 {st.session_state.current_page} 页 / 共 {total_pages} 页")with col4:if st.button("下一页", disabled=st.session_state.current_page == total_pages):st.session_state.current_page = min(total_pages, st.session_state.current_page + 1)st.session_state.trigger_rerun = Truest.rerun()with col5:if st.button("末页", disabled=st.session_state.current_page == total_pages):st.session_state.current_page = total_pagesst.session_state.trigger_rerun = Truest.rerun()else:st.info("没有找到匹配的订单")else:st.info("暂无销售单记录")elif page == "查看/编辑销售单":st.header("查看/编辑销售单")# 选择订单orders = db.get_all_orders()if orders:order_options = {f"{o['order_number']} - {o['customer_name']} (¥{o['total_amount']:.2f})": o['id']for o in orders}label_list = list(order_options.keys())default_index = 0selected_order_id = st.session_state.get('selected_order_id')if selected_order_id:for idx, label in enumerate(label_list):if order_options[label] == selected_order_id:default_index = idxbreakselected_label = st.selectbox("选择订单", label_list, index=default_index)order_id = order_options[selected_label]if st.session_state.get('selected_order_id') != order_id:st.session_state.selected_order_id = order_idif order_id:order = db.get_order(order_id)if order:# 编辑模式切换 - 使用不同的状态管理方式if 'edit_mode_state' not in st.session_state:st.session_state.edit_mode_state = False# 处理成功和取消状态if st.session_state.get('edit_mode_success'):st.session_state.edit_mode_state = Falsest.session_state.edit_mode_success = Falseif st.session_state.get('edit_mode_cancel'):st.session_state.edit_mode_state = Falsest.session_state.edit_mode_cancel = False# 编辑模式切换按钮col1, col2 = st.columns([1, 3])with col1:if st.button("进入编辑模式" if not st.session_state.edit_mode_state else "退出编辑模式"):st.session_state.edit_mode_state = not st.session_state.edit_mode_state# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Trueif st.session_state.edit_mode_state:# 编辑表单col1, col2 = st.columns(2)with col1:customer_name = st.text_input("客户名称 *", value=order['customer_name'], key="edit_customer_name")customer_phone = st.text_input("联系电话", value=order.get('customer_phone', ''), key="edit_customer_phone")with col2:order_date = st.date_input("订单日期", value=datetime.datetime.strptime(order['order_date'], '%Y-%m-%d').date(),key="edit_order_date")customer_address = st.text_input("客户地址", value=order.get('customer_address', ''), key="edit_customer_address")notes = st.text_area("备注", value=order.get('notes', ''), key="edit_notes")st.subheader("商品明细")# 初始化编辑商品列表if f'edit_items_{order_id}' not in st.session_state:st.session_state[f'edit_items_{order_id}'] = normalize_items(order['items'])edit_items = st.session_state[f'edit_items_{order_id}']# 添加新商品功能st.markdown("**添加商品**")# 获取商品列表products = db.get_products()product_options = ["手动输入商品"] + [f"{p['name']} ({p['unit']}) - ¥{p['reference_price']:.2f}" for p in products]col1, col2 = st.columns([2, 1])with col1:selected_product_option = st.selectbox("选择商品",product_options,key=f"edit_selected_product_option_{order_id}",help="从商品库选择商品将自动填充名称和参考价格")# 根据选择显示不同的输入方式if selected_product_option == "手动输入商品":col1, col2, col3, col4 = st.columns([3, 2, 2, 1])with col1:edit_product_name = st.text_input("商品名称", key=f"edit_new_product_name_{order_id}", placeholder="输入商品名称")with col2:edit_quantity = st.number_input("数量", key=f"edit_new_quantity_{order_id}", min_value=0.01, step=0.01)with col3:edit_unit_price = st.number_input("单价", key=f"edit_new_unit_price_{order_id}", min_value=0.0, step=0.01, format="%.2f")with col4:st.write("")st.write("")if st.button("添加", key=f"edit_add_item_{order_id}", type="primary"):if edit_product_name and edit_quantity > 0 and edit_unit_price > 0:amount = edit_quantity * edit_unit_priceedit_items.append({'product_name': edit_product_name,'quantity': edit_quantity,'unit_price': edit_unit_price,'amount': amount})st.session_state[f'edit_items_{order_id}'] = edit_items# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Trueelse:# 从商品库选择selected_product_name = selected_product_option.split(" (")[0]selected_product = next((p for p in products if p['name'] == selected_product_name), None)if selected_product:col1, col2, col3, col4 = st.columns([3, 2, 2, 1])with col1:edit_product_name = st.text_input("商品名称", value=selected_product['name'], key=f"edit_new_product_name_{order_id}", disabled=True)with col2:edit_quantity = st.number_input("数量", key=f"edit_new_quantity_{order_id}", min_value=0.01, step=0.01)with col3:edit_unit_price = st.number_input("单价", value=selected_product['reference_price'], key=f"edit_new_unit_price_{order_id}", min_value=0.0, step=0.01, format="%.2f")with col4:st.write("")st.write("")if st.button("添加", key=f"edit_add_item_{order_id}", type="primary"):if edit_product_name and edit_quantity > 0 and edit_unit_price > 0:amount = edit_quantity * edit_unit_priceedit_items.append({'product_name': edit_product_name,'quantity': edit_quantity,'unit_price': edit_unit_price,'amount': amount})st.session_state[f'edit_items_{order_id}'] = edit_items# 使用不同的方式触发重新运行st.session_state.trigger_rerun = True# 显示已添加的商品,支持删除if edit_items:st.write("")st.markdown("**已添加的商品**")for idx, item in enumerate(edit_items):col1, col2, col3, col4, col5, col6 = st.columns([1, 4, 2, 2, 2, 1])with col1:st.write(idx + 1)with col2:st.write(item['product_name'])with col3:st.write(f"{item['quantity']:.2f}")with col4:st.write(f"¥{item['unit_price']:.2f}")with col5:st.write(f"¥{item['amount']:.2f}")with col6:if st.button("删除", key=f"edit_del_{order_id}_{idx}"):edit_items.pop(idx)st.session_state[f'edit_items_{order_id}'] = edit_items# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Trueelse:st.info("请添加商品明细")total = sum(item['amount'] for item in edit_items)st.markdown(f"### 合计金额:¥{total:.2f}")col1, col2 = st.columns(2)with col1:if st.button("保存修改", type="primary", use_container_width=True):if not customer_name:st.error("请填写客户名称!")elif not edit_items:st.error("请至少添加一个商品!")else:try:order_data = {'customer_name': customer_name,'customer_phone': customer_phone,'customer_address': customer_address,'order_date': order_date.strftime('%Y-%m-%d'),'total_amount': total,'notes': notes,'items': edit_items}db.save_customer({'name': customer_name,'phone': customer_phone,'address': customer_address})db.update_order(order_id, order_data)st.success("修改成功!")# 清除编辑状态if f'edit_items_{order_id}' in st.session_state:del st.session_state[f'edit_items_{order_id}']# 使用不同的方式设置编辑模式状态st.session_state.edit_mode_success = True# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Trueexcept Exception as e:st.error(f"修改失败:{str(e)}")with col2:if st.button("取消", use_container_width=True):# 清除编辑状态if f'edit_items_{order_id}' in st.session_state:del st.session_state[f'edit_items_{order_id}']# 使用不同的方式设置编辑模式状态st.session_state.edit_mode_cancel = True# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Trueelse:# 查看模式col1, col2 = st.columns([2, 1])with col1:st.subheader("订单信息")st.write(f"**订单号:** {order['order_number']}")st.write(f"**客户名称:** {order['customer_name']}")st.write(f"**联系电话:** {order.get('customer_phone', '')}")st.write(f"**客户地址:** {order.get('customer_address', '')}")st.write(f"**订单日期:** {order['order_date']}")st.write(f"**创建时间:** {order['created_at']}")if order.get('notes'):st.write(f"**备注:** {order['notes']}")with col2:st.subheader("金额信息")st.metric("总金额", f"¥{order['total_amount']:.2f}")st.subheader("商品明细")view_items = normalize_items(order['items'])df_items = pd.DataFrame(view_items)df_items['序号'] = range(1, len(df_items) + 1)df_items = df_items[['序号', 'product_name', 'quantity', 'unit_price', 'amount']]df_items.columns = ['序号', '商品名称', '数量', '单价', '金额']st.dataframe(df_items, use_container_width=True, hide_index=True)# 打印功能st.divider()st.subheader("打印")order_for_print = order.copy()order_for_print['items'] = view_itemsprint_html = generate_print_html(order_for_print)st.download_button(label="下载打印文件(HTML)",data=print_html,file_name=f"销售单_{order['order_number']}.html",mime="text/html")# 在浏览器中显示打印预览st.components.v1.html(print_html, height=800, scrolling=True)else:st.info("暂无销售单记录")elif page == "客户管理":st.header("客户管理")customers = db.get_customers()st.session_state.available_customers = customerscol_form, col_list = st.columns([1, 2])with col_form:st.subheader("客户信息")form_mode = st.session_state.customer_form_modeform_title = "编辑客户" if form_mode == 'edit' else "新增客户"st.markdown(f"**{form_title}**")with st.form("customer_form"):st.text_input("客户名称 *", key="customer_form_name")st.text_input("联系电话", key="customer_form_phone")st.text_area("客户地址", key="customer_form_address", height=80)submitted = st.form_submit_button("保存客户", type="primary", use_container_width=True)if submitted:name = st.session_state.customer_form_name.strip()phone = st.session_state.customer_form_phone.strip()address = st.session_state.customer_form_address.strip()if not name:st.error("请填写客户名称!")else:try:customer_data = {'name': name,'phone': phone,'address': address}if form_mode == 'edit' and st.session_state.customer_edit_id:db.update_customer(st.session_state.customer_edit_id, customer_data)st.success("客户更新成功!")else:db.create_customer(customer_data)st.success("客户创建成功!")reset_customer_form()st.session_state.selected_customer_option = '新增客户'# 触发客户数据刷新并重新渲染页面st.rerun()except Exception as e:st.error(f"操作失败:{str(e)}")if st.session_state.customer_form_mode == 'edit':if st.button("取消编辑", use_container_width=True):reset_customer_form()# 触发客户数据刷新并重新渲染页面st.rerun()with col_list:st.subheader("客户列表")if customers:header_cols = st.columns([2, 2, 3, 2, 1, 1])header_cols[0].markdown("**客户名称**")header_cols[1].markdown("**联系电话**")header_cols[2].markdown("**客户地址**")header_cols[3].markdown("**最近使用时间**")header_cols[4].markdown("**编辑**")header_cols[5].markdown("**删除**")for customer in customers:cols = st.columns([2, 2, 3, 2, 1, 1])with cols[0]:st.write(customer['name'])with cols[1]:st.write(customer.get('phone', ''))with cols[2]:st.write(customer.get('address', ''))with cols[3]:st.write(customer.get('last_used', ''))with cols[4]:if st.button("编辑", key=f"edit_customer_{customer['id']}"):schedule_customer_edit(customer)# 触发客户数据刷新并重新渲染页面st.rerun()with cols[5]:if st.button("删除", key=f"delete_customer_{customer['id']}"):try:db.delete_customer(customer['id'])st.success("删除成功!")if st.session_state.selected_customer_option == customer['name']:st.session_state.selected_customer_option = '新增客户'reset_customer_form()# 触发客户数据刷新并重新渲染页面st.rerun()except Exception as e:st.error(f"删除失败:{str(e)}")else:st.info("暂无客户记录")elif page == "商品管理":st.header("商品管理")# 在页面渲染前检查是否需要刷新数据if st.session_state.get('refresh_products', False):st.session_state.refresh_products = Falseproducts = db.get_products()else:products = db.get_products()col_form, col_list = st.columns([1, 2])with col_form:st.subheader("商品信息")# 商品表单状态管理if 'product_form_mode' not in st.session_state:st.session_state.product_form_mode = 'create'if 'product_edit_id' not in st.session_state:st.session_state.product_edit_id = Noneif 'product_form_name' not in st.session_state:st.session_state.product_form_name = ''if 'product_form_description' not in st.session_state:st.session_state.product_form_description = ''if 'product_form_unit' not in st.session_state:st.session_state.product_form_unit = '个'if 'product_form_reference_price' not in st.session_state:st.session_state.product_form_reference_price = 0.0form_mode = st.session_state.product_form_modeform_title = "编辑商品" if form_mode == 'edit' else "新增商品"st.markdown(f"**{form_title}**")# 使用表单外的widget来避免状态管理问题name = st.text_input("商品名称 *", value=st.session_state.product_form_name, key="product_form_name_input")description = st.text_area("商品描述", value=st.session_state.product_form_description, height=80, key="product_form_description_input")col1, col2 = st.columns(2)with col1:unit = st.selectbox("单位", ["个","项","次", "台", "件", "套", "米", "公斤", "包", "箱"], index=["个", "台", "件", "套", "米", "公斤", "包", "箱"].index(st.session_state.product_form_unit), key="product_form_unit_input")with col2:reference_price = st.number_input("参考价格", min_value=0.0, step=0.01, format="%.2f", value=st.session_state.product_form_reference_price, key="product_form_reference_price_input")if st.button("保存商品", type="primary", use_container_width=True, key="save_product_button"):name = name.strip()description = description.strip()if not name:st.error("请填写商品名称!")else:try:product_data = {'name': name,'description': description,'unit': unit,'reference_price': reference_price}if form_mode == 'edit' and st.session_state.product_edit_id:db.update_product(st.session_state.product_edit_id, product_data)st.success("商品更新成功!")else:db.create_product(product_data)st.success("商品创建成功!")# 重置表单st.session_state.product_form_mode = 'create'st.session_state.product_edit_id = Nonest.session_state.product_form_name = ''st.session_state.product_form_description = ''st.session_state.product_form_unit = '个'st.session_state.product_form_reference_price = 0.0# 触发商品数据刷新并重新渲染页面st.session_state.refresh_products = Truest.rerun()except Exception as e:st.error(f"操作失败:{str(e)}")if st.session_state.product_form_mode == 'edit':if st.button("取消编辑", use_container_width=True):st.session_state.product_form_mode = 'create'st.session_state.product_edit_id = Nonest.session_state.product_form_name = ''st.session_state.product_form_description = ''st.session_state.product_form_unit = '个'st.session_state.product_form_reference_price = 0.0# 触发商品数据刷新并重新渲染页面st.session_state.refresh_products = Truest.rerun()with col_list:st.subheader("商品列表")# 搜索功能search_term = st.text_input("搜索商品名称或描述", "")if search_term:filtered_products = db.search_products(search_term)else:filtered_products = productsif filtered_products:header_cols = st.columns([2, 2, 1, 1, 1, 1])header_cols[0].markdown("**商品名称**")header_cols[1].markdown("**商品描述**")header_cols[2].markdown("**单位**")header_cols[3].markdown("**参考价格**")header_cols[4].markdown("**编辑**")header_cols[5].markdown("**删除**")for product in filtered_products:cols = st.columns([2, 2, 1, 1, 1, 1])with cols[0]:st.write(product['name'])with cols[1]:st.write(product.get('description', '')[:50] + '...' if len(product.get('description', '')) > 50 else product.get('description', ''))with cols[2]:st.write(product.get('unit', '个'))with cols[3]:st.write(f"¥{product.get('reference_price', 0.0):.2f}")with cols[4]:if st.button("编辑", key=f"edit_product_{product['id']}"):st.session_state.product_form_mode = 'edit'st.session_state.product_edit_id = product['id']st.session_state.product_form_name = product['name']st.session_state.product_form_description = product.get('description', '')st.session_state.product_form_unit = product.get('unit', '个')st.session_state.product_form_reference_price = product.get('reference_price', 0.0)# 触发商品数据刷新并重新渲染页面st.session_state.refresh_products = Truest.rerun()with cols[5]:if st.button("删除", key=f"delete_product_{product['id']}"):try:db.delete_product(product['id'])st.success("删除成功!")# 触发商品数据刷新并重新渲染页面st.session_state.refresh_products = Truest.rerun()except Exception as e:st.error(f"删除失败:{str(e)}")else:st.info("暂无商品记录")elif page == "数据管理":st.header("📊 数据管理")# 添加数据管理相关的session stateif 'data_export_type' not in st.session_state:st.session_state.data_export_type = '销售单'if 'data_import_type' not in st.session_state:st.session_state.data_import_type = '销售单'tab1, tab2 = st.tabs(["📤 数据导出", "📥 数据导入"])with tab1:st.subheader("数据导出")col1, col2 = st.columns([1, 2])with col1:st.write("**导出设置**")export_type = st.selectbox("选择导出数据类型",["销售单", "客户", "商品"],key="data_export_type")# 映射显示名称到数据库类型type_mapping = {"销售单": "orders","客户": "customers", "商品": "products"}export_format = st.selectbox("导出格式",["Excel (.xlsx)", "CSV (.csv)", "JSON (.json)"],index=0)if st.button("开始导出", type="primary", use_container_width=True):try:# 获取数据data_type = type_mapping[export_type]data = db.export_data(data_type)if not data:st.warning(f"没有找到{export_type}数据")st.stop()# 转换为DataFramedf = pd.DataFrame(data)# 根据选择的格式处理数据if export_format == "Excel (.xlsx)":# 创建Excel文件import iobuffer = io.BytesIO()with pd.ExcelWriter(buffer, engine='openpyxl') as writer:df.to_excel(writer, sheet_name=export_type, index=False)# 提供下载st.download_button(label="下载Excel文件",data=buffer.getvalue(),file_name=f"{export_type}_导出_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx",mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",use_container_width=True)elif export_format == "CSV (.csv)":# 提供CSV下载csv_data = df.to_csv(index=False, encoding='utf-8-sig')st.download_button(label="下载CSV文件",data=csv_data,file_name=f"{export_type}_导出_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",mime="text/csv",use_container_width=True)elif export_format == "JSON (.json)":# 提供JSON下载json_data = df.to_json(orient='records', force_ascii=False, indent=2)st.download_button(label="下载JSON文件",data=json_data,file_name=f"{export_type}_导出_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.json",mime="application/json",use_container_width=True)st.success(f"成功导出 {len(data)} 条{export_type}记录")except Exception as e:st.error(f"导出失败:{str(e)}")with col2:st.write("**数据预览**")try:# 重新定义type_mapping,确保变量作用域正确type_mapping = {"销售单": "orders","客户": "customers", "商品": "products"}data_type = type_mapping[export_type]data = db.export_data(data_type)if data:df_preview = pd.DataFrame(data)st.dataframe(df_preview.head(10), use_container_width=True)st.write(f"总计: {len(data)} 条记录")else:st.info("暂无数据")except Exception as e:st.error(f"预览失败:{str(e)}")with tab2:st.subheader("数据导入")col1, col2 = st.columns([1, 2])with col1:st.write("**导入设置**")import_type = st.selectbox("选择导入数据类型",["销售单", "客户", "商品"],key="data_import_type")# 映射显示名称到数据库类型type_mapping = {"销售单": "orders","客户": "customers", "商品": "products"}import_format = st.selectbox("导入文件格式",["Excel (.xlsx)", "CSV (.csv)", "JSON (.json)"],index=0)uploaded_file = st.file_uploader(f"选择{import_type}文件",type=["xlsx", "csv", "json"],help="请确保文件格式与选择的导入格式一致")# 模板下载功能st.write("**模板下载**")template_col1, template_col2, template_col3 = st.columns(3)with template_col1:if st.button("📥 销售单模板", use_container_width=True):download_template("销售单", import_format)with template_col2:if st.button("📥 客户模板", use_container_width=True):download_template("客户", import_format)with template_col3:if st.button("📥 商品模板", use_container_width=True):download_template("商品", import_format)if uploaded_file is not None:st.write(f"已选择文件: {uploaded_file.name}")# 导入选项import_mode = st.radio("导入模式",["新增模式(只添加新记录)", "更新模式(更新已存在记录)", "覆盖模式(清空后重新导入)"],index=0)if st.button("开始导入", type="primary", use_container_width=True):try:data_type = type_mapping[import_type]# 读取文件数据if uploaded_file.name.endswith('.xlsx'):df = pd.read_excel(uploaded_file)elif uploaded_file.name.endswith('.csv'):df = pd.read_csv(uploaded_file)elif uploaded_file.name.endswith('.json'):df = pd.read_json(uploaded_file)else:st.error("不支持的文件格式")st.stop()# 转换为字典列表data = df.to_dict('records')if not data:st.warning("文件中没有数据")st.stop()# 执行导入result = db.import_data(data_type, data)# 显示导入结果st.success(f"导入完成!成功: {result['success_count']} 条,失败: {result['error_count']} 条")if result['errors']:st.warning("部分记录导入失败:")for error in result['errors'][:5]:  # 只显示前5个错误st.write(f"- {error}")if len(result['errors']) > 5:st.write(f"... 还有 {len(result['errors']) - 5} 个错误")# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Trueexcept Exception as e:st.error(f"导入失败:{str(e)}")with col2:st.write("**导入说明**")st.info("""**导入文件要求:****销售单导入字段:**- order_number: 订单号(必填)- customer_name: 客户名称(必填)- customer_phone: 联系电话- customer_address: 客户地址- order_date: 订单日期(格式:YYYY-MM-DD)- total_amount: 总金额(必填)- notes: 备注- created_at: 创建时间(格式:YYYY-MM-DD HH:MM:SS)**客户导入字段:**- name: 客户名称(必填)- phone: 联系电话- address: 客户地址- last_used: 最近使用时间(格式:YYYY-MM-DD HH:MM:SS)**商品导入字段:**- name: 商品名称(必填)- description: 商品描述- unit: 单位(默认:个)- reference_price: 参考价格- created_at: 创建时间(格式:YYYY-MM-DD HH:MM:SS)- updated_at: 更新时间(格式:YYYY-MM-DD HH:MM:SS)**注意:**- 必填字段不能为空- 日期时间格式必须正确- 重复的记录会根据导入模式进行处理""")elif page == "应收对账管理":st.header("📊 应收对账管理")# 筛选条件col1, col2, col3 = st.columns([1, 1, 1])with col1:period_type = st.selectbox("统计周期",["按月统计", "按年统计"],key="receivable_period_type")with col2:if period_type == "按月统计":selected_month = st.date_input("选择月份",value=datetime.date.today(),key="receivable_selected_month")else:selected_year = st.number_input("选择年份",min_value=2020,max_value=2030,value=datetime.date.today().year,key="receivable_selected_year")with col3:customer_filter = st.text_input("客户名称筛选", "", key="receivable_customer_filter")# 获取对账数据if period_type == "按月统计":year = selected_month.yearmonth = selected_month.monthsummary_data = db.get_receivable_summary('month', year, month)details_data = db.get_receivable_details(f"{year:04d}-{month:02d}", 'month')period_label = f"{year}年{month}月"else:year = selected_yearsummary_data = db.get_receivable_summary('year', year)details_data = db.get_receivable_details(str(year), 'year')period_label = f"{year}年"# 客户筛选if customer_filter:# 汇总数据不包含客户名称,只筛选明细数据details_data = [item for item in details_data if customer_filter.lower() in item['customer_name'].lower()]# 显示汇总统计st.subheader(f"📈 {period_label} 应收对账汇总")if summary_data:# 计算总统计total_orders = sum(item['order_count'] for item in summary_data)total_amount = sum(item['total_amount'] for item in summary_data)total_customers = sum(item['customer_count'] for item in summary_data)# 显示统计卡片col1, col2, col3 = st.columns(3)with col1:st.metric("总订单数", total_orders)with col2:st.metric("总金额", f"¥{total_amount:.2f}")with col3:st.metric("客户数", total_customers)# 显示汇总表格st.write("**汇总统计**")# 准备汇总表格数据summary_df = pd.DataFrame(summary_data)summary_df = summary_df[['period', 'order_count', 'total_amount', 'customer_count']]summary_df.columns = ['统计周期', '订单数量', '总金额', '客户数量']summary_df['总金额'] = summary_df['总金额'].apply(lambda x: f"¥{x:.2f}")# 显示表格st.dataframe(summary_df, use_container_width=True)# 导出汇总数据col1, col2 = st.columns([1, 1])with col1:if st.button("📥 导出汇总数据", use_container_width=True):csv_data = summary_df.to_csv(index=False, encoding='utf-8-sig')st.download_button(label="下载CSV文件",data=csv_data,file_name=f"应收对账汇总_{period_label}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",mime="text/csv",use_container_width=True)else:st.info(f"{period_label} 暂无应收对账数据")# 显示商品明细st.subheader(f"📋 {period_label} 商品明细")if details_data:# 准备明细表格数据details_df = pd.DataFrame(details_data)details_df = details_df[['order_number', 'customer_name', 'product_name', 'quantity', 'unit_price', 'amount', 'order_date']]details_df.columns = ['订单号', '客户名称', '商品名称', '数量', '单价', '金额', '订单日期']details_df['单价'] = details_df['单价'].apply(lambda x: f"¥{x:.2f}")details_df['金额'] = details_df['金额'].apply(lambda x: f"¥{x:.2f}")# 显示表格st.dataframe(details_df, use_container_width=True)# 导出明细数据col1, col2 = st.columns([1, 1])with col1:if st.button("📥 导出明细数据", use_container_width=True):csv_data = details_df.to_csv(index=False, encoding='utf-8-sig')st.download_button(label="下载CSV文件",data=csv_data,file_name=f"应收对账明细_{period_label}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",mime="text/csv",use_container_width=True)# 商品销售排行st.subheader("🏆 商品销售排行")# 按商品统计product_stats = details_df.groupby('商品名称').agg({'数量': 'sum','金额': lambda x: sum(float(amount.replace('¥', '').replace(',', '')) for amount in x)}).reset_index()product_stats = product_stats.sort_values('金额', ascending=False)# 显示商品排行col1, col2 = st.columns(2)with col1:st.write("**按销售额排行**")for idx, row in product_stats.head(10).iterrows():st.write(f"{idx+1}. {row['商品名称']} - ¥{row['金额']:.2f}")with col2:st.write("**按销售量排行**")product_stats_quantity = product_stats.sort_values('数量', ascending=False)for idx, row in product_stats_quantity.head(10).iterrows():st.write(f"{idx+1}. {row['商品名称']} - {row['数量']:.2f}件")else:st.info(f"{period_label} 暂无商品明细数据")

database.py 代码:

import sqlite3
import datetime
from typing import List, Dict, Optionalclass Database:def __init__(self, db_name: str = "sales.db"):self.db_name = db_nameself.init_database()def get_connection(self):"""获取数据库连接"""return sqlite3.connect(self.db_name)def init_database(self):"""初始化数据库表"""conn = self.get_connection()cursor = conn.cursor()# 创建销售单表cursor.execute('''CREATE TABLE IF NOT EXISTS sales_orders (id INTEGER PRIMARY KEY AUTOINCREMENT,order_number TEXT UNIQUE NOT NULL,customer_name TEXT NOT NULL,customer_phone TEXT,customer_address TEXT,order_date TEXT NOT NULL,total_amount REAL NOT NULL,notes TEXT,created_at TEXT NOT NULL)''')# 创建客户表cursor.execute('''CREATE TABLE IF NOT EXISTS customers (id INTEGER PRIMARY KEY AUTOINCREMENT,name TEXT UNIQUE NOT NULL,phone TEXT,address TEXT,last_used TEXT NOT NULL)''')# 创建商品表cursor.execute('''CREATE TABLE IF NOT EXISTS products (id INTEGER PRIMARY KEY AUTOINCREMENT,name TEXT UNIQUE NOT NULL,description TEXT,unit TEXT DEFAULT '个',reference_price REAL DEFAULT 0.0,created_at TEXT NOT NULL,updated_at TEXT NOT NULL)''')# 创建销售单明细表cursor.execute('''CREATE TABLE IF NOT EXISTS sales_order_items (id INTEGER PRIMARY KEY AUTOINCREMENT,order_id INTEGER NOT NULL,product_name TEXT NOT NULL,quantity REAL NOT NULL,unit_price REAL NOT NULL,amount REAL NOT NULL,FOREIGN KEY (order_id) REFERENCES sales_orders(id) ON DELETE CASCADE)''')conn.commit()conn.close()def create_order(self, order_data: Dict) -> int:"""创建销售单"""conn = self.get_connection()cursor = conn.cursor()try:# 生成订单号order_number = self.generate_order_number()order_date = order_data.get('order_date', datetime.datetime.now().strftime('%Y-%m-%d'))created_at = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')# 插入主订单cursor.execute('''INSERT INTO sales_orders (order_number, customer_name, customer_phone, customer_address, order_date, total_amount, notes, created_at)VALUES (?, ?, ?, ?, ?, ?, ?, ?)''', (order_number,order_data['customer_name'],order_data.get('customer_phone', ''),order_data.get('customer_address', ''),order_date,order_data['total_amount'],order_data.get('notes', ''),created_at))order_id = cursor.lastrowid# 插入订单明细items = order_data.get('items', [])for item in items:cursor.execute('''INSERT INTO sales_order_items (order_id, product_name, quantity, unit_price, amount)VALUES (?, ?, ?, ?, ?)''', (order_id,item['product_name'],item['quantity'],item['unit_price'],item['amount']))conn.commit()return order_idexcept Exception as e:conn.rollback()raise efinally:conn.close()def save_customer(self, customer_data: Dict) -> None:"""保存或更新客户信息"""if not customer_data.get('name'):returnconn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()try:timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')cursor.execute('SELECT id FROM customers WHERE name = ?', (customer_data['name'],))existing = cursor.fetchone()if existing:cursor.execute('''UPDATE customersSET phone = ?, address = ?, last_used = ?WHERE id = ?''', (customer_data.get('phone', ''),customer_data.get('address', ''),timestamp,existing['id']))else:cursor.execute('''INSERT INTO customers (name, phone, address, last_used)VALUES (?, ?, ?, ?)''', (customer_data['name'],customer_data.get('phone', ''),customer_data.get('address', ''),timestamp))conn.commit()except Exception as e:conn.rollback()raise efinally:conn.close()def get_order(self, order_id: int) -> Optional[Dict]:"""获取单个销售单"""conn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()cursor.execute('SELECT * FROM sales_orders WHERE id = ?', (order_id,))order = cursor.fetchone()if order:order_dict = dict(order)# 获取订单明细cursor.execute('SELECT * FROM sales_order_items WHERE order_id = ?', (order_id,))items = [dict(row) for row in cursor.fetchall()]order_dict['items'] = itemsconn.close()return order_dictconn.close()return Nonedef get_all_orders(self) -> List[Dict]:"""获取所有销售单"""conn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()cursor.execute('SELECT * FROM sales_orders ORDER BY created_at DESC')orders = [dict(row) for row in cursor.fetchall()]conn.close()return ordersdef get_customers(self) -> List[Dict]:"""获取所有客户"""conn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()cursor.execute('''SELECT id, name, phone, address, last_usedFROM customersORDER BY last_used DESC, name COLLATE NOCASE ASC''')customers = [dict(row) for row in cursor.fetchall()]conn.close()return customersdef get_customer(self, customer_id: int) -> Optional[Dict]:"""根据ID获取客户"""conn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()cursor.execute('''SELECT id, name, phone, address, last_usedFROM customersWHERE id = ?''', (customer_id,))row = cursor.fetchone()conn.close()return dict(row) if row else Nonedef create_customer(self, customer_data: Dict) -> int:"""创建客户"""conn = self.get_connection()cursor = conn.cursor()try:timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')cursor.execute('''INSERT INTO customers (name, phone, address, last_used)VALUES (?, ?, ?, ?)''', (customer_data['name'],customer_data.get('phone', ''),customer_data.get('address', ''),timestamp))conn.commit()return cursor.lastrowidexcept Exception as e:conn.rollback()raise efinally:conn.close()def update_customer(self, customer_id: int, customer_data: Dict) -> bool:"""更新客户信息"""conn = self.get_connection()cursor = conn.cursor()try:timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')cursor.execute('''UPDATE customersSET name = ?, phone = ?, address = ?, last_used = ?WHERE id = ?''', (customer_data['name'],customer_data.get('phone', ''),customer_data.get('address', ''),timestamp,customer_id))conn.commit()return cursor.rowcount > 0except Exception as e:conn.rollback()raise efinally:conn.close()def delete_customer(self, customer_id: int) -> bool:"""删除客户"""conn = self.get_connection()cursor = conn.cursor()try:cursor.execute('DELETE FROM customers WHERE id = ?', (customer_id,))conn.commit()return cursor.rowcount > 0except Exception as e:conn.rollback()raise efinally:conn.close()def update_order(self, order_id: int, order_data: Dict) -> bool:"""更新销售单"""conn = self.get_connection()cursor = conn.cursor()try:# 更新主订单cursor.execute('''UPDATE sales_orders SET customer_name = ?, customer_phone = ?, customer_address = ?,order_date = ?, total_amount = ?, notes = ?WHERE id = ?''', (order_data['customer_name'],order_data.get('customer_phone', ''),order_data.get('customer_address', ''),order_data.get('order_date', datetime.datetime.now().strftime('%Y-%m-%d')),order_data['total_amount'],order_data.get('notes', ''),order_id))# 删除旧明细cursor.execute('DELETE FROM sales_order_items WHERE order_id = ?', (order_id,))# 插入新明细items = order_data.get('items', [])for item in items:cursor.execute('''INSERT INTO sales_order_items (order_id, product_name, quantity, unit_price, amount)VALUES (?, ?, ?, ?, ?)''', (order_id,item['product_name'],item['quantity'],item['unit_price'],item['amount']))conn.commit()return Trueexcept Exception as e:conn.rollback()raise efinally:conn.close()def delete_order(self, order_id: int) -> bool:"""删除销售单"""conn = self.get_connection()cursor = conn.cursor()try:cursor.execute('DELETE FROM sales_orders WHERE id = ?', (order_id,))conn.commit()return Trueexcept Exception as e:conn.rollback()raise efinally:conn.close()def generate_order_number(self) -> str:"""生成订单号"""today = datetime.datetime.now().strftime('%Y%m%d')conn = self.get_connection()cursor = conn.cursor()cursor.execute('''SELECT COUNT(*) FROM sales_orders WHERE order_number LIKE ?''', (f'{today}%',))count = cursor.fetchone()[0]conn.close()sequence = str(count + 1).zfill(4)return f'{today}{sequence}'# 商品管理方法def get_products(self) -> List[Dict]:"""获取所有商品"""conn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()cursor.execute('''SELECT id, name, description, unit, reference_price, created_at, updated_atFROM productsORDER BY name COLLATE NOCASE ASC''')products = [dict(row) for row in cursor.fetchall()]conn.close()return productsdef get_product(self, product_id: int) -> Optional[Dict]:"""根据ID获取商品"""conn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()cursor.execute('''SELECT id, name, description, unit, reference_price, created_at, updated_atFROM productsWHERE id = ?''', (product_id,))row = cursor.fetchone()conn.close()return dict(row) if row else Nonedef create_product(self, product_data: Dict) -> int:"""创建商品"""conn = self.get_connection()cursor = conn.cursor()try:timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')cursor.execute('''INSERT INTO products (name, description, unit, reference_price, created_at, updated_at)VALUES (?, ?, ?, ?, ?, ?)''', (product_data['name'],product_data.get('description', ''),product_data.get('unit', '个'),product_data.get('reference_price', 0.0),timestamp,timestamp))conn.commit()return cursor.lastrowidexcept Exception as e:conn.rollback()raise efinally:conn.close()def update_product(self, product_id: int, product_data: Dict) -> bool:"""更新商品信息"""conn = self.get_connection()cursor = conn.cursor()try:timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')cursor.execute('''UPDATE productsSET name = ?, description = ?, unit = ?, reference_price = ?, updated_at = ?WHERE id = ?''', (product_data['name'],product_data.get('description', ''),product_data.get('unit', '个'),product_data.get('reference_price', 0.0),timestamp,product_id))conn.commit()return cursor.rowcount > 0except Exception as e:conn.rollback()raise efinally:conn.close()def delete_product(self, product_id: int) -> bool:"""删除商品"""conn = self.get_connection()cursor = conn.cursor()try:cursor.execute('DELETE FROM products WHERE id = ?', (product_id,))conn.commit()return cursor.rowcount > 0except Exception as e:conn.rollback()raise efinally:conn.close()def search_products(self, keyword: str) -> List[Dict]:"""搜索商品"""conn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()cursor.execute('''SELECT id, name, description, unit, reference_price, created_at, updated_atFROM productsWHERE name LIKE ? OR description LIKE ?ORDER BY name COLLATE NOCASE ASC''', (f'%{keyword}%', f'%{keyword}%'))products = [dict(row) for row in cursor.fetchall()]conn.close()return products# 数据导出方法def export_data(self, data_type: str) -> List[Dict]:"""导出数据Args:data_type: 数据类型,可选 'orders', 'customers', 'products'Returns:数据列表"""if data_type == 'orders':return self._export_orders()elif data_type == 'customers':return self._export_customers()elif data_type == 'products':return self._export_products()else:raise ValueError(f"不支持的数据类型: {data_type}")def _export_orders(self) -> List[Dict]:"""导出销售单数据"""conn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()# 获取所有销售单cursor.execute('''SELECT so.*, GROUP_CONCAT(soi.product_name || '|' || soi.quantity || '|' || soi.unit_price || '|' || soi.amount, ';') as itemsFROM sales_orders soLEFT JOIN sales_order_items soi ON so.id = soi.order_idGROUP BY so.idORDER BY so.created_at DESC''')orders = []for row in cursor.fetchall():order_dict = dict(row)# 处理明细项items_str = order_dict.get('items', '')order_items = []if items_str and items_str != 'None':  # 确保items_str不为None或'None'for item_str in items_str.split(';'):if item_str and item_str != 'None':  # 确保item_str不为None或'None'parts = item_str.split('|')if len(parts) == 4:order_items.append({'product_name': parts[0],'quantity': float(parts[1]),'unit_price': float(parts[2]),'amount': float(parts[3])})# 移除原始的items字段(避免SQLite的GROUP_CONCAT结果干扰)if 'items' in order_dict:del order_dict['items']# 添加处理后的明细项order_dict['items'] = order_itemsorders.append(order_dict)conn.close()return ordersdef _export_customers(self) -> List[Dict]:"""导出客户数据"""return self.get_customers()def _export_products(self) -> List[Dict]:"""导出商品数据"""return self.get_products()# 数据导入方法def import_data(self, data_type: str, data: List[Dict]) -> Dict:"""导入数据Args:data_type: 数据类型,可选 'orders', 'customers', 'products'data: 要导入的数据列表Returns:导入结果统计"""if data_type == 'orders':return self._import_orders(data)elif data_type == 'customers':return self._import_customers(data)elif data_type == 'products':return self._import_products(data)else:raise ValueError(f"不支持的数据类型: {data_type}")def _import_orders(self, data: List[Dict]) -> Dict:"""导入销售单数据"""conn = self.get_connection()cursor = conn.cursor()success_count = 0error_count = 0errors = []try:for order_data in data:try:# 检查订单号是否已存在cursor.execute('SELECT id FROM sales_orders WHERE order_number = ?', (order_data.get('order_number'),))existing = cursor.fetchone()if existing:# 更新现有订单order_id = existing[0]cursor.execute('''UPDATE sales_orders SET customer_name=?, customer_phone=?, customer_address=?,order_date=?, total_amount=?, notes=?WHERE id=?''', (order_data.get('customer_name', ''),order_data.get('customer_phone', ''),order_data.get('customer_address', ''),order_data.get('order_date', ''),order_data.get('total_amount', 0.0),order_data.get('notes', ''),order_id))# 删除旧明细cursor.execute('DELETE FROM sales_order_items WHERE order_id = ?', (order_id,))else:# 插入新订单cursor.execute('''INSERT INTO sales_orders (order_number, customer_name, customer_phone, customer_address, order_date, total_amount, notes, created_at)VALUES (?, ?, ?, ?, ?, ?, ?, ?)''', (order_data.get('order_number', ''),order_data.get('customer_name', ''),order_data.get('customer_phone', ''),order_data.get('customer_address', ''),order_data.get('order_date', ''),order_data.get('total_amount', 0.0),order_data.get('notes', ''),order_data.get('created_at', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))))order_id = cursor.lastrowid# 插入明细项items = order_data.get('items', [])# 处理items字段可能被序列化为字符串的情况if isinstance(items, str):try:# 尝试解析JSON字符串import jsonitems = json.loads(items)except Exception as json_error:# 如果JSON解析失败,尝试其他格式处理try:# 可能是Python字面量格式,使用eval(安全方式)import astitems = ast.literal_eval(items)except:# 如果所有解析都失败,设置为空列表items = []# 确保items是列表类型if not isinstance(items, list):items = []for item in items:# 确保item是字典类型if isinstance(item, dict):cursor.execute('''INSERT INTO sales_order_items (order_id, product_name, quantity, unit_price, amount)VALUES (?, ?, ?, ?, ?)''', (order_id,item.get('product_name', ''),item.get('quantity', 0.0),item.get('unit_price', 0.0),item.get('amount', 0.0)))else:# 如果item不是字典,记录警告但继续处理print(f"警告:订单 {order_data.get('order_number', '未知')} 的明细项格式不正确: {item}")success_count += 1except Exception as e:error_count += 1errors.append(f"订单 {order_data.get('order_number', '未知')}: {str(e)}")conn.commit()except Exception as e:conn.rollback()errors.append(f"导入过程错误: {str(e)}")finally:conn.close()return {'success_count': success_count,'error_count': error_count,'errors': errors}def _import_customers(self, data: List[Dict]) -> Dict:"""导入客户数据"""conn = self.get_connection()cursor = conn.cursor()success_count = 0error_count = 0errors = []try:for customer_data in data:try:# 检查客户是否已存在cursor.execute('SELECT id FROM customers WHERE name = ?', (customer_data.get('name'),))existing = cursor.fetchone()if existing:# 更新现有客户cursor.execute('''UPDATE customersSET phone=?, address=?, last_used=?WHERE id=?''', (customer_data.get('phone', ''),customer_data.get('address', ''),customer_data.get('last_used', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')),existing[0]))else:# 插入新客户cursor.execute('''INSERT INTO customers (name, phone, address, last_used)VALUES (?, ?, ?, ?)''', (customer_data.get('name', ''),customer_data.get('phone', ''),customer_data.get('address', ''),customer_data.get('last_used', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))))success_count += 1except Exception as e:error_count += 1errors.append(f"客户 {customer_data.get('name', '未知')}: {str(e)}")conn.commit()except Exception as e:conn.rollback()errors.append(f"导入过程错误: {str(e)}")finally:conn.close()return {'success_count': success_count,'error_count': error_count,'errors': errors}def _import_products(self, data: List[Dict]) -> Dict:"""导入商品数据"""conn = self.get_connection()cursor = conn.cursor()success_count = 0error_count = 0errors = []try:for product_data in data:try:# 检查商品是否已存在cursor.execute('SELECT id FROM products WHERE name = ?', (product_data.get('name'),))existing = cursor.fetchone()if existing:# 更新现有商品cursor.execute('''UPDATE productsSET description=?, unit=?, reference_price=?, updated_at=?WHERE id=?''', (product_data.get('description', ''),product_data.get('unit', '个'),product_data.get('reference_price', 0.0),datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),existing[0]))else:# 插入新商品cursor.execute('''INSERT INTO products (name, description, unit, reference_price, created_at, updated_at)VALUES (?, ?, ?, ?, ?, ?)''', (product_data.get('name', ''),product_data.get('description', ''),product_data.get('unit', '个'),product_data.get('reference_price', 0.0),datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))success_count += 1except Exception as e:error_count += 1errors.append(f"商品 {product_data.get('name', '未知')}: {str(e)}")conn.commit()except Exception as e:conn.rollback()errors.append(f"导入过程错误: {str(e)}")finally:conn.close()return {'success_count': success_count,'error_count': error_count,'errors': errors}# 应收对账管理方法def get_receivable_summary(self, period_type: str = 'month', year: int = None, month: int = None) -> List[Dict]:"""获取应收对账汇总数据Args:period_type: 统计周期类型,'month'按月统计,'year'按年统计year: 指定年份,如果为None则使用当前年份month: 指定月份(仅当period_type='month'时有效),如果为None则统计全年Returns:应收对账汇总数据列表"""if year is None:year = datetime.datetime.now().yearconn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()if period_type == 'month':if month is None:# 按月份统计全年数据cursor.execute('''SELECT strftime('%Y-%m', order_date) as period,COUNT(*) as order_count,SUM(total_amount) as total_amount,COUNT(DISTINCT customer_name) as customer_countFROM sales_orders WHERE strftime('%Y', order_date) = ?GROUP BY strftime('%Y-%m', order_date)ORDER BY period DESC''', (str(year),))else:# 统计指定月份的数据cursor.execute('''SELECT order_date as period,COUNT(*) as order_count,SUM(total_amount) as total_amount,COUNT(DISTINCT customer_name) as customer_countFROM sales_orders WHERE strftime('%Y-%m', order_date) = ?GROUP BY order_dateORDER BY order_date DESC''', (f"{year:04d}-{month:02d}",))else:  # period_type == 'year'cursor.execute('''SELECT strftime('%Y', order_date) as period,COUNT(*) as order_count,SUM(total_amount) as total_amount,COUNT(DISTINCT customer_name) as customer_countFROM sales_orders GROUP BY strftime('%Y', order_date)ORDER BY period DESC''')summary_data = []for row in cursor.fetchall():summary_data.append(dict(row))conn.close()return summary_datadef get_receivable_details(self, period: str, period_type: str = 'month') -> List[Dict]:"""获取应收对账明细数据Args:period: 统计周期,格式为'YYYY-MM'(月份)或'YYYY'(年份)period_type: 统计周期类型,'month'按月统计,'year'按年统计Returns:应收对账明细数据列表"""conn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()if period_type == 'month':# 获取指定月份的销售单明细cursor.execute('''SELECT so.order_number,so.customer_name,so.customer_phone,so.order_date,so.total_amount,soi.product_name,soi.quantity,soi.unit_price,soi.amountFROM sales_orders soJOIN sales_order_items soi ON so.id = soi.order_idWHERE strftime('%Y-%m', so.order_date) = ?ORDER BY so.order_date DESC, so.order_number''', (period,))else:  # period_type == 'year'# 获取指定年份的销售单明细cursor.execute('''SELECT so.order_number,so.customer_name,so.customer_phone,so.order_date,so.total_amount,soi.product_name,soi.quantity,soi.unit_price,soi.amountFROM sales_orders soJOIN sales_order_items soi ON so.id = soi.order_idWHERE strftime('%Y', so.order_date) = ?ORDER BY so.order_date DESC, so.order_number''', (period,))details_data = []for row in cursor.fetchall():details_data.append(dict(row))conn.close()return details_datadef get_customer_receivable_summary(self, period: str, period_type: str = 'month') -> List[Dict]:"""获取客户应收对账汇总Args:period: 统计周期,格式为'YYYY-MM'(月份)或'YYYY'(年份)period_type: 统计周期类型,'month'按月统计,'year'按年统计Returns:客户应收对账汇总数据列表"""conn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()if period_type == 'month':cursor.execute('''SELECT customer_name,COUNT(*) as order_count,SUM(total_amount) as total_amount,MIN(order_date) as first_order_date,MAX(order_date) as last_order_dateFROM sales_orders WHERE strftime('%Y-%m', order_date) = ?GROUP BY customer_nameORDER BY total_amount DESC''', (period,))else:  # period_type == 'year'cursor.execute('''SELECT customer_name,COUNT(*) as order_count,SUM(total_amount) as total_amount,MIN(order_date) as first_order_date,MAX(order_date) as last_order_dateFROM sales_orders WHERE strftime('%Y', order_date) = ?GROUP BY customer_nameORDER BY total_amount DESC''', (period,))customer_summary = []for row in cursor.fetchall():customer_summary.append(dict(row))conn.close()return customer_summary

http://www.dtcms.com/a/601131.html

相关文章:

  • 网站权重是什么华大基因背景调查
  • 华为桌面图标模糊问题解决方案
  • MotionTrans: 从人类VR数据学习机器人操作的运动级迁移
  • [Dify 实战案例] 构建一个 CV 内容优化与润色助手:让 AI 成为你的简历教练
  • 【计算思维】蓝桥杯STEMA 科技素养考试真题及解析 B
  • Kanass实战教程系列(3) - 项目经理如何使用kanass有效管理项目
  • 成都网站建设 网络公司建设工程中标查询网站
  • C语言编译程序属于应用软件 | 理解编译原理与应用场景
  • 蛋糕网站模板汕头网络营销公司
  • HOT100题打卡第37天——贪心算法
  • Python学习历程——模块
  • bin文件反编译C语言 | 深入探讨反编译技术及其应用
  • 测开学习DAY27
  • dede cms 网站模板云匠网怎么样
  • 信息学奥赛一本通 1625:【例 1】反素数 Antiprime | 洛谷 P1463 [POI 2001 R1 / HAOI2007] 反素数
  • 如何做网站长尾关键词布局工程公司取名字大全
  • 深度学习:从零开始手搓一个深层神经网络
  • Docker 多服务镜像构建完整教程
  • Docker 启动 EMQX 5.x 并配置自签名证书
  • 网站招工费怎么做会计分录小小视频免费观看高清
  • C++重点知识梳理(上)
  • 长沙市建设局官方网站wordpress 显示文章标签
  • 基于用户评论分析挖掘的旅游景点推荐系统
  • 宣传旅游网站建设的重点是什么装修公司哪家好排名
  • 【C语言学习笔记】动态内存分配:malloc/free的正确打开方式
  • HOVER:用于人形机器人的多功能全身神经控制器
  • 学会给网页穿衣服——学习 CSS 语言
  • Android11-Launcher3 定制-去除副屏幕-可以滑动效果 - 篇二
  • 在 ubuntu怎么创建一个新用户,并且设置为默认自动登录用户,最简单
  • 字符串的陷阱与艺术——std::string全解析