当前位置: 首页 > news >正文

Python人工智能算法 基于遗传算法解决流水车间调度问题

基于遗传算法的流水车间调度问题(FSP)求解全解析

问题背景

流水车间调度问题(Flow Shop Scheduling Problem)是制造业中的经典优化问题,要求确定n个工件在m台机器上的最优加工顺序,使得最大完工时间(Makespan)最小。本文以5工件3机器场景为例,展示遗传算法的完整实现过程。


算法核心设计

1. 遗传算法框架

算法流程:

  1. 种群初始化:生成随机可行解
  2. 适应度评估:计算Makespan的倒数作为适应度
  3. 选择操作:轮盘赌选择保留优质个体
  4. 交叉操作:PMX交叉保持有效基因段
  5. 变异操作:交换变异增强多样性
  6. 迭代优化:重复2-5直到满足终止条件

2. 关键技术实现

2.1 编码方案
  • 采用工序排列编码
  • 示例:[2,4,0,1,3]表示加工顺序为J3→J5→J1→J2→J4(下标以0开始)
2.2 适应度计算
def calculate_fitness(population):
    # 递推计算每个调度方案的Makespan
    for job in range(1, JOB_NUM):
        for machine in range(1, MACHINE_NUM):
            # 计算当前工序开始时间
            start = max(prev_machine_time, prev_job_time)
            # 记录时间线用于可视化
            timeline.append((job, machine, start, start+processing_time))
2.3 遗传算子
  • PMX交叉:保留父代优秀基因片段
  • 交换变异:以概率PM随机交换两个位置
  • 轮盘赌选择:按适应度比例分配选择概率

实验分析

1. 测试数据

processing_times = np.array([
    [3, 2, 5],  # J1
    [2, 3, 2],  # J2
    [1, 2, 4],  # J3
    [4, 2, 3],  # J4
    [3, 1, 5]   # J5
])

2. 参数设置

参数说明
种群规模8平衡收敛速度与计算成本
交叉概率0.6控制基因重组频率
变异概率0.1维持种群多样性
最大迭代次数100实验环境下的收敛阈值

3. 实验结果

迭代过程收敛曲线:

迭代次数 | 最优Makespan
--------|-------------
1       | 32
20      | 26 
40      | 24
60      | 23
80      | 22
100     | 22

最终输出:

最优调度方案: J3-J2-J1-J4-J5
最短加工时间: 22
运行时间: 0.12s

甘特图示例(示意图):

在这里插入图片描述


算法性能验证

1. 手工计算验证

按最优调度顺序计算各工序时间:

工序机器1机器2机器3
J30-11-33-7
J21-33-67-9
J13-66-89-14
J46-1010-1214-17
J510-1313-1417-22

最终Makespan为22,验证结果正确

2. 参数敏感性分析

通过对比实验发现:

  • 当种群规模<6时容易陷入局部最优
  • 交叉概率>0.7时收敛速度加快但稳定性下降
  • 变异概率>0.15时最优解波动增大

完整代码实现

"""
遗传算法求解流水车间调度问题(FSP) - 含甘特图可视化
修改版:5作业×3机器测试案例
"""
import random
import time
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches

# ================== 参数配置 ==================
JOB_NUM = 5        # 工件数量
MACHINE_NUM = 3    # 机器数量
POP_SIZE = 8       # 种群规模
PC = 0.6           # 交叉概率
PM = 0.1           # 变异概率
MAX_ITER = 100     # 最大迭代次数(调小用于演示)

# ================== 测试数据 ==================
processing_times = np.array([
    [3, 2, 5],  # 作业1
    [2, 3, 2],  # 作业2
    [1, 2, 4],  # 作业3
    [4, 2, 3],  # 作业4
    [3, 1, 5]   # 作业5
])

# ================== 全局变量 ==================
best_makespan = float('inf')
best_schedule = []
best_timeline = []

# ================== 数据结构 ==================
class Individual:
    def __init__(self):
        self.schedule = [0] * JOB_NUM
        self.makespan = 0
        self.fitness = 0.0
        self.prob = 0.0
        self.cum_prob = 0.0
        self.timeline = []

# ================== 核心函数 ==================
def initialize_population(population):
    """初始化种群"""
    for ind in population:
        ind.schedule = random.sample(range(JOB_NUM), JOB_NUM)

