1.1 项目背景

柔性作业车间调度问题(Flexible Job Shop Scheduling Problem, FJSP)是现代智能制造中最具挑战性的调度问题之一。与传统作业车间调度(JSP)相比,FJSP增加了工序的灵活性:每道工序可以选择多个不同的机器进行加工,这种灵活性虽然提升了资源利用率,但也大幅增加了问题的复杂度(NP-hard问题)。

在实际制造过程中,除了考虑作业的完工时间外,还必须纳入以下多个因素:

  1. 机器人转运时间:不同工序之间的搬运需时;
  2. 机器能耗差异:加工设备的能耗不同,影响运行成本;
  3. 设备维护窗口:部分时间段内机器不能运行。

为此,本项目设计并实现了一个图形化智能调度系统,支持上述约束,集成三种优化算法,目标是实现高效、可视、可操作的柔性调度。

1.2 系统目标

本系统旨在构建一个面向研究与教学场景的智能调度平台,具备如下核心能力:

1.多约束调度建模:

  1. 考虑完工时间、能耗、转运时间与设备维护等现实因素;

2.作业与调度数据可视化:

  1. 支持作业结构随机生成与人工编辑;
  2. 实现甘特图形式的调度方案动态演示;

3.多算法求解与对比分析:

  1. 实现遗传算法(GA)、模拟退火(SA)、NSGA-II三种主流调度优化算法;
  2. 支持多目标优化与解质量统计;

4.调度结果导出与系统集成:

  1. 可将调度方案导出为CSV,支持后续数据分析或对接生产系统。

二、系统架构设计

2.1 总体架构图(逻辑结构)

SchedulerApp (调度主控制器)

├── 参数配置模块(Spinbox、Entry、Scale)

├── 作业生成与编辑模块(随机生成 + GUI人工编辑)

├── 优化算法模块

│   ├── 遗传算法(GA)

│   ├── 模拟退火算法(SA)

│   └── NSGA-II(多目标优化)

├── 调度结果汇总模块(调度评价与Pareto分析)

├── 甘特图演示模块(Matplotlib动画)

└── 结果导出模块(CSV写入)

2.2 模块功能详解

2.2.1 参数配置模块

本模块提供用户友好的调度参数输入功能,包括:

  1. 作业数(n_jobs):控制作业数量;
  2. 机器数(n_machines):车间可用设备数量;
  3. 重复运行次数:用于统计分析;
  4. 能耗系数(energy_costs):每台机器单位时间的能耗;
  5. 维护窗口(maintenance_windows):各设备不可用时间段;
  6. 目标函数权重设置:在完工时间(makespan)和能耗之间分配权重。

实现形式:基于 ttk.Entry, ttk.Scale, Spinbox 等控件,并加入输入校验。

2.2.2 作业生成与编辑模块

在柔性作业车间调度(FJSP)模型中,每个作业由若干工序组成,每道工序可以在多个机器中选择其中一台完成,加工时间因机器不同而异。因此,调度的关键在于工序分配(选择哪台机器)和工序排序(调度先后顺序)。为支持算法运行与可视化模拟,本模块负责生成、组织与编辑调度数据。

数据结构定义

系统中作业的结构为一个嵌套列表,形式如下:

jobs = [

    [ {0:6, 2:8}, {1:4, 3:7} ],     # 作业0:工序0可在机器0和2加工,耗时6和8;工序1可在1和3加工

    [ {1:5, 2:6}, {0:6} ],        # 作业1:两道工序,第一道可选机器1或2,第二道只能在机器0加工

    ...

]
  1. 外层列表:表示多个作业;
  2. 每个作业由若干工序组成(子列表);
  3. 每道工序为一个字典:键为机器编号,值为该机器上加工该工序所需时间。
随机生成作业结构

在用户点击“生成随机作业”按钮后,系统自动调用 generate_jobs() 函数,执行以下步骤:

  1. 工序数设定:为每个作业随机生成 2~4 道工序;
  2. 机器选择:为每道工序随机分配 2~n 台机器;
  3. 加工时间设定:为每个选中机器分配一个加工时间(4~10个时间单位之间);
  4. 结果存储:将结果以嵌套列表结构保存在 self.jobs 中,供后续算法调度与绘图调用。
人工编辑作业结构

用户可以通过点击“编辑作业详细”按钮,在弹出的窗口中对每道工序的机器选项与加工时间进行逐项修改。界面设计:

  1. 使用 Toplevel + LabelFrame + Entry 构建多层嵌套窗口;
  2. 每个作业显示为一个 LabelFrame,包含若干工序的输入框;
  3. 每道工序的输入格式为:
机器编号1:时间1, 机器编号2:时间2, ...

例如:

0:6, 2:9, ...

2.2.3 调度优化算法模块

