flowshop_transformer / source /create_dataset.py
younadi's picture
commit
6179bb6
import numpy as np
import os
import json
import itertools
import math
from loguru import logger
import tqdm
import time
import namer
def generate_random_pfsp_instance(nb_jobs, nb_machines, time_min, time_max, seed=97):
#TODO: add the possibility to simply load an instance from a file
"""
Generates a random instance of the Permutation Flow Shop Problem (PFSP).
Parameters:
- nb_jobs: Number of jobs (n).
- nb_machines: Number of machines (m).j,
- time_min: Minimum processing time for any job on any machine.
- time_max: Maximum processing time for any job on any machine.
Returns:
- A 2D list (matrix) of size (nb_jobs x nb_machines) where each entry is a random processing time between time_min and time_max.
"""
if seed is not None: np.random.seed(seed)
# create a pfsp_instance using the uniform distribution
pfsp_instance = np.random.uniform(low=time_min, high=time_max, size=(nb_jobs, nb_machines)).astype(np.float32)
return pfsp_instance
def fit_palmer(pfsp_instance: np.ndarray):
"""
Implements Palmer's heuristic for the flowshop scheduling problem. Returns a schedule and its corresponding makespan.
For now I am using an old code that performs palmer by interfacing with it, but it should be refactored to be cleaner and more efficient.
Parameters:
- pfsp_instance: A 2D numpy array where pfsp_instance[i][j] is the processing time of job i on machine j.
Returns:
- A tuple (schedule, makespan) where:
- schedule: A list of job indices representing the order of jobs (e.g., [0, 2, 1]).
- makespan: The total completion time for the given schedule.
"""
# =====================================================================================
class Palmer:
def __init__(self, jobs_list: list):
self.jobs_list = jobs_list
self.nb_jobs = len(jobs_list)
self.nb_machines = len(jobs_list[0])
self.seq_star = None
self.make_span_star = None
# utility function that returns the gantt cumule based on a job execution times and a previous gantt cumule
def cumulate(self, job: list, previous_cumul=None):
res = [0] * len(job)
if previous_cumul == None:
res[0] = job[0]
for i in range(1, len(job)):
res[i] = res[i - 1] + job[i]
else:
res[0] = previous_cumul[0] + job[0]
for i in range(1, len(job)):
res[i] = max(res[i - 1], previous_cumul[i]) + job[i]
return res
# utility function that computes the gantt cumule given only a job sequence (not used in the algorithm due to inneficiency
# dynamic programming with cumulate is used instead ...)
def cumulate_seq(self, seq: list):
cumulated = None
for i in seq:
cumulated = self.cumulate(self.jobs_list[i], cumulated)
return cumulated
# launching the optimization
def optim(self, debug=False):
jobs_weights = []
for i, job in zip(range(self.nb_jobs), self.jobs_list):
weight = 0
for j in range(self.nb_machines):
if debug == True:
print(
f">job {i} mach {j} first term: {(2*(j+1) - 1) - self.nb_machines}"
)
print(f">job {i} mach {j} second term: {job[j]}")
print(
"------------------------------------------------------------------"
)
weight += ((2 * (j + 1) - 1) - self.nb_machines) * job[j]
if debug == True:
print(f"===>> job {i} weight: {weight}")
jobs_weights.append((weight, i))
self.seq_star = [tu[1] for tu in sorted(jobs_weights, reverse=True)]
self.make_span_star = self.cumulate_seq(self.seq_star)[-1]
return (self.seq_star, self.make_span_star)
# =====================================================================================
# Interfacing with the underlying old palmer code
jobs_list = pfsp_instance.tolist()
palmer_schedule, palmer_makespan = Palmer(jobs_list).optim()
# Returning the schedule and makespan as numpy arrays of type int32
return np.array(palmer_schedule, dtype=np.int32), np.float32(palmer_makespan)
def fit_cds(pfsp_instance: np.ndarray):
"""
Implements CDS heuristic for the flowshop scheduling problem. Returns a schedule and its corresponding makespan.
For now I am using an old code that performs cds by interfacing with it, but it should be refactored to be cleaner and more efficient.
Parameters:
- pfsp_instance: A 2D numpy array where pfsp_instance[i][j] is the processing time of job i on machine j.
Returns:
- A tuple (schedule, makespan) where:
- schedule: A list of job indices representing the order of jobs (e.g., [0, 2, 1]).
- makespan: The total completion time for the given schedule.
"""
# =====================================================================================
# Function to cumulate job processing times
def cumulate(job, previous_cumul=None):
res = [0] * len(job)
if previous_cumul is None:
res[0] = job[0]
for i in range(1, len(job)):
res[i] = res[i - 1] + job[i]
else:
res[0] = previous_cumul[0] + job[0]
for i in range(1, len(job)):
res[i] = max(res[i - 1], previous_cumul[i]) + job[i]
return res
# Function to cumulate processing times for a given sequence of jobs
def cumulate_seq(seq, jobs_list):
cumulated = None
for i in seq:
cumulated = cumulate(jobs_list[i], cumulated)
return cumulated
# Function to compute the makespan given a sequence of jobs and the job list
def makespan(sequence, job_list):
return cumulate_seq(sequence, job_list)[-1]
# Function to perform the Johnson's algorithm for the flow shop problem
def johnson_algorithm(matrix):
n = matrix.shape[0]
sequence = []
machines = [[], []]
# Preprocessing to determine the order of jobs
for i in range(n):
if matrix[i][0] < matrix[i][1]: # if time(m1) < time(m2)
machines[0].append((matrix[i][0], i))
else:
machines[1].append((matrix[i][1], i))
# Sorting jobs for each machine
machines[0] = sorted(
machines[0], key=lambda x: x[0]
) # ascending sort for the first machine
machines[1] = sorted(
machines[1], key=lambda x: x[0], reverse=True
) # descending sort for the second machine
# Merging the two sorted lists
merged = machines[0] + machines[1]
# Constructing the optimal sequence
sequence = [index for _, index in merged]
return sequence
# Function that applies Johnson's algorithm and computes the makespan
def johnson(job_matrix, data_matrix):
sequence = johnson_algorithm(job_matrix)
return sequence, makespan(sequence, data_matrix)
# CDS heuristic
def cds_heuristic(matrix):
n = matrix.shape[0]
m = matrix.shape[1]
best_makespan = float("inf")
best_sequences = []
# Step 1: Generate matrices of all possible job lists
for i in range(1, m):
machine_subset_1 = matrix[:, :i].sum(axis=1)
machine_subset_2 = matrix[:, -i:].sum(axis=1)
job_matrix = np.column_stack((machine_subset_1, machine_subset_2))
# Step 2: Apply Johnson's algorithm to the job matrix abd calculate the makespan
sequence, makespan_value = johnson(job_matrix, matrix)
# Step 3: Update the best makespan and corresponding sequences
if makespan_value < best_makespan:
best_makespan = makespan_value
best_sequences = [sequence]
elif makespan_value == best_makespan:
best_sequences.append(sequence)
return best_sequences[0], best_makespan
# =====================================================================================
# Interfacing with the underlying old cds code
cds_schedule, cds_makespan = cds_heuristic(pfsp_instance)
# Returning the schedule and makespan as numpy arrays of type int32
return np.array(cds_schedule, dtype=np.int32), np.float32(cds_makespan)
def fit_neh(pfsp_instance: np.ndarray):
"""
Implements NEH heuristic for the flowshop scheduling problem. Returns a schedule and its corresponding makespan.
For now I am using an old code that performs neh by interfacing with it, but it should be refactored to be cleaner and more efficient.
Parameters:
- pfsp_instance: A 2D numpy array where pfsp_instance[i][j] is the processing time of job i on machine j.
Returns:
- A tuple (schedule, makespan) where:
- schedule: A list of job indices representing the order of jobs (e.g., [0, 2, 1]).
- makespan: The total completion time for the given schedule.
"""
# =====================================================================================
class Inst:
def __init__(
self,
jobs: int,
machines: int,
seed: int,
ub: int,
lb: int,
matrix: list[list[int]],
):
self.jobs = jobs
self.machines = machines
self.seed = seed
self.ub = ub
self.lb = lb
self.matrix = matrix
def __repr__(self) -> str:
return f"Inst(jobs={self.jobs}, machines={self.machines}, seed={self.seed}, ub={self.ub}, lb={self.lb}, matrix={self.matrix})"
class NEH:
def __init__(self, instance: Inst, debug: bool = False):
self.instance = instance
self.debug = debug
def calculate_sj(self, job: int) -> int:
sj = 0
for machine in range(self.instance.machines):
sj += self.instance.matrix[machine][job]
return sj
def sort_jobs(self, reverse: bool = False) -> list[int]:
return sorted(
range(self.instance.jobs),
key=lambda job: self.calculate_sj(job),
reverse=reverse,
)
def emulate(self, jobs: list[int]) -> list[int]:
machines_exec = [0] * self.instance.machines
for job in jobs:
for current_machine in range(self.instance.machines):
# Add jobs execution time to current machine
machines_exec[current_machine] += self.instance.matrix[
current_machine
][job]
# Sync other machines if they are behind current time
for machine in range(current_machine + 1, self.instance.machines):
machines_exec[machine] = max(
machines_exec[current_machine], machines_exec[machine]
)
return machines_exec
def calculate_cmax(self, jobs: list[int]) -> int:
return self.emulate(jobs)[-1]
def get_best_order(self, orders: list[list[int]]) -> tuple[int, list[int]]:
min_cmax = float("inf")
min_order = None
for order in orders:
cmax = self.calculate_cmax(order)
if cmax < min_cmax:
min_cmax = cmax
min_order = order
return min_cmax, min_order
def get_best_position(
self, order: list[int], job: int
) -> tuple[int, list[int]]:
possible_orders: list[list[int]] = []
for pos in range(len(order) + 1):
possible_orders.append(order[:pos] + [job] + order[pos:])
return self.get_best_order(possible_orders)
def __call__(self) -> tuple[int, list[int]]:
if self.instance.jobs < 2:
raise ValueError("Number of jobs must be greater than 2")
sorted_jobs = self.sort_jobs()
current_cmax, current_order = self.get_best_order(
[sorted_jobs[:2], sorted_jobs[:2][::-1]]
)
if self.debug:
print(current_cmax, current_order)
if self.instance.jobs == 2:
return current_cmax, current_order
for job in sorted_jobs[2:]:
current_cmax, current_order = self.get_best_position(current_order, job)
if self.debug:
print(current_cmax, current_order)
return current_cmax, current_order
# =====================================================================================
# Interfacing with the underlying old neh code
neh_instance_jobs = pfsp_instance.shape[0]
neh_instance_machines = pfsp_instance.shape[1]
neh_instance_matrix = pfsp_instance.T.tolist()
neh_instance = Inst(
neh_instance_jobs,
neh_instance_machines,
seed=0,
ub=0,
lb=0,
matrix=neh_instance_matrix,
)
neh_makespan, neh_schedule = NEH(neh_instance)()
# Returning the schedule and makespan as numpy arrays of type int32
return np.array(neh_schedule, dtype=np.int32), np.float32(neh_makespan)
def evaluate_makespan(pfsp_instance, schedule):
"""
Evaluates the makespan (completion time) of a given schedule for a given pfsp_instance.
Parameters:
- pfsp_instance: A list of lists, where pfsp_instance[i][j] is the processing time of job i on machine j.
- schedule: A list/tuple indicating the order of jobs (e.g., [0, 2, 1]).
Returns:
- The makespan (total completion time) for the given schedule.
"""
def cumulate(job: list, previous_cumul=None):
# Calculate the cumulative completion times for a job
res = [0] * len(job)
if previous_cumul == None:
res[0] = job[0]
for i in range(1, len(job)):
res[i] = res[i - 1] + job[i]
else:
res[0] = previous_cumul[0] + job[0]
for i in range(1, len(job)):
res[i] = max(res[i - 1], previous_cumul[i]) + job[i]
return res
def cumulate_seq(pfsp_instance: list, schedule: list):
# Calculates the cumulative time for a sequence of jobs on machines.
cumulates = [0] * len(pfsp_instance)
cumulated = None
for j, i in enumerate(schedule):
cumulated = cumulate(pfsp_instance[i], cumulated)
cumulates[j] = cumulated[-1]
return cumulates
cumulates = cumulate_seq(pfsp_instance, schedule)
return cumulates
def create_dataset(
testing,
pfsp_instance,
nb_samples,
init_type,
output_dir,
seed,
normalize_makespans,
nb_jobs,
nb_machines,
time_min,
time_max,
autoname_output_dir,
):
if autoname_output_dir:
output_dir = os.path.join(output_dir, time.strftime("%Y_%m_%d_%H_%M_%S") + "_" + namer.generate(separator="_", category="sports"))
# prepare loging
logger.add(os.path.join(output_dir, "create_dataset.log"))
if os.path.exists(pfsp_instance):
# TODO: add logic to load pfsp_instance from some file
pass
else:
# create pfsp_instance
logger.info(f"Creating pfsp_instance with {nb_jobs} jobs and {nb_machines} machines")
pfsp_instance = generate_random_pfsp_instance(nb_jobs, nb_machines, time_min, time_max, seed=seed)
# create the output folder
os.makedirs(output_dir, exist_ok=True)
# check if experiment termination flag file exists
if not testing:
if os.path.exists(os.path.join(output_dir, ".terminated_create_dataset")):
print("Dataset creation already done. Exiting...")
return None
# log parameters
logger.info(f"nb_samples: {nb_samples}")
logger.info(f"init_type: {init_type}")
logger.info(f"output_dir: {output_dir}")
logger.info(f"seed: {seed}")
# compute the makespan upperbound if needed
if normalize_makespans:
makespans_upperbound = np.sum(pfsp_instance) # The upperbound of the makespan is the sum of all processing times (worst case: all jobs are processed sequentially with no parallelism)
logger.info(f"Normalizing makespans by the sum of processing times with pfsp sum: {makespans_upperbound}")
else:
makespans_upperbound = None
logger.info("Not normalizing makespans")
if init_type == "exhaustive":
nb_samples = math.factorial(pfsp_instance.shape[0])
logger.info(f"Exhaustive init_type: Number of samples: {nb_samples}")
if seed is not None: np.random.seed(seed)
def perturb_schedule(schedule):
perturbed_schedule = schedule[:]
i, j = np.random.choice(perturbed_schedule.shape[0], size=2, replace=False)
perturbed_schedule[[i,j]] = perturbed_schedule[[j,i]]
return perturbed_schedule, evaluate_makespan(pfsp_instance, perturbed_schedule)
# ======
# create the folder if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
# create the np memmap files for schedules and makespans
nb_jobs = pfsp_instance.shape[0]
schedules = np.lib.format.open_memmap(os.path.join(output_dir,"schedules.npy"), dtype=np.int32, mode='w+', shape=(nb_samples, nb_jobs))
makespans = np.lib.format.open_memmap(os.path.join(output_dir,"makespans.npy"), dtype=np.float32, mode='w+', shape=(nb_samples, nb_jobs))
# save the pfsp instance as a numpy file
np.save(os.path.join(output_dir,"pfsp_instance.npy"), pfsp_instance)
# create a metadata dictionary and save it as a json file
metadata_dict = {
"nb_samples": nb_samples,
"nb_samples": nb_samples,
"nb_jobs": nb_jobs,
"nb_machines": pfsp_instance.shape[1],
"init_type": init_type,
"data_path": output_dir,
"seed": seed,
"date_time": time.strftime('%Y_%m_%d_%H_%M_%S')
}
with open(os.path.join(output_dir,"metadata.json"), "w") as f:
json.dump(metadata_dict, f, indent=4)
if init_type == "exhaustive":
for i, schedule in tqdm.tqdm(enumerate(itertools.permutations(range(nb_jobs))), total=math.factorial(nb_jobs)):
schedules[i] = schedule
makespans[i] = evaluate_makespan(pfsp_instance, schedule)
elif init_type == "cds":
cds_schedule, _ = fit_cds(pfsp_instance)
cds_makespan = evaluate_makespan(pfsp_instance, cds_schedule)
schedules[0] = cds_schedule
makespans[0] = cds_makespan
for i in tqdm.tqdm(range(1, nb_samples), desc="Generating CDS samples"):
schedules[i], makespans[i] = perturb_schedule(cds_schedule)
elif init_type == "palmer":
palmer_schedule, _ = fit_palmer(pfsp_instance)
palmer_makespan = evaluate_makespan(pfsp_instance, palmer_schedule)
schedules[0] = palmer_schedule
makespans[0] = palmer_makespan
for i in tqdm.tqdm(range(1, nb_samples), desc="Generating Palmer samples"):
schedules[i], makespans[i] = perturb_schedule(palmer_schedule)
elif init_type == "neh":
neh_schedule, _ = fit_neh(pfsp_instance)
neh_makespan = evaluate_makespan(pfsp_instance, neh_schedule)
schedules[0] = neh_schedule
makespans[0] = neh_makespan
for i in tqdm.tqdm(range(1, nb_samples), desc="Generating NEH samples"):
schedules[i], makespans[i] = perturb_schedule(neh_schedule)
elif init_type == "heuristics":
cds_schedule, _ = fit_cds(pfsp_instance)
cds_makespan = evaluate_makespan(pfsp_instance, cds_schedule)
schedules[0], makespans[0] = cds_schedule, cds_makespan
cds_size = nb_samples // 3
for i in tqdm.tqdm(range(1, cds_size), desc="Generating CDS heuristic samples"):
schedules[i], makespans[i] = perturb_schedule(cds_schedule)
i+=1
palmer_schedule, _ = fit_palmer(pfsp_instance)
palmer_makespan = evaluate_makespan(pfsp_instance, palmer_schedule)
schedules[i], makespans[i] = palmer_schedule, palmer_makespan
palmer_size = nb_samples // 3
for i in tqdm.tqdm(range(i+1, i+palmer_size), desc="Generating Palmer heuristic samples"):
schedules[i], makespans[i] = perturb_schedule(palmer_schedule)
i+=1
neh_schedule, _ = fit_neh(pfsp_instance)
neh_makespan = evaluate_makespan(pfsp_instance, neh_schedule)
schedules[i], makespans[i] = neh_schedule, neh_makespan
neh_size = nb_samples - cds_size - palmer_size
for i in tqdm.tqdm(range(i+1, i+neh_size), desc="Generating NEH heuristic samples"):
schedules[i], makespans[i] = perturb_schedule(neh_schedule)
elif init_type == "random":
for i in tqdm.tqdm(range(nb_samples), desc="Generating Random samples"):
schedule = np.random.permutation(pfsp_instance.shape[0])
makespan = evaluate_makespan(pfsp_instance, schedule)
schedules[i] = schedule
makespans[i] = makespan
else:
raise ValueError("Invalid initialization type")
# normalize the makespans if an upperbound is provided
if makespans_upperbound is not None:
makespans /= makespans_upperbound
# flush
schedules.flush()
makespans.flush()
# save min makespan
min_makespan = np.min(makespans[:, -1])
np.save(os.path.join(output_dir, "min_makespan.npy"), min_makespan)
logger.info(f"Minimum makespan: {min_makespan}")
# fit the heuristics and save their makespans
neh_schedule, neh_makespan = fit_neh(pfsp_instance); neh_makespan/=makespans_upperbound
cds_schedule, cds_makespan = fit_cds(pfsp_instance); cds_makespan/=makespans_upperbound
palmer_schedule, palmer_makespan = fit_palmer(pfsp_instance); palmer_makespan/=makespans_upperbound
np.save(os.path.join(output_dir, "neh_makespan.npy"), neh_makespan)
np.save(os.path.join(output_dir, "cds_makespan.npy"), cds_makespan)
np.save(os.path.join(output_dir, "palmer_makespan.npy"), palmer_makespan)
# add termination flag file
with open(os.path.join(output_dir, ".terminated_create_dataset"), "w") as f: pass
# return
return schedules, makespans
# ======
if __name__ == "__main__":
# parse arguments and call create_dataset with the appropriate parameters
import argparse
parser = argparse.ArgumentParser(description="Create a dataset for the flowshop scheduling problem")
parser.add_argument("--testing", type=bool, required=True)
parser.add_argument("--nb_jobs", type=int, required=True, help="Number of jobs")
parser.add_argument("--nb_machines", type=int, required=True, help="Number of machines")
parser.add_argument("--time_min", type=int, required=True, help="Minimum processing time")
parser.add_argument("--time_max", type=int, required=True, help="Maximum processing time")
parser.add_argument("--nb_samples", type=int, required=True, help="Number of base samples to generate")
parser.add_argument("--init_type", type=str, required=True, choices=["exhaustive", "cds", "palmer", "neh", "heuristics", "random"], help="Initialization type for the base samples")
parser.add_argument("--output_dir", type=str, required=True, help="Path to the output directory where dataset artifacts will be saved")
parser.add_argument("--autoname_output_dir", type=bool, required=True, help="Whether to autoname the output directory")
parser.add_argument("--seed", type=int, required=True, help="Random seed for reproducibility (set to None for no seeding)")
parser.add_argument("--normalize_makespans", type=bool, required=True, help="Whether to normalize makespans by the sum of processing times")
parser.add_argument("--pfsp_instance", type=str, required=True, help="Path to the pfsp instance or None if to be generated")
args = parser.parse_args()
# create the dataset
create_dataset(
testing=args.testing,
nb_samples=args.nb_samples,
init_type=args.init_type,
output_dir=args.output_dir,
seed=args.seed,
normalize_makespans=args.normalize_makespans,
pfsp_instance=args.pfsp_instance,
nb_jobs=args.nb_jobs,
nb_machines=args.nb_machines,
time_min=args.time_min,
time_max=args.time_max,
autoname_output_dir=args.autoname_output_dir,
)
# ======