智能AI医疗物资/耗材管理系统升级改造方案分析
基于AI技术的智能物资管理系统为各级医疗机构(包括三甲医院、社区诊所、药房等)提供了一套完整的数字化管理解决方案。系统通过物联网传感器实时监控库存状态,结合机器学习算法分析历史消耗数据、季节性因素和突发公共卫生事件影响,可提前90天预测物资需求波动,使库存周转率提升40%以上。
系统具备以下核心功能:
- 智能预警机制:当一次性医用口罩、防护服等关键物资低于安全库存阈值时,自动触发采购流程,并通过多供应商比价模块推荐最优采购方案
- 效期管理系统:采用"先进先出"原则,对近效期药品和耗材进行分级预警,避免物资过期造成的浪费
- 应急调配平台:在突发公共卫生事件期间,可实时显示区域内各医疗机构的物资储备情况,支持一键发起跨机构调拨
应用场景示例:
- 某三甲医院在2025年流感季前,系统根据门诊量增长趋势预测到输液器需求将增加35%,提前完成备货避免了供应中断
- 某社区卫生服务中心通过效期管理功能,将过期药品损耗率从8%降至2%以下
系统功能说明
这个智能AI医疗物资管理系统包含以下核心功能:
1. 库存管理
- 实时监控所有医疗物资库存状态
- 分类显示库存状态(紧急、不足、正常、充足)
- 过期状态管理(已过期、即将过期、正常)
- 详细的库存数据表格展示
2. 使用分析
- 按科室统计物资使用量
- 物资使用量排名(TOP 10)
- 物资使用趋势分析(按周)
3. AI预测与采购建议
- 基于机器学习(随机森林)的需求预测
- 14天使用量预测
- 智能采购建议(根据库存和预测)
- 库存可维持天数计算
4. 预警系统
- 侧边栏实时显示过期物资警告
- 库存不足物资预警
- 过期状态分类显示
5. 系统设置
- 库存阈值配置
- 过期预警天数设置
- 数据导入/导出功能
- 系统信息查看
技术特点
- 用户界面:使用Streamlit构建直观的Web界面
- 数据可视化:使用Plotly创建交互式图表
- 机器学习:使用Scikit-learn的随机森林算法进行需求预测
- 数据管理:模拟生成医疗物资数据,支持数据导入导出
- 预警系统:实时监控库存和过期状态并提供警告
使用方法
- 安装依赖库:
pip install streamlit pandas numpy plotly scikit-learn
- 运行应用:
streamlit run medical_inventory_system.py
- 在浏览器中访问应用(默认地址:http://localhost:8501)
系统会自动生成模拟数据,您可以通过侧边栏筛选器查看不同类别的物资,并在各标签页中查看库存状态、使用分析和预测结果。
这个系统为医疗机构的物资管理提供了智能化解决方案,帮助管理人员优化库存、减少浪费、确保关键医疗物资的供应。
import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime, timedelta
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error
import random
import hashlib
import base64# 设置页面
st.set_page_config(page_title="智能AI医疗物资管理系统",page_icon="🏥",layout="wide",initial_sidebar_state="expanded"
)# 初始化会话状态
if 'authenticated' not in st.session_state:st.session_state.authenticated = Falseif 'username' not in st.session_state:st.session_state.username = None# 模拟用户数据库
users = {"admin": hashlib.sha256("admin123".encode()).hexdigest(),"doctor": hashlib.sha256("doctor123".encode()).hexdigest(),"nurse": hashlib.sha256("nurse123".encode()).hexdigest(),"pharmacist": hashlib.sha256("pharmacist123".encode()).hexdigest()
}# 用户权限
permissions = {"admin": ["view", "edit", "manage", "settings"],"doctor": ["view", "request"],"nurse": ["view", "request"],"pharmacist": ["view", "edit", "manage"]
}# 登录页面
def login_page():st.title("🏥 智能AI医疗物资管理系统")st.subheader("用户登录")username = st.text_input("用户名")password = st.text_input("密码", type="password")if st.button("登录"):if username in users and users[username] == hashlib.sha256(password.encode()).hexdigest():st.session_state.authenticated = Truest.session_state.username = usernamest.success("登录成功!正在进入系统...")st.experimental_rerun()else:st.error("用户名或密码错误")# 主应用
def main_app():# 标题和说明st.title(f"🏥 智能AI医疗物资/耗材管理系统")st.markdown(f"欢迎回来,{st.session_state.username}!")st.markdown("""<div style="background-color:#f0f2f6;padding:10px;border-radius:10px;margin-bottom:20px;"><h3 style="color:#1e3a8a;">系统功能:</h3><ul><li>实时库存监控与预警</li><li>物资过期自动提醒</li><li>AI驱动的需求预测</li><li>智能采购建议</li><li>物资使用分析与可视化</li><li>物资出入库管理</li></ul></div>""", unsafe_allow_html=True)# 模拟数据生成@st.cache_datadef generate_data():# 医疗物资类别categories = ['防护用品', '注射器械', '消毒用品', '手术器械', '诊断试剂', '医用敷料', '药品', '一次性用品']# 具体物资名称items = {'防护用品': ['N95口罩', '防护服', '护目镜', '手套', '面屏'],'注射器械': ['注射器', '输液器', '针头', '输液袋', '输液管'],'消毒用品': ['酒精', '碘伏', '手消液', '消毒湿巾', '84消毒液'],'手术器械': ['手术刀', '镊子', '缝合针', '止血钳', '持针器'],'诊断试剂': ['血糖试纸', '新冠检测试剂', '血常规试剂', '尿检试纸', 'PCR试剂'],'医用敷料': ['纱布', '绷带', '棉签', '创可贴', '医用胶带'],'药品': ['阿司匹林', '胰岛素', '抗生素', '止痛药', '降压药'],'一次性用品': ['医用帽', '鞋套', '床单', '尿杯', '采血管']}# 生成物资数据medical_items = []for cat in categories:for item in items[cat]:medical_items.append({'物资ID': f"{cat[:2]}{random.randint(1000,9999)}",'物资名称': item,'类别': cat,'单位': random.choice(['个', '盒', '瓶', '包', '箱']),'安全库存': random.randint(20, 100),'当前库存': random.randint(0, 200),'最近进货日期': (datetime.now() - timedelta(days=random.randint(1, 60))).strftime('%Y-%m-%d'),'过期日期': (datetime.now() + timedelta(days=random.randint(30, 730))).strftime('%Y-%m-%d'),'供应商': random.choice(['康泰医疗', '强生医疗', '美敦力', '3M医疗', '国药集团', '稳健医疗']),'单价': round(random.uniform(1, 100), 2),'总价值': 0 # 初始化为0,后面计算})# 生成使用记录usage_records = []for _ in range(500):item = random.choice(medical_items)usage_date = datetime.now() - timedelta(days=random.randint(1, 90))usage_records.append({'日期': usage_date.strftime('%Y-%m-%d'),'物资ID': item['物资ID'],'物资名称': item['物资名称'],'类别': item['类别'],'使用量': random.randint(1, 20),'使用科室': random.choice(['急诊科', '内科', '外科', '儿科', '妇产科', 'ICU', '手术室']),'操作人': random.choice(list(users.keys()))})# 生成出入库记录transaction_records = []for item in medical_items:# 初始入库记录initial_quantity = random.randint(50, 150)transaction_records.append({'日期': (datetime.now() - timedelta(days=random.randint(30, 90))).strftime('%Y-%m-%d'),'物资ID': item['物资ID'],'物资名称': item['物资名称'],'类别': item['类别'],'操作类型': '入库','数量': initial_quantity,'单价': item['单价'],'总金额': initial_quantity * item['单价'],'供应商': item['供应商'],'操作人': 'system','备注': '系统初始化'})# 随机添加一些出入库记录for _ in range(random.randint(1, 10)):is_in = random.random() > 0.5quantity = random.randint(1, 30)transaction_date = (datetime.now() - timedelta(days=random.randint(1, 30))).strftime('%Y-%m-%d')transaction_records.append({'日期': transaction_date,'物资ID': item['物资ID'],'物资名称': item['物资名称'],'类别': item['类别'],'操作类型': '入库' if is_in else '出库','数量': quantity,'单价': item['单价'] if is_in else 0, # 出库不需要单价'总金额': quantity * item['单价'] if is_in else 0,'供应商': item['供应商'] if is_in else '','操作人': random.choice(list(users.keys())),'备注': '采购入库' if is_in else '科室领用'})# 计算总价值for item in medical_items:item['总价值'] = item['当前库存'] * item['单价']return pd.DataFrame(medical_items), pd.DataFrame(usage_records), pd.DataFrame(transaction_records)# 加载数据inventory_df, usage_df, transaction_df = generate_data()# 添加库存状态列def get_inventory_status(row):if row['当前库存'] <= row['安全库存'] * 0.3:return '紧急'elif row['当前库存'] <= row['安全库存']:return '不足'elif row['当前库存'] <= row['安全库存'] * 1.5:return '正常'else:return '充足'inventory_df['库存状态'] = inventory_df.apply(get_inventory_status, axis=1)# 添加过期状态def get_expiry_status(row):expiry_date = datetime.strptime(row['过期日期'], '%Y-%m-%d')days_to_expire = (expiry_date - datetime.now()).daysif days_to_expire < 0:return '已过期'elif days_to_expire < 30:return '即将过期'else:return '正常'inventory_df['过期状态'] = inventory_df.apply(get_expiry_status, axis=1)# 侧边栏 - 筛选器st.sidebar.header("🔍 筛选选项")category_filter = st.sidebar.multiselect("选择物资类别",options=inventory_df['类别'].unique(),default=inventory_df['类别'].unique())status_filter = st.sidebar.multiselect("选择库存状态",options=inventory_df['库存状态'].unique(),default=inventory_df['库存状态'].unique())expiry_filter = st.sidebar.multiselect("选择过期状态",options=inventory_df['过期状态'].unique(),default=inventory_df['过期状态'].unique())# 应用筛选filtered_df = inventory_df[(inventory_df['类别'].isin(category_filter)) &(inventory_df['库存状态'].isin(status_filter)) &(inventory_df['过期状态'].isin(expiry_filter))]# 主界面布局tabs = ["📦 库存概览", "📊 使用分析", "🔮 智能预测", "📈 物资管理", "⚙️ 系统设置"]# 根据用户权限显示不同的标签页if st.session_state.username in permissions:user_permissions = permissions[st.session_state.username]if "manage" not in user_permissions:tabs.remove("📈 物资管理")if "settings" not in user_permissions:tabs.remove("⚙️ 系统设置")tab1, tab2, tab3, *other_tabs = st.tabs(tabs)with tab1:# 库存概览st.subheader("医疗物资库存概览")# KPI 指标col1, col2, col3, col4 = st.columns(4)col1.metric("物资总数", len(filtered_df))col2.metric("库存不足物资", len(filtered_df[filtered_df['库存状态'] == '不足']))col3.metric("库存紧急物资", len(filtered_df[filtered_df['库存状态'] == '紧急']))col4.metric("即将过期物资", len(filtered_df[filtered_df['过期状态'] == '即将过期']))# 库存价值total_value = filtered_df['总价值'].sum()st.metric("库存总价值", f"¥{total_value:.2f}")# 库存状态分布st.subheader("库存状态分布")fig1 = px.pie(filtered_df, names='库存状态',color='库存状态',color_discrete_map={'紧急': 'red','不足': 'orange','正常': 'green','充足': 'blue'})st.plotly_chart(fig1, use_container_width=True)# 过期状态分布st.subheader("过期状态分布")fig2 = px.pie(filtered_df, names='过期状态',color='过期状态',color_discrete_map={'已过期': 'red','即将过期': 'orange','正常': 'green'})st.plotly_chart(fig2, use_container_width=True)# 库存明细表st.subheader("库存明细")st.dataframe(filtered_df[['物资ID', '物资名称', '类别', '单位', '安全库存', '当前库存', '库存状态', '过期日期', '过期状态', '供应商', '单价', '总价值']], height=400,column_config={"当前库存": st.column_config.ProgressColumn("当前库存",help="当前库存水平",format="%f",min_value=0,max_value=200,),"总价值": st.column_config.NumberColumn("总价值",format="¥%f",)})# 导出数据按钮if st.button("导出库存数据"):csv = filtered_df.to_csv(sep='\t', na_rep='nan')b64 = base64.b64encode(csv.encode()).decode()href = f'<a href="data:file/csv;base64,{b64}" download="inventory_data.csv">点击下载数据</a>'st.markdown(href, unsafe_allow_html=True)with tab2:# 使用分析st.subheader("物资使用分析")# 按类别使用量st.subheader("各科室物资使用量")usage_by_dept = usage_df.groupby('使用科室')['使用量'].sum().reset_index()fig3 = px.bar(usage_by_dept,x='使用科室',y='使用量',color='使用科室')st.plotly_chart(fig3, use_container_width=True)# 按物资使用量st.subheader("物资使用量TOP 10")usage_by_item = usage_df.groupby('物资名称')['使用量'].sum().reset_index().sort_values('使用量', ascending=False).head(10)fig4 = px.bar(usage_by_item,x='物资名称',y='使用量',color='物资名称')st.plotly_chart(fig4, use_container_width=True)# 使用趋势分析st.subheader("物资使用趋势")usage_df['日期'] = pd.to_datetime(usage_df['日期'])usage_df['周'] = usage_df['日期'].dt.isocalendar().weekusage_trend = usage_df.groupby(['周', '类别'])['使用量'].sum().reset_index()fig5 = px.line(usage_trend,x='周',y='使用量',color='类别',title="每周物资使用趋势")st.plotly_chart(fig5, use_container_width=True)# 各科室使用物资类别分布st.subheader("各科室使用物资类别分布")dept_category = usage_df.groupby(['使用科室', '类别'])['使用量'].sum().reset_index()fig7 = px.bar(dept_category,x='使用科室',y='使用量',color='类别',barmode='stack')st.plotly_chart(fig7, use_container_width=True)with tab3:# 智能预测st.subheader("AI驱动的需求预测")# 选择预测物资selected_item = st.selectbox("选择要预测的物资", inventory_df['物资名称'].unique())# 获取选定物资的数据item_data = usage_df[usage_df['物资名称'] == selected_item].copy()item_data['日期'] = pd.to_datetime(item_data['日期'])if len(item_data) > 10:# 准备数据item_data = item_data.groupby('日期')['使用量'].sum().reset_index()item_data = item_data.set_index('日期').asfreq('D').fillna(0).reset_index()# 添加时间特征item_data['day_of_week'] = item_data['日期'].dt.dayofweekitem_data['day_of_month'] = item_data['日期'].dt.dayitem_data['month'] = item_data['日期'].dt.monthitem_data['is_weekend'] = item_data['day_of_week'].isin([5, 6]).astype(int)# 添加假期特征(简化版)holidays = [# 这里可以添加更多假期日期pd.to_datetime('2025-01-01'), # 元旦pd.to_datetime('2025-02-10'), # 春节pd.to_datetime('2025-04-05'), # 清明节pd.to_datetime('2025-05-01'), # 劳动节pd.to_datetime('2025-06-07'), # 端午节pd.to_datetime('2025-09-15'), # 中秋节pd.to_datetime('2025-10-01'), # 国庆节]item_data['is_holiday'] = item_data['日期'].isin(holidays).astype(int)# 添加滞后特征for i in range(1, 8): # 添加前7天的使用量作为特征item_data[f'lag_{i}'] = item_data['使用量'].shift(i)item_data = item_data.dropna() # 删除包含NaN的行# 分割数据X = item_data[['day_of_week', 'day_of_month', 'month', 'is_weekend', 'is_holiday'] + [f'lag_{i}' for i in range(1, 8)]]y = item_data['使用量']X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)# 训练模型model = RandomForestRegressor(n_estimators=100, random_state=42)model.fit(X_train, y_train)# 预测y_pred = model.predict(X_test)# 评估mae = mean_absolute_error(y_test, y_pred)# 创建预测日期范围last_date = item_data['日期'].max()future_dates = pd.date_range(start=last_date + timedelta(days=1), periods=30) # 预测未来30天# 准备未来数据future_data = pd.DataFrame({'日期': future_dates,'day_of_week': future_dates.dayofweek,'day_of_month': future_dates.day,'month': future_dates.month,'is_weekend': future_dates.dayofweek.isin([5, 6]).astype(int),'is_holiday': future_dates.isin(holidays).astype(int)})# 为了预测未来值,我们需要前7天的实际/预测值# 这里我们使用最后7天的实际值作为初始值last_7_days = item_data['使用量'].tail(7).values# 逐步预测未来30天的值future_pred = []for i in range(len(future_dates)):# 创建当前预测所需的特征features = [future_data.iloc[i]['day_of_week'],future_data.iloc[i]['day_of_month'],future_data.iloc[i]['month'],future_data.iloc[i]['is_weekend'],future_data.iloc[i]['is_holiday']]features.extend(last_7_days)# 预测当前天的值pred = model.predict([features])[0]future_pred.append(pred)# 更新最后7天的值(将最新预测添加到末尾,并移除最旧的值)last_7_days = np.append(last_7_days[1:], pred)# 创建结果DataFrametest_dates = item_data['日期'].iloc[-len(y_test):]results = pd.DataFrame({'日期': test_dates,'实际使用量': y_test,'预测使用量': y_pred})future_results = pd.DataFrame({'日期': future_dates,'预测使用量': future_pred})# 显示评估结果st.metric("预测平均绝对误差", f"{mae:.2f}")# 显示预测图表fig6 = go.Figure()fig6.add_trace(go.Scatter(x=results['日期'],y=results['实际使用量'],mode='lines+markers',name='实际使用量'))fig6.add_trace(go.Scatter(x=results['日期'],y=results['预测使用量'],mode='lines+markers',name='预测使用量'))fig6.add_trace(go.Scatter(x=future_results['日期'],y=future_results['预测使用量'],mode='lines+markers',name='未来预测',line=dict(dash='dash')))fig6.update_layout(title=f"{selected_item} 使用量预测",xaxis_title="日期",yaxis_title="使用量",legend_title="图例")st.plotly_chart(fig6, use_container_width=True)# 计算平均每日使用量avg_daily_usage = item_data['使用量'].mean()# 获取当前库存current_stock = inventory_df.loc[inventory_df['物资名称'] == selected_item, '当前库存'].values[0]# 计算库存可维持天数if avg_daily_usage > 0:days_left = current_stock / avg_daily_usageelse:days_left = float('inf')# 生成采购建议st.subheader("智能采购建议")if days_left < 7:st.error(f"⚠️ 紧急采购建议:{selected_item}库存仅能维持{days_left:.1f}天,请立即采购!")elif days_left < 14:st.warning(f"⚠️ 采购建议:{selected_item}库存仅能维持{days_left:.1f}天,建议尽快采购")else:st.success(f"✅ {selected_item}库存充足,可维持{days_left:.1f}天")# 计算建议采购量safety_stock = inventory_df.loc[inventory_df['物资名称'] == selected_item, '安全库存'].values[0]# 基于未来30天的预测使用量计算建议采购量total_predicted_usage = future_results['预测使用量'].sum()suggested_order = max(0, total_predicted_usage * 1.2 + safety_stock - current_stock) # 增加20%的缓冲st.info(f"建议采购量:{suggested_order:.0f} {inventory_df.loc[inventory_df['物资名称'] == selected_item, '单位'].values[0]}")# 预测图表 - 月度汇总future_results['month'] = future_results['日期'].dt.month_name()monthly_prediction = future_results.groupby('month')['预测使用量'].sum().reset_index()fig8 = px.bar(monthly_prediction,x='month',y='预测使用量',title=f"{selected_item} 月度使用量预测")st.plotly_chart(fig8, use_container_width=True)else:st.warning("该物资使用数据不足,无法进行预测")if "📈 物资管理" in tabs:with other_tabs[0]: # 物资管理st.subheader("物资管理")# 物资操作选项卡manage_tabs = st.tabs(["入库管理", "出库管理", "出入库记录", "物资信息管理"])with manage_tabs[0]: # 入库管理st.header("物资入库")# 选择要入库的物资入库_物资 = st.selectbox("选择物资", inventory_df['物资名称'].unique())# 获取物资信息物资_info = inventory_df[inventory_df['物资名称'] == 入库_物资].iloc[0]# 显示当前库存st.write(f"当前库存: {物资_info['当前库存']} {物资_info['单位']}")# 入库数量入库数量 = st.number_input("入库数量", min_value=1, value=10)# 入库日期入库日期 = st.date_input("入库日期", datetime.now()).strftime('%Y-%m-%d')# 供应商供应商 = st.selectbox("供应商", inventory_df['供应商'].unique())# 单价单价 = st.number_input("单价", min_value=0.01, value=物资_info['单价'], step=0.01)# 备注备注 = st.text_input("备注", "采购入库")# 入库按钮if st.button("确认入库"):# 更新库存inventory_df.loc[inventory_df['物资名称'] == 入库_物资, '当前库存'] += 入库数量inventory_df.loc[inventory_df['物资名称'] == 入库_物资, '最近进货日期'] = 入库日期inventory_df.loc[inventory_df['物资名称'] == 入库_物资, '单价'] = 单价inventory_df.loc[inventory_df['物资名称'] == 入库_物资, '总价值'] = inventory_df.loc[inventory_df['物资名称'] == 入库_物资, '当前库存'] * 单价# 添加入库记录new_transaction = pd.DataFrame({'日期': [入库日期],'物资ID': [物资_info['物资ID']],'物资名称': [入库_物资],'类别': [物资_info['类别']],'操作类型': ['入库'],'数量': [入库数量],'单价': [单价],'总金额': [入库数量 * 单价],'供应商': [供应商],'操作人': [st.session_state.username],'备注': [备注]})transaction_df = pd.concat([transaction_df, new_transaction], ignore_index=True)st.success(f"成功入库 {入库数量} {物资_info['单位']} {入库_物资}")st.balloons()with manage_tabs[1]: # 出库管理st.header("物资出库")# 选择要出库的物资出库_物资 = st.selectbox("选择物资", inventory_df['物资名称'].unique())# 获取物资信息物资_info = inventory_df[inventory_df['物资名称'] == 出库_物资].iloc[0]# 显示当前库存st.write(f"当前库存: {物资_info['当前库存']} {物资_info['单位']}")# 出库数量出库数量 = st.number_input("出库数量", min_value=1, value=1, max_value=int(物资_info['当前库存']))# 出库日期出库日期 = st.date_input("出库日期", datetime.now()).strftime('%Y-%m-%d')# 使用科室使用科室 = st.selectbox("使用科室", usage_df['使用科室'].unique())# 备注备注 = st.text_input("备注", "科室领用")# 出库按钮if st.button("确认出库"):# 更新库存inventory_df.loc[inventory_df['物资名称'] == 出库_物资, '当前库存'] -= 出库数量inventory_df.loc[inventory_df['物资名称'] == 出库_物资, '总价值'] = inventory_df.loc[inventory_df['物资名称'] == 出库_物资, '当前库存'] * 物资_info['单价']# 添加出库记录new_transaction = pd.DataFrame({'日期': [出库日期],'物资ID': [物资_info['物资ID']],'物资名称': [出库_物资],'类别': [物资_info['类别']],'操作类型': ['出库'],'数量': [出库数量],'单价': [0], # 出库不需要单价'总金额': [0],'供应商': [''],'操作人': [st.session_state.username],'备注': [备注]})transaction_df = pd.concat([transaction_df, new_transaction], ignore_index=True)# 添加使用记录new_usage = pd.DataFrame({'日期': [出库日期],'物资ID': [物资_info['物资ID']],'物资名称': [出库_物资],'类别': [物资_info['类别']],'使用量': [出库数量],'使用科室': [使用科室],'操作人': [st.session_state.username]})usage_df = pd.concat([usage_df, new_usage], ignore_index=True)st.success(f"成功出库 {出库数量} {物资_info['单位']} {出库_物资}")st.balloons()with manage_tabs[2]: # 出入库记录st.header("出入库记录")# 筛选选项col1, col2 = st.columns(2)with col1:记录类型 = st.selectbox("记录类型", ["全部", "入库", "出库"])with col2:日期范围 = st.date_input("日期范围", [datetime.now() - timedelta(days=30), datetime.now()])# 应用筛选filtered_transactions = transaction_df.copy()if 记录类型 != "全部":filtered_transactions = filtered_transactions[filtered_transactions['操作类型'] == 记录类型]if len(日期范围) == 2:start_date = 日期范围[0].strftime('%Y-%m-%d')end_date = 日期范围[1].strftime('%Y-%m-%d')filtered_transactions = filtered_transactions[(filtered_transactions['日期'] >= start_date) & (filtered_transactions['日期'] <= end_date)]# 显示记录st.dataframe(filtered_transactions, height=500)# 导出记录if st.button("导出记录"):csv = filtered_transactions.to_csv(sep='\t', na_rep='nan')b64 = base64.b64encode(csv.encode()).decode()href = f'<a href="data:file/csv;base64,{b64}" download="transaction_records.csv">点击下载记录</a>'st.markdown(href, unsafe_allow_html=True)with manage_tabs[3]: # 物资信息管理st.header("物资信息管理")# 选择要管理的物资管理_物资 = st.selectbox("选择物资", inventory_df['物资名称'].unique())# 获取物资信息物资_info = inventory_df[inventory_df['物资名称'] == 管理_物资].iloc[0]# 显示当前信息st.subheader("当前信息")col1, col2 = st.columns(2)with col1:st.write(f"**物资ID**: {物资_info['物资ID']}")st.write(f"**类别**: {物资_info['类别']}")st.write(f"**单位**: {物资_info['单位']}")st.write(f"**安全库存**: {物资_info['安全库存']}")with col2:st.write(f"**当前库存**: {物资_info['当前库存']}")st.write(f"**最近进货日期**: {物资_info['最近进货日期']}")st.write(f"**过期日期**: {物资_info['过期日期']}")st.write(f"**供应商**: {物资_info['供应商']}")# 修改信息st.subheader("修改信息")# 安全库存新安全库存 = st.number_input("安全库存", min_value=0, value=int(物资_info['安全库存']))# 供应商新供应商 = st.selectbox("供应商", inventory_df['供应商'].unique(), index=list(inventory_df['供应商'].unique()).index(物资_info['供应商']))# 过期日期新过期日期 = st.date_input("过期日期", datetime.strptime(物资_info['过期日期'], '%Y-%m-%d')).strftime('%Y-%m-%d')# 单价新单价 = st.number_input("单价", min_value=0.01, value=float(物资_info['单价']), step=0.01)# 备注修改备注 = st.text_input("修改备注", "系统更新")# 修改按钮if st.button("确认修改"):# 更新物资信息inventory_df.loc[inventory_df['物资名称'] == 管理_物资, '安全库存'] = 新安全库存inventory_df.loc[inventory_df['物资名称'] == 管理_物资, '供应商'] = 新供应商inventory_df.loc[inventory_df['物资名称'] == 管理_物资, '过期日期'] = 新过期日期inventory_df.loc[inventory_df['物资名称'] == 管理_物资, '单价'] = 新单价inventory_df.loc[inventory_df['物资名称'] == 管理_物资, '总价值'] = inventory_df.loc[inventory_df['物资名称'] == 管理_物资, '当前库存'] * 新单价# 更新库存状态inventory_df['库存状态'] = inventory_df.apply(get_inventory_status, axis=1)inventory_df['过期状态'] = inventory_df.apply(get_expiry_status, axis=1)st.success(f"成功更新 {管理_物资} 的信息")st.balloons()if "⚙️ 系统设置" in tabs:with other_tabs[-1]: # 系统设置st.subheader("系统配置")with st.expander("库存阈值设置"):st.write("设置库存预警阈值")# 获取当前设置current_low_threshold = 70current_critical_threshold = 30low_threshold = st.slider("库存不足阈值(%)", 30, 100, current_low_threshold)critical_threshold = st.slider("库存紧急阈值(%)", 0, 50, current_critical_threshold)if st.button("保存库存阈值设置"):# 这里可以添加保存设置的逻辑st.success(f"已更新库存阈值设置:不足({low_threshold}%),紧急({critical_threshold}%)")with st.expander("过期预警设置"):st.write("设置过期预警天数")# 获取当前设置current_expiry_warning = 30expiry_warning = st.slider("即将过期预警(天)", 1, 90, current_expiry_warning)if st.button("保存过期预警设置"):# 这里可以添加保存设置的逻辑st.success(f"已更新过期预警设置:{expiry_warning}天")with st.expander("数据管理"):st.write("导入/导出数据")col1, col2 = st.columns(2)with col1:uploaded_file = st.file_uploader("上传库存数据(CSV格式)")if uploaded_file is not None:try:new_inventory = pd.read_csv(uploaded_file)# 这里可以添加数据合并的逻辑st.success("文件上传成功!数据已合并")except Exception as e:st.error(f"文件上传失败:{str(e)}")with col2:st.write("导出数据")if st.button("导出库存数据"):csv = inventory_df.to_csv(sep='\t', na_rep='nan')b64 = base64.b64encode(csv.encode()).decode()href = f'<a href="data:file/csv;base64,{b64}" download="inventory_data.csv">点击下载库存数据</a>'st.markdown(href, unsafe_allow_html=True)if st.button("导出使用记录"):csv = usage_df.to_csv(sep='\t', na_rep='nan')b64 = base64.b64encode(csv.encode()).decode()href = f'<a href="data:file/csv;base64,{b64}" download="usage_records.csv">点击下载使用记录</a>'st.markdown(href, unsafe_allow_html=True)with st.expander("用户管理"):st.write("添加/管理用户")# 仅管理员可以管理用户if st.session_state.username == "admin":new_username = st.text_input("新用户名")new_password = st.text_input("新密码", type="password")new_role = st.selectbox("用户角色", ["doctor", "nurse", "pharmacist", "admin"])if st.button("添加用户"):if new_username and new_password:# 这里可以添加用户添加的逻辑st.success(f"成功添加用户:{new_username},角色:{new_role}")else:st.error("用户名和密码不能为空")# 显示当前用户列表st.write("当前用户列表")user_list = pd.DataFrame({'用户名': list(users.keys()),'角色': [permissions.get(user, ["无权限"])[0] for user in users.keys()]})st.dataframe(user_list)with st.expander("系统信息"):st.write("**版本信息**: v4.3.2")st.write("**最后")st.write("**最后更新**: 2025-07-30")st.write("**开发者**: 医疗AI团队")st.write("**技术支持**: tech-support@medai.com")# 过期物资警告expiring_items = inventory_df[inventory_df['过期状态'].isin(['已过期', '即将过期'])]if not expiring_items.empty:st.sidebar.warning("⚠️ 有过期风险物资")for _, row in expiring_items.iterrows():days_left = (datetime.strptime(row['过期日期'], '%Y-%m-%d') - datetime.now()).daysstatus = "已过期" if days_left < 0 else f"还剩{days_left}天"st.sidebar.error(f"{row['物资名称']} - {status}")# 库存不足警告low_stock_items = inventory_df[inventory_df['库存状态'].isin(['不足', '紧急'])]if not low_stock_items.empty:st.sidebar.warning("⚠️ 有库存不足物资")for _, row in low_stock_items.iterrows():st.sidebar.error(f"{row['物资名称']} - 当前库存: {row['当前库存']} (安全库存: {row['安全库存']})")# 登出按钮if st.sidebar.button("登出"):st.session_state.authenticated = Falsest.session_state.username = Nonest.success("已成功登出")st.experimental_rerun()# 页脚st.markdown("---")st.caption("© 2025 智能医疗物资管理系统 | 基于AI的医疗物资管理解决方案")
运行应用
if name == “main”:
if st.session_state.authenticated:
main_app()
else:
login_page()
“增量升级”方案:
────────────────────
附录一、实时库存同步(对接 HIS/ERP)
────────────────────
-
目标
让系统不再依赖“模拟数据”,而是实时读取 HIS/ERP 的出入库流水,并自动更新库存。 -
关键思路
• 在generate_data()
之前增加一个“数据源选择”开关:- “模拟数据”(默认)
- “HIS/ERP 接口”
• 新增fetch_realtime_inventory()
与fetch_realtime_usage()
两个函数,利用 RESTful API 或数据库直连拉取数据。
• 用 Streamlit 的st.cache_data(ttl=300)
把结果缓存 5 分钟,既减轻源系统压力,又保证页面流畅。
-
代码片段
def fetch_realtime_inventory():# 示例:调用 REST 接口r = requests.get(st.secrets["his"]["base_url"] + "/api/v1/inventory",headers={"Authorization": f"Bearer {st.secrets['his']['token']}"})r.raise_for_status()return pd.DataFrame(r.json())def fetch_realtime_usage():# 示例:直连 SQL Serverconn = pymssql.connect(server=st.secrets["erp"]["host"],user=st.secrets["erp"]["user"],password=st.secrets["erp"]["pwd"],database="ERP")sql = """SELECT CONVERT(date, CreateTime) AS 日期,ItemID AS 物资ID,ItemName AS 物资名称,Category AS 类别,Quantity AS 使用量,Dept AS 使用科室FROM t_OutboundWHERE CreateTime >= DATEADD(day, -90, GETDATE())"""return pd.read_sql(sql, conn)
- 落地注意
• 用st.secrets
保存接口密钥,避免硬编码。
• 若源系统无“过期日期”字段,可在本地维护一张映射表(物资ID→过期日期),再与实时库存merge
。
• 建议把“同步日志”写入侧边栏,异常时给出提示,方便运维。
────────────────────
附录二、移动端 PDA 扫码盘点
────────────────────
-
目标
护士/库管员用手机或 PDA 扫条码即可快速盘点并回写库存。 -
关键思路
• 用 Streamlit 的“camera_input”组件 +pyzbar
/opencv
解析二维码/条形码。
• 扫码后弹窗显示当前库存,可直接输入“盘点数量”,点击“确认”即回写后台(支持写回 HIS 或本地 SQLite)。
• 盘点过程离线缓存盘点单,网络恢复后批量提交。 -
代码片段
st.subheader("PDA 扫码盘点")
img_file = st.camera_input("请对准物资条码")
if img_file:img = Image.open(img_file)barcodes = decode(img)if barcodes:code = barcodes[0].data.decode("utf-8")item = inventory_df[inventory_df["物资ID"] == code]if not item.empty:st.write("当前库存:", item.iloc[0]["当前库存"])new_qty = st.number_input("盘点后库存", min_value=0, value=int(item.iloc[0]["当前库存"]))if st.button("确认更新"):update_inventory(code, new_qty) # 调用接口或写本地st.success("库存已更新")else:st.error("未找到该物资")
- 落地注意
• 手机端部署:可用 Streamlit Cloud + 手机浏览器,或打包成 PWA。
• 条码标准:若院内用 GS1-128,则解析时注意分隔符。
• 并发冲突:盘点时给记录加乐观锁(版本号或时间戳字段)。
────────────────────
附录三、AI 预测再升级:加入“节假日/疫情”外生特征
────────────────────
-
目标
让 AI 预测更准,尤其在春节、疫情爆发等特殊时期。 -
关键思路
• 收集外部特征:- 中国法定节假日(可用
ChineseCalendar
库) - 本地新冠新增病例(卫健委公开接口)
- 气温、湿度(气象局 API)
• 把上述特征拼接到训练集,再训练 RandomForest 或 LightGBM。
• 预测时,未来 14 天的外部特征可简单用“最近 7 天均值”填充,或调用同样接口拉取。
- 中国法定节假日(可用
-
代码片段
from chinese_calendar import is_holidaydef add_external_features(df):df["是否节假日"] = df["日期"].apply(lambda x: int(is_holiday(x)))# 例:获取新增病例df["新增病例"] = fetch_covid_cases(df["日期"])return dfitem_data = add_external_features(item_data)
X = item_data[["day_of_week", "是否节假日", "新增病例", "气温"]]