为了求解柔性作业车间调度问题(FJSP)这一NP难问题,本系统集成了三种具有代表性的智能优化算法:遗传算法(GA)、模拟退火算法(SA)、非支配排序遗传算法(NSGA-II)。这三种算法分别代表了经典启发式搜索、局部搜索、以及多目标进化优化策略,具有互补性。

  • 遗传算法(Genetic Algorithm, GA)

模拟生物自然选择和遗传机制,通过种群进化迭代寻优,广泛应用于组合优化问题。

表2.1遗传算法实现细节

模块

描述

染色体表示

每个染色体是一个作业编号序列,例如 [0,1,1,2,0,2,...] 表示作业和工序的加工顺序。作业编号出现的次数等于工序数。

初始种群

随机生成若干个合法的作业序列,形成初始解集(如40个个体)。

适应度计算

对每个个体,调用 decode() 函数生成调度方案,返回 完工时间(makespan) 和 总能耗(energy)。通过 weighted_score = α * makespan + β * energy 计算目标函数值。

选择机制

使用精英保留策略,即每代保留若干最优个体,确保最优解不会被遗失。

交叉操作

使用单点交叉 + 保序修复策略:截取一部分父代染色体,余下部分按照另一父代中的顺序补全,避免基因重复或遗漏。

变异机制

以一定概率(如0.2)在染色体中随机选择两个基因位置交换,增加种群多样性。

终止条件

固定迭代次数(如40代),输出适应度最优的个体作为结果。

  • 模拟退火算法(Simulated Annealing, SA)

模拟物理退火过程,接受一定概率的“差解”以跳出局部最优。适用于精细调节搜索路径的问题。

表2.2模拟退火算法实现细节

模块

描述

状态定义

当前调度序列作为系统状态(即作业编号排列)。

邻域扰动

通过随机交换两个位置的作业编号生成新的邻域解。

评价函数

与GA相同,使用完工时间与能耗加权和作为目标函数值。

接受准则

若新解优于当前解,则接受;否则以概率 exp(-(ΔE/T)) 接受劣解,其中 ΔE 为目标值差距,T 为当前温度。

降温策略

使用指数递减法:T = T * α(例如 α=0.95),每轮迭代后降低温度。

终止条件

达到固定迭代次数(如200轮),输出当前最优调度方案。

  • 非支配排序遗传算法II(NSGA-II)

NSGA-II 是一种广泛应用于多目标优化的问题求解器,旨在寻找非支配解集(Pareto前沿),而非单一最优解。

表2.3非支配排序遗传算法实现细节

模块

描述

染色体表示

与GA相同,使用作业编号序列编码;

初始种群

随机生成若干解组成种群(如40个);

目标维度

评估两个目标:① 完工时间(makespan);② 总能耗(energy);

非支配排序

使用经典Dominance Rule对种群进行排序,识别出非支配前沿(Pareto解集);

拥挤距离

用于衡量解在目标空间中分布稠密度,鼓励解集多样性;

选择与繁殖

从当前父代与子代中通过非支配排序与拥挤距离综合选择,生成下一代种群;

迭代更新

迭代若干代(如40轮)后,输出最优的非支配前沿点集合。

2.2.4 调度结果汇总模块

调度算法的运行结果具有随机性和多样性,尤其在采用元启发式算法(如 GA/SA/NSGA-II)时,为保证输出结果的稳定性与可信度,系统支持多次重复运行(由参数 repeat_times 控制),并在运行结束后对结果进行统计汇总与结构化展示。

每轮算法运行输出的核心指标为:

  1. 完工时间(Makespan):所有作业完成所需的最大时间;
  2. 能耗(Energy):调度方案下所有机器运行总能耗(考虑能耗系数);
  3. 调度方案:包括每道工序的机器编号、起始时间和结束时间。

在多次运行后,系统进一步汇总以下统计信息:

表2.4统计信息

统计项

含义

最小完工时间

所有结果中的最低完工时间

最小能耗

所有结果中的最低能耗值

最优目标函数值

对每个结果根据目标函数 α * makespan + β * energy 计算得分,输出最小值

完工时间标准差

多次运行的完工时间标准差,衡量波动性

能耗标准差

能耗在不同调度方案中的波动性

Pareto前沿解数量

(仅适用于NSGA-II)输出非支配调度解集数量

调度优化的核心不仅在于结果数值的优劣,更在于其调度过程的可解释性与可视化。为此,本模块通过动态甘特图展示调度进度,是调度逻辑的关键表达形式之一。

基于 matplotlib + Tkinter canvas 实现动态甘特图:

  1. 每台机器为一条水平时间线;
  2. 加工区段以彩条展示,标注“作业-工序”编号;
  3. 维护窗口以灰色透明背景 + 斜线标记表示;
  4. 支持自动播放动画(时间推进);
  5. 可识别瓶颈段与资源冲突。

三、源代码结构说明

该调度系统以 Python 语言实现,采用面向对象结构,并使用 Tkinter 构建图形界面,matplotlib 实现动态可视化。整个系统封装在一个主文件中,内部划分为多个逻辑子模块,具备良好的可维护性与可扩展性。

