智能车间调度算法(python)启发式算法(GA)(SA)(多目标优化)
柔性作业车间调度问题(Flexible Job Shop Scheduling Problem, FJSP)是现代智能制造中最具挑战性的调度问题之一。与传统作业车间调度(JSP)相比,FJSP增加了工序的灵活性:每道工序可以选择多个不同的机器进行加工,这种灵活性虽然提升了资源利用率,但也大幅增加了问题的复杂度(NP-hard问题)。
1.1 项目背景
柔性作业车间调度问题(Flexible Job Shop Scheduling Problem, FJSP)是现代智能制造中最具挑战性的调度问题之一。与传统作业车间调度(JSP)相比,FJSP增加了工序的灵活性:每道工序可以选择多个不同的机器进行加工,这种灵活性虽然提升了资源利用率,但也大幅增加了问题的复杂度(NP-hard问题)。
在实际制造过程中,除了考虑作业的完工时间外,还必须纳入以下多个因素:
- 机器人转运时间:不同工序之间的搬运需时;
- 机器能耗差异:加工设备的能耗不同,影响运行成本;
- 设备维护窗口:部分时间段内机器不能运行。
为此,本项目设计并实现了一个图形化智能调度系统,支持上述约束,集成三种优化算法,目标是实现高效、可视、可操作的柔性调度。
1.2 系统目标
本系统旨在构建一个面向研究与教学场景的智能调度平台,具备如下核心能力:
1.多约束调度建模:
- 考虑完工时间、能耗、转运时间与设备维护等现实因素;
2.作业与调度数据可视化:
- 支持作业结构随机生成与人工编辑;
- 实现甘特图形式的调度方案动态演示;
3.多算法求解与对比分析:
- 实现遗传算法(GA)、模拟退火(SA)、NSGA-II三种主流调度优化算法;
- 支持多目标优化与解质量统计;
4.调度结果导出与系统集成:
- 可将调度方案导出为CSV,支持后续数据分析或对接生产系统。
二、系统架构设计
2.1 总体架构图(逻辑结构)
SchedulerApp (调度主控制器)
│
├── 参数配置模块(Spinbox、Entry、Scale)
├── 作业生成与编辑模块(随机生成 + GUI人工编辑)
├── 优化算法模块
│ ├── 遗传算法(GA)
│ ├── 模拟退火算法(SA)
│ └── NSGA-II(多目标优化)
├── 调度结果汇总模块(调度评价与Pareto分析)
├── 甘特图演示模块(Matplotlib动画)
└── 结果导出模块(CSV写入)
2.2 模块功能详解
2.2.1 参数配置模块
本模块提供用户友好的调度参数输入功能,包括:
- 作业数(n_jobs):控制作业数量;
- 机器数(n_machines):车间可用设备数量;
- 重复运行次数:用于统计分析;
- 能耗系数(energy_costs):每台机器单位时间的能耗;
- 维护窗口(maintenance_windows):各设备不可用时间段;
- 目标函数权重设置:在完工时间(makespan)和能耗之间分配权重。
实现形式:基于 ttk.Entry, ttk.Scale, Spinbox 等控件,并加入输入校验。
2.2.2 作业生成与编辑模块
在柔性作业车间调度(FJSP)模型中,每个作业由若干工序组成,每道工序可以在多个机器中选择其中一台完成,加工时间因机器不同而异。因此,调度的关键在于工序分配(选择哪台机器)和工序排序(调度先后顺序)。为支持算法运行与可视化模拟,本模块负责生成、组织与编辑调度数据。
数据结构定义
系统中作业的结构为一个嵌套列表,形式如下:
jobs = [
[ {0:6, 2:8}, {1:4, 3:7} ], # 作业0:工序0可在机器0和2加工,耗时6和8;工序1可在1和3加工
[ {1:5, 2:6}, {0:6} ], # 作业1:两道工序,第一道可选机器1或2,第二道只能在机器0加工
...
]
- 外层列表:表示多个作业;
- 每个作业由若干工序组成(子列表);
- 每道工序为一个字典:键为机器编号,值为该机器上加工该工序所需时间。
随机生成作业结构
在用户点击“生成随机作业”按钮后,系统自动调用 generate_jobs() 函数,执行以下步骤:
- 工序数设定:为每个作业随机生成 2~4 道工序;
- 机器选择:为每道工序随机分配 2~n 台机器;
- 加工时间设定:为每个选中机器分配一个加工时间(4~10个时间单位之间);
- 结果存储:将结果以嵌套列表结构保存在 self.jobs 中,供后续算法调度与绘图调用。
人工编辑作业结构
用户可以通过点击“编辑作业详细”按钮,在弹出的窗口中对每道工序的机器选项与加工时间进行逐项修改。界面设计:
- 使用 Toplevel + LabelFrame + Entry 构建多层嵌套窗口;
- 每个作业显示为一个 LabelFrame,包含若干工序的输入框;
- 每道工序的输入格式为:
机器编号1:时间1, 机器编号2:时间2, ...
例如:
0:6, 2:9, ...
2.2.3 调度优化算法模块
为了求解柔性作业车间调度问题(FJSP)这一NP难问题,本系统集成了三种具有代表性的智能优化算法:遗传算法(GA)、模拟退火算法(SA)、非支配排序遗传算法(NSGA-II)。这三种算法分别代表了经典启发式搜索、局部搜索、以及多目标进化优化策略,具有互补性。
- 遗传算法(Genetic Algorithm, GA)
模拟生物自然选择和遗传机制,通过种群进化迭代寻优,广泛应用于组合优化问题。
表2.1遗传算法实现细节
模块 |
描述 |
染色体表示 |
每个染色体是一个作业编号序列,例如 [0,1,1,2,0,2,...] 表示作业和工序的加工顺序。作业编号出现的次数等于工序数。 |
初始种群 |
随机生成若干个合法的作业序列,形成初始解集(如40个个体)。 |
适应度计算 |
对每个个体,调用 decode() 函数生成调度方案,返回 完工时间(makespan) 和 总能耗(energy)。通过 weighted_score = α * makespan + β * energy 计算目标函数值。 |
选择机制 |
使用精英保留策略,即每代保留若干最优个体,确保最优解不会被遗失。 |
交叉操作 |
使用单点交叉 + 保序修复策略:截取一部分父代染色体,余下部分按照另一父代中的顺序补全,避免基因重复或遗漏。 |
变异机制 |
以一定概率(如0.2)在染色体中随机选择两个基因位置交换,增加种群多样性。 |
终止条件 |
固定迭代次数(如40代),输出适应度最优的个体作为结果。 |
- 模拟退火算法(Simulated Annealing, SA)
模拟物理退火过程,接受一定概率的“差解”以跳出局部最优。适用于精细调节搜索路径的问题。
表2.2模拟退火算法实现细节
模块 |
描述 |
状态定义 |
当前调度序列作为系统状态(即作业编号排列)。 |
邻域扰动 |
通过随机交换两个位置的作业编号生成新的邻域解。 |
评价函数 |
与GA相同,使用完工时间与能耗加权和作为目标函数值。 |
接受准则 |
若新解优于当前解,则接受;否则以概率 exp(-(ΔE/T)) 接受劣解,其中 ΔE 为目标值差距,T 为当前温度。 |
降温策略 |
使用指数递减法:T = T * α(例如 α=0.95),每轮迭代后降低温度。 |
终止条件 |
达到固定迭代次数(如200轮),输出当前最优调度方案。 |
- 非支配排序遗传算法II(NSGA-II)
NSGA-II 是一种广泛应用于多目标优化的问题求解器,旨在寻找非支配解集(Pareto前沿),而非单一最优解。
表2.3非支配排序遗传算法实现细节
模块 |
描述 |
染色体表示 |
与GA相同,使用作业编号序列编码; |
初始种群 |
随机生成若干解组成种群(如40个); |
目标维度 |
评估两个目标:① 完工时间(makespan);② 总能耗(energy); |
非支配排序 |
使用经典Dominance Rule对种群进行排序,识别出非支配前沿(Pareto解集); |
拥挤距离 |
用于衡量解在目标空间中分布稠密度,鼓励解集多样性; |
选择与繁殖 |
从当前父代与子代中通过非支配排序与拥挤距离综合选择,生成下一代种群; |
迭代更新 |
迭代若干代(如40轮)后,输出最优的非支配前沿点集合。 |
2.2.4 调度结果汇总模块
调度算法的运行结果具有随机性和多样性,尤其在采用元启发式算法(如 GA/SA/NSGA-II)时,为保证输出结果的稳定性与可信度,系统支持多次重复运行(由参数 repeat_times 控制),并在运行结束后对结果进行统计汇总与结构化展示。
每轮算法运行输出的核心指标为:
- 完工时间(Makespan):所有作业完成所需的最大时间;
- 能耗(Energy):调度方案下所有机器运行总能耗(考虑能耗系数);
- 调度方案:包括每道工序的机器编号、起始时间和结束时间。
在多次运行后,系统进一步汇总以下统计信息:
表2.4统计信息
统计项 |
含义 |
最小完工时间 |
所有结果中的最低完工时间 |
最小能耗 |
所有结果中的最低能耗值 |
最优目标函数值 |
对每个结果根据目标函数 α * makespan + β * energy 计算得分,输出最小值 |
完工时间标准差 |
多次运行的完工时间标准差,衡量波动性 |
能耗标准差 |
能耗在不同调度方案中的波动性 |
Pareto前沿解数量 |
(仅适用于NSGA-II)输出非支配调度解集数量 |
调度优化的核心不仅在于结果数值的优劣,更在于其调度过程的可解释性与可视化。为此,本模块通过动态甘特图展示调度进度,是调度逻辑的关键表达形式之一。
基于 matplotlib + Tkinter canvas 实现动态甘特图:
- 每台机器为一条水平时间线;
- 加工区段以彩条展示,标注“作业-工序”编号;
- 维护窗口以灰色透明背景 + 斜线标记表示;
- 支持自动播放动画(时间推进);
- 可识别瓶颈段与资源冲突。
三、源代码结构说明
该调度系统以 Python 语言实现,采用面向对象结构,并使用 Tkinter 构建图形界面,matplotlib 实现动态可视化。整个系统封装在一个主文件中,内部划分为多个逻辑子模块,具备良好的可维护性与可扩展性。
├── DEFAULT_PARAMS # 全局默认参数设置
├── 作业与维护窗口解析函数 # parse_maintenance_input(), generate_jobs()
├── 解码与调度函数 # decode(), is_in_maintenance()
├── 三种优化算法实现 # run_ga(), run_sa(), run_nsga2()
├── 结果汇总模块 # summarize_results()
├── 甘特图动画类 # GanttAnimator
├── 作业编辑弹窗类 # JobEditor
└── 主界面控制器类 # SchedulerApp(含主窗口UI与按钮逻辑)
1. 主类:SchedulerApp(调度系统 GUI 控制器)
该类负责整个系统的初始化、界面布局、参数读取、按钮绑定和调度算法触发,是系统的主控模块。
主要职责:
- 构建参数设置区、按钮操作区与文本输出区;
- 管理当前作业数据(self.jobs)和最优调度方案(self.best_schedule);
- 响应用户操作:生成作业、编辑作业、运行算法、展示动画、导出结果。
2. 调度解码函数 decode()
核心调度逻辑,将染色体(作业序列)翻译为调度方案,考虑以下约束:
- 工序按顺序依赖;
- 每台机器每时刻仅加工一个任务;
- 每个作业同一时间只能加工一道工序;
- 考虑机器人转运时间与维护窗口。
3. 三种调度算法模块
统一结构:每个算法均以 jobs、robot_transfer_time、energy_costs 等参数作为输入,返回多次调度结果列表 List[Tuple[makespan, energy, sequence, schedule]]。
run_ga()
- 包括初始化种群、选择、交叉、变异、淘汰;
- 迭代演化获取目标函数值最小解。
run_sa()
- 初始序列扰动;
- 模拟退火接受机制;
- 采用固定温度下降速率进行200轮迭代。
run_nsga2()
- 实现非支配排序;
- 采用Pareto最优解作为评估指标;
- 最终输出为 Pareto前沿点集中的一个最优个体(默认按平均目标加权排序选出)。
4. 作业编辑类 JobEditor(弹窗对话框)
此类继承自 simpledialog.Dialog,用于弹出窗口,编辑每个作业每道工序的机器-时间配置。
- 自动生成输入框;
- 校验输入合法性;
- 更新主程序中 self.jobs 数据结构。
5. GanttAnimator(动态甘特图动画类)
负责渲染调度方案的动态图形展示。
核心流程:
- __init__() 构建绘图界面;
- draw_frame() 渲染某一帧的作业条与维护窗口;
- _step() 控制帧间时间推进;
- start() 开始动画播放;
- stop() 停止播放。
支持实时刷新、维护标注、作业标注,并使用 tab20 colormap 分配颜色。
6. 辅助工具函数
函数名 |
说明 |
parse_maintenance_input(text) |
解析用户输入的维修时间窗口文本,转为字典格式 |
generate_jobs(n_jobs, n_machines) |
随机生成作业工序结构,每道工序含多个可选机器 |
is_in_maintenance(machine, start, end, windows) |
判断时间段是否落入维护窗口 |
summarize_results(results, method_name, ...) |
多次运行后的结果统计、格式化报告生成 |
7. 输出与导出模块
- 文本输出:所有统计报告通过 Text 控件插入结果(含分隔行、标题、数据);
- 文件导出:以 .csv 格式保存 schedule,由 export_schedule() 方法实现,调用 csv.writer 写入。
四、使用说明与运行演示
设置参数:选择作业数、机器数、能耗系数、目标权重等;
图4.1设置参数
生成作业数据:可随机生成或点击“编辑作业”手动设置;
图4.2随机生成
图4.3手动设置
运行调度算法:选择任意一种算法运行;
查看调度结果:文本框中展示统计信息;
图4.4运行遗传算法
演示调度进度:点击“动态甘特图演示”进行可视化播放;
图4.5甘特图演示
导出调度方案:点击“导出CSV”保存调度结果。
图4.6导出CSV
五、源代码
import tkinter as tk
from tkinter import ttk, simpledialog, messagebox, filedialog
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import numpy as np
import random
import math
import csv
from collections import defaultdict
# ==== 全局参数默认值 ====
DEFAULT_PARAMS = {
"n_jobs": 6,
"n_machines": 4,
"repeat_times": 5,
"robot_transfer_time": 2,
"energy_costs": [1.0, 1.4, 1.2, 1.6],
"maintenance_windows": {0: [(25, 35)], 1: [(15, 25)], 2: [(10, 20)], 3: [(40, 50)]},
"weight_makespan": 0.5,
"weight_energy": 0.5,
}
# ==== 工具函数 ====
def parse_maintenance_input(text):
"""
将输入的维修时间窗口文本解析成字典格式,
例如: "0:25-35,40-50;1:15-25;2:10-20"
转为 {0:[(25,35),(40,50)],1:[(15,25)],2:[(10,20)]}
"""
result = {}
if not text.strip():
return result
try:
groups = text.strip().split(';')
for g in groups:
m_str, windows_str = g.split(':')
m = int(m_str.strip())
windows = []
for w in windows_str.split(','):
s, e = w.strip().split('-')
windows.append((int(s), int(e)))
result[m] = windows
return result
except Exception as e:
messagebox.showerror("输入错误", f"维修时间窗口格式错误,请检查输入。\n示例: 0:25-35,40-50;1:15-25")
return None
# ==== 数据生成 ====
def generate_jobs(n_jobs, n_machines):
"""
生成作业,随机2-4个工序,每工序分配随机机器和加工时间
结构:
jobs = [
[ {机器:时间, ...}, # 工序0
{机器:时间, ...}, # 工序1
...
],
...
]
"""
jobs = []
for j in range(n_jobs):
n_ops = random.randint(2, 4)
ops = []
for _ in range(n_ops):
machines = random.sample(range(n_machines), k=random.randint(2, n_machines))
durations = {m: random.randint(4, 10) for m in machines}
ops.append(durations)
jobs.append(ops)
return jobs
# ==== 调度核心 ====
def is_in_maintenance(machine, start, end, maintenance_windows):
for s, e in maintenance_windows.get(machine, []):
if not (end <= s or start >= e):
return True
return False
def decode(chromosome, jobs, robot_transfer_time, energy_costs, maintenance_windows):
n_machines = len(energy_costs)
machine_timeline = [[] for _ in range(n_machines)]
job_op_idx = [0] * len(jobs)
job_end_time = [0] * len(jobs)
total_energy = 0
schedule = []
for job_id in chromosome:
op_idx = job_op_idx[job_id]
op_options = jobs[job_id][op_idx]
best_start = float('inf')
best_machine, best_duration = -1, 0
for m, t in op_options.items():
m_last = machine_timeline[m][-1][1] if machine_timeline[m] else 0
earliest = max(job_end_time[job_id] + robot_transfer_time, m_last)
while is_in_maintenance(m, earliest, earliest + t, maintenance_windows):
earliest += 1
if earliest < best_start:
best_start = earliest
best_machine = m
best_duration = t
start = best_start
end = start + best_duration
machine_timeline[best_machine].append((start, end))
job_end_time[job_id] = end
job_op_idx[job_id] += 1
schedule.append((job_id, op_idx, best_machine, start, end))
total_energy += best_duration * energy_costs[best_machine]
makespan = max(job_end_time)
return makespan, total_energy, schedule
# ==== 算法 ====
def run_ga(jobs, robot_transfer_time, energy_costs, maintenance_windows, repeat_times, weight_makespan, weight_energy):
def single_run():
def initialize_population():
base = []
for j, ops in enumerate(jobs):
base.extend([j] * len(ops))
return [random.sample(base, len(base)) for _ in range(40)]
def crossover(p1, p2):
cut = random.randint(1, len(p1) - 1)
child = p1[:cut]
for g in p2:
if child.count(g) < p2.count(g):
child.append(g)
return child
def mutate(chrom, rate=0.2):
if random.random() < rate:
i, j = random.sample(range(len(chrom)), 2)
chrom[i], chrom[j] = chrom[j], chrom[i]
population = initialize_population()
evaluated = [(decode(chrom, jobs, robot_transfer_time, energy_costs, maintenance_windows)[0],
decode(chrom, jobs, robot_transfer_time, energy_costs, maintenance_windows)[1],
chrom,
decode(chrom, jobs, robot_transfer_time, energy_costs, maintenance_windows)[2]) for chrom in population]
for _ in range(40):
offspring = []
while len(offspring) < 40:
p1, p2 = random.sample(evaluated, 2)
child = crossover(p1[2], p2[2])
mutate(child)
mk, en, sch = decode(child, jobs, robot_transfer_time, energy_costs, maintenance_windows)
offspring.append((mk, en, child, sch))
evaluated += offspring
evaluated = sorted(evaluated, key=lambda x: weight_makespan * x[0] + weight_energy * x[1])[:40]
return evaluated[0]
return [single_run() for _ in range(repeat_times)]
def run_sa(jobs, robot_transfer_time, energy_costs, maintenance_windows, repeat_times, weight_makespan, weight_energy):
def single_run():
base = []
for j, ops in enumerate(jobs):
base.extend([j] * len(ops))
current = random.sample(base, len(base))
mk, en, sch = decode(current, jobs, robot_transfer_time, energy_costs, maintenance_windows)
best_score = weight_makespan * mk + weight_energy * en
best = (mk, en, current[:], sch)
T = 100
for _ in range(200):
new = current[:]
i, j = random.sample(range(len(new)), 2)
new[i], new[j] = new[j], new[i]
new_mk, new_en, new_sch = decode(new, jobs, robot_transfer_time, energy_costs, maintenance_windows)
new_score = weight_makespan * new_mk + weight_energy * new_en
if new_score < best_score or random.random() < math.exp(-(new_score - best_score) / T):
current = new[:]
if new_score < best_score:
best_score = new_score
best = (new_mk, new_en, new[:], new_sch)
T *= 0.95
return best
return [single_run() for _ in range(repeat_times)]
def run_nsga2(jobs, robot_transfer_time, energy_costs, maintenance_windows, repeat_times):
def dominates(a, b):
return (a[0] <= b[0] and a[1] <= b[1]) and (a[0] < b[0] or a[1] < b[1])
def non_dominated_sort(pop):
S, n, front = [[] for _ in pop], [0] * len(pop), [[]]
for p in range(len(pop)):
for q in range(len(pop)):
if dominates(pop[p][:2], pop[q][:2]):
S[p].append(q)
elif dominates(pop[q][:2], pop[p][:2]):
n[p] += 1
if n[p] == 0:
front[0].append(p)
i = 0
while front[i]:
next_front = []
for p in front[i]:
for q in S[p]:
n[q] -= 1
if n[q] == 0:
next_front.append(q)
i += 1
front.append(next_front)
return front[:-1]
def single_run():
base = []
for j, ops in enumerate(jobs):
base.extend([j] * len(ops))
def initialize_population():
return [random.sample(base, len(base)) for _ in range(40)]
def crossover(p1, p2):
cut = random.randint(1, len(p1) - 1)
child = p1[:cut]
for g in p2:
if child.count(g) < p2.count(g):
child.append(g)
return child
def mutate(chrom, rate=0.2):
if random.random() < rate:
i, j = random.sample(range(len(chrom)), 2)
chrom[i], chrom[j] = chrom[j], chrom[i]
population = initialize_population()
evaluated = [(decode(chrom, jobs, robot_transfer_time, energy_costs, maintenance_windows)[0],
decode(chrom, jobs, robot_transfer_time, energy_costs, maintenance_windows)[1],
chrom,
decode(chrom, jobs, robot_transfer_time, energy_costs, maintenance_windows)[2]) for chrom in population]
for _ in range(40):
offspring = []
while len(offspring) < 40:
p1, p2 = random.sample(evaluated, 2)
child = crossover(p1[2], p2[2])
mutate(child)
mk, en, sch = decode(child, jobs, robot_transfer_time, energy_costs, maintenance_windows)
offspring.append((mk, en, child, sch))
combined = evaluated + offspring
fronts = non_dominated_sort(combined)
new_pop = []
for front in fronts:
for i in front:
if len(new_pop) < 40:
new_pop.append(combined[i])
evaluated = new_pop
return min(evaluated, key=lambda x: 0.5 * x[0] + 0.5 * x[1])
return [single_run() for _ in range(repeat_times)]
# ==== 结果汇总 ====
def summarize_results(results, method_name="算法", weight_makespan=0.5, weight_energy=0.5):
makespans = [r[0] for r in results]
energies = [r[1] for r in results]
obj_scores = [weight_makespan * m + weight_energy * e for m, e in zip(makespans, energies)]
pareto_set = sorted(set((m, e) for m, e in zip(makespans, energies)))
summary = f"\n📊 {method_name} 优化结果统计\n"
summary += f"样本数:{len(results)}\n"
summary += f"最小完工时间: {min(makespans):.2f}\n"
summary += f"最小能耗: {min(energies):.2f}\n"
summary += f"目标函数最优值: {min(obj_scores):.2f}\n"
summary += f"完工时间 Std: {np.std(makespans):.2f}\n"
summary += f"能耗 Std: {np.std(energies):.2f}\n"
summary += f"Pareto 前沿点数量: {len(pareto_set)}\n"
return summary
# ==== 调度动态演示辅助 ====
class GanttAnimator:
def __init__(self, schedule, n_machines, maintenance_windows, canvas_frame):
self.schedule = schedule
self.n_machines = n_machines
self.maintenance_windows = maintenance_windows
self.canvas_frame = canvas_frame
self.fig, self.ax = plt.subplots(figsize=(12,5))
self.canvas = FigureCanvasTkAgg(self.fig, master=self.canvas_frame)
self.canvas.get_tk_widget().pack(fill='both', expand=True)
self.job_colors = defaultdict(lambda: plt.get_cmap("tab20")(random.random()))
self.current_time = 0
self.max_time = max(s[4] for s in schedule)
self.anim_running = False
def draw_frame(self):
self.ax.clear()
for job_id, op_idx, machine, start, end in self.schedule:
if start <= self.current_time:
width = min(end, self.current_time) - start
if width > 0:
self.ax.barh(machine, width, left=start, color=self.job_colors[job_id], edgecolor='black')
self.ax.text((start + min(end, self.current_time)) / 2, machine,
f'J{job_id}-O{op_idx}', ha='center', va='center', color='white', fontsize=8)
for m, windows in self.maintenance_windows.items():
for s, e in windows:
if s <= self.current_time:
width = min(e, self.current_time) - s
if width > 0:
self.ax.barh(m, width, left=s, color='gray', alpha=0.3, edgecolor='red', hatch='//')
self.ax.set_yticks(list(range(self.n_machines)))
self.ax.set_yticklabels([f'M{m}' for m in range(self.n_machines)])
self.ax.set_xlabel("Time")
self.ax.set_title("Gantt chart")
self.ax.set_xlim(0, self.max_time + 5)
self.ax.grid(True)
self.canvas.draw()
def start(self):
self.anim_running = True
self.current_time = 0
self._step()
def _step(self):
if not self.anim_running:
return
if self.current_time > self.max_time + 5:
self.anim_running = False
return
self.draw_frame()
self.current_time += 1
self.canvas.get_tk_widget().after(100, self._step)
def stop(self):
self.anim_running = False
# ==== 作业编辑窗口 ====
class JobEditor(simpledialog.Dialog):
def __init__(self, parent, jobs, n_machines):
self.jobs = jobs
self.n_machines = n_machines
super().__init__(parent, title="作业详细编辑")
def body(self, master):
self.frames = []
self.entries = []
ttk.Label(master, text="请编辑各作业各工序机器与加工时间,格式:机器编号:时间,机器编号:时间").grid(row=0, column=0, sticky='w')
for i, job in enumerate(self.jobs):
f = ttk.LabelFrame(master, text=f"作业 {i}")
f.grid(row=i+1, column=0, sticky='ew', pady=5)
self.frames.append(f)
for j, op in enumerate(job):
label = ttk.Label(f, text=f"工序 {j}:")
label.grid(row=j, column=0, sticky='w')
# 格式化成字符串
s = ",".join([f"{m}:{t}" for m,t in op.items()])
ent = ttk.Entry(f, width=40)
ent.insert(0, s)
ent.grid(row=j, column=1, sticky='w')
self.entries.append((i,j,ent))
return None
def apply(self):
# 解析输入,更新self.jobs
for job_id, op_idx, ent in self.entries:
text = ent.get().strip()
if not text:
continue
try:
parts = text.split(',')
d = {}
for p in parts:
m_str, t_str = p.split(':')
m = int(m_str.strip())
t = int(t_str.strip())
if m < 0 or m >= self.n_machines or t <= 0:
raise ValueError
d[m] = t
self.jobs[job_id][op_idx] = d
except:
messagebox.showerror("输入错误", f"作业{job_id}工序{op_idx}格式错误,请使用 机器编号:时间,机器编号:时间 格式,机器编号应在[0-{self.n_machines-1}]内,时间为正整数。")
# ==== 主GUI ====
class SchedulerApp:
def __init__(self, master):
self.master = master
master.title("智能调度系统")
self.params = DEFAULT_PARAMS.copy()
# 顶部参数框架
self.param_frame = ttk.Frame(master)
self.param_frame.pack(fill='x', padx=10, pady=5)
# 作业数
ttk.Label(self.param_frame, text="作业数:").grid(row=0, column=0)
self.n_jobs_var = tk.IntVar(value=self.params["n_jobs"])
self.n_jobs_spin = ttk.Spinbox(self.param_frame, from_=1, to=100, textvariable=self.n_jobs_var, width=5)
self.n_jobs_spin.grid(row=0, column=1)
# 机器数
ttk.Label(self.param_frame, text="机器数:").grid(row=0, column=2)
self.n_machines_var = tk.IntVar(value=self.params["n_machines"])
self.n_machines_spin = ttk.Spinbox(self.param_frame, from_=1, to=20, textvariable=self.n_machines_var, width=5)
self.n_machines_spin.grid(row=0, column=3)
# 重复次数
ttk.Label(self.param_frame, text="重复次数:").grid(row=0, column=4)
self.repeat_times_var = tk.IntVar(value=self.params["repeat_times"])
self.repeat_times_spin = ttk.Spinbox(self.param_frame, from_=1, to=20, textvariable=self.repeat_times_var, width=5)
self.repeat_times_spin.grid(row=0, column=5)
# 机器人转移时间
ttk.Label(self.param_frame, text="机器人转移时间:").grid(row=1, column=0)
self.robot_transfer_time_var = tk.IntVar(value=self.params["robot_transfer_time"])
self.robot_transfer_time_spin = ttk.Spinbox(self.param_frame, from_=0, to=20, textvariable=self.robot_transfer_time_var, width=5)
self.robot_transfer_time_spin.grid(row=1, column=1)
# 能耗系数,逗号分隔
ttk.Label(self.param_frame, text="机器能耗系数(逗号分隔):").grid(row=1, column=2)
self.energy_costs_var = tk.StringVar(value=",".join(map(str, self.params["energy_costs"])))
self.energy_costs_entry = ttk.Entry(self.param_frame, textvariable=self.energy_costs_var, width=20)
self.energy_costs_entry.grid(row=1, column=3)
# 维修时间窗口
ttk.Label(self.param_frame, text="维修时间窗口(格式如0:25-35,40-50;1:15-25):").grid(row=2, column=0, columnspan=3, sticky='w')
self.maintenance_var = tk.StringVar(value=";".join(
[f"{m}:" + ",".join(f"{s}-{e}" for s,e in ws) for m, ws in self.params["maintenance_windows"].items()]
))
self.maintenance_entry = ttk.Entry(self.param_frame, textvariable=self.maintenance_var, width=50)
self.maintenance_entry.grid(row=2, column=3, columnspan=3, sticky='w')
# 多目标权重
ttk.Label(self.param_frame, text="目标权重(完工时间):").grid(row=3, column=0)
self.weight_makespan_var = tk.DoubleVar(value=self.params["weight_makespan"])
self.weight_makespan_scale = ttk.Scale(self.param_frame, from_=0, to=1, variable=self.weight_makespan_var, orient='horizontal')
self.weight_makespan_scale.grid(row=3, column=1)
ttk.Label(self.param_frame, text="目标权重(能耗):").grid(row=3, column=2)
self.weight_energy_var = tk.DoubleVar(value=self.params["weight_energy"])
self.weight_energy_scale = ttk.Scale(self.param_frame, from_=0, to=1, variable=self.weight_energy_var, orient='horizontal')
self.weight_energy_scale.grid(row=3, column=3)
# 按钮框架
self.btn_frame = ttk.Frame(master)
self.btn_frame.pack(fill='x', padx=10, pady=5)
self.gen_jobs_btn = ttk.Button(self.btn_frame, text="生成随机作业", command=self.generate_jobs)
self.gen_jobs_btn.grid(row=0, column=0, padx=5)
self.edit_jobs_btn = ttk.Button(self.btn_frame, text="编辑作业详细", command=self.edit_jobs)
self.edit_jobs_btn.grid(row=0, column=1, padx=5)
self.run_ga_btn = ttk.Button(self.btn_frame, text="运行遗传算法", command=self.run_ga)
self.run_ga_btn.grid(row=0, column=2, padx=5)
self.run_sa_btn = ttk.Button(self.btn_frame, text="运行模拟退火", command=self.run_sa)
self.run_sa_btn.grid(row=0, column=3, padx=5)
self.run_nsga_btn = ttk.Button(self.btn_frame, text="运行NSGA-II", command=self.run_nsga)
self.run_nsga_btn.grid(row=0, column=4, padx=5)
self.show_anim_btn = ttk.Button(self.btn_frame, text="动态甘特图演示", command=self.show_animation, state='disabled')
self.show_anim_btn.grid(row=0, column=5, padx=5)
self.export_btn = ttk.Button(self.btn_frame, text="导出最佳调度CSV", command=self.export_schedule, state='disabled')
self.export_btn.grid(row=0, column=6, padx=5)
# 结果文本框
self.result_text = tk.Text(master, height=15)
self.result_text.pack(fill='both', padx=10, pady=5, expand=True)
# 初始化作业
self.jobs = generate_jobs(self.params["n_jobs"], self.params["n_machines"])
self.best_schedule = None
def validate_and_update_params(self):
try:
n_jobs = self.n_jobs_var.get()
n_machines = self.n_machines_var.get()
repeat_times = self.repeat_times_var.get()
robot_transfer_time = self.robot_transfer_time_var.get()
energy_costs = list(map(float, self.energy_costs_var.get().split(',')))
if len(energy_costs) != n_machines:
raise ValueError("能耗系数数量应与机器数相同")
maintenance_windows = parse_maintenance_input(self.maintenance_var.get())
if maintenance_windows is None:
return False
w1 = self.weight_makespan_var.get()
w2 = self.weight_energy_var.get()
if abs(w1 + w2 - 1.0) > 0.1:
messagebox.showwarning("权重提示", "建议权重和为1,可调整")
self.params.update({
"n_jobs": n_jobs,
"n_machines": n_machines,
"repeat_times": repeat_times,
"robot_transfer_time": robot_transfer_time,
"energy_costs": energy_costs,
"maintenance_windows": maintenance_windows,
"weight_makespan": w1,
"weight_energy": w2,
})
return True
except Exception as e:
messagebox.showerror("参数错误", f"参数输入有误: {e}")
return False
def generate_jobs(self):
if not self.validate_and_update_params():
return
self.jobs = generate_jobs(self.params["n_jobs"], self.params["n_machines"])
self.result_text.insert('end', "✅ 随机作业生成成功\n")
self.best_schedule = None
self.show_anim_btn.config(state='disabled')
self.export_btn.config(state='disabled')
def edit_jobs(self):
if not self.validate_and_update_params():
return
editor = JobEditor(self.master, self.jobs, self.params["n_machines"])
self.jobs = editor.jobs
self.result_text.insert('end', "✅ 作业详细编辑完成\n")
self.best_schedule = None
self.show_anim_btn.config(state='disabled')
self.export_btn.config(state='disabled')
def run_ga(self):
if not self.validate_and_update_params():
return
self.result_text.insert('end', "⏳ 遗传算法运行中...\n")
self.master.update()
results = run_ga(
self.jobs,
self.params["robot_transfer_time"],
self.params["energy_costs"],
self.params["maintenance_windows"],
self.params["repeat_times"],
self.params["weight_makespan"],
self.params["weight_energy"],
)
summary = summarize_results(results, "遗传算法", self.params["weight_makespan"], self.params["weight_energy"])
self.result_text.insert('end', summary)
self.best_schedule = min(results, key=lambda x: self.params["weight_makespan"] * x[0] + self.params["weight_energy"] * x[1])
self.show_anim_btn.config(state='normal')
self.export_btn.config(state='normal')
def run_sa(self):
if not self.validate_and_update_params():
return
self.result_text.insert('end', "⏳ 模拟退火运行中...\n")
self.master.update()
results = run_sa(
self.jobs,
self.params["robot_transfer_time"],
self.params["energy_costs"],
self.params["maintenance_windows"],
self.params["repeat_times"],
self.params["weight_makespan"],
self.params["weight_energy"],
)
summary = summarize_results(results, "模拟退火", self.params["weight_makespan"], self.params["weight_energy"])
self.result_text.insert('end', summary)
self.best_schedule = min(results, key=lambda x: self.params["weight_makespan"] * x[0] + self.params["weight_energy"] * x[1])
self.show_anim_btn.config(state='normal')
self.export_btn.config(state='normal')
def run_nsga(self):
if not self.validate_and_update_params():
return
self.result_text.insert('end', "⏳ NSGA-II运行中...\n")
self.master.update()
results = run_nsga2(
self.jobs,
self.params["robot_transfer_time"],
self.params["energy_costs"],
self.params["maintenance_windows"],
self.params["repeat_times"],
)
summary = summarize_results(results, "NSGA-II", 0.5, 0.5)
self.result_text.insert('end', summary)
self.best_schedule = min(results, key=lambda x: 0.5 * x[0] + 0.5 * x[1])
self.show_anim_btn.config(state='normal')
self.export_btn.config(state='normal')
def show_animation(self):
if self.best_schedule is None:
messagebox.showwarning("无调度数据", "请先运行调度算法得到结果!")
return
anim_win = tk.Toplevel(self.master)
anim_win.title("调度动态甘特图演示")
animator = GanttAnimator(self.best_schedule[3], self.params["n_machines"], self.params["maintenance_windows"], anim_win)
animator.start()
def export_schedule(self):
if self.best_schedule is None:
messagebox.showwarning("无调度数据", "请先运行调度算法得到结果!")
return
path = filedialog.asksaveasfilename(defaultextension=".csv", filetypes=[("CSV文件","*.csv")])
if not path:
return
with open(path, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(["作业ID", "工序号", "机器号", "开始时间", "结束时间"])
for row in self.best_schedule[3]:
writer.writerow(row)
messagebox.showinfo("导出成功", f"调度结果已导出到 {path}")
if __name__ == "__main__":
root = tk.Tk()
app = SchedulerApp(root)
root.mainloop()
更多推荐
所有评论(0)