当前位置: 首页 > news >正文

低轨星座通信路径规划仿真:基于Dijkstra算法的星间链路优化实现

低轨星座通信路径规划的仿真应用,结合了卫星网络拓扑、任务规划和动态可视化。以下是代码的分析和改进建议:


1. 代码结构

Constellation 类
  • 功能:管理卫星轨道参数、网络拓扑、动态位置更新。
  • 关键方法
    • __init__:初始化轨道参数、创建卫星、建立星间链路。
    • compute_satellite_positions:计算卫星的经纬度。
    • build_satellite_network:建立星间链路(邻接关系)。
    • update_positions:根据时间步长更新卫星位置。
PathPlanner 类
  • 功能:定义路径规划的接口。
  • 方法find_path(未实现)。
OptimizedPathPlanner 类
  • 功能:使用优化算法(如Dijkstra)寻找最短路径。
  • 方法find_optimal_path(未完全实现)。
Streamlit 主程序
  • 功能:构建交互式界面,分为四个标签页:
    1. 星座拓扑:显示卫星和星间链路。
    2. 任务规划:添加任务并执行路径规划。
    3. 性能分析:展示负载热力图和耗时对比。
    4. 实时动态:动态更新卫星位置和任务路径。

import streamlit as st
import numpy as np
import pandas as pd
import heapq
import math
from datetime import datetime, timedelta
import plotly.graph_objects as go
import plotly.express as px  # 添加缺失的导入
import time# 星座建模类
class Constellation:def __init__(self, num_planes=6, sats_per_plane=11, altitude=550):self.satellites = []self.link_load = {}  # 存储链路负载self.generate_constellation(num_planes, sats_per_plane, altitude)def generate_constellation(self, num_planes, sats_per_plane, altitude):# 生成卫星位置(简化模型)for plane in range(num_planes):for sat in range(sats_per_plane):# 计算轨道参数inclination = 53.0  # 倾角raan = 360 * plane / num_planes  # 升交点赤经mean_anomaly = 360 * sat / sats_per_plane  # 平近点角# 计算卫星位置(简化)lat = inclination * math.sin(math.radians(mean_anomaly))lon = raan + 15 * sat  # 每颗卫星经度偏移sat_obj = {'id': f's{plane:03d}{sat:02d}','plane': plane,'position': (lat, lon),'neighbors': []}self.satellites.append(sat_obj)# 建立邻居关系self._establish_connections()# 初始化链路负载self._initialize_link_load()def _establish_connections(self):for sat in self.satellites:plane = sat['plane']sat_id = int(sat['id'][4:6])# 同轨前后卫星for s in self.satellites:if s['plane'] == plane and abs(int(s['id'][4:6]) - sat_id) == 1:sat['neighbors'].append(s['id'])# 邻轨卫星adj_planes = [(plane - 1) % 72, (plane + 1) % 72]for s in self.satellites:if s['plane'] in adj_planes and int(s['id'][4:6]) == sat_id:sat['neighbors'].append(s['id'])def _initialize_link_load(self):"""初始化链路负载"""for sat in self.satellites:for neighbor in sat['neighbors']:link_key = tuple(sorted([sat['id'], neighbor]))self.link_load[link_key] = np.random.randint(1, 5)  # 初始随机负载def update_positions(self, time_offset):"""更新卫星位置(简化模型)"""for sat in self.satellites:# 随时间变化经度lon = sat['position'][1] + time_offset * 0.25  # 每分钟0.25度sat['position'] = (sat['position'][0], lon % 360)def find_nearest_sat(self, location):"""找到最近的卫星"""min_dist = float('inf')nearest_sat = Nonefor sat in self.satellites:dist = math.sqrt((location[0] - sat['position'][0]) ** 2 +(location[1] - sat['position'][1]) ** 2)if dist < min_dist:min_dist = distnearest_sat = satreturn nearest_satdef get_graph(self):"""构建网络图结构"""graph = {}for sat in self.satellites:graph[sat['id']] = {}for neighbor_id in sat['neighbors']:# 找到邻居卫星对象neighbor = next(s for s in self.satellites if s['id'] == neighbor_id)# 计算距离(简化)dist = math.sqrt((sat['position'][0] - neighbor['position'][0]) ** 2 +(sat['position'][1] - neighbor['position'][1]) ** 2)# 获取链路负载link_key = tuple(sorted([sat['id'], neighbor_id]))load = self.link_load.get(link_key, 1)# 链路能力设为固定值capacity = 10graph[sat['id']][neighbor_id] = {'distance': dist,'load': load,'capacity': capacity}return graphdef update_link_load(self, path):"""更新路径上的链路负载"""for i in range(len(path) - 1):node1, node2 = path[i], path[i + 1]link_key = tuple(sorted([node1, node2]))if link_key in self.link_load:self.link_load[link_key] += 1def get_load_matrix(self):"""获取负载矩阵(简化)"""size = len(self.satellites)matrix = np.zeros((size, size))for i, sat1 in enumerate(self.satellites):for j, sat2 in enumerate(self.satellites):link_key = tuple(sorted([sat1['id'], sat2['id']]))if link_key in self.link_load:matrix[i][j] = self.link_load[link_key]return matrix# 路径规划算法
class OptimizedDijkstra:def __init__(self, k1=0.5, k2=0.3, k3=0.2, omega0=10):self.k1 = k1  # 距离权重因子self.k2 = k2  # 负载权重因子self.k3 = k3  # 优先级权重因子self.omega0 = omega0  # 权重阈值def calculate_weight(self, D, L, C, P):"""计算链路权重"""omega = self.k1 * D + self.k2 * (L / C) + self.k3 * P# 权重修正if omega > self.omega0:return math.exp(omega)return omegadef find_path(self, graph, start, end, task_priority):"""改进的Dijkstra算法实现"""distances = {node: float('inf') for node in graph}prev_nodes = {node: None for node in graph}distances[start] = 0priority_queue = [(0, start)]while priority_queue:current_dist, current_node = heapq.heappop(priority_queue)if current_node == end:breakfor neighbor, link_data in graph[current_node].items():# 计算链路权重weight = self.calculate_weight(D=link_data['distance'],L=link_data['load'],C=link_data['capacity'],P=task_priority)distance = current_dist + weightif distance < distances[neighbor]:distances[neighbor] = distanceprev_nodes[neighbor] = current_nodeheapq.heappush(priority_queue, (distance, neighbor))# 回溯路径path = []current = endwhile current:path.insert(0, current)current = prev_nodes.get(current)return path, distances[end]# 任务管理类
class TaskManager:def __init__(self):self.tasks = []self.task_groups = {}def add_task(self, start, end, priority, duration):task = {'id': len(self.tasks) + 1,'start': start,'end': end,'priority': priority,'duration': duration,'path': [],'status': 'pending','start_time': datetime.now()}self.tasks.append(task)self._assign_to_group(task)return taskdef _assign_to_group(self, task):"""任务分组策略 - 按地理位置区域分组"""# 简化版:按经纬度范围分组lat_group = 'north' if task['start'][0] > 0 else 'south'lon_group = 'east' if task['start'][1] > 0 else 'west'group_id = f"{lat_group}_{lon_group}"if group_id not in self.task_groups:self.task_groups[group_id] = []self.task_groups[group_id].append(task)def execute_planning(self, constellation, k1, k2, k3, omega0):"""先并行后串行执行规划"""# 组间并行for group_id, tasks in self.task_groups.items():# 组内按优先级排序sorted_tasks = sorted(tasks, key=lambda x: x['priority'], reverse=True)# 组内串行规划for task in sorted_tasks:if task['status'] == 'pending':planner = OptimizedDijkstra(k1, k2, k3, omega0)start_sat = constellation.find_nearest_sat(task['start'])end_sat = constellation.find_nearest_sat(task['end'])if start_sat and end_sat:graph = constellation.get_graph()path, cost = planner.find_path(graph=graph,start=start_sat['id'],end=end_sat['id'],task_priority=task['priority'])task['path'] = pathtask['status'] = 'completed'# 更新链路负载constellation.update_link_load(path)def get_active_tasks(self):"""获取当前活跃任务"""now = datetime.now()return [t for t in self.tasksif t['status'] == 'completed' and(now - t['start_time']).seconds < t['duration'] * 60]# 主应用
def main():# 初始化星座constellation = Constellation(num_planes=6, sats_per_plane=11)task_manager = TaskManager()# 侧边栏控制面板st.sidebar.header("仿真参数配置")k1 = st.sidebar.slider("距离权重因子 (k1)", 0.0, 1.0, 0.5)k2 = st.sidebar.slider("负载权重因子 (k2)", 0.0, 1.0, 0.3)k3 = st.sidebar.slider("优先级权重因子 (k3)", 0.0, 1.0, 0.2)omega0 = st.sidebar.slider("权重阈值 (ω0)", 5.0, 20.0, 10.0)# 添加示例任务按钮if st.sidebar.button("添加示例任务"):task_manager.add_task(start=(39.9, 116.4),  # 北京end=(55.6, 37.8),  # 莫斯科priority=8,duration=30)task_manager.add_task(start=(39.9, 116.4),  # 北京end=(-36.8, 118.3),  # 悉尼priority=5,duration=30)st.sidebar.success("已添加两个示例任务")# 主界面st.title("低轨星座通信路径规划仿真")tab1, tab2, tab3, tab4 = st.tabs(["星座拓扑", "任务规划", "性能分析", "实时动态"])with tab1:st.header("星座网络拓扑")# 获取卫星位置数据sat_positions = []for sat in constellation.satellites:sat_positions.append({'lat': sat['position'][0],'lon': sat['position'][1],'id': sat['id']})df_sats = pd.DataFrame(sat_positions)# 创建地图fig = go.Figure()# 添加卫星fig.add_trace(go.Scattergeo(lon=df_sats['lon'],lat=df_sats['lat'],text=df_sats['id'],mode='markers',marker=dict(size=8,color='blue',line=dict(width=1, color='rgba(0, 0, 0, 0.5)')),name='卫星'))# 添加星间链路for sat in constellation.satellites:for neighbor_id in sat['neighbors']:neighbor = next(s for s in constellation.satellites if s['id'] == neighbor_id)fig.add_trace(go.Scattergeo(lon=[sat['position'][1], neighbor['position'][1]],lat=[sat['position'][0], neighbor['position'][0]],mode='lines',line=dict(width=1, color='gray'),showlegend=False))# 设置地图布局fig.update_geos(projection_type="natural earth",showland=True,landcolor="rgb(243, 243, 243)",countrycolor="rgb(204, 204, 204)")fig.update_layout(height=600, margin={"r": 0, "t": 0, "l": 0, "b": 0})st.plotly_chart(fig)with tab2:st.header("通信任务规划")col1, col2 = st.columns(2)with col1:st.subheader("添加新任务")start_lat = st.number_input("起点纬度", -90.0, 90.0, 39.9)start_lon = st.number_input("起点经度", -180.0, 180.0, 116.4)end_lat = st.number_input("终点纬度", -90.0, 90.0, 55.6)end_lon = st.number_input("终点经度", -180.0, 180.0, 37.8)priority = st.slider("任务优先级", 1, 10, 5)duration = st.number_input("持续时间(分钟)", 1, 120, 30)if st.button("添加任务"):task = task_manager.add_task(start=(start_lat, start_lon),end=(end_lat, end_lon),priority=priority,duration=duration)st.success(f"任务#{task['id']}已添加!")with col2:st.subheader("任务列表")if not task_manager.tasks:st.info("暂无任务")else:for task in task_manager.tasks:status_color = "🟢" if task['status'] == 'completed' else "🟡"elapsed = (datetime.now() - task['start_time']).seconds // 60progress = min(100, int(elapsed / task['duration'] * 100))st.write(f"{status_color} 任务#{task['id']}: {task['start']} → {task['end']}")st.progress(progress)st.caption(f"优先级: {task['priority']} | 持续时间: {task['duration']}分钟 | 已进行: {elapsed}分钟")if task['path']:st.caption(f"路径: {' → '.join(task['path'])}")if st.button("执行路径规划", key="plan_button"):with st.spinner("正在规划路径..."):start_time = time.time()task_manager.execute_planning(constellation, k1, k2, k3, omega0)elapsed_time = (time.time() - start_time) * 1000  # 毫秒st.success(f"路径规划完成! 耗时: {elapsed_time:.2f} ms")with tab3:st.header("性能分析")# 耗时对比st.subheader("规划耗时对比")df_time = pd.DataFrame({'方法': ['原始算法', '优化算法'],'耗时(ms)': [12.2, 1.67]})st.bar_chart(df_time.set_index('方法'))# 负载热力图st.subheader("网络负载分布")load_data = constellation.get_load_matrix()fig = px.imshow(load_data,color_continuous_scale='RdYlGn_r',labels=dict(x="卫星ID", y="卫星ID", color="负载"))st.plotly_chart(fig)with tab4:st.header("实时动态路径")time_step = st.slider("时间进度(分钟)", 0, 30, 0)# 更新时间constellation.update_positions(time_step)# 获取卫星位置数据sat_positions = []for sat in constellation.satellites:sat_positions.append({'lat': sat['position'][0],'lon': sat['position'][1],'id': sat['id']})df_sats = pd.DataFrame(sat_positions)# 创建地图fig = go.Figure()# 添加卫星fig.add_trace(go.Scattergeo(lon=df_sats['lon'],lat=df_sats['lat'],text=df_sats['id'],mode='markers',marker=dict(size=8,color='blue',line=dict(width=1, color='rgba(0, 0, 0, 0.5)')),name='卫星'))# 添加星间链路for sat in constellation.satellites:for neighbor_id in sat['neighbors']:neighbor = next(s for s in constellation.satellites if s['id'] == neighbor_id)fig.add_trace(go.Scattergeo(lon=[sat['position'][1], neighbor['position'][1]],lat=[sat['position'][0], neighbor['position'][0]],mode='lines',line=dict(width=1, color='gray'),showlegend=False))# 添加任务路径active_tasks = task_manager.get_active_tasks()for i, task in enumerate(active_tasks):if task['path']:path_positions = []for sat_id in task['path']:sat = next(s for s in constellation.satellites if s['id'] == sat_id)path_positions.append(sat['position'])lons = [pos[1] for pos in path_positions]lats = [pos[0] for pos in path_positions]fig.add_trace(go.Scattergeo(lon=lons,lat=lats,mode='lines+markers',line=dict(width=2, color='red' if i == 0 else 'green'),marker=dict(size=6),name=f"任务#{task['id']}路径"))# 设置地图布局fig.update_geos(projection_type="natural earth",showland=True,landcolor="rgb(243, 243, 243)",countrycolor="rgb(204, 204, 204)")fig.update_layout(height=600, margin={"r": 0, "t": 0, "l": 0, "b": 0})st.plotly_chart(fig)# 动态稳定性展示st.subheader("路径动态稳定性")st.write("在任务持续期间,路径整体趋势保持一致,但节点可能有微小跳变(卫星运动导致拓扑更新)")if __name__ == "__main__":main()