├── DEFAULT_PARAMS             # 全局默认参数设置

├── 作业与维护窗口解析函数     # parse_maintenance_input(), generate_jobs()

├── 解码与调度函数             # decode(), is_in_maintenance()

├── 三种优化算法实现           # run_ga(), run_sa(), run_nsga2()

├── 结果汇总模块               # summarize_results()

├── 甘特图动画类               # GanttAnimator

├── 作业编辑弹窗类             # JobEditor

└── 主界面控制器类             # SchedulerApp(含主窗口UI与按钮逻辑)

1. 主类:SchedulerApp(调度系统 GUI 控制器)

该类负责整个系统的初始化、界面布局、参数读取、按钮绑定和调度算法触发,是系统的主控模块。

主要职责:

  1. 构建参数设置区、按钮操作区与文本输出区;
  2. 管理当前作业数据(self.jobs)和最优调度方案(self.best_schedule);
  3. 响应用户操作:生成作业、编辑作业、运行算法、展示动画、导出结果。

2. 调度解码函数 decode()

核心调度逻辑,将染色体(作业序列)翻译为调度方案,考虑以下约束:

  1. 工序按顺序依赖;
  2. 每台机器每时刻仅加工一个任务;
  3. 每个作业同一时间只能加工一道工序;
  4. 考虑机器人转运时间与维护窗口。

3. 三种调度算法模块

统一结构:每个算法均以 jobs、robot_transfer_time、energy_costs 等参数作为输入,返回多次调度结果列表 List[Tuple[makespan, energy, sequence, schedule]]。

run_ga()
  1. 包括初始化种群、选择、交叉、变异、淘汰;
  2. 迭代演化获取目标函数值最小解。

run_sa()

  1. 初始序列扰动;
  2. 模拟退火接受机制;
  3. 采用固定温度下降速率进行200轮迭代。

run_nsga2()

  1. 实现非支配排序;
  2. 采用Pareto最优解作为评估指标;
  3. 最终输出为 Pareto前沿点集中的一个最优个体(默认按平均目标加权排序选出)。

4. 作业编辑类 JobEditor(弹窗对话框)

此类继承自 simpledialog.Dialog,用于弹出窗口,编辑每个作业每道工序的机器-时间配置。

  1. 自动生成输入框;
  2. 校验输入合法性;
  3. 更新主程序中 self.jobs 数据结构。

5. GanttAnimator(动态甘特图动画类)

负责渲染调度方案的动态图形展示。

核心流程:

  1. __init__() 构建绘图界面;
  2. draw_frame() 渲染某一帧的作业条与维护窗口;
  3. _step() 控制帧间时间推进;
  4. start() 开始动画播放;
  5. stop() 停止播放。

支持实时刷新、维护标注、作业标注,并使用 tab20 colormap 分配颜色。

6. 辅助工具函数

函数名

说明

parse_maintenance_input(text)

解析用户输入的维修时间窗口文本,转为字典格式

generate_jobs(n_jobs, n_machines)

随机生成作业工序结构,每道工序含多个可选机器

is_in_maintenance(machine, start, end, windows)

判断时间段是否落入维护窗口

summarize_results(results, method_name, ...)

多次运行后的结果统计、格式化报告生成

7. 输出与导出模块

  1. 文本输出:所有统计报告通过 Text 控件插入结果(含分隔行、标题、数据);
  2. 文件导出:以 .csv 格式保存 schedule,由 export_schedule() 方法实现,调用 csv.writer 写入。

四、使用说明与运行演示

设置参数:选择作业数、机器数、能耗系数、目标权重等;

图4.1设置参数

生成作业数据:可随机生成或点击“编辑作业”手动设置;

图4.2随机生成

图4.3手动设置

运行调度算法:选择任意一种算法运行;

查看调度结果:文本框中展示统计信息;

图4.4运行遗传算法

演示调度进度:点击“动态甘特图演示”进行可视化播放;

图4.5甘特图演示

导出调度方案:点击“导出CSV”保存调度结果。

图4.6导出CSV

五、源代码

import tkinter as tk
from tkinter import ttk, simpledialog, messagebox, filedialog
import matplotlib.pyplot as plt
from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg
import numpy as np
import random
import math
import csv
from collections import defaultdict

# ==== 全局参数默认值 ====
DEFAULT_PARAMS = {
    "n_jobs": 6,
    "n_machines": 4,
    "repeat_times": 5,
    "robot_transfer_time": 2,
    "energy_costs": [1.0, 1.4, 1.2, 1.6],
    "maintenance_windows": {0: [(25, 35)], 1: [(15, 25)], 2: [(10, 20)], 3: [(40, 50)]},
    "weight_makespan": 0.5,
    "weight_energy": 0.5,
}

# ==== 工具函数 ====

