simpy仿真
模拟服务台
一共5个顾客,2个服务台
import simpy
import random
def customer(env, name, service_time_mean):
arrival_time = env.now
print(f'{arrival_time}: {name} 到达服务台,开始排队')
with server.request() as req:
yield req
wait_time = env.now - arrival_time
print(f'{env.now}: {name} 开始接受服务,等待了 {wait_time} 时间')
yield env.timeout(random.expovariate(1 / service_time_mean))
print(f'{env.now}: {name} 服务完成')
# 创建仿真环境
env = simpy.Environment()
# 创建服务台资源,容量为 1
server = simpy.Resource(env, capacity=2)
# 启动多个顾客的到达和服务进程
for i in range(5):
arrival_interval = random.expovariate(1 / 2) # 平均到达间隔为 2
env.process(customer(env, f'顾客 {i}', 3)) # 平均服务时间为 3
env.timeout(arrival_interval)
# 运行仿真
env.run()
模拟车辆
import simpy
# 定义车辆行为
def vehicle(env, name, speed, total_distance, positions):
position = 0 # 初始位置
print(f"Time {env.now:.2f}: 车辆 {name} 从起点出发")
while position < total_distance:
# 检查前方是否有车辆
front_vehicle_position = None
for pos in positions.values():
if pos > position and (front_vehicle_position is None or pos < front_vehicle_position):
front_vehicle_position = pos
# 根据前方车辆调整速度
if front_vehicle_position is not None and front_vehicle_position - position <= speed:
move_distance = front_vehicle_position - position - 1 # 禁止超车,保持最小间距
move_distance = max(move_distance, 0) # 确保移动距离非负
else:
move_distance = speed
# 如果没有移动距离,则等待一段时间再检查
if move_distance == 0:
yield env.timeout(0.1) # 等待 0.1 秒后重新检查
continue
# 计算到达下一位置的时间
time_to_move = move_distance / speed
last_position = position # 记录上一位置
start_time = env.now # 记录开始时间
# 按照较小的时间步实时打印位置
while env.now - start_time < time_to_move:
elapsed_time = env.now - start_time
current_position = last_position + (elapsed_time * speed)
if current_position > position + move_distance: # 防止超出目标位置
break
positions[name] = current_position
print(f"Time {env.now:.2f}: 车辆 {name} 当前位置: {current_position:.2f}")
yield env.timeout(0.1) # 每 0.1 秒更新一次
# 更新最终位置
position += move_distance
if position > total_distance: # 防止超出终点
position = total_distance
print(f"Time {env.now:.2f}: 车辆 {name} 到达终点")
del positions[name] # 到达终点后移除该车辆
# 模拟函数
def simulate_vehicle_traffic(env, num_vehicles, speed, total_distance, start_interval):
positions = {} # 记录所有车辆的位置
for i in range(num_vehicles):
# 每辆车按顺序发车,发车间隔为 start_interval
yield env.timeout(i * start_interval)
env.process(vehicle(env, i + 1, speed, total_distance, positions))
# 主程序
if __name__ == "__main__":
# 参数设置
num_vehicles = 4 # 车辆数量
speed = 4 # 每辆车的速度(单位/秒)
total_distance = 20 # 路程长度(单位)
start_interval = 2 # 每辆车的发车间隔(秒)
# 初始化模拟环境
env = simpy.Environment()
# 启动模拟
env.process(simulate_vehicle_traffic(env, num_vehicles, speed, total_distance, start_interval))
# 运行模拟直到所有车辆到达终点
env.run()