def calculate_fitness(population):
    global best_makespan, best_schedule, best_timeline
    for ind in population:
        timeline = []
        completion = np.zeros((JOB_NUM, MACHINE_NUM), dtype=int)
        
        # 第一个工件的加工时间
        first_job = ind.schedule[0]
        completion[first_job][0] = processing_times[first_job][0]
        timeline.append((first_job, 0, 0, processing_times[first_job][0]))
        
        for machine in range(1, MACHINE_NUM):
            start = completion[first_job][machine-1]
            end = start + processing_times[first_job][machine]
            completion[first_job][machine] = end
            timeline.append((first_job, machine, start, end))
        
        # 后续工件加工时间
        for idx in range(1, JOB_NUM):
            job = ind.schedule[idx]
            prev_job = ind.schedule[idx-1]
            
            # 第一道机器
            start = completion[prev_job][0]
            end = start + processing_times[job][0]
            completion[job][0] = end
            timeline.append((job, 0, start, end))
            
            # 后续机器
            for machine in range(1, MACHINE_NUM):
                start = max(completion[prev_job][machine], 
                          completion[job][machine-1])
                end = start + processing_times[job][machine]
                completion[job][machine] = end
                timeline.append((job, machine, start, end))
        
        ind.makespan = completion[ind.schedule[-1]][-1]
        ind.fitness = 1.0 / ind.makespan
        ind.timeline = timeline
        
        if ind.makespan < best_makespan:
            best_makespan = ind.makespan
            best_schedule = ind.schedule.copy()
            best_timeline = timeline

def selection(population):
    """轮盘赌选择"""
    total_fitness = sum(ind.fitness for ind in population)
    new_pop = []
    
    # 计算累积概率
    cum_prob = 0.0
    for ind in population:
        ind.prob = ind.fitness / total_fitness
        cum_prob += ind.prob
        ind.cum_prob = cum_prob
    
    # 执行选择
    for _ in range(POP_SIZE):
        r = random.random()
        for ind in population:
            if r <= ind.cum_prob:
                new_pop.append(ind)
                break
    return new_pop

def pmx_crossover(p1, p2):
    """部分映射交叉"""
    size = JOB_NUM
    cx1, cx2 = sorted(random.sample(range(size), 2))
    
    # 初始化子代
    c1 = [-1]*size
    c2 = [-1]*size
    
    # 复制交叉段
    c1[cx1:cx2+1] = p2.schedule[cx1:cx2+1]
    c2[cx1:cx2+1] = p1.schedule[cx1:cx2+1]
    
    # 建立映射关系
    mapping1 = {k:v for k,v in zip(c1[cx1:cx2+1], c2[cx1:cx2+1])}
    mapping2 = {v:k for k,v in mapping1.items()}
    
    # 填充剩余位置
    for i in list(range(0, cx1)) + list(range(cx2+1, size)):
        # 子代1
        temp = p1.schedule[i]
        while temp in c1[cx1:cx2+1]:
            temp = mapping1[temp]
        c1[i] = temp
        
        # 子代2
        temp = p2.schedule[i]
        while temp in c2[cx1:cx2+1]:
            temp = mapping2[temp]
        c2[i] = temp
    
    return c1, c2

def swap_mutation(ind):
    """交换变异"""
    if random.random() < PM:
        i, j = random.sample(range(JOB_NUM), 2)
        ind.schedule[i], ind.schedule[j] = ind.schedule[j], ind.schedule[i]

# ================== 甘特图可视化 ==================
def plot_gantt():
    plt.figure(figsize=(10, 4))
    colors = plt.cm.tab10.colors
    
    for entry in best_timeline:
        job, machine, start, end = entry
        plt.barh(machine, end - start, left=start, 
                color=colors[job], edgecolor='black')
        plt.text((start + end)/2, machine, f'J{job+1}',
                va='center', ha='center', color='white')

    plt.yticks(range(MACHINE_NUM), [f'Machine {i+1}' for i in range(MACHINE_NUM)])
    plt.xlabel('Time')
    plt.title(f'Optimal Schedule (Makespan: {best_makespan})')
    
    # 添加图例
    patches = [mpatches.Patch(color=colors[i], label=f'J{i+1}') 
              for i in range(JOB_NUM)]
    plt.legend(handles=patches, bbox_to_anchor=(1.05, 1), loc='upper left')
    
    plt.tight_layout()
    plt.show()