def parse_maintenance_input(text):
    """
    将输入的维修时间窗口文本解析成字典格式,
    例如: "0:25-35,40-50;1:15-25;2:10-20"
    转为 {0:[(25,35),(40,50)],1:[(15,25)],2:[(10,20)]}
    """
    result = {}
    if not text.strip():
        return result
    try:
        groups = text.strip().split(';')
        for g in groups:
            m_str, windows_str = g.split(':')
            m = int(m_str.strip())
            windows = []
            for w in windows_str.split(','):
                s, e = w.strip().split('-')
                windows.append((int(s), int(e)))
            result[m] = windows
        return result
    except Exception as e:
        messagebox.showerror("输入错误", f"维修时间窗口格式错误,请检查输入。\n示例: 0:25-35,40-50;1:15-25")
        return None

# ==== 数据生成 ====

def generate_jobs(n_jobs, n_machines):
    """
    生成作业,随机2-4个工序,每工序分配随机机器和加工时间
    结构:
    jobs = [
      [ {机器:时间, ...},  # 工序0
        {机器:时间, ...},  # 工序1
        ...
      ],
      ...
    ]
    """
    jobs = []
    for j in range(n_jobs):
        n_ops = random.randint(2, 4)
        ops = []
        for _ in range(n_ops):
            machines = random.sample(range(n_machines), k=random.randint(2, n_machines))
            durations = {m: random.randint(4, 10) for m in machines}
            ops.append(durations)
        jobs.append(ops)
    return jobs

# ==== 调度核心 ====

def is_in_maintenance(machine, start, end, maintenance_windows):
    for s, e in maintenance_windows.get(machine, []):
        if not (end <= s or start >= e):
            return True
    return False

def decode(chromosome, jobs, robot_transfer_time, energy_costs, maintenance_windows):
    n_machines = len(energy_costs)
    machine_timeline = [[] for _ in range(n_machines)]
    job_op_idx = [0] * len(jobs)
    job_end_time = [0] * len(jobs)
    total_energy = 0
    schedule = []

    for job_id in chromosome:
        op_idx = job_op_idx[job_id]
        op_options = jobs[job_id][op_idx]
        best_start = float('inf')
        best_machine, best_duration = -1, 0
        for m, t in op_options.items():
            m_last = machine_timeline[m][-1][1] if machine_timeline[m] else 0
            earliest = max(job_end_time[job_id] + robot_transfer_time, m_last)
            while is_in_maintenance(m, earliest, earliest + t, maintenance_windows):
                earliest += 1
            if earliest < best_start:
                best_start = earliest
                best_machine = m
                best_duration = t
        start = best_start
        end = start + best_duration
        machine_timeline[best_machine].append((start, end))
        job_end_time[job_id] = end
        job_op_idx[job_id] += 1
        schedule.append((job_id, op_idx, best_machine, start, end))
        total_energy += best_duration * energy_costs[best_machine]

    makespan = max(job_end_time)
    return makespan, total_energy, schedule

# ==== 算法 ====

def run_ga(jobs, robot_transfer_time, energy_costs, maintenance_windows, repeat_times, weight_makespan, weight_energy):
    def single_run():
        def initialize_population():
            base = []
            for j, ops in enumerate(jobs):
                base.extend([j] * len(ops))
            return [random.sample(base, len(base)) for _ in range(40)]

        def crossover(p1, p2):
            cut = random.randint(1, len(p1) - 1)
            child = p1[:cut]
            for g in p2:
                if child.count(g) < p2.count(g):
                    child.append(g)
            return child

        def mutate(chrom, rate=0.2):
            if random.random() < rate:
                i, j = random.sample(range(len(chrom)), 2)
                chrom[i], chrom[j] = chrom[j], chrom[i]

        population = initialize_population()
        evaluated = [(decode(chrom, jobs, robot_transfer_time, energy_costs, maintenance_windows)[0],
                      decode(chrom, jobs, robot_transfer_time, energy_costs, maintenance_windows)[1],
                      chrom,
                      decode(chrom, jobs, robot_transfer_time, energy_costs, maintenance_windows)[2]) for chrom in population]
        for _ in range(40):
            offspring = []
            while len(offspring) < 40:
                p1, p2 = random.sample(evaluated, 2)
                child = crossover(p1[2], p2[2])
                mutate(child)
                mk, en, sch = decode(child, jobs, robot_transfer_time, energy_costs, maintenance_windows)
                offspring.append((mk, en, child, sch))
            evaluated += offspring
            evaluated = sorted(evaluated, key=lambda x: weight_makespan * x[0] + weight_energy * x[1])[:40]
        return evaluated[0]
    return [single_run() for _ in range(repeat_times)]

