一个基于Python Streamlit sqlite3 的销售单管理系统,提供商品管理、客户管理、销售单管理及打印,和应收对账单等功能
图:

功能特性
核心功能
-
商品管理 - 添加、编辑、删除商品信息,支持商品搜索
-
客户管理 - 管理客户信息,包括客户名称、联系电话、地址等
-
销售单管理 - 创建销售单,关联客户和商品,自动计算金额
-
数据统计 - 销售数据统计和可视化分析
技术特性
-
基于Streamlit框架,界面简洁易用
-
SQLite数据库存储,数据安全可靠
-
响应式设计,支持即时数据刷新
-
完整的错误处理和用户提示
安装和运行
环境要求
-
Python 3.8+
-
推荐使用虚拟环境
安装步骤
-
克隆或下载项目文件
-
安装依赖包:
pip install -r requirements.txt
streamlit>=1.28.0
pandas>=2.0.0
-
运行应用程序:
streamlit run app.py
run.bat文件
-
在浏览器中打开显示的URL(通常是 http://localhost:8501)
快速启动
Windows用户可以直接双击运行 run.bat 文件启动应用。
使用说明
商品管理
-
在左侧导航栏选择"商品管理"
-
使用表单添加新商品,包括商品名称、描述、单位和参考价格
-
在商品列表中可以进行编辑、删除操作
-
使用搜索框快速查找商品
客户管理
-
在左侧导航栏选择"客户管理"
-
添加客户信息,包括客户名称、联系电话和地址
-
客户列表显示所有客户信息,支持编辑和删除
-
客户信息会在创建销售单时自动关联
销售单管理
-
在左侧导航栏选择"销售单管理"
-
选择客户(支持搜索和新增客户)
-
添加商品到销售单,设置数量和单价
-
系统自动计算总金额
-
保存销售单后可以查看和打印
数据统计
-
在左侧导航栏选择"数据统计"
-
查看销售数据的统计图表
-
支持按时间范围筛选数据
文件结构
销售管理系统/ ├── app.py # 主应用程序文件 ├── database.py # 数据库操作模块 ├── requirements.txt # Python依赖包列表 ├── run.bat # Windows启动脚本 ├── sales.db # SQLite数据库文件(自动生成) └── README.md # 项目说明文档
数据库说明
系统使用SQLite数据库,主要数据表包括:
-
products- 商品信息表 -
customers- 客户信息表 -
sales- 销售单主表 -
sale_items- 销售明细表
数据库文件 sales.db 会在首次运行时自动创建。
开发说明
主要依赖包
-
streamlit - Web应用框架
-
pandas - 数据处理
-
sqlite3 - 数据库操作
自定义开发
如需扩展功能,可以修改以下文件:
-
app.py- 界面逻辑和业务处理 -
database.py- 数据库操作函数 -
添加新的Python模块到项目目录
故障排除
常见问题
-
端口占用错误
-
关闭其他占用8503端口的应用
-
或使用
streamlit run app.py --server.port 8504指定其他端口
-
-
导入错误
-
确保已安装所有依赖包:
pip install -r requirements.txt -
检查Python版本是否符合要求
-
-
数据库错误
-
删除
sales.db文件后重新启动应用 -
检查文件读写权限
-
开始使用:运行 streamlit run app.py 启动应用!
app.py代码:
import streamlit as st
import pandas as pd
from database import Database
import datetime
from typing import Optional, Dict# 页面配置
st.set_page_config(page_title="销售单管理系统",page_icon="📋",layout="wide"
)# 初始化数据库
@st.cache_resource
def init_db():return Database()db = init_db()# 初始化session state
ITEMS_KEY = 'order_items'def get_default_state():"""返回一个新的默认状态字典"""return {ITEMS_KEY: [],'nav_page': '创建销售单','pending_nav_page': None,'pending_selected_order_id': None,'customer_name': '','customer_phone': '','customer_address': '','order_date': datetime.date.today(),'notes': '','new_product_name': '','new_quantity': 1.0,'new_unit_price': 0.0,'edit_mode': False,'available_customers': [],'selected_customer_option': '新增客户','customer_form_mode': 'create','customer_edit_id': None,'selected_order_id': None,'customer_form_name': '','customer_form_phone': '','customer_form_address': '','customer_form_reset_trigger': False,'customer_form_pending_values': None,'reset_form_trigger': False,}# 初始化缺失的状态
for state_key, default_value in get_default_state().items():if state_key not in st.session_state:st.session_state[state_key] = default_value# 如果触发了重置,在控件创建前统一恢复默认值
if st.session_state.get('reset_form_trigger'):defaults = get_default_state()for state_key, default_value in defaults.items():st.session_state[state_key] = default_valuest.session_state['reset_form_trigger'] = False# 处理客户表单重置
if st.session_state.get('customer_form_reset_trigger'):st.session_state.customer_form_mode = 'create'st.session_state.customer_edit_id = Nonest.session_state.customer_form_name = ''st.session_state.customer_form_phone = ''st.session_state.customer_form_address = ''st.session_state.customer_form_reset_trigger = False# 处理客户表单待填充值
pending_customer = st.session_state.get('customer_form_pending_values')
if pending_customer:st.session_state.customer_form_mode = pending_customer.get('mode', 'create')st.session_state.customer_edit_id = pending_customer.get('id')st.session_state.customer_form_name = pending_customer.get('name', '')st.session_state.customer_form_phone = pending_customer.get('phone', '')st.session_state.customer_form_address = pending_customer.get('address', '')st.session_state.customer_form_pending_values = None# 处理触发重新运行
if st.session_state.get('trigger_rerun'):st.session_state.trigger_rerun = Falsest.rerun()# 恢复客户信息(在重新运行后)
if st.session_state.get('preserved_customer_name') is not None:st.session_state.customer_name = st.session_state.preserved_customer_namest.session_state.customer_phone = st.session_state.preserved_customer_phonest.session_state.customer_address = st.session_state.preserved_customer_addressst.session_state.selected_customer_option = st.session_state.preserved_selected_customer_option# 清除保存的状态st.session_state.preserved_customer_name = Nonest.session_state.preserved_customer_phone = Nonest.session_state.preserved_customer_address = Nonest.session_state.preserved_selected_customer_option = Nonedef normalize_items(raw_items):"""规范化商品明细数据"""if not raw_items:return []if isinstance(raw_items, dict):raw_items = [raw_items]normalized = []for item in raw_items:if item is None:continueif not isinstance(item, dict):try:item = dict(item)except Exception:continuenormalized.append({'product_name': item.get('product_name', ''),'quantity': float(item.get('quantity', 0) or 0),'unit_price': float(item.get('unit_price', 0) or 0),'amount': float(item.get('amount', 0) or 0)})return normalizeddef reset_form():"""重置表单"""st.session_state['reset_form_trigger'] = Truedef reset_customer_form():"""重置客户表单"""st.session_state.customer_form_reset_trigger = Truedef schedule_customer_edit(customer: Dict):"""安排客户信息填充"""st.session_state.customer_form_pending_values = {'mode': 'edit','id': customer.get('id'),'name': customer.get('name', ''),'phone': customer.get('phone', ''),'address': customer.get('address', '')}def set_customer_fields(customer: Optional[Dict] = None):"""根据客户信息填充表单"""if customer:st.session_state.customer_name = customer.get('name', '')st.session_state.customer_phone = customer.get('phone', '')st.session_state.customer_address = customer.get('address', '')else:st.session_state.customer_name = ''st.session_state.customer_phone = ''st.session_state.customer_address = ''def handle_customer_selection():"""处理客户选择变化"""option = st.session_state.get('selected_customer_option', '新增客户')customers = st.session_state.get('available_customers', [])if option == '新增客户':set_customer_fields(None)returncustomer = next((c for c in customers if c.get('name') == option), None)if customer:set_customer_fields(customer)def load_customer_options():"""获取客户选项列表"""customers = db.get_customers()st.session_state.available_customers = customersoptions = ['新增客户'] + [c['name'] for c in customers]if st.session_state.selected_customer_option not in options:st.session_state.selected_customer_option = '新增客户'return optionsdef download_template(data_type: str, file_format: str):"""下载数据导入模板"""try:# 根据数据类型创建模板数据if data_type == "销售单":template_data = [{"order_number": "SO20240001","customer_name": "示例客户","customer_phone": "13800138000","customer_address": "示例地址","order_date": "2024-01-01","total_amount": 1000.00,"notes": "示例备注","created_at": "2024-01-01 10:00:00"}]file_name = "销售单导入模板"elif data_type == "客户":template_data = [{"name": "示例客户","phone": "13800138000","address": "示例地址","last_used": "2024-01-01 10:00:00"}]file_name = "客户导入模板"elif data_type == "商品":template_data = [{"name": "示例商品","description": "商品描述","unit": "个","reference_price": 100.00,"created_at": "2024-01-01 10:00:00","updated_at": "2024-01-01 10:00:00"}]file_name = "商品导入模板"else:st.error("不支持的数据类型")return# 转换为DataFramedf = pd.DataFrame(template_data)# 根据文件格式处理数据if file_format == "Excel (.xlsx)":import iobuffer = io.BytesIO()with pd.ExcelWriter(buffer, engine='openpyxl') as writer:df.to_excel(writer, sheet_name=data_type, index=False)# 提供下载st.download_button(label=f"下载{file_format}",data=buffer.getvalue(),file_name=f"{file_name}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx",mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",use_container_width=True,key=f"download_{data_type}_{file_format}")elif file_format == "CSV (.csv)":# 提供CSV下载csv_data = df.to_csv(index=False, encoding='utf-8-sig')st.download_button(label=f"下载{file_format}",data=csv_data,file_name=f"{file_name}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",mime="text/csv",use_container_width=True,key=f"download_{data_type}_{file_format}")elif file_format == "JSON (.json)":# 提供JSON下载json_data = df.to_json(orient='records', force_ascii=False, indent=2)st.download_button(label=f"下载{file_format}",data=json_data,file_name=f"{file_name}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.json",mime="application/json",use_container_width=True,key=f"download_{data_type}_{file_format}")st.success(f"{data_type}模板已准备就绪,请点击下载按钮")except Exception as e:st.error(f"模板下载失败:{str(e)}")def add_item_manual():"""手动添加商品项"""if (st.session_state.new_product_name andst.session_state.new_quantity > 0 andst.session_state.new_unit_price > 0):amount = st.session_state.new_quantity * st.session_state.new_unit_pricecurrent_items = normalize_items(st.session_state.get(ITEMS_KEY))current_items.append({'product_name': st.session_state.new_product_name,'quantity': st.session_state.new_quantity,'unit_price': st.session_state.new_unit_price,'amount': amount})st.session_state[ITEMS_KEY] = current_itemsst.session_state.new_product_name = ""st.session_state.new_quantity = 1.0st.session_state.new_unit_price = 0.0st.session_state.selected_product_option = "手动输入商品"# 保存当前客户信息,避免重新运行时丢失st.session_state.preserved_customer_name = st.session_state.get('customer_name', '')st.session_state.preserved_customer_phone = st.session_state.get('customer_phone', '')st.session_state.preserved_customer_address = st.session_state.get('customer_address', '')st.session_state.preserved_selected_customer_option = st.session_state.get('selected_customer_option', '新增客户')# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Truedef add_item_from_library():"""从商品库添加商品项"""if (st.session_state.new_product_name andst.session_state.new_quantity > 0 andst.session_state.new_unit_price > 0):amount = st.session_state.new_quantity * st.session_state.new_unit_pricecurrent_items = normalize_items(st.session_state.get(ITEMS_KEY))current_items.append({'product_name': st.session_state.new_product_name,'quantity': st.session_state.new_quantity,'unit_price': st.session_state.new_unit_price,'amount': amount})st.session_state[ITEMS_KEY] = current_itemsst.session_state.new_product_name = ""st.session_state.new_quantity = 1.0st.session_state.new_unit_price = 0.0st.session_state.selected_product_option = "手动输入商品"# 保存当前客户信息,避免重新运行时丢失st.session_state.preserved_customer_name = st.session_state.get('customer_name', '')st.session_state.preserved_customer_phone = st.session_state.get('customer_phone', '')st.session_state.preserved_customer_address = st.session_state.get('customer_address', '')st.session_state.preserved_selected_customer_option = st.session_state.get('selected_customer_option', '新增客户')# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Truedef remove_item(index):"""删除商品项"""current_items = normalize_items(st.session_state.get(ITEMS_KEY))if 0 <= index < len(current_items):current_items.pop(index)st.session_state[ITEMS_KEY] = current_items# 保存当前客户信息,避免重新运行时丢失st.session_state.preserved_customer_name = st.session_state.get('customer_name', '')st.session_state.preserved_customer_phone = st.session_state.get('customer_phone', '')st.session_state.preserved_customer_address = st.session_state.get('customer_address', '')st.session_state.preserved_selected_customer_option = st.session_state.get('selected_customer_option', '新增客户')# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Truedef calculate_total():"""计算总金额"""return sum(item['amount'] for item in normalize_items(st.session_state.get(ITEMS_KEY)))def generate_print_html(order_data):"""生成打印用的HTML(贴近纸质销售单模板样式)"""# 卖方信息(可按需修改)seller = {"name": "开平市xxxx电脑行","address": "开平市三xxx铺","tel": "0750-xxx","qq": "xxxx","wechat": "","wechat_public": "xxx","website": ""}def to_rmb_upper(value: float) -> str:nums = ["零", "壹", "贰", "叁", "肆", "伍", "陆", "柒", "捌", "玖"]units = ["", "拾", "佰", "仟"]big_units = ["", "万"]negative = value < 0value = abs(round(value + 1e-8, 2))integer = int(value)fraction = int(round((value - integer) * 100))def four_digit_to_upper(num: int) -> str:result = ""zero_flag = Falsefor idx in range(4):digit = num // (10 ** (3 - idx)) % 10if digit == 0:if not zero_flag and result:result += "零"zero_flag = Trueelse:result += nums[digit] + units[3 - idx]zero_flag = Falsereturn result.rstrip("零")int_str = "零元"if integer > 0:parts = []idx = 0while integer > 0:segment = integer % 10000if segment:segment_str = four_digit_to_upper(segment)if idx > 0:segment_str += big_units[idx]parts.append(segment_str)integer //= 10000idx += 1int_str = "".join(reversed(parts))int_str = (int_str.replace("零零", "零").replace("零万", "万"))if int_str.endswith("零"):int_str = int_str[:-1]int_str += "元"frac_str = "整"if fraction:jiao = fraction // 10fen = fraction % 10parts = []if jiao:parts.append(nums[jiao] + "角")if fen:parts.append(nums[fen] + "分")frac_str = "".join(parts)result = ("负" if negative else "") + int_strif frac_str != "整":result += frac_strreturn resultdef big_amount_boxes(text: str, total_length: int = 12) -> str:cleaned = text.replace("整", "").strip()cells = list(cleaned)if len(cells) < total_length:cells = [""] * (total_length - len(cells)) + cellselse:cells = cells[-total_length:]return "".join(f'<div class="box">{c}</div>' for c in cells)total_upper = to_rmb_upper(float(order_data["total_amount"]))total_lower = f"¥{float(order_data['total_amount']):.2f}"order_no = order_data.get("order_number", "")order_date = order_data.get("order_date", "")created_at = order_data.get("created_at", "")items_rows = ""for idx, item in enumerate(order_data["items"], 1):unit = item.get("unit", "")items_rows += f"""<tr><td class="c">{idx}</td><td>{item['product_name']}</td><td class="c">{unit}</td><td class="r">{item['quantity']}</td><td class="r">{item['unit_price']:.2f}</td><td class="r">{item['amount']:.2f}</td><td></td></tr>"""html = f"""
<!DOCTYPE html>
<html>
<head><meta charset="UTF-8"><title>销售单 - {order_no}</title><style>@media print {{@page {{size: 210mm 13.97mm; margin: 10mm;}}}}body {{font-family: "Microsoft YaHei", Arial, sans-serif;color: #000;}}.sheet {{width: 190mm;margin: 0 auto;padding: 8mm 8mm 6mm 8mm;}}.title-row {{display: flex;align-items: center;justify-content: center;position: relative;border-bottom: 0px solid #000;}}.title-row .title {{font-size: 20pt;font-weight: 700;letter-spacing: 1px;}}.title-row .no {{position: absolute;right: 0;top: 0;font-size: 12pt;}}.sub-row {{display: flex;justify-content: right;margin-top: 3mm;margin-bottom: 3mm;}}.label {{margin-right: 3mm;}}.frame {{border: 1px solid #000;padding: 2mm 3mm;height: 18mm;border-bottom: none;}}.frame1 {{border: 1px solid #000;border-left: none;border-bottom: none;padding: 2mm 3mm;height: 18mm;}}.flex {{display: flex;gap: 0mm;}}.w50 {{ width: 50%; }}.muted {{ font-size: 10pt; color: #333; }}table.items {{width: 100%;border-collapse: collapse;}}table.items th, table.items td {{border: 1px solid #000;padding: 1mm 1mm;font-size: 10pt;}}table.items th {{text-align: center;}}.c {{ text-align: center; }}.r {{ text-align: right; }}.amount-row {{display: flex;align-items: flex-end;gap: 6mm;border-left: 1px solid #000;border-right: 1px solid #000;margin-top: 0mm;padding-bottom: 2mm;}}.amount-row .label-big {{white-space: nowrap;font-weight: 700;font-size: 12pt;}}.amount-box-wrapper {{display: flex;align-items: center;gap: 2mm;}}.amount-box-wrapper .box {{width: 8mm;height: 6mm;border: 1px solid #000;display: flex;justify-content: center;align-items: center;font-size: 12pt;}}.amount-small {{display: flex;align-items: center;gap: 3mm;font-size: 10pt;}}.amount-small .amount-lower {{min-width: 30mm;border-bottom: 1px solid #000;text-align: right;font-size: 12pt;font-weight: 700;padding: 1mm 2mm;}}.note-area {{border: 1px solid #000;padding: 2mm 3mm;min-height: 12mm;}}.pay-ways {{display: flex;gap: 8mm;align-items: center;margin-top: 3mm;}}.checkbox {{border: 1px solid #000;width: 4mm; height: 4mm;display: inline-block;margin-right: 2mm;}}.sign-row {{display: flex;justify-content: space-between;align-items: center;margin-top: 6mm;}}.small {{font-size: 10pt;}}</style>
</head>
<body><div class="sheet"><div class="title-row"><div class="title">开平xxx电脑(销售单)</div><div class="no">No: {order_no}</div></div><div class="sub-row small"><div>开单日期:{order_date if order_date else created_at[:10] if created_at else ""}</div></div><div class="flex"><div class="w50 frame small"><div class="label"><strong>购买方信息</strong></div><div>名称:{order_data.get('customer_name','')}</div><div>联系信息:{order_data.get('customer_phone','')}</div><div>地址:{order_data.get('customer_address','')}</div></div><div class="w50 frame1 small"><div class="label"><strong>销售方信息</strong></div><div>名称:{seller["name"]}</div><div>地址:{seller["address"]} </div><div>电话:{seller["tel"]}</div></div></div><table class="items"><thead><tr><th style="width:8%">序号</th><th>名称规格</th><th style="width:10%">单位</th><th style="width:12%">数量</th><th style="width:12%">单价</th><th style="width:14%">总金额</th><th style="width:14%">备注</th></tr></thead><tbody>{items_rows}{"".join(f"<tr><td> </td><td></td><td></td><td></td><td></td><td></td><td></td></tr>" for _ in range(max(0, 6 - len(order_data['items']))))}</tbody></table><div class="amount-row"><div class="label-big">合计(大写)</div><div class="amount-box-wrapper">{big_amount_boxes(total_upper, total_length=10)}</div><div class="amount-small">(小写)<span class="amount-lower">{total_lower}</span></div></div><div class="note-area small">备注:{order_data.get('notes','')}<div class="pay-ways">付款方式:<span class="checkbox"></span>现金<span class="checkbox"></span>转账<span class="checkbox"></span>微信<span class="checkbox"></span>支付宝</div></div><div class="sign-row small"><div>销售人:__________ 收款人:__________</div><div> 客户签名:_________________日期:20____ 年 ____ 月 ____ 日</div></div></div>
</body>
</html>
"""return html# 主界面
st.title("📋 销售单管理系统")# 侧边栏导航
page_options = ["创建销售单", "销售单列表", "查看/编辑销售单", "客户管理", "商品管理", "应收对账管理", "数据管理"]
# 若存在挂起的页面跳转请求,需在渲染选择控件前应用
if st.session_state.get('pending_nav_page'):st.session_state.nav_page = st.session_state.pending_nav_pagest.session_state.pending_nav_page = None
if st.session_state.get('pending_selected_order_id') is not None:st.session_state.selected_order_id = st.session_state.pending_selected_order_idst.session_state.pending_selected_order_id = None
if st.session_state.nav_page not in page_options:st.session_state.nav_page = page_options[0]
page = st.sidebar.selectbox("选择功能", page_options, key="nav_page")if page == "创建销售单":st.header("创建新销售单")customer_options = load_customer_options()st.selectbox("选择已有客户",customer_options,key="selected_customer_option",on_change=handle_customer_selection,help="从历史客户中选择将自动填充客户信息")# 客户信息col1, col2 = st.columns(2)with col1:customer_name = st.text_input("客户名称 *", key="customer_name")customer_phone = st.text_input("联系电话", key="customer_phone")with col2:order_date = st.date_input("订单日期", key="order_date")customer_address = st.text_input("客户地址", key="customer_address")notes = st.text_area("备注", key="notes")st.divider()st.subheader("商品明细")# 添加商品 - 支持从商品库选择st.markdown("**添加商品**")# 获取商品列表products = db.get_products()product_options = ["手动输入商品"] + [f"{p['name']} ({p['unit']}) - ¥{p['reference_price']:.2f}" for p in products]col1, col2 = st.columns([2, 1])with col1:selected_product_option = st.selectbox("选择商品",product_options,key="selected_product_option",help="从商品库选择商品将自动填充名称和参考价格")# 根据选择显示不同的输入方式if selected_product_option == "手动输入商品":col1, col2, col3, col4 = st.columns([3, 2, 2, 1])with col1:st.text_input("商品名称", key="new_product_name", placeholder="输入商品名称")with col2:st.number_input("数量", key="new_quantity", min_value=0.01, step=0.01)with col3:st.number_input("单价", key="new_unit_price", min_value=0.0, step=0.01, format="%.2f")with col4:st.write("")st.write("")st.button("添加", on_click=add_item_manual, type="primary")else:# 从商品库选择selected_product_name = selected_product_option.split(" (")[0]selected_product = next((p for p in products if p['name'] == selected_product_name), None)if selected_product:col1, col2, col3, col4 = st.columns([3, 2, 2, 1])with col1:st.text_input("商品名称", value=selected_product['name'], key="new_product_name", disabled=True)with col2:st.number_input("数量", key="new_quantity", min_value=0.01, step=0.01)with col3:st.number_input("单价", value=selected_product['reference_price'], key="new_unit_price", min_value=0.0, step=0.01, format="%.2f")with col4:st.write("")st.write("")st.button("添加", on_click=add_item_from_library, type="primary")# 显示已添加的商品normalized_items = normalize_items(st.session_state.get(ITEMS_KEY))if normalized_items:st.write("")df_items = pd.DataFrame(normalized_items)df_items['序号'] = range(1, len(df_items) + 1)df_items = df_items[['序号', 'product_name', 'quantity', 'unit_price', 'amount']]df_items.columns = ['序号', '商品名称', '数量', '单价', '金额']# 显示表格,每行有删除按钮for idx, item in enumerate(normalized_items):col1, col2, col3, col4, col5, col6 = st.columns([1, 4, 2, 2, 2, 1])with col1:st.write(idx + 1)with col2:st.write(item['product_name'])with col3:st.write(f"{item['quantity']:.2f}")with col4:st.write(f"¥{item['unit_price']:.2f}")with col5:st.write(f"¥{item['amount']:.2f}")with col6:st.button("删除", key=f"del_{idx}", on_click=remove_item, args=(idx,))st.divider()total = calculate_total()st.markdown(f"### 合计金额:¥{total:.2f}")# 提交按钮col1, col2 = st.columns([1, 1])with col1:if st.button("保存销售单", type="primary", use_container_width=True):if not customer_name:st.error("请填写客户名称!")elif not normalized_items:st.error("请至少添加一个商品!")else:try:# 检查并保存新商品到商品库existing_products = db.get_products()existing_product_names = [p['name'] for p in existing_products]for item in normalized_items:product_name = item['product_name']if product_name and product_name not in existing_product_names:# 新商品,添加到商品库db.create_product({'name': product_name,'description': f'从销售单自动添加的商品:{product_name}','unit': '个','reference_price': item['unit_price']})st.info(f"新商品 '{product_name}' 已自动添加到商品库")order_data = {'customer_name': customer_name,'customer_phone': customer_phone,'customer_address': customer_address,'order_date': order_date.strftime('%Y-%m-%d'),'total_amount': total,'notes': notes,'items': normalized_items}db.save_customer({'name': customer_name,'phone': customer_phone,'address': customer_address})order_id = db.create_order(order_data)st.success(f"销售单创建成功!订单号:{db.get_order(order_id)['order_number']}")reset_form()# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Trueexcept Exception as e:st.error(f"创建失败:{str(e)}")with col2:if st.button("重置表单", use_container_width=True):reset_form()# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Trueelse:st.info("请添加商品明细")elif page == "销售单列表":st.header("销售单列表")orders = db.get_all_orders()if orders:# 搜索和筛选col1, col2 = st.columns([2, 1])with col1:search_term = st.text_input("搜索(订单号/客户名称)", "")with col2:date_filter = st.date_input("日期筛选", value=None)# 筛选订单filtered_orders = ordersif search_term:filtered_orders = [o for o in filtered_orders if search_term.lower() in o['order_number'].lower() or search_term.lower() in o['customer_name'].lower()]if date_filter:filtered_orders = [o for o in filtered_orders if o['order_date'] == date_filter.strftime('%Y-%m-%d')]if filtered_orders:# 分页设置if 'page_size' not in st.session_state:st.session_state.page_size = 10if 'current_page' not in st.session_state:st.session_state.current_page = 1# 分页控件col1, col2, col3 = st.columns([1, 2, 1])with col1:page_size = st.selectbox("每页显示数量", [5, 10, 20, 50], index=1)st.session_state.page_size = page_sizewith col2:st.write(f"共 {len(filtered_orders)} 条订单")with col3:total_pages = max(1, (len(filtered_orders) + st.session_state.page_size - 1) // st.session_state.page_size)# 确保当前页码不超过总页数if st.session_state.current_page > total_pages:st.session_state.current_page = 1current_page = st.number_input("页码", min_value=1, max_value=total_pages, value=st.session_state.current_page)st.session_state.current_page = current_page# 计算当前页的数据范围start_idx = (st.session_state.current_page - 1) * st.session_state.page_sizeend_idx = min(start_idx + st.session_state.page_size, len(filtered_orders))current_orders = filtered_orders[start_idx:end_idx]# 显示分页信息st.write(f"显示第 {start_idx + 1} - {end_idx} 条订单(共 {len(filtered_orders)} 条)")# 显示订单列表for order in current_orders:with st.expander(f"订单号:{order['order_number']} | 客户:{order['customer_name']} | 金额:¥{order['total_amount']:.2f} | 日期:{order['order_date']}"):col1, col2, col3 = st.columns(3)with col1:st.write(f"**客户名称:** {order['customer_name']}")st.write(f"**联系电话:** {order.get('customer_phone', '')}")with col2:st.write(f"**订单日期:** {order['order_date']}")st.write(f"**总金额:** ¥{order['total_amount']:.2f}")with col3:if st.button("查看详情", key=f"view_{order['id']}"):st.session_state.selected_order_id = order['id']st.session_state.pending_nav_page = "查看/编辑销售单"st.session_state.pending_selected_order_id = order['id']st.session_state.edit_mode = False# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Trueif st.button("删除", key=f"delete_{order['id']}"):try:db.delete_order(order['id'])st.success("删除成功!")# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Trueexcept Exception as e:st.error(f"删除失败:{str(e)}")# 分页导航if total_pages > 1:st.divider()col1, col2, col3, col4, col5 = st.columns([1, 1, 2, 1, 1])with col1:if st.button("首页", disabled=st.session_state.current_page == 1):st.session_state.current_page = 1st.session_state.trigger_rerun = Truest.rerun()with col2:if st.button("上一页", disabled=st.session_state.current_page == 1):st.session_state.current_page = max(1, st.session_state.current_page - 1)st.session_state.trigger_rerun = Truest.rerun()with col3:st.write(f"第 {st.session_state.current_page} 页 / 共 {total_pages} 页")with col4:if st.button("下一页", disabled=st.session_state.current_page == total_pages):st.session_state.current_page = min(total_pages, st.session_state.current_page + 1)st.session_state.trigger_rerun = Truest.rerun()with col5:if st.button("末页", disabled=st.session_state.current_page == total_pages):st.session_state.current_page = total_pagesst.session_state.trigger_rerun = Truest.rerun()else:st.info("没有找到匹配的订单")else:st.info("暂无销售单记录")elif page == "查看/编辑销售单":st.header("查看/编辑销售单")# 选择订单orders = db.get_all_orders()if orders:order_options = {f"{o['order_number']} - {o['customer_name']} (¥{o['total_amount']:.2f})": o['id']for o in orders}label_list = list(order_options.keys())default_index = 0selected_order_id = st.session_state.get('selected_order_id')if selected_order_id:for idx, label in enumerate(label_list):if order_options[label] == selected_order_id:default_index = idxbreakselected_label = st.selectbox("选择订单", label_list, index=default_index)order_id = order_options[selected_label]if st.session_state.get('selected_order_id') != order_id:st.session_state.selected_order_id = order_idif order_id:order = db.get_order(order_id)if order:# 编辑模式切换 - 使用不同的状态管理方式if 'edit_mode_state' not in st.session_state:st.session_state.edit_mode_state = False# 处理成功和取消状态if st.session_state.get('edit_mode_success'):st.session_state.edit_mode_state = Falsest.session_state.edit_mode_success = Falseif st.session_state.get('edit_mode_cancel'):st.session_state.edit_mode_state = Falsest.session_state.edit_mode_cancel = False# 编辑模式切换按钮col1, col2 = st.columns([1, 3])with col1:if st.button("进入编辑模式" if not st.session_state.edit_mode_state else "退出编辑模式"):st.session_state.edit_mode_state = not st.session_state.edit_mode_state# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Trueif st.session_state.edit_mode_state:# 编辑表单col1, col2 = st.columns(2)with col1:customer_name = st.text_input("客户名称 *", value=order['customer_name'], key="edit_customer_name")customer_phone = st.text_input("联系电话", value=order.get('customer_phone', ''), key="edit_customer_phone")with col2:order_date = st.date_input("订单日期", value=datetime.datetime.strptime(order['order_date'], '%Y-%m-%d').date(),key="edit_order_date")customer_address = st.text_input("客户地址", value=order.get('customer_address', ''), key="edit_customer_address")notes = st.text_area("备注", value=order.get('notes', ''), key="edit_notes")st.subheader("商品明细")# 初始化编辑商品列表if f'edit_items_{order_id}' not in st.session_state:st.session_state[f'edit_items_{order_id}'] = normalize_items(order['items'])edit_items = st.session_state[f'edit_items_{order_id}']# 添加新商品功能st.markdown("**添加商品**")# 获取商品列表products = db.get_products()product_options = ["手动输入商品"] + [f"{p['name']} ({p['unit']}) - ¥{p['reference_price']:.2f}" for p in products]col1, col2 = st.columns([2, 1])with col1:selected_product_option = st.selectbox("选择商品",product_options,key=f"edit_selected_product_option_{order_id}",help="从商品库选择商品将自动填充名称和参考价格")# 根据选择显示不同的输入方式if selected_product_option == "手动输入商品":col1, col2, col3, col4 = st.columns([3, 2, 2, 1])with col1:edit_product_name = st.text_input("商品名称", key=f"edit_new_product_name_{order_id}", placeholder="输入商品名称")with col2:edit_quantity = st.number_input("数量", key=f"edit_new_quantity_{order_id}", min_value=0.01, step=0.01)with col3:edit_unit_price = st.number_input("单价", key=f"edit_new_unit_price_{order_id}", min_value=0.0, step=0.01, format="%.2f")with col4:st.write("")st.write("")if st.button("添加", key=f"edit_add_item_{order_id}", type="primary"):if edit_product_name and edit_quantity > 0 and edit_unit_price > 0:amount = edit_quantity * edit_unit_priceedit_items.append({'product_name': edit_product_name,'quantity': edit_quantity,'unit_price': edit_unit_price,'amount': amount})st.session_state[f'edit_items_{order_id}'] = edit_items# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Trueelse:# 从商品库选择selected_product_name = selected_product_option.split(" (")[0]selected_product = next((p for p in products if p['name'] == selected_product_name), None)if selected_product:col1, col2, col3, col4 = st.columns([3, 2, 2, 1])with col1:edit_product_name = st.text_input("商品名称", value=selected_product['name'], key=f"edit_new_product_name_{order_id}", disabled=True)with col2:edit_quantity = st.number_input("数量", key=f"edit_new_quantity_{order_id}", min_value=0.01, step=0.01)with col3:edit_unit_price = st.number_input("单价", value=selected_product['reference_price'], key=f"edit_new_unit_price_{order_id}", min_value=0.0, step=0.01, format="%.2f")with col4:st.write("")st.write("")if st.button("添加", key=f"edit_add_item_{order_id}", type="primary"):if edit_product_name and edit_quantity > 0 and edit_unit_price > 0:amount = edit_quantity * edit_unit_priceedit_items.append({'product_name': edit_product_name,'quantity': edit_quantity,'unit_price': edit_unit_price,'amount': amount})st.session_state[f'edit_items_{order_id}'] = edit_items# 使用不同的方式触发重新运行st.session_state.trigger_rerun = True# 显示已添加的商品,支持删除if edit_items:st.write("")st.markdown("**已添加的商品**")for idx, item in enumerate(edit_items):col1, col2, col3, col4, col5, col6 = st.columns([1, 4, 2, 2, 2, 1])with col1:st.write(idx + 1)with col2:st.write(item['product_name'])with col3:st.write(f"{item['quantity']:.2f}")with col4:st.write(f"¥{item['unit_price']:.2f}")with col5:st.write(f"¥{item['amount']:.2f}")with col6:if st.button("删除", key=f"edit_del_{order_id}_{idx}"):edit_items.pop(idx)st.session_state[f'edit_items_{order_id}'] = edit_items# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Trueelse:st.info("请添加商品明细")total = sum(item['amount'] for item in edit_items)st.markdown(f"### 合计金额:¥{total:.2f}")col1, col2 = st.columns(2)with col1:if st.button("保存修改", type="primary", use_container_width=True):if not customer_name:st.error("请填写客户名称!")elif not edit_items:st.error("请至少添加一个商品!")else:try:order_data = {'customer_name': customer_name,'customer_phone': customer_phone,'customer_address': customer_address,'order_date': order_date.strftime('%Y-%m-%d'),'total_amount': total,'notes': notes,'items': edit_items}db.save_customer({'name': customer_name,'phone': customer_phone,'address': customer_address})db.update_order(order_id, order_data)st.success("修改成功!")# 清除编辑状态if f'edit_items_{order_id}' in st.session_state:del st.session_state[f'edit_items_{order_id}']# 使用不同的方式设置编辑模式状态st.session_state.edit_mode_success = True# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Trueexcept Exception as e:st.error(f"修改失败:{str(e)}")with col2:if st.button("取消", use_container_width=True):# 清除编辑状态if f'edit_items_{order_id}' in st.session_state:del st.session_state[f'edit_items_{order_id}']# 使用不同的方式设置编辑模式状态st.session_state.edit_mode_cancel = True# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Trueelse:# 查看模式col1, col2 = st.columns([2, 1])with col1:st.subheader("订单信息")st.write(f"**订单号:** {order['order_number']}")st.write(f"**客户名称:** {order['customer_name']}")st.write(f"**联系电话:** {order.get('customer_phone', '')}")st.write(f"**客户地址:** {order.get('customer_address', '')}")st.write(f"**订单日期:** {order['order_date']}")st.write(f"**创建时间:** {order['created_at']}")if order.get('notes'):st.write(f"**备注:** {order['notes']}")with col2:st.subheader("金额信息")st.metric("总金额", f"¥{order['total_amount']:.2f}")st.subheader("商品明细")view_items = normalize_items(order['items'])df_items = pd.DataFrame(view_items)df_items['序号'] = range(1, len(df_items) + 1)df_items = df_items[['序号', 'product_name', 'quantity', 'unit_price', 'amount']]df_items.columns = ['序号', '商品名称', '数量', '单价', '金额']st.dataframe(df_items, use_container_width=True, hide_index=True)# 打印功能st.divider()st.subheader("打印")order_for_print = order.copy()order_for_print['items'] = view_itemsprint_html = generate_print_html(order_for_print)st.download_button(label="下载打印文件(HTML)",data=print_html,file_name=f"销售单_{order['order_number']}.html",mime="text/html")# 在浏览器中显示打印预览st.components.v1.html(print_html, height=800, scrolling=True)else:st.info("暂无销售单记录")elif page == "客户管理":st.header("客户管理")customers = db.get_customers()st.session_state.available_customers = customerscol_form, col_list = st.columns([1, 2])with col_form:st.subheader("客户信息")form_mode = st.session_state.customer_form_modeform_title = "编辑客户" if form_mode == 'edit' else "新增客户"st.markdown(f"**{form_title}**")with st.form("customer_form"):st.text_input("客户名称 *", key="customer_form_name")st.text_input("联系电话", key="customer_form_phone")st.text_area("客户地址", key="customer_form_address", height=80)submitted = st.form_submit_button("保存客户", type="primary", use_container_width=True)if submitted:name = st.session_state.customer_form_name.strip()phone = st.session_state.customer_form_phone.strip()address = st.session_state.customer_form_address.strip()if not name:st.error("请填写客户名称!")else:try:customer_data = {'name': name,'phone': phone,'address': address}if form_mode == 'edit' and st.session_state.customer_edit_id:db.update_customer(st.session_state.customer_edit_id, customer_data)st.success("客户更新成功!")else:db.create_customer(customer_data)st.success("客户创建成功!")reset_customer_form()st.session_state.selected_customer_option = '新增客户'# 触发客户数据刷新并重新渲染页面st.rerun()except Exception as e:st.error(f"操作失败:{str(e)}")if st.session_state.customer_form_mode == 'edit':if st.button("取消编辑", use_container_width=True):reset_customer_form()# 触发客户数据刷新并重新渲染页面st.rerun()with col_list:st.subheader("客户列表")if customers:header_cols = st.columns([2, 2, 3, 2, 1, 1])header_cols[0].markdown("**客户名称**")header_cols[1].markdown("**联系电话**")header_cols[2].markdown("**客户地址**")header_cols[3].markdown("**最近使用时间**")header_cols[4].markdown("**编辑**")header_cols[5].markdown("**删除**")for customer in customers:cols = st.columns([2, 2, 3, 2, 1, 1])with cols[0]:st.write(customer['name'])with cols[1]:st.write(customer.get('phone', ''))with cols[2]:st.write(customer.get('address', ''))with cols[3]:st.write(customer.get('last_used', ''))with cols[4]:if st.button("编辑", key=f"edit_customer_{customer['id']}"):schedule_customer_edit(customer)# 触发客户数据刷新并重新渲染页面st.rerun()with cols[5]:if st.button("删除", key=f"delete_customer_{customer['id']}"):try:db.delete_customer(customer['id'])st.success("删除成功!")if st.session_state.selected_customer_option == customer['name']:st.session_state.selected_customer_option = '新增客户'reset_customer_form()# 触发客户数据刷新并重新渲染页面st.rerun()except Exception as e:st.error(f"删除失败:{str(e)}")else:st.info("暂无客户记录")elif page == "商品管理":st.header("商品管理")# 在页面渲染前检查是否需要刷新数据if st.session_state.get('refresh_products', False):st.session_state.refresh_products = Falseproducts = db.get_products()else:products = db.get_products()col_form, col_list = st.columns([1, 2])with col_form:st.subheader("商品信息")# 商品表单状态管理if 'product_form_mode' not in st.session_state:st.session_state.product_form_mode = 'create'if 'product_edit_id' not in st.session_state:st.session_state.product_edit_id = Noneif 'product_form_name' not in st.session_state:st.session_state.product_form_name = ''if 'product_form_description' not in st.session_state:st.session_state.product_form_description = ''if 'product_form_unit' not in st.session_state:st.session_state.product_form_unit = '个'if 'product_form_reference_price' not in st.session_state:st.session_state.product_form_reference_price = 0.0form_mode = st.session_state.product_form_modeform_title = "编辑商品" if form_mode == 'edit' else "新增商品"st.markdown(f"**{form_title}**")# 使用表单外的widget来避免状态管理问题name = st.text_input("商品名称 *", value=st.session_state.product_form_name, key="product_form_name_input")description = st.text_area("商品描述", value=st.session_state.product_form_description, height=80, key="product_form_description_input")col1, col2 = st.columns(2)with col1:unit = st.selectbox("单位", ["个","项","次", "台", "件", "套", "米", "公斤", "包", "箱"], index=["个", "台", "件", "套", "米", "公斤", "包", "箱"].index(st.session_state.product_form_unit), key="product_form_unit_input")with col2:reference_price = st.number_input("参考价格", min_value=0.0, step=0.01, format="%.2f", value=st.session_state.product_form_reference_price, key="product_form_reference_price_input")if st.button("保存商品", type="primary", use_container_width=True, key="save_product_button"):name = name.strip()description = description.strip()if not name:st.error("请填写商品名称!")else:try:product_data = {'name': name,'description': description,'unit': unit,'reference_price': reference_price}if form_mode == 'edit' and st.session_state.product_edit_id:db.update_product(st.session_state.product_edit_id, product_data)st.success("商品更新成功!")else:db.create_product(product_data)st.success("商品创建成功!")# 重置表单st.session_state.product_form_mode = 'create'st.session_state.product_edit_id = Nonest.session_state.product_form_name = ''st.session_state.product_form_description = ''st.session_state.product_form_unit = '个'st.session_state.product_form_reference_price = 0.0# 触发商品数据刷新并重新渲染页面st.session_state.refresh_products = Truest.rerun()except Exception as e:st.error(f"操作失败:{str(e)}")if st.session_state.product_form_mode == 'edit':if st.button("取消编辑", use_container_width=True):st.session_state.product_form_mode = 'create'st.session_state.product_edit_id = Nonest.session_state.product_form_name = ''st.session_state.product_form_description = ''st.session_state.product_form_unit = '个'st.session_state.product_form_reference_price = 0.0# 触发商品数据刷新并重新渲染页面st.session_state.refresh_products = Truest.rerun()with col_list:st.subheader("商品列表")# 搜索功能search_term = st.text_input("搜索商品名称或描述", "")if search_term:filtered_products = db.search_products(search_term)else:filtered_products = productsif filtered_products:header_cols = st.columns([2, 2, 1, 1, 1, 1])header_cols[0].markdown("**商品名称**")header_cols[1].markdown("**商品描述**")header_cols[2].markdown("**单位**")header_cols[3].markdown("**参考价格**")header_cols[4].markdown("**编辑**")header_cols[5].markdown("**删除**")for product in filtered_products:cols = st.columns([2, 2, 1, 1, 1, 1])with cols[0]:st.write(product['name'])with cols[1]:st.write(product.get('description', '')[:50] + '...' if len(product.get('description', '')) > 50 else product.get('description', ''))with cols[2]:st.write(product.get('unit', '个'))with cols[3]:st.write(f"¥{product.get('reference_price', 0.0):.2f}")with cols[4]:if st.button("编辑", key=f"edit_product_{product['id']}"):st.session_state.product_form_mode = 'edit'st.session_state.product_edit_id = product['id']st.session_state.product_form_name = product['name']st.session_state.product_form_description = product.get('description', '')st.session_state.product_form_unit = product.get('unit', '个')st.session_state.product_form_reference_price = product.get('reference_price', 0.0)# 触发商品数据刷新并重新渲染页面st.session_state.refresh_products = Truest.rerun()with cols[5]:if st.button("删除", key=f"delete_product_{product['id']}"):try:db.delete_product(product['id'])st.success("删除成功!")# 触发商品数据刷新并重新渲染页面st.session_state.refresh_products = Truest.rerun()except Exception as e:st.error(f"删除失败:{str(e)}")else:st.info("暂无商品记录")elif page == "数据管理":st.header("📊 数据管理")# 添加数据管理相关的session stateif 'data_export_type' not in st.session_state:st.session_state.data_export_type = '销售单'if 'data_import_type' not in st.session_state:st.session_state.data_import_type = '销售单'tab1, tab2 = st.tabs(["📤 数据导出", "📥 数据导入"])with tab1:st.subheader("数据导出")col1, col2 = st.columns([1, 2])with col1:st.write("**导出设置**")export_type = st.selectbox("选择导出数据类型",["销售单", "客户", "商品"],key="data_export_type")# 映射显示名称到数据库类型type_mapping = {"销售单": "orders","客户": "customers", "商品": "products"}export_format = st.selectbox("导出格式",["Excel (.xlsx)", "CSV (.csv)", "JSON (.json)"],index=0)if st.button("开始导出", type="primary", use_container_width=True):try:# 获取数据data_type = type_mapping[export_type]data = db.export_data(data_type)if not data:st.warning(f"没有找到{export_type}数据")st.stop()# 转换为DataFramedf = pd.DataFrame(data)# 根据选择的格式处理数据if export_format == "Excel (.xlsx)":# 创建Excel文件import iobuffer = io.BytesIO()with pd.ExcelWriter(buffer, engine='openpyxl') as writer:df.to_excel(writer, sheet_name=export_type, index=False)# 提供下载st.download_button(label="下载Excel文件",data=buffer.getvalue(),file_name=f"{export_type}_导出_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx",mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",use_container_width=True)elif export_format == "CSV (.csv)":# 提供CSV下载csv_data = df.to_csv(index=False, encoding='utf-8-sig')st.download_button(label="下载CSV文件",data=csv_data,file_name=f"{export_type}_导出_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",mime="text/csv",use_container_width=True)elif export_format == "JSON (.json)":# 提供JSON下载json_data = df.to_json(orient='records', force_ascii=False, indent=2)st.download_button(label="下载JSON文件",data=json_data,file_name=f"{export_type}_导出_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.json",mime="application/json",use_container_width=True)st.success(f"成功导出 {len(data)} 条{export_type}记录")except Exception as e:st.error(f"导出失败:{str(e)}")with col2:st.write("**数据预览**")try:# 重新定义type_mapping,确保变量作用域正确type_mapping = {"销售单": "orders","客户": "customers", "商品": "products"}data_type = type_mapping[export_type]data = db.export_data(data_type)if data:df_preview = pd.DataFrame(data)st.dataframe(df_preview.head(10), use_container_width=True)st.write(f"总计: {len(data)} 条记录")else:st.info("暂无数据")except Exception as e:st.error(f"预览失败:{str(e)}")with tab2:st.subheader("数据导入")col1, col2 = st.columns([1, 2])with col1:st.write("**导入设置**")import_type = st.selectbox("选择导入数据类型",["销售单", "客户", "商品"],key="data_import_type")# 映射显示名称到数据库类型type_mapping = {"销售单": "orders","客户": "customers", "商品": "products"}import_format = st.selectbox("导入文件格式",["Excel (.xlsx)", "CSV (.csv)", "JSON (.json)"],index=0)uploaded_file = st.file_uploader(f"选择{import_type}文件",type=["xlsx", "csv", "json"],help="请确保文件格式与选择的导入格式一致")# 模板下载功能st.write("**模板下载**")template_col1, template_col2, template_col3 = st.columns(3)with template_col1:if st.button("📥 销售单模板", use_container_width=True):download_template("销售单", import_format)with template_col2:if st.button("📥 客户模板", use_container_width=True):download_template("客户", import_format)with template_col3:if st.button("📥 商品模板", use_container_width=True):download_template("商品", import_format)if uploaded_file is not None:st.write(f"已选择文件: {uploaded_file.name}")# 导入选项import_mode = st.radio("导入模式",["新增模式(只添加新记录)", "更新模式(更新已存在记录)", "覆盖模式(清空后重新导入)"],index=0)if st.button("开始导入", type="primary", use_container_width=True):try:data_type = type_mapping[import_type]# 读取文件数据if uploaded_file.name.endswith('.xlsx'):df = pd.read_excel(uploaded_file)elif uploaded_file.name.endswith('.csv'):df = pd.read_csv(uploaded_file)elif uploaded_file.name.endswith('.json'):df = pd.read_json(uploaded_file)else:st.error("不支持的文件格式")st.stop()# 转换为字典列表data = df.to_dict('records')if not data:st.warning("文件中没有数据")st.stop()# 执行导入result = db.import_data(data_type, data)# 显示导入结果st.success(f"导入完成!成功: {result['success_count']} 条,失败: {result['error_count']} 条")if result['errors']:st.warning("部分记录导入失败:")for error in result['errors'][:5]: # 只显示前5个错误st.write(f"- {error}")if len(result['errors']) > 5:st.write(f"... 还有 {len(result['errors']) - 5} 个错误")# 使用不同的方式触发重新运行st.session_state.trigger_rerun = Trueexcept Exception as e:st.error(f"导入失败:{str(e)}")with col2:st.write("**导入说明**")st.info("""**导入文件要求:****销售单导入字段:**- order_number: 订单号(必填)- customer_name: 客户名称(必填)- customer_phone: 联系电话- customer_address: 客户地址- order_date: 订单日期(格式:YYYY-MM-DD)- total_amount: 总金额(必填)- notes: 备注- created_at: 创建时间(格式:YYYY-MM-DD HH:MM:SS)**客户导入字段:**- name: 客户名称(必填)- phone: 联系电话- address: 客户地址- last_used: 最近使用时间(格式:YYYY-MM-DD HH:MM:SS)**商品导入字段:**- name: 商品名称(必填)- description: 商品描述- unit: 单位(默认:个)- reference_price: 参考价格- created_at: 创建时间(格式:YYYY-MM-DD HH:MM:SS)- updated_at: 更新时间(格式:YYYY-MM-DD HH:MM:SS)**注意:**- 必填字段不能为空- 日期时间格式必须正确- 重复的记录会根据导入模式进行处理""")elif page == "应收对账管理":st.header("📊 应收对账管理")# 筛选条件col1, col2, col3 = st.columns([1, 1, 1])with col1:period_type = st.selectbox("统计周期",["按月统计", "按年统计"],key="receivable_period_type")with col2:if period_type == "按月统计":selected_month = st.date_input("选择月份",value=datetime.date.today(),key="receivable_selected_month")else:selected_year = st.number_input("选择年份",min_value=2020,max_value=2030,value=datetime.date.today().year,key="receivable_selected_year")with col3:customer_filter = st.text_input("客户名称筛选", "", key="receivable_customer_filter")# 获取对账数据if period_type == "按月统计":year = selected_month.yearmonth = selected_month.monthsummary_data = db.get_receivable_summary('month', year, month)details_data = db.get_receivable_details(f"{year:04d}-{month:02d}", 'month')period_label = f"{year}年{month}月"else:year = selected_yearsummary_data = db.get_receivable_summary('year', year)details_data = db.get_receivable_details(str(year), 'year')period_label = f"{year}年"# 客户筛选if customer_filter:# 汇总数据不包含客户名称,只筛选明细数据details_data = [item for item in details_data if customer_filter.lower() in item['customer_name'].lower()]# 显示汇总统计st.subheader(f"📈 {period_label} 应收对账汇总")if summary_data:# 计算总统计total_orders = sum(item['order_count'] for item in summary_data)total_amount = sum(item['total_amount'] for item in summary_data)total_customers = sum(item['customer_count'] for item in summary_data)# 显示统计卡片col1, col2, col3 = st.columns(3)with col1:st.metric("总订单数", total_orders)with col2:st.metric("总金额", f"¥{total_amount:.2f}")with col3:st.metric("客户数", total_customers)# 显示汇总表格st.write("**汇总统计**")# 准备汇总表格数据summary_df = pd.DataFrame(summary_data)summary_df = summary_df[['period', 'order_count', 'total_amount', 'customer_count']]summary_df.columns = ['统计周期', '订单数量', '总金额', '客户数量']summary_df['总金额'] = summary_df['总金额'].apply(lambda x: f"¥{x:.2f}")# 显示表格st.dataframe(summary_df, use_container_width=True)# 导出汇总数据col1, col2 = st.columns([1, 1])with col1:if st.button("📥 导出汇总数据", use_container_width=True):csv_data = summary_df.to_csv(index=False, encoding='utf-8-sig')st.download_button(label="下载CSV文件",data=csv_data,file_name=f"应收对账汇总_{period_label}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",mime="text/csv",use_container_width=True)else:st.info(f"{period_label} 暂无应收对账数据")# 显示商品明细st.subheader(f"📋 {period_label} 商品明细")if details_data:# 准备明细表格数据details_df = pd.DataFrame(details_data)details_df = details_df[['order_number', 'customer_name', 'product_name', 'quantity', 'unit_price', 'amount', 'order_date']]details_df.columns = ['订单号', '客户名称', '商品名称', '数量', '单价', '金额', '订单日期']details_df['单价'] = details_df['单价'].apply(lambda x: f"¥{x:.2f}")details_df['金额'] = details_df['金额'].apply(lambda x: f"¥{x:.2f}")# 显示表格st.dataframe(details_df, use_container_width=True)# 导出明细数据col1, col2 = st.columns([1, 1])with col1:if st.button("📥 导出明细数据", use_container_width=True):csv_data = details_df.to_csv(index=False, encoding='utf-8-sig')st.download_button(label="下载CSV文件",data=csv_data,file_name=f"应收对账明细_{period_label}_{datetime.datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",mime="text/csv",use_container_width=True)# 商品销售排行st.subheader("🏆 商品销售排行")# 按商品统计product_stats = details_df.groupby('商品名称').agg({'数量': 'sum','金额': lambda x: sum(float(amount.replace('¥', '').replace(',', '')) for amount in x)}).reset_index()product_stats = product_stats.sort_values('金额', ascending=False)# 显示商品排行col1, col2 = st.columns(2)with col1:st.write("**按销售额排行**")for idx, row in product_stats.head(10).iterrows():st.write(f"{idx+1}. {row['商品名称']} - ¥{row['金额']:.2f}")with col2:st.write("**按销售量排行**")product_stats_quantity = product_stats.sort_values('数量', ascending=False)for idx, row in product_stats_quantity.head(10).iterrows():st.write(f"{idx+1}. {row['商品名称']} - {row['数量']:.2f}件")else:st.info(f"{period_label} 暂无商品明细数据")
database.py 代码:
import sqlite3
import datetime
from typing import List, Dict, Optionalclass Database:def __init__(self, db_name: str = "sales.db"):self.db_name = db_nameself.init_database()def get_connection(self):"""获取数据库连接"""return sqlite3.connect(self.db_name)def init_database(self):"""初始化数据库表"""conn = self.get_connection()cursor = conn.cursor()# 创建销售单表cursor.execute('''CREATE TABLE IF NOT EXISTS sales_orders (id INTEGER PRIMARY KEY AUTOINCREMENT,order_number TEXT UNIQUE NOT NULL,customer_name TEXT NOT NULL,customer_phone TEXT,customer_address TEXT,order_date TEXT NOT NULL,total_amount REAL NOT NULL,notes TEXT,created_at TEXT NOT NULL)''')# 创建客户表cursor.execute('''CREATE TABLE IF NOT EXISTS customers (id INTEGER PRIMARY KEY AUTOINCREMENT,name TEXT UNIQUE NOT NULL,phone TEXT,address TEXT,last_used TEXT NOT NULL)''')# 创建商品表cursor.execute('''CREATE TABLE IF NOT EXISTS products (id INTEGER PRIMARY KEY AUTOINCREMENT,name TEXT UNIQUE NOT NULL,description TEXT,unit TEXT DEFAULT '个',reference_price REAL DEFAULT 0.0,created_at TEXT NOT NULL,updated_at TEXT NOT NULL)''')# 创建销售单明细表cursor.execute('''CREATE TABLE IF NOT EXISTS sales_order_items (id INTEGER PRIMARY KEY AUTOINCREMENT,order_id INTEGER NOT NULL,product_name TEXT NOT NULL,quantity REAL NOT NULL,unit_price REAL NOT NULL,amount REAL NOT NULL,FOREIGN KEY (order_id) REFERENCES sales_orders(id) ON DELETE CASCADE)''')conn.commit()conn.close()def create_order(self, order_data: Dict) -> int:"""创建销售单"""conn = self.get_connection()cursor = conn.cursor()try:# 生成订单号order_number = self.generate_order_number()order_date = order_data.get('order_date', datetime.datetime.now().strftime('%Y-%m-%d'))created_at = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')# 插入主订单cursor.execute('''INSERT INTO sales_orders (order_number, customer_name, customer_phone, customer_address, order_date, total_amount, notes, created_at)VALUES (?, ?, ?, ?, ?, ?, ?, ?)''', (order_number,order_data['customer_name'],order_data.get('customer_phone', ''),order_data.get('customer_address', ''),order_date,order_data['total_amount'],order_data.get('notes', ''),created_at))order_id = cursor.lastrowid# 插入订单明细items = order_data.get('items', [])for item in items:cursor.execute('''INSERT INTO sales_order_items (order_id, product_name, quantity, unit_price, amount)VALUES (?, ?, ?, ?, ?)''', (order_id,item['product_name'],item['quantity'],item['unit_price'],item['amount']))conn.commit()return order_idexcept Exception as e:conn.rollback()raise efinally:conn.close()def save_customer(self, customer_data: Dict) -> None:"""保存或更新客户信息"""if not customer_data.get('name'):returnconn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()try:timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')cursor.execute('SELECT id FROM customers WHERE name = ?', (customer_data['name'],))existing = cursor.fetchone()if existing:cursor.execute('''UPDATE customersSET phone = ?, address = ?, last_used = ?WHERE id = ?''', (customer_data.get('phone', ''),customer_data.get('address', ''),timestamp,existing['id']))else:cursor.execute('''INSERT INTO customers (name, phone, address, last_used)VALUES (?, ?, ?, ?)''', (customer_data['name'],customer_data.get('phone', ''),customer_data.get('address', ''),timestamp))conn.commit()except Exception as e:conn.rollback()raise efinally:conn.close()def get_order(self, order_id: int) -> Optional[Dict]:"""获取单个销售单"""conn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()cursor.execute('SELECT * FROM sales_orders WHERE id = ?', (order_id,))order = cursor.fetchone()if order:order_dict = dict(order)# 获取订单明细cursor.execute('SELECT * FROM sales_order_items WHERE order_id = ?', (order_id,))items = [dict(row) for row in cursor.fetchall()]order_dict['items'] = itemsconn.close()return order_dictconn.close()return Nonedef get_all_orders(self) -> List[Dict]:"""获取所有销售单"""conn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()cursor.execute('SELECT * FROM sales_orders ORDER BY created_at DESC')orders = [dict(row) for row in cursor.fetchall()]conn.close()return ordersdef get_customers(self) -> List[Dict]:"""获取所有客户"""conn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()cursor.execute('''SELECT id, name, phone, address, last_usedFROM customersORDER BY last_used DESC, name COLLATE NOCASE ASC''')customers = [dict(row) for row in cursor.fetchall()]conn.close()return customersdef get_customer(self, customer_id: int) -> Optional[Dict]:"""根据ID获取客户"""conn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()cursor.execute('''SELECT id, name, phone, address, last_usedFROM customersWHERE id = ?''', (customer_id,))row = cursor.fetchone()conn.close()return dict(row) if row else Nonedef create_customer(self, customer_data: Dict) -> int:"""创建客户"""conn = self.get_connection()cursor = conn.cursor()try:timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')cursor.execute('''INSERT INTO customers (name, phone, address, last_used)VALUES (?, ?, ?, ?)''', (customer_data['name'],customer_data.get('phone', ''),customer_data.get('address', ''),timestamp))conn.commit()return cursor.lastrowidexcept Exception as e:conn.rollback()raise efinally:conn.close()def update_customer(self, customer_id: int, customer_data: Dict) -> bool:"""更新客户信息"""conn = self.get_connection()cursor = conn.cursor()try:timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')cursor.execute('''UPDATE customersSET name = ?, phone = ?, address = ?, last_used = ?WHERE id = ?''', (customer_data['name'],customer_data.get('phone', ''),customer_data.get('address', ''),timestamp,customer_id))conn.commit()return cursor.rowcount > 0except Exception as e:conn.rollback()raise efinally:conn.close()def delete_customer(self, customer_id: int) -> bool:"""删除客户"""conn = self.get_connection()cursor = conn.cursor()try:cursor.execute('DELETE FROM customers WHERE id = ?', (customer_id,))conn.commit()return cursor.rowcount > 0except Exception as e:conn.rollback()raise efinally:conn.close()def update_order(self, order_id: int, order_data: Dict) -> bool:"""更新销售单"""conn = self.get_connection()cursor = conn.cursor()try:# 更新主订单cursor.execute('''UPDATE sales_orders SET customer_name = ?, customer_phone = ?, customer_address = ?,order_date = ?, total_amount = ?, notes = ?WHERE id = ?''', (order_data['customer_name'],order_data.get('customer_phone', ''),order_data.get('customer_address', ''),order_data.get('order_date', datetime.datetime.now().strftime('%Y-%m-%d')),order_data['total_amount'],order_data.get('notes', ''),order_id))# 删除旧明细cursor.execute('DELETE FROM sales_order_items WHERE order_id = ?', (order_id,))# 插入新明细items = order_data.get('items', [])for item in items:cursor.execute('''INSERT INTO sales_order_items (order_id, product_name, quantity, unit_price, amount)VALUES (?, ?, ?, ?, ?)''', (order_id,item['product_name'],item['quantity'],item['unit_price'],item['amount']))conn.commit()return Trueexcept Exception as e:conn.rollback()raise efinally:conn.close()def delete_order(self, order_id: int) -> bool:"""删除销售单"""conn = self.get_connection()cursor = conn.cursor()try:cursor.execute('DELETE FROM sales_orders WHERE id = ?', (order_id,))conn.commit()return Trueexcept Exception as e:conn.rollback()raise efinally:conn.close()def generate_order_number(self) -> str:"""生成订单号"""today = datetime.datetime.now().strftime('%Y%m%d')conn = self.get_connection()cursor = conn.cursor()cursor.execute('''SELECT COUNT(*) FROM sales_orders WHERE order_number LIKE ?''', (f'{today}%',))count = cursor.fetchone()[0]conn.close()sequence = str(count + 1).zfill(4)return f'{today}{sequence}'# 商品管理方法def get_products(self) -> List[Dict]:"""获取所有商品"""conn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()cursor.execute('''SELECT id, name, description, unit, reference_price, created_at, updated_atFROM productsORDER BY name COLLATE NOCASE ASC''')products = [dict(row) for row in cursor.fetchall()]conn.close()return productsdef get_product(self, product_id: int) -> Optional[Dict]:"""根据ID获取商品"""conn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()cursor.execute('''SELECT id, name, description, unit, reference_price, created_at, updated_atFROM productsWHERE id = ?''', (product_id,))row = cursor.fetchone()conn.close()return dict(row) if row else Nonedef create_product(self, product_data: Dict) -> int:"""创建商品"""conn = self.get_connection()cursor = conn.cursor()try:timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')cursor.execute('''INSERT INTO products (name, description, unit, reference_price, created_at, updated_at)VALUES (?, ?, ?, ?, ?, ?)''', (product_data['name'],product_data.get('description', ''),product_data.get('unit', '个'),product_data.get('reference_price', 0.0),timestamp,timestamp))conn.commit()return cursor.lastrowidexcept Exception as e:conn.rollback()raise efinally:conn.close()def update_product(self, product_id: int, product_data: Dict) -> bool:"""更新商品信息"""conn = self.get_connection()cursor = conn.cursor()try:timestamp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')cursor.execute('''UPDATE productsSET name = ?, description = ?, unit = ?, reference_price = ?, updated_at = ?WHERE id = ?''', (product_data['name'],product_data.get('description', ''),product_data.get('unit', '个'),product_data.get('reference_price', 0.0),timestamp,product_id))conn.commit()return cursor.rowcount > 0except Exception as e:conn.rollback()raise efinally:conn.close()def delete_product(self, product_id: int) -> bool:"""删除商品"""conn = self.get_connection()cursor = conn.cursor()try:cursor.execute('DELETE FROM products WHERE id = ?', (product_id,))conn.commit()return cursor.rowcount > 0except Exception as e:conn.rollback()raise efinally:conn.close()def search_products(self, keyword: str) -> List[Dict]:"""搜索商品"""conn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()cursor.execute('''SELECT id, name, description, unit, reference_price, created_at, updated_atFROM productsWHERE name LIKE ? OR description LIKE ?ORDER BY name COLLATE NOCASE ASC''', (f'%{keyword}%', f'%{keyword}%'))products = [dict(row) for row in cursor.fetchall()]conn.close()return products# 数据导出方法def export_data(self, data_type: str) -> List[Dict]:"""导出数据Args:data_type: 数据类型,可选 'orders', 'customers', 'products'Returns:数据列表"""if data_type == 'orders':return self._export_orders()elif data_type == 'customers':return self._export_customers()elif data_type == 'products':return self._export_products()else:raise ValueError(f"不支持的数据类型: {data_type}")def _export_orders(self) -> List[Dict]:"""导出销售单数据"""conn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()# 获取所有销售单cursor.execute('''SELECT so.*, GROUP_CONCAT(soi.product_name || '|' || soi.quantity || '|' || soi.unit_price || '|' || soi.amount, ';') as itemsFROM sales_orders soLEFT JOIN sales_order_items soi ON so.id = soi.order_idGROUP BY so.idORDER BY so.created_at DESC''')orders = []for row in cursor.fetchall():order_dict = dict(row)# 处理明细项items_str = order_dict.get('items', '')order_items = []if items_str and items_str != 'None': # 确保items_str不为None或'None'for item_str in items_str.split(';'):if item_str and item_str != 'None': # 确保item_str不为None或'None'parts = item_str.split('|')if len(parts) == 4:order_items.append({'product_name': parts[0],'quantity': float(parts[1]),'unit_price': float(parts[2]),'amount': float(parts[3])})# 移除原始的items字段(避免SQLite的GROUP_CONCAT结果干扰)if 'items' in order_dict:del order_dict['items']# 添加处理后的明细项order_dict['items'] = order_itemsorders.append(order_dict)conn.close()return ordersdef _export_customers(self) -> List[Dict]:"""导出客户数据"""return self.get_customers()def _export_products(self) -> List[Dict]:"""导出商品数据"""return self.get_products()# 数据导入方法def import_data(self, data_type: str, data: List[Dict]) -> Dict:"""导入数据Args:data_type: 数据类型,可选 'orders', 'customers', 'products'data: 要导入的数据列表Returns:导入结果统计"""if data_type == 'orders':return self._import_orders(data)elif data_type == 'customers':return self._import_customers(data)elif data_type == 'products':return self._import_products(data)else:raise ValueError(f"不支持的数据类型: {data_type}")def _import_orders(self, data: List[Dict]) -> Dict:"""导入销售单数据"""conn = self.get_connection()cursor = conn.cursor()success_count = 0error_count = 0errors = []try:for order_data in data:try:# 检查订单号是否已存在cursor.execute('SELECT id FROM sales_orders WHERE order_number = ?', (order_data.get('order_number'),))existing = cursor.fetchone()if existing:# 更新现有订单order_id = existing[0]cursor.execute('''UPDATE sales_orders SET customer_name=?, customer_phone=?, customer_address=?,order_date=?, total_amount=?, notes=?WHERE id=?''', (order_data.get('customer_name', ''),order_data.get('customer_phone', ''),order_data.get('customer_address', ''),order_data.get('order_date', ''),order_data.get('total_amount', 0.0),order_data.get('notes', ''),order_id))# 删除旧明细cursor.execute('DELETE FROM sales_order_items WHERE order_id = ?', (order_id,))else:# 插入新订单cursor.execute('''INSERT INTO sales_orders (order_number, customer_name, customer_phone, customer_address, order_date, total_amount, notes, created_at)VALUES (?, ?, ?, ?, ?, ?, ?, ?)''', (order_data.get('order_number', ''),order_data.get('customer_name', ''),order_data.get('customer_phone', ''),order_data.get('customer_address', ''),order_data.get('order_date', ''),order_data.get('total_amount', 0.0),order_data.get('notes', ''),order_data.get('created_at', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))))order_id = cursor.lastrowid# 插入明细项items = order_data.get('items', [])# 处理items字段可能被序列化为字符串的情况if isinstance(items, str):try:# 尝试解析JSON字符串import jsonitems = json.loads(items)except Exception as json_error:# 如果JSON解析失败,尝试其他格式处理try:# 可能是Python字面量格式,使用eval(安全方式)import astitems = ast.literal_eval(items)except:# 如果所有解析都失败,设置为空列表items = []# 确保items是列表类型if not isinstance(items, list):items = []for item in items:# 确保item是字典类型if isinstance(item, dict):cursor.execute('''INSERT INTO sales_order_items (order_id, product_name, quantity, unit_price, amount)VALUES (?, ?, ?, ?, ?)''', (order_id,item.get('product_name', ''),item.get('quantity', 0.0),item.get('unit_price', 0.0),item.get('amount', 0.0)))else:# 如果item不是字典,记录警告但继续处理print(f"警告:订单 {order_data.get('order_number', '未知')} 的明细项格式不正确: {item}")success_count += 1except Exception as e:error_count += 1errors.append(f"订单 {order_data.get('order_number', '未知')}: {str(e)}")conn.commit()except Exception as e:conn.rollback()errors.append(f"导入过程错误: {str(e)}")finally:conn.close()return {'success_count': success_count,'error_count': error_count,'errors': errors}def _import_customers(self, data: List[Dict]) -> Dict:"""导入客户数据"""conn = self.get_connection()cursor = conn.cursor()success_count = 0error_count = 0errors = []try:for customer_data in data:try:# 检查客户是否已存在cursor.execute('SELECT id FROM customers WHERE name = ?', (customer_data.get('name'),))existing = cursor.fetchone()if existing:# 更新现有客户cursor.execute('''UPDATE customersSET phone=?, address=?, last_used=?WHERE id=?''', (customer_data.get('phone', ''),customer_data.get('address', ''),customer_data.get('last_used', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')),existing[0]))else:# 插入新客户cursor.execute('''INSERT INTO customers (name, phone, address, last_used)VALUES (?, ?, ?, ?)''', (customer_data.get('name', ''),customer_data.get('phone', ''),customer_data.get('address', ''),customer_data.get('last_used', datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))))success_count += 1except Exception as e:error_count += 1errors.append(f"客户 {customer_data.get('name', '未知')}: {str(e)}")conn.commit()except Exception as e:conn.rollback()errors.append(f"导入过程错误: {str(e)}")finally:conn.close()return {'success_count': success_count,'error_count': error_count,'errors': errors}def _import_products(self, data: List[Dict]) -> Dict:"""导入商品数据"""conn = self.get_connection()cursor = conn.cursor()success_count = 0error_count = 0errors = []try:for product_data in data:try:# 检查商品是否已存在cursor.execute('SELECT id FROM products WHERE name = ?', (product_data.get('name'),))existing = cursor.fetchone()if existing:# 更新现有商品cursor.execute('''UPDATE productsSET description=?, unit=?, reference_price=?, updated_at=?WHERE id=?''', (product_data.get('description', ''),product_data.get('unit', '个'),product_data.get('reference_price', 0.0),datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),existing[0]))else:# 插入新商品cursor.execute('''INSERT INTO products (name, description, unit, reference_price, created_at, updated_at)VALUES (?, ?, ?, ?, ?, ?)''', (product_data.get('name', ''),product_data.get('description', ''),product_data.get('unit', '个'),product_data.get('reference_price', 0.0),datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))success_count += 1except Exception as e:error_count += 1errors.append(f"商品 {product_data.get('name', '未知')}: {str(e)}")conn.commit()except Exception as e:conn.rollback()errors.append(f"导入过程错误: {str(e)}")finally:conn.close()return {'success_count': success_count,'error_count': error_count,'errors': errors}# 应收对账管理方法def get_receivable_summary(self, period_type: str = 'month', year: int = None, month: int = None) -> List[Dict]:"""获取应收对账汇总数据Args:period_type: 统计周期类型,'month'按月统计,'year'按年统计year: 指定年份,如果为None则使用当前年份month: 指定月份(仅当period_type='month'时有效),如果为None则统计全年Returns:应收对账汇总数据列表"""if year is None:year = datetime.datetime.now().yearconn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()if period_type == 'month':if month is None:# 按月份统计全年数据cursor.execute('''SELECT strftime('%Y-%m', order_date) as period,COUNT(*) as order_count,SUM(total_amount) as total_amount,COUNT(DISTINCT customer_name) as customer_countFROM sales_orders WHERE strftime('%Y', order_date) = ?GROUP BY strftime('%Y-%m', order_date)ORDER BY period DESC''', (str(year),))else:# 统计指定月份的数据cursor.execute('''SELECT order_date as period,COUNT(*) as order_count,SUM(total_amount) as total_amount,COUNT(DISTINCT customer_name) as customer_countFROM sales_orders WHERE strftime('%Y-%m', order_date) = ?GROUP BY order_dateORDER BY order_date DESC''', (f"{year:04d}-{month:02d}",))else: # period_type == 'year'cursor.execute('''SELECT strftime('%Y', order_date) as period,COUNT(*) as order_count,SUM(total_amount) as total_amount,COUNT(DISTINCT customer_name) as customer_countFROM sales_orders GROUP BY strftime('%Y', order_date)ORDER BY period DESC''')summary_data = []for row in cursor.fetchall():summary_data.append(dict(row))conn.close()return summary_datadef get_receivable_details(self, period: str, period_type: str = 'month') -> List[Dict]:"""获取应收对账明细数据Args:period: 统计周期,格式为'YYYY-MM'(月份)或'YYYY'(年份)period_type: 统计周期类型,'month'按月统计,'year'按年统计Returns:应收对账明细数据列表"""conn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()if period_type == 'month':# 获取指定月份的销售单明细cursor.execute('''SELECT so.order_number,so.customer_name,so.customer_phone,so.order_date,so.total_amount,soi.product_name,soi.quantity,soi.unit_price,soi.amountFROM sales_orders soJOIN sales_order_items soi ON so.id = soi.order_idWHERE strftime('%Y-%m', so.order_date) = ?ORDER BY so.order_date DESC, so.order_number''', (period,))else: # period_type == 'year'# 获取指定年份的销售单明细cursor.execute('''SELECT so.order_number,so.customer_name,so.customer_phone,so.order_date,so.total_amount,soi.product_name,soi.quantity,soi.unit_price,soi.amountFROM sales_orders soJOIN sales_order_items soi ON so.id = soi.order_idWHERE strftime('%Y', so.order_date) = ?ORDER BY so.order_date DESC, so.order_number''', (period,))details_data = []for row in cursor.fetchall():details_data.append(dict(row))conn.close()return details_datadef get_customer_receivable_summary(self, period: str, period_type: str = 'month') -> List[Dict]:"""获取客户应收对账汇总Args:period: 统计周期,格式为'YYYY-MM'(月份)或'YYYY'(年份)period_type: 统计周期类型,'month'按月统计,'year'按年统计Returns:客户应收对账汇总数据列表"""conn = self.get_connection()conn.row_factory = sqlite3.Rowcursor = conn.cursor()if period_type == 'month':cursor.execute('''SELECT customer_name,COUNT(*) as order_count,SUM(total_amount) as total_amount,MIN(order_date) as first_order_date,MAX(order_date) as last_order_dateFROM sales_orders WHERE strftime('%Y-%m', order_date) = ?GROUP BY customer_nameORDER BY total_amount DESC''', (period,))else: # period_type == 'year'cursor.execute('''SELECT customer_name,COUNT(*) as order_count,SUM(total_amount) as total_amount,MIN(order_date) as first_order_date,MAX(order_date) as last_order_dateFROM sales_orders WHERE strftime('%Y', order_date) = ?GROUP BY customer_nameORDER BY total_amount DESC''', (period,))customer_summary = []for row in cursor.fetchall():customer_summary.append(dict(row))conn.close()return customer_summary
