import numpy as np import os import json import itertools import math from loguru import logger import tqdm import time import namer def generate_random_pfsp_instance(nb_jobs, nb_machines, time_min, time_max, seed=97): #TODO: add the possibility to simply load an instance from a file """ Generates a random instance of the Permutation Flow Shop Problem (PFSP). Parameters: - nb_jobs: Number of jobs (n). - nb_machines: Number of machines (m).j, - time_min: Minimum processing time for any job on any machine. - time_max: Maximum processing time for any job on any machine. Returns: - A 2D list (matrix) of size (nb_jobs x nb_machines) where each entry is a random processing time between time_min and time_max. """ if seed is not None: np.random.seed(seed) # create a pfsp_instance using the uniform distribution pfsp_instance = np.random.uniform(low=time_min, high=time_max, size=(nb_jobs, nb_machines)).astype(np.float32) return pfsp_instance def fit_palmer(pfsp_instance: np.ndarray): """ Implements Palmer's heuristic for the flowshop scheduling problem. Returns a schedule and its corresponding makespan. For now I am using an old code that performs palmer by interfacing with it, but it should be refactored to be cleaner and more efficient. Parameters: - pfsp_instance: A 2D numpy array where pfsp_instance[i][j] is the processing time of job i on machine j. Returns: - A tuple (schedule, makespan) where: - schedule: A list of job indices representing the order of jobs (e.g., [0, 2, 1]). - makespan: The total completion time for the given schedule. """ # ===================================================================================== class Palmer: def __init__(self, jobs_list: list): self.jobs_list = jobs_list self.nb_jobs = len(jobs_list) self.nb_machines = len(jobs_list[0]) self.seq_star = None self.make_span_star = None # utility function that returns the gantt cumule based on a job execution times and a previous gantt cumule def cumulate(self, job: list, previous_cumul=None): res = [0] * len(job) if previous_cumul == None: res[0] = job[0] for i in range(1, len(job)): res[i] = res[i - 1] + job[i] else: res[0] = previous_cumul[0] + job[0] for i in range(1, len(job)): res[i] = max(res[i - 1], previous_cumul[i]) + job[i] return res # utility function that computes the gantt cumule given only a job sequence (not used in the algorithm due to inneficiency # dynamic programming with cumulate is used instead ...) def cumulate_seq(self, seq: list): cumulated = None for i in seq: cumulated = self.cumulate(self.jobs_list[i], cumulated) return cumulated # launching the optimization def optim(self, debug=False): jobs_weights = [] for i, job in zip(range(self.nb_jobs), self.jobs_list): weight = 0 for j in range(self.nb_machines): if debug == True: print( f">job {i} mach {j} first term: {(2*(j+1) - 1) - self.nb_machines}" ) print(f">job {i} mach {j} second term: {job[j]}") print( "------------------------------------------------------------------" ) weight += ((2 * (j + 1) - 1) - self.nb_machines) * job[j] if debug == True: print(f"===>> job {i} weight: {weight}") jobs_weights.append((weight, i)) self.seq_star = [tu[1] for tu in sorted(jobs_weights, reverse=True)] self.make_span_star = self.cumulate_seq(self.seq_star)[-1] return (self.seq_star, self.make_span_star) # ===================================================================================== # Interfacing with the underlying old palmer code jobs_list = pfsp_instance.tolist() palmer_schedule, palmer_makespan = Palmer(jobs_list).optim() # Returning the schedule and makespan as numpy arrays of type int32 return np.array(palmer_schedule, dtype=np.int32), np.float32(palmer_makespan) def fit_cds(pfsp_instance: np.ndarray): """ Implements CDS heuristic for the flowshop scheduling problem. Returns a schedule and its corresponding makespan. For now I am using an old code that performs cds by interfacing with it, but it should be refactored to be cleaner and more efficient. Parameters: - pfsp_instance: A 2D numpy array where pfsp_instance[i][j] is the processing time of job i on machine j. Returns: - A tuple (schedule, makespan) where: - schedule: A list of job indices representing the order of jobs (e.g., [0, 2, 1]). - makespan: The total completion time for the given schedule. """ # ===================================================================================== # Function to cumulate job processing times def cumulate(job, previous_cumul=None): res = [0] * len(job) if previous_cumul is None: res[0] = job[0] for i in range(1, len(job)): res[i] = res[i - 1] + job[i] else: res[0] = previous_cumul[0] + job[0] for i in range(1, len(job)): res[i] = max(res[i - 1], previous_cumul[i]) + job[i] return res # Function to cumulate processing times for a given sequence of jobs def cumulate_seq(seq, jobs_list): cumulated = None for i in seq: cumulated = cumulate(jobs_list[i], cumulated) return cumulated # Function to compute the makespan given a sequence of jobs and the job list def makespan(sequence, job_list): return cumulate_seq(sequence, job_list)[-1] # Function to perform the Johnson's algorithm for the flow shop problem def johnson_algorithm(matrix): n = matrix.shape[0] sequence = [] machines = [[], []] # Preprocessing to determine the order of jobs for i in range(n): if matrix[i][0] < matrix[i][1]: # if time(m1) < time(m2) machines[0].append((matrix[i][0], i)) else: machines[1].append((matrix[i][1], i)) # Sorting jobs for each machine machines[0] = sorted( machines[0], key=lambda x: x[0] ) # ascending sort for the first machine machines[1] = sorted( machines[1], key=lambda x: x[0], reverse=True ) # descending sort for the second machine # Merging the two sorted lists merged = machines[0] + machines[1] # Constructing the optimal sequence sequence = [index for _, index in merged] return sequence # Function that applies Johnson's algorithm and computes the makespan def johnson(job_matrix, data_matrix): sequence = johnson_algorithm(job_matrix) return sequence, makespan(sequence, data_matrix) # CDS heuristic def cds_heuristic(matrix): n = matrix.shape[0] m = matrix.shape[1] best_makespan = float("inf") best_sequences = [] # Step 1: Generate matrices of all possible job lists for i in range(1, m): machine_subset_1 = matrix[:, :i].sum(axis=1) machine_subset_2 = matrix[:, -i:].sum(axis=1) job_matrix = np.column_stack((machine_subset_1, machine_subset_2)) # Step 2: Apply Johnson's algorithm to the job matrix abd calculate the makespan sequence, makespan_value = johnson(job_matrix, matrix) # Step 3: Update the best makespan and corresponding sequences if makespan_value < best_makespan: best_makespan = makespan_value best_sequences = [sequence] elif makespan_value == best_makespan: best_sequences.append(sequence) return best_sequences[0], best_makespan # ===================================================================================== # Interfacing with the underlying old cds code cds_schedule, cds_makespan = cds_heuristic(pfsp_instance) # Returning the schedule and makespan as numpy arrays of type int32 return np.array(cds_schedule, dtype=np.int32), np.float32(cds_makespan) def fit_neh(pfsp_instance: np.ndarray): """ Implements NEH heuristic for the flowshop scheduling problem. Returns a schedule and its corresponding makespan. For now I am using an old code that performs neh by interfacing with it, but it should be refactored to be cleaner and more efficient. Parameters: - pfsp_instance: A 2D numpy array where pfsp_instance[i][j] is the processing time of job i on machine j. Returns: - A tuple (schedule, makespan) where: - schedule: A list of job indices representing the order of jobs (e.g., [0, 2, 1]). - makespan: The total completion time for the given schedule. """ # ===================================================================================== class Inst: def __init__( self, jobs: int, machines: int, seed: int, ub: int, lb: int, matrix: list[list[int]], ): self.jobs = jobs self.machines = machines self.seed = seed self.ub = ub self.lb = lb self.matrix = matrix def __repr__(self) -> str: return f"Inst(jobs={self.jobs}, machines={self.machines}, seed={self.seed}, ub={self.ub}, lb={self.lb}, matrix={self.matrix})" class NEH: def __init__(self, instance: Inst, debug: bool = False): self.instance = instance self.debug = debug def calculate_sj(self, job: int) -> int: sj = 0 for machine in range(self.instance.machines): sj += self.instance.matrix[machine][job] return sj def sort_jobs(self, reverse: bool = False) -> list[int]: return sorted( range(self.instance.jobs), key=lambda job: self.calculate_sj(job), reverse=reverse, ) def emulate(self, jobs: list[int]) -> list[int]: machines_exec = [0] * self.instance.machines for job in jobs: for current_machine in range(self.instance.machines): # Add jobs execution time to current machine machines_exec[current_machine] += self.instance.matrix[ current_machine ][job] # Sync other machines if they are behind current time for machine in range(current_machine + 1, self.instance.machines): machines_exec[machine] = max( machines_exec[current_machine], machines_exec[machine] ) return machines_exec def calculate_cmax(self, jobs: list[int]) -> int: return self.emulate(jobs)[-1] def get_best_order(self, orders: list[list[int]]) -> tuple[int, list[int]]: min_cmax = float("inf") min_order = None for order in orders: cmax = self.calculate_cmax(order) if cmax < min_cmax: min_cmax = cmax min_order = order return min_cmax, min_order def get_best_position( self, order: list[int], job: int ) -> tuple[int, list[int]]: possible_orders: list[list[int]] = [] for pos in range(len(order) + 1): possible_orders.append(order[:pos] + [job] + order[pos:]) return self.get_best_order(possible_orders) def __call__(self) -> tuple[int, list[int]]: if self.instance.jobs < 2: raise ValueError("Number of jobs must be greater than 2") sorted_jobs = self.sort_jobs() current_cmax, current_order = self.get_best_order( [sorted_jobs[:2], sorted_jobs[:2][::-1]] ) if self.debug: print(current_cmax, current_order) if self.instance.jobs == 2: return current_cmax, current_order for job in sorted_jobs[2:]: current_cmax, current_order = self.get_best_position(current_order, job) if self.debug: print(current_cmax, current_order) return current_cmax, current_order # ===================================================================================== # Interfacing with the underlying old neh code neh_instance_jobs = pfsp_instance.shape[0] neh_instance_machines = pfsp_instance.shape[1] neh_instance_matrix = pfsp_instance.T.tolist() neh_instance = Inst( neh_instance_jobs, neh_instance_machines, seed=0, ub=0, lb=0, matrix=neh_instance_matrix, ) neh_makespan, neh_schedule = NEH(neh_instance)() # Returning the schedule and makespan as numpy arrays of type int32 return np.array(neh_schedule, dtype=np.int32), np.float32(neh_makespan) def evaluate_makespan(pfsp_instance, schedule): """ Evaluates the makespan (completion time) of a given schedule for a given pfsp_instance. Parameters: - pfsp_instance: A list of lists, where pfsp_instance[i][j] is the processing time of job i on machine j. - schedule: A list/tuple indicating the order of jobs (e.g., [0, 2, 1]). Returns: - The makespan (total completion time) for the given schedule. """ def cumulate(job: list, previous_cumul=None): # Calculate the cumulative completion times for a job res = [0] * len(job) if previous_cumul == None: res[0] = job[0] for i in range(1, len(job)): res[i] = res[i - 1] + job[i] else: res[0] = previous_cumul[0] + job[0] for i in range(1, len(job)): res[i] = max(res[i - 1], previous_cumul[i]) + job[i] return res def cumulate_seq(pfsp_instance: list, schedule: list): # Calculates the cumulative time for a sequence of jobs on machines. cumulates = [0] * len(pfsp_instance) cumulated = None for j, i in enumerate(schedule): cumulated = cumulate(pfsp_instance[i], cumulated) cumulates[j] = cumulated[-1] return cumulates cumulates = cumulate_seq(pfsp_instance, schedule) return cumulates def create_dataset( testing, pfsp_instance, nb_samples, init_type, output_dir, seed, normalize_makespans, nb_jobs, nb_machines, time_min, time_max, autoname_output_dir, ): if autoname_output_dir: output_dir = os.path.join(output_dir, time.strftime("%Y_%m_%d_%H_%M_%S") + "_" + namer.generate(separator="_", category="sports")) # prepare loging logger.add(os.path.join(output_dir, "create_dataset.log")) if os.path.exists(pfsp_instance): # TODO: add logic to load pfsp_instance from some file pass else: # create pfsp_instance logger.info(f"Creating pfsp_instance with {nb_jobs} jobs and {nb_machines} machines") pfsp_instance = generate_random_pfsp_instance(nb_jobs, nb_machines, time_min, time_max, seed=seed) # create the output folder os.makedirs(output_dir, exist_ok=True) # check if experiment termination flag file exists if not testing: if os.path.exists(os.path.join(args.output_dir, ".terminated_create_dataset")): print("Dataset creation already done. Exiting...") return None # log parameters logger.info(f"nb_samples: {nb_samples}") logger.info(f"init_type: {init_type}") logger.info(f"output_dir: {output_dir}") logger.info(f"seed: {seed}") # compute the makespan upperbound if needed if normalize_makespans: makespans_upperbound = np.sum(pfsp_instance) # The upperbound of the makespan is the sum of all processing times (worst case: all jobs are processed sequentially with no parallelism) logger.info(f"Normalizing makespans by the sum of processing times with pfsp sum: {makespans_upperbound}") else: makespans_upperbound = None logger.info("Not normalizing makespans") if init_type == "exhaustive": nb_samples = math.factorial(pfsp_instance.shape[0]) logger.info(f"Exhaustive init_type: Number of samples: {nb_samples}") if seed is not None: np.random.seed(seed) def perturb_schedule(schedule): perturbed_schedule = schedule[:] i, j = np.random.choice(perturbed_schedule.shape[0], size=2, replace=False) perturbed_schedule[[i,j]] = perturbed_schedule[[j,i]] return perturbed_schedule, evaluate_makespan(pfsp_instance, perturbed_schedule) # ====== # create the folder if it doesn't exist os.makedirs(output_dir, exist_ok=True) # create the np memmap files for schedules and makespans nb_jobs = pfsp_instance.shape[0] schedules = np.lib.format.open_memmap(os.path.join(output_dir,"schedules.npy"), dtype=np.int32, mode='w+', shape=(nb_samples, nb_jobs)) makespans = np.lib.format.open_memmap(os.path.join(output_dir,"makespans.npy"), dtype=np.float32, mode='w+', shape=(nb_samples, nb_jobs)) # save the pfsp instance as a numpy file np.save(os.path.join(output_dir,"pfsp_instance.npy"), pfsp_instance) # create a metadata dictionary and save it as a json file metadata_dict = { "nb_samples": nb_samples, "nb_samples": nb_samples, "nb_jobs": nb_jobs, "nb_machines": pfsp_instance.shape[1], "init_type": init_type, "data_path": output_dir, "seed": seed, "date_time": time.strftime('%Y_%m_%d_%H_%M_%S') } with open(os.path.join(output_dir,"metadata.json"), "w") as f: json.dump(metadata_dict, f, indent=4) if init_type == "exhaustive": for i, schedule in tqdm.tqdm(enumerate(itertools.permutations(range(nb_jobs))), total=math.factorial(nb_jobs)): schedules[i] = schedule makespans[i] = evaluate_makespan(pfsp_instance, schedule) elif init_type == "cds": cds_schedule, _ = fit_cds(pfsp_instance) cds_makespan = evaluate_makespan(pfsp_instance, cds_schedule) schedules[0] = cds_schedule makespans[0] = cds_makespan for i in tqdm.tqdm(range(1, nb_samples), desc="Generating CDS samples"): schedules[i], makespans[i] = perturb_schedule(cds_schedule) elif init_type == "palmer": palmer_schedule, _ = fit_palmer(pfsp_instance) palmer_makespan = evaluate_makespan(pfsp_instance, palmer_schedule) schedules[0] = palmer_schedule makespans[0] = palmer_makespan for i in tqdm.tqdm(range(1, nb_samples), desc="Generating Palmer samples"): schedules[i], makespans[i] = perturb_schedule(palmer_schedule) elif init_type == "neh": neh_schedule, _ = fit_neh(pfsp_instance) neh_makespan = evaluate_makespan(pfsp_instance, neh_schedule) schedules[0] = neh_schedule makespans[0] = neh_makespan for i in tqdm.tqdm(range(1, nb_samples), desc="Generating NEH samples"): schedules[i], makespans[i] = perturb_schedule(neh_schedule) elif init_type == "heuristics": cds_schedule, _ = fit_cds(pfsp_instance) cds_makespan = evaluate_makespan(pfsp_instance, cds_schedule) schedules[0], makespans[0] = cds_schedule, cds_makespan cds_size = nb_samples // 3 for i in tqdm.tqdm(range(1, cds_size), desc="Generating CDS heuristic samples"): schedules[i], makespans[i] = perturb_schedule(cds_schedule) i+=1 palmer_schedule, _ = fit_palmer(pfsp_instance) palmer_makespan = evaluate_makespan(pfsp_instance, palmer_schedule) schedules[i], makespans[i] = palmer_schedule, palmer_makespan palmer_size = nb_samples // 3 for i in tqdm.tqdm(range(i+1, i+palmer_size), desc="Generating Palmer heuristic samples"): schedules[i], makespans[i] = perturb_schedule(palmer_schedule) i+=1 neh_schedule, _ = fit_neh(pfsp_instance) neh_makespan = evaluate_makespan(pfsp_instance, neh_schedule) schedules[i], makespans[i] = neh_schedule, neh_makespan neh_size = nb_samples - cds_size - palmer_size for i in tqdm.tqdm(range(i+1, i+neh_size), desc="Generating NEH heuristic samples"): schedules[i], makespans[i] = perturb_schedule(neh_schedule) elif init_type == "random": for i in tqdm.tqdm(range(nb_samples), desc="Generating Random samples"): schedule = np.random.permutation(pfsp_instance.shape[0]) makespan = evaluate_makespan(pfsp_instance, schedule) schedules[i] = schedule makespans[i] = makespan else: raise ValueError("Invalid initialization type") # normalize the makespans if an upperbound is provided if makespans_upperbound is not None: makespans /= makespans_upperbound # flush schedules.flush() makespans.flush() # save min makespan min_makespan = np.min(makespans[:, -1]) np.save(os.path.join(output_dir, "min_makespan.npy"), min_makespan) logger.info(f"Minimum makespan: {min_makespan}") # fit the heuristics and save their makespans neh_schedule, neh_makespan = fit_neh(pfsp_instance); neh_makespan/=makespans_upperbound cds_schedule, cds_makespan = fit_cds(pfsp_instance); cds_makespan/=makespans_upperbound palmer_schedule, palmer_makespan = fit_palmer(pfsp_instance); palmer_makespan/=makespans_upperbound np.save(os.path.join(output_dir, "neh_makespan.npy"), neh_makespan) np.save(os.path.join(output_dir, "cds_makespan.npy"), cds_makespan) np.save(os.path.join(output_dir, "palmer_makespan.npy"), palmer_makespan) # add termination flag file with open(os.path.join(output_dir, ".terminated_create_dataset"), "w") as f: pass # return return schedules, makespans # ====== if __name__ == "__main__": # parse arguments and call create_dataset with the appropriate parameters import argparse parser = argparse.ArgumentParser(description="Create a dataset for the flowshop scheduling problem") parser.add_argument("--testing", type=bool, required=True) parser.add_argument("--nb_jobs", type=int, required=True, help="Number of jobs") parser.add_argument("--nb_machines", type=int, required=True, help="Number of machines") parser.add_argument("--time_min", type=int, required=True, help="Minimum processing time") parser.add_argument("--time_max", type=int, required=True, help="Maximum processing time") parser.add_argument("--nb_samples", type=int, required=True, help="Number of base samples to generate") parser.add_argument("--init_type", type=str, required=True, choices=["exhaustive", "cds", "palmer", "neh", "heuristics", "random"], help="Initialization type for the base samples") parser.add_argument("--output_dir", type=str, required=True, help="Path to the output directory where dataset artifacts will be saved") parser.add_argument("--autoname_output_dir", type=bool, required=True, help="Whether to autoname the output directory") parser.add_argument("--seed", type=int, required=True, help="Random seed for reproducibility (set to None for no seeding)") parser.add_argument("--normalize_makespans", type=bool, required=True, help="Whether to normalize makespans by the sum of processing times") parser.add_argument("--pfsp_instance", type=str, required=True, help="Path to the pfsp instance or None if to be generated") args = parser.parse_args() # create the dataset create_dataset( testing=args.testing, nb_samples=args.nb_samples, init_type=args.init_type, output_dir=args.output_dir, seed=args.seed, normalize_makespans=args.normalize_makespans, pfsp_instance=args.pfsp_instance, nb_jobs=args.nb_jobs, nb_machines=args.nb_machines, time_min=args.time_min, time_max=args.time_max, autoname_output_dir=args.autoname_output_dir, ) # ======