def run_sa(jobs, robot_transfer_time, energy_costs, maintenance_windows, repeat_times, weight_makespan, weight_energy):
    def single_run():
        base = []
        for j, ops in enumerate(jobs):
            base.extend([j] * len(ops))
        current = random.sample(base, len(base))
        mk, en, sch = decode(current, jobs, robot_transfer_time, energy_costs, maintenance_windows)
        best_score = weight_makespan * mk + weight_energy * en
        best = (mk, en, current[:], sch)
        T = 100
        for _ in range(200):
            new = current[:]
            i, j = random.sample(range(len(new)), 2)
            new[i], new[j] = new[j], new[i]
            new_mk, new_en, new_sch = decode(new, jobs, robot_transfer_time, energy_costs, maintenance_windows)
            new_score = weight_makespan * new_mk + weight_energy * new_en
            if new_score < best_score or random.random() < math.exp(-(new_score - best_score) / T):
                current = new[:]
                if new_score < best_score:
                    best_score = new_score
                    best = (new_mk, new_en, new[:], new_sch)
            T *= 0.95
        return best
    return [single_run() for _ in range(repeat_times)]

def run_nsga2(jobs, robot_transfer_time, energy_costs, maintenance_windows, repeat_times):
    def dominates(a, b):
        return (a[0] <= b[0] and a[1] <= b[1]) and (a[0] < b[0] or a[1] < b[1])

    def non_dominated_sort(pop):
        S, n, front = [[] for _ in pop], [0] * len(pop), [[]]
        for p in range(len(pop)):
            for q in range(len(pop)):
                if dominates(pop[p][:2], pop[q][:2]):
                    S[p].append(q)
                elif dominates(pop[q][:2], pop[p][:2]):
                    n[p] += 1
            if n[p] == 0:
                front[0].append(p)
        i = 0
        while front[i]:
            next_front = []
            for p in front[i]:
                for q in S[p]:
                    n[q] -= 1
                    if n[q] == 0:
                        next_front.append(q)
            i += 1
            front.append(next_front)
        return front[:-1]

    def single_run():
        base = []
        for j, ops in enumerate(jobs):
            base.extend([j] * len(ops))

        def initialize_population():
            return [random.sample(base, len(base)) for _ in range(40)]

        def crossover(p1, p2):
            cut = random.randint(1, len(p1) - 1)
            child = p1[:cut]
            for g in p2:
                if child.count(g) < p2.count(g):
                    child.append(g)
            return child

        def mutate(chrom, rate=0.2):
            if random.random() < rate:
                i, j = random.sample(range(len(chrom)), 2)
                chrom[i], chrom[j] = chrom[j], chrom[i]

        population = initialize_population()
        evaluated = [(decode(chrom, jobs, robot_transfer_time, energy_costs, maintenance_windows)[0],
                      decode(chrom, jobs, robot_transfer_time, energy_costs, maintenance_windows)[1],
                      chrom,
                      decode(chrom, jobs, robot_transfer_time, energy_costs, maintenance_windows)[2]) for chrom in population]

        for _ in range(40):
            offspring = []
            while len(offspring) < 40:
                p1, p2 = random.sample(evaluated, 2)
                child = crossover(p1[2], p2[2])
                mutate(child)
                mk, en, sch = decode(child, jobs, robot_transfer_time, energy_costs, maintenance_windows)
                offspring.append((mk, en, child, sch))
            combined = evaluated + offspring
            fronts = non_dominated_sort(combined)
            new_pop = []
            for front in fronts:
                for i in front:
                    if len(new_pop) < 40:
                        new_pop.append(combined[i])
            evaluated = new_pop
        return min(evaluated, key=lambda x: 0.5 * x[0] + 0.5 * x[1])

    return [single_run() for _ in range(repeat_times)]

# ==== 结果汇总 ====

def summarize_results(results, method_name="算法", weight_makespan=0.5, weight_energy=0.5):
    makespans = [r[0] for r in results]
    energies = [r[1] for r in results]
    obj_scores = [weight_makespan * m + weight_energy * e for m, e in zip(makespans, energies)]
    pareto_set = sorted(set((m, e) for m, e in zip(makespans, energies)))

    summary = f"\n📊 {method_name} 优化结果统计\n"
    summary += f"样本数:{len(results)}\n"
    summary += f"最小完工时间: {min(makespans):.2f}\n"
    summary += f"最小能耗: {min(energies):.2f}\n"
    summary += f"目标函数最优值: {min(obj_scores):.2f}\n"
    summary += f"完工时间 Std: {np.std(makespans):.2f}\n"
    summary += f"能耗 Std: {np.std(energies):.2f}\n"
    summary += f"Pareto 前沿点数量: {len(pareto_set)}\n"
    return summary

# ==== 调度动态演示辅助 ====

