三维建筑非法入侵情景推演

用Streamlit构建的交互式三维建筑非法入侵情景推演系统。系统使用SARSA强化学习算法来训练智能体在三维建筑拓扑中寻找最优入侵路径。以下是对代码的深度解读:
1. 主要组成部分
1.1 ThreeDBuildingTopology类
负责构建三维建筑拓扑结构,将建筑建模为图结构,其中节点代表房间,边代表连接(如走廊、楼梯)。
初始化参数:总楼层数、每层走廊数、每走廊房间数。
使用networkx的图结构存储拓扑,并为每个节点分配3D坐标(x, y, z)分别表示房间、走廊和楼层。
节点安全配置随机初始化,高层和关键位置安全级别更高。
1.2 BayesianSecurityAssessment类
基于贝叶斯网络计算每个节点的入侵成功率。
定义不同安防措施(摄像头、探测器、门禁、报警器等)存在与否对入侵成功率的影响。
1.3 IntrusionEnvironment3D类
定义三维入侵环境,包括起始点、目标点(顶层中心房间)和状态转移逻辑。
奖励函数考虑楼层惩罚、重复访问惩罚和距离目标奖励。
1.4 SARSAAgent类
实现SARSA强化学习算法,使用Q表存储状态-动作值。
采用ε-greedy策略平衡探索与利用。
1.5 训练函数和可视化函数
train_sarsa_model_3d:训练SARSA智能体。visualize_3d_topology:使用Plotly可视化三维建筑拓扑和入侵路径。
1.6 main函数
使用Streamlit构建Web界面,用户可配置参数、初始化系统、训练模型并查看结果。
2. 工作流程
初始化三维建筑拓扑:根据用户输入的楼层、走廊、房间数生成图结构。
初始化贝叶斯安防评估:定义安防措施对入侵成功率的影响。
初始化三维入侵环境:设置起始点和目标点。
训练SARSA模型:智能体在环境中学习,更新Q表。
生成最优路径:根据训练好的Q表,从起点到目标点生成路径。
可视化:用3D图展示建筑拓扑和入侵路径。
3. 关键算法细节
3.1 三维拓扑构建
节点ID按楼层、走廊、房间的顺序线性分配。
连接包括:同一走廊的房间相连、同一楼层不同走廊通过中间房间相连、楼层间通过楼梯相连。
3.2 奖励函数
到达目标:+1000
楼层惩罚:-0.1 * (最高层-当前层+1) * 入侵成功率(鼓励向上层移动)
重复访问惩罚:-15(避免循环)
距离目标奖励:0.1 * (1 - 当前距离/最大距离) * 入侵成功率(鼓励靠近目标)
3.3 SARSA算法
在每次状态转移后更新Q值:Q(s,a) = Q(s,a) + α [r + γ Q(s',a') - Q(s,a)]
与Q-learning不同,SARSA采用当前策略选择下一个动作a',因此是在线策略算法。
4. 可视化效果
节点颜色表示安全级别(从浅蓝到深蓝,越深越安全),路径用红色高亮。
可交互的三维图形,用户可旋转、缩放。
import streamlit as st
import numpy as np
import networkx as nx
import plotly.graph_objects as go
import plotly.express as px
from typing import Dict, List, Tuple, Set
import mathclass ThreeDBuildingTopology:"""三维建筑拓扑结构建模"""def __init__(self, total_floors: int = 20, corridors_per_floor: int = 4, rooms_per_corridor: int = 3):self.total_floors = total_floorsself.corridors_per_floor = corridors_per_floorself.rooms_per_corridor = rooms_per_corridorself.total_nodes = total_floors * corridors_per_floor * rooms_per_corridorself.graph = nx.Graph()self.node_positions_3d = {} # 3D节点坐标self.node_security = {}self._build_3d_topology()self._initialize_security_measures()def _build_3d_topology(self):"""构建三维建筑拓扑网络"""node_id = 0# 创建三维网格节点for floor in range(self.total_floors):z = floor * 10 # 楼层高度间隔for corridor in range(self.corridors_per_floor):y = corridor * 8 # 走廊间隔for room in range(self.rooms_per_corridor):x = room * 6 # 房间间隔self.graph.add_node(node_id, floor=floor, corridor=corridor, room=room)self.node_positions_3d[node_id] = (x, y, z)node_id += 1# 添加层内连接(同一走廊的房间相连)for floor in range(self.total_floors):for corridor in range(self.corridors_per_floor):for room in range(self.rooms_per_corridor - 1):node_a = self._get_node_id(floor, corridor, room)node_b = self._get_node_id(floor, corridor, room + 1)self.graph.add_edge(node_a, node_b, weight=1.0, type='corridor')# 添加走廊间连接(同一楼层不同走廊)for floor in range(self.total_floors):for corridor in range(self.corridors_per_floor - 1):# 连接相邻走廊的中间房间mid_room = self.rooms_per_corridor // 2node_a = self._get_node_id(floor, corridor, mid_room)node_b = self._get_node_id(floor, corridor + 1, mid_room)self.graph.add_edge(node_a, node_b, weight=1.5, type='hallway')# 添加层间连接(楼梯、电梯)for floor in range(self.total_floors - 1):# 每层选择角落房间作为楼梯间stair_nodes = [self._get_node_id(floor, 0, 0), # 左下角楼梯self._get_node_id(floor, self.corridors_per_floor - 1, 0), # 右下角楼梯self._get_node_id(floor, 0, self.rooms_per_corridor - 1), # 左上角电梯]for stair_node in stair_nodes:next_floor_node = stair_node + (self.corridors_per_floor * self.rooms_per_corridor)if next_floor_node < self.total_nodes:self.graph.add_edge(stair_node, next_floor_node, weight=2.0, type='stair')def _get_node_id(self, floor: int, corridor: int, room: int) -> int:"""根据楼层、走廊、房间计算节点ID"""return (floor * self.corridors_per_floor * self.rooms_per_corridor +corridor * self.rooms_per_corridor + room)def _initialize_security_measures(self):"""初始化节点安防配置"""for node in range(self.total_nodes):measures = {}security_options = ['camera', 'detector', 'access_control', 'alarm']# 根据节点位置设置不同的安全级别floor, corridor, room = self._get_node_coordinates(node)# 高层和关键位置安全级别更高base_prob = 0.3 if floor > self.total_floors // 2 else 0.7for measure in security_options:if np.random.random() > base_prob:measures[measure] = 'present'else:measures[measure] = 'absent'self.node_security[node] = measuresdef _get_node_coordinates(self, node_id: int) -> Tuple[int, int, int]:"""根据节点ID获取楼层、走廊、房间坐标"""floor = node_id // (self.corridors_per_floor * self.rooms_per_corridor)remainder = node_id % (self.corridors_per_floor * self.rooms_per_corridor)corridor = remainder // self.rooms_per_corridorroom = remainder % self.rooms_per_corridorreturn floor, corridor, roomclass BayesianSecurityAssessment:"""基于贝叶斯网络的安防风险评估"""def __init__(self):self.security_measures = {'camera': {'present': 0.3, 'absent': 0.8},'detector': {'present': 0.2, 'absent': 0.85},'access_control': {'present': 0.25, 'absent': 0.75},'alarm': {'present': 0.35, 'absent': 0.9},'patrol': {'present': 0.4, 'absent': 0.95},'guard': {'present': 0.15, 'absent': 0.98}}def calculate_intrusion_probability(self, security_config: Dict) -> float:"""计算节点入侵成功率"""success_prob = 1.0for measure, status in security_config.items():if measure in self.security_measures:prob = self.security_measures[measure].get(status, 0.7)success_prob *= probreturn success_probclass IntrusionEnvironment3D:"""三维非法入侵环境模拟"""def __init__(self, topology: ThreeDBuildingTopology, bayesian_net: BayesianSecurityAssessment):self.topology = topologyself.bayesian_net = bayesian_netself.current_state = Noneself.visited_nodes = set()# 目标设置为顶层中心房间self.target_node = self.topology._get_node_id(topology.total_floors - 1,topology.corridors_per_floor // 2,topology.rooms_per_corridor // 2)# 设置多个入口点self.start_nodes = [self.topology._get_node_id(0, 0, 0), # 左下角入口self.topology._get_node_id(0, topology.corridors_per_floor - 1, 0), # 右下角入口self.topology._get_node_id(0, 0, topology.rooms_per_corridor - 1), # 左上角入口]def reset(self, start_node: int = None) -> int:"""重置环境状态"""if start_node is None:start_node = np.random.choice(self.start_nodes)self.current_state = start_nodeself.visited_nodes = {start_node}return start_nodedef get_available_actions(self, state: int) -> List[int]:"""获取可执行动作(相邻节点)"""return list(self.topology.graph.neighbors(state))def step(self, action: int) -> Tuple[int, float, bool, Dict]:"""执行动作并返回结果"""reward = self._calculate_reward(action)prev_state = self.current_stateself.current_state = actionself.visited_nodes.add(action)done = (action == self.target_node)return action, reward, done, {'previous_state': prev_state,'node_info': self._get_node_info(action)}def _calculate_reward(self, action: int) -> float:"""计算三维环境下的奖励值"""if action == self.target_node:return 1000.0 # 到达目标的奖励# 获取节点属性floor, corridor, room = self.topology._get_node_coordinates(action)max_floor = self.topology.total_floors - 1# 获取入侵概率security_config = self.topology.node_security.get(action, {})p_a = self.bayesian_net.calculate_intrusion_probability(security_config)# 楼层惩罚项floor_penalty = -0.1 * (max_floor - floor + 1) * p_a# 重复访问惩罚revisit_penalty = -15.0 if action in self.visited_nodes else 0.0# 距离目标奖励(鼓励向目标移动)target_x, target_y, target_z = self.topology.node_positions_3d[self.target_node]current_x, current_y, current_z = self.topology.node_positions_3d[action]distance_to_target = math.sqrt((target_x - current_x) ** 2 +(target_y - current_y) ** 2 +(target_z - current_z) ** 2)max_distance = math.sqrt((target_x) ** 2 + (target_y) ** 2 + (target_z) ** 2)distance_reward = 0.1 * (1 - distance_to_target / max_distance) * p_areturn floor_penalty + revisit_penalty + distance_rewarddef _get_node_info(self, node_id: int) -> Dict:"""获取节点详细信息"""floor, corridor, room = self.topology._get_node_coordinates(node_id)return {'floor': floor,'corridor': corridor,'room': room,'security_level': self.bayesian_net.calculate_intrusion_probability(self.topology.node_security.get(node_id, {}))}class SARSAAgent:"""SARSA强化学习智能体(三维版本)"""def __init__(self, state_size: int, action_size: int,learning_rate: float = 0.1, gamma: float = 0.99, epsilon: float = 0.1):self.learning_rate = learning_rateself.gamma = gammaself.epsilon = epsilonself.q_table = np.zeros((state_size, action_size))# 状态-动作映射self.state_to_idx = {}self.idx_to_state = {}self.action_to_idx = {}self.idx_to_action = {}self.next_state_idx = 0self.next_action_idx = 0def _get_state_index(self, state: int) -> int:"""将状态映射为索引"""if state not in self.state_to_idx:self.state_to_idx[state] = self.next_state_idxself.idx_to_state[self.next_state_idx] = stateself.next_state_idx += 1return self.state_to_idx[state]def _get_action_index(self, action: int) -> int:"""将动作映射为索引"""if action not in self.action_to_idx:self.action_to_idx[action] = self.next_action_idxself.idx_to_action[self.next_action_idx] = actionself.next_action_idx += 1return self.action_to_idx[action]def choose_action(self, state: int, available_actions: List[int]) -> int:"""ϵ-greedy策略选择动作"""if np.random.random() < self.epsilon or not available_actions:return np.random.choice(available_actions) if available_actions else state# 选择Q值最大的动作state_idx = self._get_state_index(state)q_values = []for action in available_actions:action_idx = self._get_action_index(action)if state_idx < self.q_table.shape[0] and action_idx < self.q_table.shape[1]:q_values.append(self.q_table[state_idx, action_idx])else:q_values.append(0)if q_values:max_idx = np.argmax(q_values)return available_actions[max_idx]return statedef update(self, state: int, action: int, reward: float,next_state: int, next_action: int, done: bool):"""SARSA算法更新"""state_idx = self._get_state_index(state)action_idx = self._get_action_index(action)next_state_idx = self._get_state_index(next_state)next_action_idx = self._get_action_index(next_action)# 检查索引范围if (state_idx >= self.q_table.shape[0] or action_idx >= self.q_table.shape[1] ornext_state_idx >= self.q_table.shape[0] or next_action_idx >= self.q_table.shape[1]):returncurrent_q = self.q_table[state_idx, action_idx]if done:target = rewardelse:next_q = self.q_table[next_state_idx, next_action_idx]target = reward + self.gamma * next_q# Q值更新self.q_table[state_idx, action_idx] += self.learning_rate * (target - current_q)def get_optimal_path(self, start_state: int, env: IntrusionEnvironment3D) -> List[int]:"""获取最优入侵路径"""current_state = start_statepath = [current_state]visited = set([current_state])for step in range(200): # 防止无限循环available_actions = [a for a in env.get_available_actions(current_state)if a not in visited]if not available_actions or current_state == env.target_node:break# 选择Q值最大的动作state_idx = self._get_state_index(current_state)q_values = []for action in available_actions:action_idx = self._get_action_index(action)if state_idx < self.q_table.shape[0] and action_idx < self.q_table.shape[1]:q_values.append(self.q_table[state_idx, action_idx])else:q_values.append(0)if q_values:best_action = available_actions[np.argmax(q_values)]path.append(best_action)visited.add(best_action)current_state = best_actionelse:breakreturn pathdef train_sarsa_model_3d(env: IntrusionEnvironment3D, episodes: int = 1000) -> Tuple[SARSAAgent, List[Dict]]:"""训练三维环境下的SARSA模型"""agent = SARSAAgent(state_size=env.topology.total_nodes,action_size=env.topology.total_nodes)training_progress = []for episode in range(episodes):state = env.reset()available_actions = env.get_available_actions(state)if not available_actions:available_actions = [state]action = agent.choose_action(state, available_actions)total_reward = 0steps = 0while True:next_state, reward, done, info = env.step(action)total_reward += rewardnext_actions = env.get_available_actions(next_state)if not next_actions:next_actions = [next_state]next_action = agent.choose_action(next_state, next_actions)agent.update(state, action, reward, next_state, next_action, done)state, action = next_state, next_actionsteps += 1if done or steps > 300: # 最大步数限制break# 记录训练进度if episode % 100 == 0:training_progress.append({'episode': episode,'total_reward': total_reward,'steps': steps})return agent, training_progressdef visualize_3d_topology(topology: ThreeDBuildingTopology, bayesian_net: BayesianSecurityAssessment,path: List[int] = None):"""三维可视化建筑拓扑和入侵路径"""# 提取所有节点的3D坐标x_coords, y_coords, z_coords = [], [], []node_colors, node_sizes = [], []for node_id in range(topology.total_nodes):x, y, z = topology.node_positions_3d[node_id]x_coords.append(x)y_coords.append(y)z_coords.append(z)# 根据节点类型设置颜色和大小floor, corridor, room = topology._get_node_coordinates(node_id)# 普通节点为蓝色,路径节点为红色if path and node_id in path:node_colors.append('red')node_sizes.append(8)else:# 根据安全级别设置颜色深浅security_config = topology.node_security.get(node_id, {})security_prob = bayesian_net.calculate_intrusion_probability(security_config)color_intensity = int(255 * (1 - security_prob))node_colors.append(f'rgb({color_intensity}, {color_intensity}, 255)')node_sizes.append(5)# 创建节点散点图node_trace = go.Scatter3d(x=x_coords, y=y_coords, z=z_coords,mode='markers',marker=dict(size=node_sizes,color=node_colors,opacity=0.8),text=[f'节点{i}<br>楼层{topology._get_node_coordinates(i)[0]}<br>安全级别:{bayesian_net.calculate_intrusion_probability(topology.node_security.get(i, {})):.2f}'for i in range(topology.total_nodes)],hoverinfo='text')# 创建边edge_traces = []for edge in topology.graph.edges():x_edge, y_edge, z_edge = [], [], []x1, y1, z1 = topology.node_positions_3d[edge[0]]x2, y2, z2 = topology.node_positions_3d[edge[1]]x_edge.extend([x1, x2, None])y_edge.extend([y1, y2, None])z_edge.extend([z1, z2, None])edge_color = 'gray'if path and edge[0] in path and edge[1] in path and abs(path.index(edge[0]) - path.index(edge[1])) == 1:edge_color = 'red'edge_trace = go.Scatter3d(x=x_edge, y=y_edge, z=z_edge,mode='lines',line=dict(color=edge_color, width=3),hoverinfo='none')edge_traces.append(edge_trace)# 创建路径高亮path_traces = []if path and len(path) > 1:path_x, path_y, path_z = [], [], []for i in range(len(path) - 1):x1, y1, z1 = topology.node_positions_3d[path[i]]x2, y2, z2 = topology.node_positions_3d[path[i + 1]]path_x.extend([x1, x2, None])path_y.extend([y1, y2, None])path_z.extend([z1, z2, None])path_trace = go.Scatter3d(x=path_x, y=path_y, z=path_z,mode='lines',line=dict(color='red', width=6),name='入侵路径')path_traces.append(path_trace)# 组合所有轨迹all_traces = [node_trace] + edge_traces + path_traces# 创建图形fig = go.Figure(data=all_traces)fig.update_layout(title='三维建筑拓扑结构与入侵路径可视化',scene=dict(xaxis_title='X轴 - 房间位置',yaxis_title='Y轴 - 走廊位置',zaxis_title='Z轴 - 楼层高度',aspectmode='data'),width=800,height=600)return figdef main():"""主函数 - 三维版本Streamlit应用"""st.set_page_config(page_title="三维建筑入侵推演系统", layout="wide")st.title("🏢 三维建筑非法入侵情景推演系统")st.markdown("基于强化学习的3D入侵路径预测与安防优化分析")# 侧边栏配置st.sidebar.header("📊 三维系统参数配置")total_floors = st.sidebar.slider("建筑楼层数", 5, 30, 15)corridors_per_floor = st.sidebar.slider("每层走廊数", 2, 6, 4)rooms_per_corridor = st.sidebar.slider("每走廊房间数", 2, 5, 3)learning_rate = st.sidebar.slider("学习率 (α)", 0.01, 0.5, 0.1)gamma = st.sidebar.slider("折扣因子 (γ)", 0.5, 0.99, 0.95)epsilon = st.sidebar.slider("探索率 (ε)", 0.01, 0.5, 0.15)training_episodes = st.sidebar.slider("训练轮数", 100, 3000, 1500)# 初始化系统if st.sidebar.button("🚀 初始化三维系统", use_container_width=True):with st.spinner("正在构建三维建筑拓扑结构..."):topology = ThreeDBuildingTopology(total_floors=total_floors,corridors_per_floor=corridors_per_floor,rooms_per_corridor=rooms_per_corridor)bayesian_net = BayesianSecurityAssessment()env = IntrusionEnvironment3D(topology, bayesian_net)st.session_state.topology_3d = topologyst.session_state.bayesian_net = bayesian_netst.session_state.env_3d = envst.success(f"✅ 三维系统初始化完成!构建了 {total_floors} 层建筑,共 {topology.total_nodes} 个空间节点")# 训练模型if st.sidebar.button("🎯 训练三维SARSA模型", use_container_width=True) and 'env_3d' in st.session_state:with st.spinner("正在训练三维强化学习模型..."):agent, progress = train_sarsa_model_3d(st.session_state.env_3d,episodes=training_episodes)st.session_state.agent_3d = agentst.session_state.training_progress_3d = progressst.success(f"✅ 三维模型训练完成!共训练 {training_episodes} 轮")# 主内容区域col1, col2 = st.columns([2, 1])with col1:if 'topology_3d' in st.session_state:st.subheader("🏗️ 三维建筑拓扑结构")topology = st.session_state.topology_3dst.write(f"- 总楼层数: {topology.total_floors}")st.write(f"- 每层走廊数: {topology.corridors_per_floor}")st.write(f"- 每走廊房间数: {topology.rooms_per_corridor}")st.write(f"- 总空间节点数: {topology.total_nodes}")st.write(f"- 总连接边数: {topology.graph.number_of_edges()}")if 'training_progress_3d' in st.session_state:st.subheader("📈 三维训练结果")progress_data = st.session_state.training_progress_3dif progress_data:last_progress = progress_data[-1]col1_1, col1_2, col1_3 = st.columns(3)with col1_1:st.metric("训练轮数", training_episodes)with col1_2:st.metric("最终奖励", f"{last_progress['total_reward']:.2f}")with col1_3:st.metric("平均步数", last_progress['steps'])with col2:if 'env_3d' in st.session_state and 'agent_3d' in st.session_state:st.subheader("🔍 三维路径推演")start_options = {0: "入口 1 (1层-走廊1-房间1)",1: "入口 2 (1层-走廊4-房间1)",2: "入口 3 (1层-走廊1-房间3)"}start_node_key = st.selectbox("选择入侵起点", options=list(start_options.keys()),format_func=lambda x: start_options[x])# 映射到实际节点IDstart_node = st.session_state.env_3d.start_nodes[start_node_key]if st.button("生成三维最优入侵路径", use_container_width=True):optimal_path = st.session_state.agent_3d.get_optimal_path(start_node, st.session_state.env_3d)st.session_state.optimal_path_3d = optimal_pathst.write("**📊 三维最优入侵路径分析:**")path_info = []for i, node in enumerate(optimal_path):floor, corridor, room = st.session_state.topology_3d._get_node_coordinates(node)path_info.append(f"步骤 {i + 1}: {floor + 1}层-走廊{corridor + 1}-房间{room + 1}")for info in path_info:st.write(f"• {info}")st.write(f"**总步数:** {len(optimal_path)}")st.write(f"**目标位置:** {topology.total_floors}层中心房间")# 三维可视化if 'topology_3d' in st.session_state and 'bayesian_net' in st.session_state:st.subheader("🗺️ 三维路径可视化")path_to_show = st.session_state.get('optimal_path_3d', None)fig = visualize_3d_topology(st.session_state.topology_3d, st.session_state.bayesian_net, path_to_show)st.plotly_chart(fig, use_container_width=True)# 添加交互控制st.subheader("🎮 三维视图控制")col_view1, col_view2, col_view3 = st.columns(3)with col_view1:if st.button("俯视图"):st.info("使用鼠标拖拽可调整三维视角")with col_view2:if st.button("侧视图"):st.info("滚轮可缩放,右键拖拽可平移")with col_view3:if st.button("重置视图"):st.info("视图已重置为默认角度")# 系统信息st.sidebar.markdown("---")st.sidebar.subheader("ℹ️ 三维系统信息")st.sidebar.write("基于强化学习的超高层建筑")st.sidebar.write("三维非法入侵情景推演系统")st.sidebar.write("版本: 3D-1.0")if __name__ == "__main__":main()