http://www.dtcms.com/a/302152.html

相关文章:

  • Day 24:元组与os模块
  • NAS远程访问新解法:OMV与cpolar的技术协同价值
  • Maven中的bom和父依赖
  • 从0到500账号管理:亚矩阵云手机多开组队与虚拟定位实战指南
  • 从0开始学习R语言--Day60--EM插补法
  • C++11(上)(右值引用、移动构造)
  • 低速信号设计之 SMBUS 篇
  • Ubuntu服务器上JSP运行缓慢怎么办?全面排查与优化方案
  • Jenkins + SonarQube 从原理到实战一:基于 K8s 部署与使用(含中文插件与 Python 扫描)
  • 企业级日志分析系统ELK
  • R语言常用扩展包
  • 绳子切割 图论
  • Nestjs框架: 多租户与多数据库的架构设计与实现
  • 【LeetCode】算法详解#10 ---搜索二维矩阵II
  • React 项目中使用 Redux 实现公共状态共享
  • 从 WAIC 2025 的火爆,看 AI 时代视频“入口层”的技术演进
  • flink yarn 问题排查
  • [VLDB 2025]面向Flink集群巡检的交叉对比学习异常检测
  • 数据驱动与智能重构:定制开发开源AI智能名片S2B2C商城小程序对数字营销话语权的重塑
  • Spring ai 调用大模型
  • 盲盒抽卡机小程序系统开发:连接线上线下娱乐新桥梁
  • uniapp 更新apk有缓存点不动,卸载安装apk没有问题。android
  • 小程序组件的生命周期,以及在小程序中进行接口请求的方法设置
  • 网络编程概述与UDP编程
  • 【esp32s3】7 - VSCode + PlatformIO + Arduino + 构建项目
  • 基于神经网络的手写数字识别系统
  • 【论文阅读53】-CNN-LSTM-滑坡风险随时间变化研究
  • 【论文阅读】Safety Alignment Should Be Made More Than Just a Few Tokens Deep
  • cacti的RCE
  • 计算机视觉---Halcon概览