class GanttAnimator:
    def __init__(self, schedule, n_machines, maintenance_windows, canvas_frame):
        self.schedule = schedule
        self.n_machines = n_machines
        self.maintenance_windows = maintenance_windows
        self.canvas_frame = canvas_frame

        self.fig, self.ax = plt.subplots(figsize=(12,5))
        self.canvas = FigureCanvasTkAgg(self.fig, master=self.canvas_frame)
        self.canvas.get_tk_widget().pack(fill='both', expand=True)

        self.job_colors = defaultdict(lambda: plt.get_cmap("tab20")(random.random()))
        self.current_time = 0
        self.max_time = max(s[4] for s in schedule)
        self.anim_running = False

    def draw_frame(self):
        self.ax.clear()
        for job_id, op_idx, machine, start, end in self.schedule:
            if start <= self.current_time:
                width = min(end, self.current_time) - start
                if width > 0:
                    self.ax.barh(machine, width, left=start, color=self.job_colors[job_id], edgecolor='black')
                    self.ax.text((start + min(end, self.current_time)) / 2, machine,
                                 f'J{job_id}-O{op_idx}', ha='center', va='center', color='white', fontsize=8)

        for m, windows in self.maintenance_windows.items():
            for s, e in windows:
                if s <= self.current_time:
                    width = min(e, self.current_time) - s
                    if width > 0:
                        self.ax.barh(m, width, left=s, color='gray', alpha=0.3, edgecolor='red', hatch='//')

        self.ax.set_yticks(list(range(self.n_machines)))
        self.ax.set_yticklabels([f'M{m}' for m in range(self.n_machines)])
        self.ax.set_xlabel("Time")
        self.ax.set_title("Gantt chart")
        self.ax.set_xlim(0, self.max_time + 5)
        self.ax.grid(True)
        self.canvas.draw()

    def start(self):
        self.anim_running = True
        self.current_time = 0
        self._step()

    def _step(self):
        if not self.anim_running:
            return
        if self.current_time > self.max_time + 5:
            self.anim_running = False
            return
        self.draw_frame()
        self.current_time += 1
        self.canvas.get_tk_widget().after(100, self._step)

    def stop(self):
        self.anim_running = False

# ==== 作业编辑窗口 ====

class JobEditor(simpledialog.Dialog):
    def __init__(self, parent, jobs, n_machines):
        self.jobs = jobs
        self.n_machines = n_machines
        super().__init__(parent, title="作业详细编辑")

    def body(self, master):
        self.frames = []
        self.entries = []

        ttk.Label(master, text="请编辑各作业各工序机器与加工时间,格式:机器编号:时间,机器编号:时间").grid(row=0, column=0, sticky='w')

        for i, job in enumerate(self.jobs):
            f = ttk.LabelFrame(master, text=f"作业 {i}")
            f.grid(row=i+1, column=0, sticky='ew', pady=5)
            self.frames.append(f)

            for j, op in enumerate(job):
                label = ttk.Label(f, text=f"工序 {j}:")
                label.grid(row=j, column=0, sticky='w')

                # 格式化成字符串
                s = ",".join([f"{m}:{t}" for m,t in op.items()])
                ent = ttk.Entry(f, width=40)
                ent.insert(0, s)
                ent.grid(row=j, column=1, sticky='w')
                self.entries.append((i,j,ent))
        return None

    def apply(self):
        # 解析输入,更新self.jobs
        for job_id, op_idx, ent in self.entries:
            text = ent.get().strip()
            if not text:
                continue
            try:
                parts = text.split(',')
                d = {}
                for p in parts:
                    m_str, t_str = p.split(':')
                    m = int(m_str.strip())
                    t = int(t_str.strip())
                    if m < 0 or m >= self.n_machines or t <= 0:
                        raise ValueError
                    d[m] = t
                self.jobs[job_id][op_idx] = d
            except:
                messagebox.showerror("输入错误", f"作业{job_id}工序{op_idx}格式错误,请使用 机器编号:时间,机器编号:时间 格式,机器编号应在[0-{self.n_machines-1}]内,时间为正整数。")

# ==== 主GUI ====