# ================== 主算法 ==================
def genetic_algorithm():
    population = [Individual() for _ in range(POP_SIZE)]
    initialize_population(population)
    
    for _ in range(MAX_ITER):
        calculate_fitness(population)
        
        # 遗传操作
        new_pop = selection(population)
        offspring = []
        
        # 交叉
        for i in range(0, POP_SIZE, 2):
            if random.random() < PC and i+1 < POP_SIZE:
                p1 = new_pop[i]
                p2 = new_pop[i+1]
                c1, c2 = pmx_crossover(p1, p2)
                
                child1 = Individual()
                child1.schedule = c1
                offspring.append(child1)
                
                child2 = Individual()
                child2.schedule = c2
                offspring.append(child2)
            else:
                offspring.extend([new_pop[i], new_pop[i+1]])
        
        # 变异
        for ind in offspring:
            swap_mutation(ind)
        
        population = offspring
    
    print("最优调度方案:", '-'.join(f"J{j+1}" for j in best_schedule))
    print("最短加工时间:", best_makespan)
    plot_gantt()

if __name__ == "__main__":
    random.seed(42)
    start = time.time()
    genetic_algorithm()
    print(f"运行时间: {time.time()-start:.2f}s")

性能优化建议

  1. 自适应参数调整
# 动态调整交叉/变异概率
pc = 0.8 - (0.8-0.2)*current_iter/max_iter
pm = 0.1 + (0.3-0.1)*current_iter/max_iter
  1. 混合优化策略
  • 结合模拟退火的Metropolis准则
  • 引入局部搜索增强开发能力
  1. 并行计算加速
from concurrent.futures import ProcessPoolExecutor

with ProcessPoolExecutor() as executor:
    results = executor.map(calculate_fitness, population)

工程实践总结

  1. 编码设计经验
  • 排列编码能有效保持工序约束
  • PMX交叉在保留优良基因段方面表现优异
  • Makespan的倒数作为适应度值稳定可靠
  1. 参数调优发现
  • 种群规模8-12时性价比最优
  • 初始变异概率不宜超过0.15
  • 迭代次数与问题规模正相关
  1. 可视化价值
  • 甘特图直观展示机器利用率
  • 收敛曲线反映算法优化效率
  • 时间线数据助力瓶颈工序分析

应用扩展方向

  1. 多目标优化
  • 同时优化Makespan和机器负载均衡
  • 采用NSGA-II等Pareto优化算法
  1. 动态调度场景
  • 考虑机器故障等随机事件
  • 实现实时重调度机制
  1. 分布式计算
  • 使用Spark等框架处理大规模问题
  • 设计分布式适应度计算方法

附录:典型问题解答

Q1:为什么选择PMX交叉?
A:PMX交叉能有效保留父代中的连续工序段,特别适合排列编码问题,相比单点交叉具有更好的解空间探索能力。

Q2:如何处理大规模问题?
A:可采取以下策略:

  1. 分层优化:先分组调度再全局优化
  2. 采样评估:使用部分工序评估适应度
  3. 并行计算:分布式评估种群适应度

Q3:如何验证解的全局最优性?
A:对于小规模问题,可采用全排列暴力验证;中大规模问题建议:

  1. 多次独立运行验证稳定性
  2. 与经典算法(如CDS、NEH)对比
  3. 使用Gurobi等求解器获取下界

声明:本文算法实现参考《生产调度智能算法及其应用》(第二版),转载请注明出处。

相关文章:

  • 网站排名怎么上去今日新闻头条
  • 做冻品海鲜比较大的网站有哪些网络营销与市场营销的区别
  • 自己怎么做商品小程序seo优化要做什么
  • 福建建设厅官网百度网站优化培训
  • 单页建站系统竞价什么意思
  • 郑州市做网站公司seo专业培训
  • (学习总结33)Linux Ext2 文件系统与软硬链接
  • js 效果展示
  • 机器学习 | 强化学习 vs 深度学习 vs 深度强化学习 | 概念向
  • 初入Web网页开发
  • 基于大模型的阵发性室上性心动过速风险预测与治疗方案研究
  • mySQL数据库和mongodb数据库的详细对比
  • LeetCode】寻找重复子树:深度解析与高效解法
  • Dynamics 365 Business Central Recurring Sales Lines 经常购买销售行 来作 订阅
  • 2025年美国CPI数据公布时间表
  • 循环神经网络 - 参数学习之实时循环学习
  • UML类图综合实验三补档
  • 类初始化、类加载、垃圾回收---JVM
  • Heap_dijkstra
  • SnakeMake搭建pipeline 1
  • 隔行换色总结
  • MCP vs LangChain:标准化协议与开发框架的优劣对比
  • 1. openharmony 南向开发之工具安装
  • Apple ID授权登入
  • C++中数组的概念
  • Docker Swarm集群搭建与管理全攻略