class SchedulerApp:
    def __init__(self, master):
        self.master = master
        master.title("智能调度系统")
        self.params = DEFAULT_PARAMS.copy()

        # 顶部参数框架
        self.param_frame = ttk.Frame(master)
        self.param_frame.pack(fill='x', padx=10, pady=5)

        # 作业数
        ttk.Label(self.param_frame, text="作业数:").grid(row=0, column=0)
        self.n_jobs_var = tk.IntVar(value=self.params["n_jobs"])
        self.n_jobs_spin = ttk.Spinbox(self.param_frame, from_=1, to=100, textvariable=self.n_jobs_var, width=5)
        self.n_jobs_spin.grid(row=0, column=1)

        # 机器数
        ttk.Label(self.param_frame, text="机器数:").grid(row=0, column=2)
        self.n_machines_var = tk.IntVar(value=self.params["n_machines"])
        self.n_machines_spin = ttk.Spinbox(self.param_frame, from_=1, to=20, textvariable=self.n_machines_var, width=5)
        self.n_machines_spin.grid(row=0, column=3)

        # 重复次数
        ttk.Label(self.param_frame, text="重复次数:").grid(row=0, column=4)
        self.repeat_times_var = tk.IntVar(value=self.params["repeat_times"])
        self.repeat_times_spin = ttk.Spinbox(self.param_frame, from_=1, to=20, textvariable=self.repeat_times_var, width=5)
        self.repeat_times_spin.grid(row=0, column=5)

        # 机器人转移时间
        ttk.Label(self.param_frame, text="机器人转移时间:").grid(row=1, column=0)
        self.robot_transfer_time_var = tk.IntVar(value=self.params["robot_transfer_time"])
        self.robot_transfer_time_spin = ttk.Spinbox(self.param_frame, from_=0, to=20, textvariable=self.robot_transfer_time_var, width=5)
        self.robot_transfer_time_spin.grid(row=1, column=1)

        # 能耗系数,逗号分隔
        ttk.Label(self.param_frame, text="机器能耗系数(逗号分隔):").grid(row=1, column=2)
        self.energy_costs_var = tk.StringVar(value=",".join(map(str, self.params["energy_costs"])))
        self.energy_costs_entry = ttk.Entry(self.param_frame, textvariable=self.energy_costs_var, width=20)
        self.energy_costs_entry.grid(row=1, column=3)

        # 维修时间窗口
        ttk.Label(self.param_frame, text="维修时间窗口(格式如0:25-35,40-50;1:15-25):").grid(row=2, column=0, columnspan=3, sticky='w')
        self.maintenance_var = tk.StringVar(value=";".join(
            [f"{m}:" + ",".join(f"{s}-{e}" for s,e in ws) for m, ws in self.params["maintenance_windows"].items()]
        ))
        self.maintenance_entry = ttk.Entry(self.param_frame, textvariable=self.maintenance_var, width=50)
        self.maintenance_entry.grid(row=2, column=3, columnspan=3, sticky='w')

        # 多目标权重
        ttk.Label(self.param_frame, text="目标权重(完工时间):").grid(row=3, column=0)
        self.weight_makespan_var = tk.DoubleVar(value=self.params["weight_makespan"])
        self.weight_makespan_scale = ttk.Scale(self.param_frame, from_=0, to=1, variable=self.weight_makespan_var, orient='horizontal')
        self.weight_makespan_scale.grid(row=3, column=1)

        ttk.Label(self.param_frame, text="目标权重(能耗):").grid(row=3, column=2)
        self.weight_energy_var = tk.DoubleVar(value=self.params["weight_energy"])
        self.weight_energy_scale = ttk.Scale(self.param_frame, from_=0, to=1, variable=self.weight_energy_var, orient='horizontal')
        self.weight_energy_scale.grid(row=3, column=3)

        # 按钮框架
        self.btn_frame = ttk.Frame(master)
        self.btn_frame.pack(fill='x', padx=10, pady=5)

        self.gen_jobs_btn = ttk.Button(self.btn_frame, text="生成随机作业", command=self.generate_jobs)
        self.gen_jobs_btn.grid(row=0, column=0, padx=5)

        self.edit_jobs_btn = ttk.Button(self.btn_frame, text="编辑作业详细", command=self.edit_jobs)
        self.edit_jobs_btn.grid(row=0, column=1, padx=5)

        self.run_ga_btn = ttk.Button(self.btn_frame, text="运行遗传算法", command=self.run_ga)
        self.run_ga_btn.grid(row=0, column=2, padx=5)

        self.run_sa_btn = ttk.Button(self.btn_frame, text="运行模拟退火", command=self.run_sa)
        self.run_sa_btn.grid(row=0, column=3, padx=5)

        self.run_nsga_btn = ttk.Button(self.btn_frame, text="运行NSGA-II", command=self.run_nsga)
        self.run_nsga_btn.grid(row=0, column=4, padx=5)

        self.show_anim_btn = ttk.Button(self.btn_frame, text="动态甘特图演示", command=self.show_animation, state='disabled')
        self.show_anim_btn.grid(row=0, column=5, padx=5)

        self.export_btn = ttk.Button(self.btn_frame, text="导出最佳调度CSV", command=self.export_schedule, state='disabled')
        self.export_btn.grid(row=0, column=6, padx=5)

        # 结果文本框
        self.result_text = tk.Text(master, height=15)
        self.result_text.pack(fill='both', padx=10, pady=5, expand=True)

        # 初始化作业
        self.jobs = generate_jobs(self.params["n_jobs"], self.params["n_machines"])
        self.best_schedule = None

    def validate_and_update_params(self):
        try:
            n_jobs = self.n_jobs_var.get()
            n_machines = self.n_machines_var.get()
            repeat_times = self.repeat_times_var.get()
            robot_transfer_time = self.robot_transfer_time_var.get()
            energy_costs = list(map(float, self.energy_costs_var.get().split(',')))
            if len(energy_costs) != n_machines:
                raise ValueError("能耗系数数量应与机器数相同")
            maintenance_windows = parse_maintenance_input(self.maintenance_var.get())
            if maintenance_windows is None:
                return False
            w1 = self.weight_makespan_var.get()
            w2 = self.weight_energy_var.get()
            if abs(w1 + w2 - 1.0) > 0.1:
                messagebox.showwarning("权重提示", "建议权重和为1,可调整")
            self.params.update({
                "n_jobs": n_jobs,
                "n_machines": n_machines,
                "repeat_times": repeat_times,
                "robot_transfer_time": robot_transfer_time,
                "energy_costs": energy_costs,
                "maintenance_windows": maintenance_windows,
                "weight_makespan": w1,
                "weight_energy": w2,
            })
            return True
        except Exception as e:
            messagebox.showerror("参数错误", f"参数输入有误: {e}")
            return False

    def generate_jobs(self):
        if not self.validate_and_update_params():
            return
        self.jobs = generate_jobs(self.params["n_jobs"], self.params["n_machines"])
        self.result_text.insert('end', "✅ 随机作业生成成功\n")
        self.best_schedule = None
        self.show_anim_btn.config(state='disabled')
        self.export_btn.config(state='disabled')

    def edit_jobs(self):
        if not self.validate_and_update_params():
            return
        editor = JobEditor(self.master, self.jobs, self.params["n_machines"])
        self.jobs = editor.jobs
        self.result_text.insert('end', "✅ 作业详细编辑完成\n")
        self.best_schedule = None
        self.show_anim_btn.config(state='disabled')
        self.export_btn.config(state='disabled')

    def run_ga(self):
        if not self.validate_and_update_params():
            return
        self.result_text.insert('end', "⏳ 遗传算法运行中...\n")
        self.master.update()
        results = run_ga(
            self.jobs,
            self.params["robot_transfer_time"],
            self.params["energy_costs"],
            self.params["maintenance_windows"],
            self.params["repeat_times"],
            self.params["weight_makespan"],
            self.params["weight_energy"],
        )
        summary = summarize_results(results, "遗传算法", self.params["weight_makespan"], self.params["weight_energy"])
        self.result_text.insert('end', summary)
        self.best_schedule = min(results, key=lambda x: self.params["weight_makespan"] * x[0] + self.params["weight_energy"] * x[1])
        self.show_anim_btn.config(state='normal')
        self.export_btn.config(state='normal')

    def run_sa(self):
        if not self.validate_and_update_params():
            return
        self.result_text.insert('end', "⏳ 模拟退火运行中...\n")
        self.master.update()
        results = run_sa(
            self.jobs,
            self.params["robot_transfer_time"],
            self.params["energy_costs"],
            self.params["maintenance_windows"],
            self.params["repeat_times"],
            self.params["weight_makespan"],
            self.params["weight_energy"],
        )
        summary = summarize_results(results, "模拟退火", self.params["weight_makespan"], self.params["weight_energy"])
        self.result_text.insert('end', summary)
        self.best_schedule = min(results, key=lambda x: self.params["weight_makespan"] * x[0] + self.params["weight_energy"] * x[1])
        self.show_anim_btn.config(state='normal')
        self.export_btn.config(state='normal')

    def run_nsga(self):
        if not self.validate_and_update_params():
            return
        self.result_text.insert('end', "⏳ NSGA-II运行中...\n")
        self.master.update()
        results = run_nsga2(
            self.jobs,
            self.params["robot_transfer_time"],
            self.params["energy_costs"],
            self.params["maintenance_windows"],
            self.params["repeat_times"],
        )
        summary = summarize_results(results, "NSGA-II", 0.5, 0.5)
        self.result_text.insert('end', summary)
        self.best_schedule = min(results, key=lambda x: 0.5 * x[0] + 0.5 * x[1])
        self.show_anim_btn.config(state='normal')
        self.export_btn.config(state='normal')

    def show_animation(self):
        if self.best_schedule is None:
            messagebox.showwarning("无调度数据", "请先运行调度算法得到结果!")
            return
        anim_win = tk.Toplevel(self.master)
        anim_win.title("调度动态甘特图演示")
        animator = GanttAnimator(self.best_schedule[3], self.params["n_machines"], self.params["maintenance_windows"], anim_win)
        animator.start()

    def export_schedule(self):
        if self.best_schedule is None:
            messagebox.showwarning("无调度数据", "请先运行调度算法得到结果!")
            return
        path = filedialog.asksaveasfilename(defaultextension=".csv", filetypes=[("CSV文件","*.csv")])
        if not path:
            return
        with open(path, 'w', newline='') as f:
            writer = csv.writer(f)
            writer.writerow(["作业ID", "工序号", "机器号", "开始时间", "结束时间"])
            for row in self.best_schedule[3]:
                writer.writerow(row)
        messagebox.showinfo("导出成功", f"调度结果已导出到 {path}")


if __name__ == "__main__":
    root = tk.Tk()
    app = SchedulerApp(root)
    root.mainloop()

Logo

技术共进,成长同行——讯飞AI开发者社区

更多推荐