CRMP-DRL-Scheduler / crmp_env.py
kunhsiang's picture
Upload crmp_env.py with huggingface_hub
acc387c verified
"""
CRMP Environment: Circular Rubber Manufacturing Problem
Two-Line Flowshop with Circular Material Constraints
Data from: Yin et al. (2021) Sustainability, Table 3 & Table 4
Format: processing_time, type1_granulates, type2_strips
Line A: yields materials after each operation
Line B: demands materials before each operation
"""
import numpy as np
from typing import Optional
NUM_JOBS_A = 8
NUM_MACHINES_A = 6
NUM_JOBS_B = 6
NUM_MACHINES_B = 3
# =================================================================
# Table 3: Line A - (processing_time, yield_granulates, yield_strips)
# Rows: J1-J8, Columns: M1-M6
# =================================================================
_TABLE3 = [
# J1: M1 M2 M3 M4 M5 M6
[(115, 63, 15), (21, 20, 13), (10, 15, 5), (173, 147, 37), (12, 11, 6), (52, 39, 20)],
# J2:
[(77, 74, 35), ( 5, 4, 1), (14, 17, 7), (113, 122, 66), ( 7, 9, 2), (111, 33, 68)],
# J3:
[(107, 96, 5), (26, 33, 5), (14, 23, 3), (132, 57, 59), ( 3, 1, 1), (36, 28, 3)],
# J4:
[( 93, 140, 54), (23, 32, 13), (11, 14, 2), (169, 141, 76), (14, 22, 4), (107, 91, 64)],
# J5:
[( 91, 74, 49), (15, 6, 4), (10, 7, 4), ( 92, 29, 29), ( 8, 6, 2), (53, 37, 8)],
# J6:
[( 62, 12, 28), (10, 11, 6), (14, 2, 5), (145, 140, 27), ( 4, 2, 2), (68, 67, 43)],
# J7:
[( 77, 28, 38), (17, 19, 5), (11, 5, 5), (165, 107, 8), ( 5, 6, 2), (50, 68, 15)],
# J8:
[( 72, 46, 40), (25, 22, 3), (14, 12, 8), (114, 150, 63), (11, 4, 6), (66, 107, 11)],
]
# Parse into separate arrays
LINE_A_PROC = np.zeros((NUM_JOBS_A, NUM_MACHINES_A), dtype=np.float64)
LINE_A_YIELD_GRAN = np.zeros((NUM_JOBS_A, NUM_MACHINES_A), dtype=np.float64)
LINE_A_YIELD_STRIP = np.zeros((NUM_JOBS_A, NUM_MACHINES_A), dtype=np.float64)
for j in range(NUM_JOBS_A):
for m in range(NUM_MACHINES_A):
p, g, s = _TABLE3[j][m]
LINE_A_PROC[j, m] = p
LINE_A_YIELD_GRAN[j, m] = g
LINE_A_YIELD_STRIP[j, m] = s
# =================================================================
# Table 4: Line B - (processing_time, demand_granulates, demand_strips)
# Each operation has its own material demand!
# =================================================================
_TABLE4 = [
# J1B: M1B M2B M3B
[(51, 134, 42), (21, 76, 18), ( 84, 98, 103)],
# J2B:
[(54, 101, 82), (43, 40, 40), ( 75, 114, 44)],
# J3B:
[(37, 88, 45), (40, 114, 21), (110, 116, 96)],
# J4B:
[(71, 75, 37), (19, 71, 24), ( 85, 288, 55)],
# J5B:
[(32, 127, 30), (31, 72, 25), ( 96, 196, 50)],
# J6B:
[(78, 218, 105), (26, 65, 41), (112, 189, 111)],
]
LINE_B_PROC = np.zeros((NUM_JOBS_B, NUM_MACHINES_B), dtype=np.float64)
LINE_B_DEMAND_GRAN = np.zeros((NUM_JOBS_B, NUM_MACHINES_B), dtype=np.float64)
LINE_B_DEMAND_STRIP = np.zeros((NUM_JOBS_B, NUM_MACHINES_B), dtype=np.float64)
for j in range(NUM_JOBS_B):
for m in range(NUM_MACHINES_B):
p, g, s = _TABLE4[j][m]
LINE_B_PROC[j, m] = p
LINE_B_DEMAND_GRAN[j, m] = g
LINE_B_DEMAND_STRIP[j, m] = s
def verify_data():
"""Verify material balance: total yield >= total demand."""
total_g = LINE_A_YIELD_GRAN.sum()
total_s = LINE_A_YIELD_STRIP.sum()
demand_g = LINE_B_DEMAND_GRAN.sum()
demand_s = LINE_B_DEMAND_STRIP.sum()
print(f"Granulates: yield={total_g:.0f}, demand={demand_g:.0f}, surplus={total_g-demand_g:.0f}")
print(f"Strips: yield={total_s:.0f}, demand={demand_s:.0f}, surplus={total_s-demand_s:.0f}")
return total_g >= demand_g and total_s >= demand_s
def simulate_crmp(seq_a, seq_b, proc_a=None, proc_b=None,
yield_gran=None, yield_strip=None,
demand_gran=None, demand_strip=None):
"""
Correct permutation flowshop simulation for CRMP.
All machines process jobs in the SAME order (permutation constraint).
"""
if proc_a is None: proc_a = LINE_A_PROC
if proc_b is None: proc_b = LINE_B_PROC
if yield_gran is None: yield_gran = LINE_A_YIELD_GRAN
if yield_strip is None: yield_strip = LINE_A_YIELD_STRIP
if demand_gran is None: demand_gran = LINE_B_DEMAND_GRAN
if demand_strip is None: demand_strip = LINE_B_DEMAND_STRIP
# ---- Line A: standard permutation flowshop ----
a_comp = np.zeros((NUM_JOBS_A, NUM_MACHINES_A))
yield_time = {}
for pos, j in enumerate(seq_a):
for m in range(NUM_MACHINES_A):
if pos == 0 and m == 0:
start = 0
elif pos == 0:
start = a_comp[pos][m-1]
elif m == 0:
start = a_comp[pos-1][m]
else:
start = max(a_comp[pos-1][m], a_comp[pos][m-1])
a_comp[pos][m] = start + proc_a[j, m]
yield_time[(j, m)] = a_comp[pos][m]
yield_events = []
for (j, m), t in yield_time.items():
yield_events.append((t, yield_gran[j, m], yield_strip[j, m]))
yield_events.sort()
# ---- Line B: permutation flowshop with material constraints ----
b_comp = np.zeros((NUM_JOBS_B, NUM_MACHINES_B))
buf_g = 0.0
buf_s = 0.0
yield_idx = 0
def get_buffer_at(time_t):
nonlocal buf_g, buf_s, yield_idx
while yield_idx < len(yield_events) and yield_events[yield_idx][0] <= time_t:
_, g, s = yield_events[yield_idx]
buf_g += g
buf_s += s
yield_idx += 1
for pos, j in enumerate(seq_b):
for m in range(NUM_MACHINES_B):
if pos == 0 and m == 0:
earliest = 0
elif pos == 0:
earliest = b_comp[pos][m-1]
elif m == 0:
earliest = b_comp[pos-1][m]
else:
earliest = max(b_comp[pos-1][m], b_comp[pos][m-1])
dg = demand_gran[j, m]
ds = demand_strip[j, m]
get_buffer_at(earliest)
if buf_g >= dg and buf_s >= ds:
start = earliest
else:
start = earliest
saved_g, saved_s, saved_idx = buf_g, buf_s, yield_idx
found = False
for yi in range(yield_idx, len(yield_events)):
yt, yg, ys = yield_events[yi]
wait_time = max(earliest, yt)
tmp_g, tmp_s = saved_g, saved_s
for yj in range(saved_idx, len(yield_events)):
if yield_events[yj][0] <= wait_time:
tmp_g += yield_events[yj][1]
tmp_s += yield_events[yj][2]
else:
break
if tmp_g >= dg and tmp_s >= ds:
start = wait_time
get_buffer_at(start)
found = True
break
if not found:
get_buffer_at(float('inf'))
start = max(earliest, yield_events[-1][0] if yield_events else earliest)
buf_g -= dg
buf_s -= ds
b_comp[pos][m] = start + proc_b[j, m]
makespan = max(a_comp[-1, -1], b_comp[-1, -1])
return {"makespan": makespan,
"a_end": a_comp[-1, -1],
"b_end": b_comp[-1, -1]}
def evaluate_sequence(seq_a, seq_b, proc_a=None, proc_b=None):
"""Quick evaluation of a sequence pair."""
return simulate_crmp(seq_a, seq_b, proc_a, proc_b)["makespan"]
def simulate_nonperm(order_a, order_b, proc_a=None, proc_b=None,
yield_gran=None, yield_strip=None,
demand_gran=None, demand_strip=None):
"""
Non-permutation flowshop simulation for CRMP.
order_a: dict {machine: [job_order]} or list (same order all machines)
order_b: same for Line B
Key difference from permutation: each machine can process jobs in different orders.
Line B operations are scheduled in temporal order (event-driven) for correct
material consumption.
"""
if proc_a is None: proc_a = LINE_A_PROC
if proc_b is None: proc_b = LINE_B_PROC
if yield_gran is None: yield_gran = LINE_A_YIELD_GRAN
if yield_strip is None: yield_strip = LINE_A_YIELD_STRIP
if demand_gran is None: demand_gran = LINE_B_DEMAND_GRAN
if demand_strip is None: demand_strip = LINE_B_DEMAND_STRIP
if isinstance(order_a, list) and isinstance(order_a[0], int):
order_a = {m: list(order_a) for m in range(NUM_MACHINES_A)}
if isinstance(order_b, list) and isinstance(order_b[0], int):
order_b = {m: list(order_b) for m in range(NUM_MACHINES_B)}
# ---- Line A: non-permutation flowshop (machine-by-machine is correct) ----
a_end = np.full((NUM_JOBS_A, NUM_MACHINES_A), -1.0)
a_machine_end = np.zeros(NUM_MACHINES_A)
for m in range(NUM_MACHINES_A):
for j in order_a[m]:
if m == 0:
job_ready = 0
else:
job_ready = a_end[j, m-1]
if job_ready < 0:
raise ValueError(f"Job {j} not completed on machine {m-1} before scheduling on {m}")
start = max(job_ready, a_machine_end[m])
a_end[j, m] = start + proc_a[j, m]
a_machine_end[m] = a_end[j, m]
# Collect yield events sorted by time
yield_events = []
for j in range(NUM_JOBS_A):
for m in range(NUM_MACHINES_A):
yield_events.append((a_end[j, m], yield_gran[j, m], yield_strip[j, m]))
yield_events.sort()
# ---- Line B: event-driven simulation with material constraints ----
# Process operations in temporal order across all machines
b_end = np.full((NUM_JOBS_B, NUM_MACHINES_B), -1.0)
b_machine_end = np.zeros(NUM_MACHINES_B)
next_pos = [0] * NUM_MACHINES_B # next position to schedule on each machine
buf_g = 0.0
buf_s = 0.0
yield_idx = 0
def flush_to(t):
nonlocal buf_g, buf_s, yield_idx
while yield_idx < len(yield_events) and yield_events[yield_idx][0] <= t:
_, g, s = yield_events[yield_idx]
buf_g += g
buf_s += s
yield_idx += 1
def find_material_time(earliest, dg, ds):
"""Find earliest time >= earliest when materials are available."""
nonlocal buf_g, buf_s, yield_idx
flush_to(earliest)
if buf_g >= dg and buf_s >= ds:
return earliest
saved_g, saved_s, saved_idx = buf_g, buf_s, yield_idx
for yi in range(yield_idx, len(yield_events)):
yt = yield_events[yi][0]
wait_time = max(earliest, yt)
tmp_g, tmp_s = saved_g, saved_s
for yj in range(saved_idx, len(yield_events)):
if yield_events[yj][0] <= wait_time:
tmp_g += yield_events[yj][1]
tmp_s += yield_events[yj][2]
else:
break
if tmp_g >= dg and tmp_s >= ds:
return wait_time
# All yields exhausted
return max(earliest, yield_events[-1][0] if yield_events else earliest)
scheduled = 0
total_ops = NUM_JOBS_B * NUM_MACHINES_B
while scheduled < total_ops:
# Find the operation with earliest possible start time
best_start = float('inf')
best_m = -1
candidates = []
for m in range(NUM_MACHINES_B):
pos = next_pos[m]
if pos >= NUM_JOBS_B:
continue
j = order_b[m][pos]
# Flowshop constraint: job must have finished previous machine
if m == 0:
job_ready = 0.0
else:
if b_end[j, m-1] < 0:
continue # not yet done on previous machine
job_ready = b_end[j, m-1]
earliest = max(job_ready, b_machine_end[m])
candidates.append((earliest, m, j))
if not candidates:
raise RuntimeError("No schedulable operations but not all done")
# Sort by earliest start, break ties by machine index (earlier machine first)
candidates.sort()
# Schedule the first candidate that can get materials earliest
# (In practice, we schedule the one with earliest flowshop start,
# since material wait affects ALL candidates equally)
earliest, m, j = candidates[0]
dg = demand_gran[j, m]
ds = demand_strip[j, m]
# Find actual start time considering materials
# Save buffer state to restore after probing
saved_g, saved_s, saved_idx = buf_g, buf_s, yield_idx
start = find_material_time(earliest, dg, ds)
# Restore and properly flush
buf_g, buf_s, yield_idx = saved_g, saved_s, saved_idx
flush_to(start)
buf_g -= dg
buf_s -= ds
b_end[j, m] = start + proc_b[j, m]
b_machine_end[m] = b_end[j, m]
next_pos[m] += 1
scheduled += 1
makespan = max(a_end[:, -1].max(), b_end[:, -1].max())
return {"makespan": makespan,
"a_end": a_end[:, -1].max(),
"b_end": b_end[:, -1].max()}
class CRMPEnv:
"""
CRMP Environment for DRL - Sequence Building.
The agent builds TWO sequences (Line A and Line B) step by step.
Phase 1: Build Line A sequence (8 steps - pick one unscheduled job each step)
Phase 2: Build Line B sequence (6 steps - pick one unscheduled job each step)
Total: 14 steps per episode (always terminates, no timeout risk).
After both sequences are built, simulate_crmp evaluates the makespan.
Action space:
Phase 1 (Line A): pick from 8 jobs -> action 0..7
Phase 2 (Line B): pick from 6 jobs -> action 0..5
This is a PERMUTATION flowshop formulation (same as GA baseline).
DRL advantage: learns scheduling heuristics from data, generalizes to stochastic instances.
"""
def __init__(self, stochastic=False, noise_std=0.1,
base_proc_a=None, base_proc_b=None,
base_yield_g=None, base_yield_s=None,
base_demand_g=None, base_demand_s=None):
self.stochastic = stochastic
self.noise_std = noise_std
self.base_proc_a = base_proc_a if base_proc_a is not None else LINE_A_PROC
self.base_proc_b = base_proc_b if base_proc_b is not None else LINE_B_PROC
self.base_yield_g = base_yield_g if base_yield_g is not None else LINE_A_YIELD_GRAN
self.base_yield_s = base_yield_s if base_yield_s is not None else LINE_A_YIELD_STRIP
self.base_demand_g = base_demand_g if base_demand_g is not None else LINE_B_DEMAND_GRAN
self.base_demand_s = base_demand_s if base_demand_s is not None else LINE_B_DEMAND_STRIP
self.rng = np.random.default_rng()
self.reset()
@property
def obs_dim(self):
return self._get_obs().shape[0]
def reset(self, seed=None):
if seed is not None:
self.rng = np.random.default_rng(seed)
self.proc_a = self._sample(self.base_proc_a)
self.proc_b = self._sample(self.base_proc_b)
# Sequences being built
self.seq_a = []
self.seq_b = []
# Which jobs are still available
self.avail_a = set(range(NUM_JOBS_A))
self.avail_b = set(range(NUM_JOBS_B))
# Phase: 'A' = building Line A sequence, 'B' = building Line B sequence
self.phase = 'A'
self.done = False
self.makespan = 0.0
self.step_count = 0
return self._get_obs()
def _sample(self, base):
if not self.stochastic:
return base.copy()
noise = 1.0 + self.rng.normal(0, self.noise_std, base.shape)
return np.maximum(base * np.clip(noise, 0.8, 1.2), 1.0)
def get_mask_a(self):
"""Mask for Line A action head. Valid only during phase A."""
mask = np.zeros(NUM_JOBS_A + 1)
if self.phase == 'A':
for j in self.avail_a:
mask[j] = 1.0
else:
mask[NUM_JOBS_A] = 1.0 # idle/no-op during phase B
return mask
def get_mask_b(self):
"""Mask for Line B action head. Valid only during phase B."""
mask = np.zeros(NUM_JOBS_B + 1)
if self.phase == 'B':
for j in self.avail_b:
mask[j] = 1.0
else:
mask[NUM_JOBS_B] = 1.0 # idle/no-op during phase A
return mask
def step(self, action_a, action_b):
if self.done:
return self._get_obs(), 0.0, True, {"makespan": self.makespan}
self.step_count += 1
if self.phase == 'A':
# Line A decision
j = action_a
if j in self.avail_a:
self.seq_a.append(j)
self.avail_a.remove(j)
if len(self.seq_a) == NUM_JOBS_A:
self.phase = 'B'
elif self.phase == 'B':
# Line B decision
j = action_b
if j in self.avail_b:
self.seq_b.append(j)
self.avail_b.remove(j)
if len(self.seq_b) == NUM_JOBS_B:
# Episode complete - evaluate
self.done = True
result = simulate_crmp(self.seq_a, self.seq_b,
self.proc_a, self.proc_b,
self.base_yield_g, self.base_yield_s,
self.base_demand_g, self.base_demand_s)
self.makespan = result["makespan"]
# Reward: only at end, negative makespan normalized
if self.done:
# Reward: higher is better. Target ~1307, normalize so good solutions get positive reward
reward = (1500 - self.makespan) / 200.0 # 1307 -> +0.965, 1500 -> 0, 1800 -> -1.5
else:
reward = 0.0
info = {"makespan": self.makespan if self.done else None,
"phase": self.phase, "steps": self.step_count}
return self._get_obs(), reward, self.done, info
def _get_obs(self):
obs = []
# Phase indicator (one-hot: A=1,0 B=0,1)
obs.append(1.0 if self.phase == 'A' else 0.0)
obs.append(1.0 if self.phase == 'B' else 0.0)
# Progress
obs.append(len(self.seq_a) / NUM_JOBS_A)
obs.append(len(self.seq_b) / NUM_JOBS_B)
# Line A job availability (8 dims)
for j in range(NUM_JOBS_A):
obs.append(1.0 if j in self.avail_a else 0.0)
# Line B job availability (6 dims)
for j in range(NUM_JOBS_B):
obs.append(1.0 if j in self.avail_b else 0.0)
# Processing time features for available jobs (normalized)
# Line A: total processing time per job (8 dims)
for j in range(NUM_JOBS_A):
obs.append(self.proc_a[j].sum() / 1000.0)
# Line B: total processing time per job (6 dims)
for j in range(NUM_JOBS_B):
obs.append(self.proc_b[j].sum() / 1000.0)
# Line B total material demand per job (6 dims each for gran and strip)
for j in range(NUM_JOBS_B):
obs.append(self.base_demand_g[j].sum() / 500.0)
for j in range(NUM_JOBS_B):
obs.append(self.base_demand_s[j].sum() / 500.0)
# Already-scheduled sequence features
# Partial Line A makespan estimate (if any jobs scheduled)
if len(self.seq_a) > 0:
partial_a_time = sum(self.proc_a[j].sum() for j in self.seq_a)
obs.append(partial_a_time / 2000.0)
else:
obs.append(0.0)
# Last scheduled job features
if len(self.seq_a) > 0:
last_j = self.seq_a[-1]
obs.append(self.proc_a[last_j].sum() / 1000.0)
else:
obs.append(0.0)
if len(self.seq_b) > 0:
last_j = self.seq_b[-1]
obs.append(self.proc_b[last_j].sum() / 1000.0)
else:
obs.append(0.0)
return np.array(obs, dtype=np.float64)
class CRMPEnvNonPerm:
"""
CRMP Environment for Non-Permutation DRL.
Non-permutation: each machine on Line A can have a DIFFERENT job order.
The agent makes per-machine dispatching decisions.
Phase A: For each machine m=0..5, pick the order of 8 jobs (8 steps per machine, 48 total)
Phase B: For each machine m=0..2, pick the order of 6 jobs (6 steps per machine, 18 total)
Total: 66 steps per episode.
This is what gives DRL the potential to beat permutation-optimal 1307.
"""
def __init__(self, stochastic=False, noise_std=0.1):
self.stochastic = stochastic
self.noise_std = noise_std
self.rng = np.random.default_rng()
self.reset()
@property
def obs_dim(self):
return self._get_obs().shape[0]
def reset(self, seed=None):
if seed is not None:
self.rng = np.random.default_rng(seed)
self.proc_a = self._sample(LINE_A_PROC)
self.proc_b = self._sample(LINE_B_PROC)
# Per-machine job orders
self.order_a = {m: [] for m in range(NUM_MACHINES_A)}
self.order_b = {m: [] for m in range(NUM_MACHINES_B)}
# Current machine being scheduled
self.current_line = 'A' # 'A' or 'B'
self.current_machine = 0
self.avail_jobs = set(range(NUM_JOBS_A))
self.done = False
self.makespan = 0.0
self.step_count = 0
return self._get_obs()
def _sample(self, base):
if not self.stochastic:
return base.copy()
noise = 1.0 + self.rng.normal(0, self.noise_std, base.shape)
return np.maximum(base * np.clip(noise, 0.8, 1.2), 1.0)
def get_mask_a(self):
mask = np.zeros(NUM_JOBS_A + 1)
if self.current_line == 'A':
for j in self.avail_jobs:
mask[j] = 1.0
else:
mask[NUM_JOBS_A] = 1.0
return mask
def get_mask_b(self):
mask = np.zeros(NUM_JOBS_B + 1)
if self.current_line == 'B':
for j in self.avail_jobs:
mask[j] = 1.0
else:
mask[NUM_JOBS_B] = 1.0
return mask
def step(self, action_a, action_b):
if self.done:
return self._get_obs(), 0.0, True, {"makespan": self.makespan}
self.step_count += 1
if self.current_line == 'A':
j = action_a
if j in self.avail_jobs:
self.order_a[self.current_machine].append(j)
self.avail_jobs.remove(j)
if not self.avail_jobs:
# Move to next machine or switch to Line B
self.current_machine += 1
if self.current_machine >= NUM_MACHINES_A:
self.current_line = 'B'
self.current_machine = 0
self.avail_jobs = set(range(NUM_JOBS_B))
else:
self.avail_jobs = set(range(NUM_JOBS_A))
elif self.current_line == 'B':
j = action_b
if j in self.avail_jobs:
self.order_b[self.current_machine].append(j)
self.avail_jobs.remove(j)
if not self.avail_jobs:
self.current_machine += 1
if self.current_machine >= NUM_MACHINES_B:
self.done = True
result = simulate_nonperm(self.order_a, self.order_b,
self.proc_a, self.proc_b)
self.makespan = result["makespan"]
else:
self.avail_jobs = set(range(NUM_JOBS_B))
if self.done:
reward = (1500 - self.makespan) / 200.0
else:
reward = 0.0
info = {"makespan": self.makespan if self.done else None,
"steps": self.step_count}
return self._get_obs(), reward, self.done, info
def _get_obs(self):
obs = []
# Line indicator
obs.append(1.0 if self.current_line == 'A' else 0.0)
obs.append(1.0 if self.current_line == 'B' else 0.0)
# Current machine (normalized)
obs.append(self.current_machine / max(NUM_MACHINES_A, NUM_MACHINES_B))
# Progress
if self.current_line == 'A':
total_steps = NUM_JOBS_A * NUM_MACHINES_A + NUM_JOBS_B * NUM_MACHINES_B
done_steps = self.current_machine * NUM_JOBS_A + (NUM_JOBS_A - len(self.avail_jobs))
else:
done_steps = NUM_JOBS_A * NUM_MACHINES_A + self.current_machine * NUM_JOBS_B + (NUM_JOBS_B - len(self.avail_jobs))
total_steps = NUM_JOBS_A * NUM_MACHINES_A + NUM_JOBS_B * NUM_MACHINES_B
obs.append(done_steps / total_steps)
# Available jobs
if self.current_line == 'A':
for j in range(NUM_JOBS_A):
obs.append(1.0 if j in self.avail_jobs else 0.0)
for j in range(NUM_JOBS_B):
obs.append(0.0)
else:
for j in range(NUM_JOBS_A):
obs.append(0.0)
for j in range(NUM_JOBS_B):
obs.append(1.0 if j in self.avail_jobs else 0.0)
# Processing times
for j in range(NUM_JOBS_A):
obs.append(self.proc_a[j].sum() / 1000.0)
for j in range(NUM_JOBS_B):
obs.append(self.proc_b[j].sum() / 1000.0)
# Current machine processing times
if self.current_line == 'A' and self.current_machine < NUM_MACHINES_A:
for j in range(NUM_JOBS_A):
obs.append(self.proc_a[j, self.current_machine] / 200.0)
else:
for j in range(NUM_JOBS_A):
obs.append(0.0)
if self.current_line == 'B' and self.current_machine < NUM_MACHINES_B:
for j in range(NUM_JOBS_B):
obs.append(self.proc_b[j, self.current_machine] / 200.0)
else:
for j in range(NUM_JOBS_B):
obs.append(0.0)
return np.array(obs, dtype=np.float64)
if __name__ == "__main__":
import time
print("CRMP Environment - Formal Paper Data (Yin et al. 2021)")
print("=" * 60)
ok = verify_data()
print(f"Material balance feasible: {ok}")
print()
print("Paper benchmarks (Real dataset, Table 5):")
print(" FCFS: 1457 min")
print(" Campbell-Dudek: 1340 best, 1361 avg")
print(" GA: 1307 best, 1315 avg")
print()
# FCFS
ms = evaluate_sequence(list(range(NUM_JOBS_A)), list(range(NUM_JOBS_B)))
print(f"Our FCFS (permutation): {ms:.0f} min")
# Paper's GA best sequence
ga_a = [5, 0, 1, 6, 7, 3, 4, 2]
ga_b = [0, 2, 5, 4, 3, 1]
ms_ga = evaluate_sequence(ga_a, ga_b)
print(f"Paper GA best (permutation): {ms_ga:.0f} min")
# Non-permutation with same sequence (should match permutation)
ms_np = simulate_nonperm(ga_a, ga_b)["makespan"]
print(f"Non-perm with GA seq (same order all machines): {ms_np:.0f} min")
# Test CRMPEnv
print("\nTesting CRMPEnv (sequence builder)...")
env = CRMPEnv(stochastic=False)
obs = env.reset()
print(f" Obs dim: {len(obs)}")
# Feed GA sequence
for j in ga_a:
obs, r, done, info = env.step(j, NUM_JOBS_B) # idle on B during phase A
for j in ga_b:
obs, r, done, info = env.step(NUM_JOBS_A, j) # idle on A during phase B
print(f" GA sequence makespan via env: {info['makespan']:.0f}")
print(f" Steps: {info['steps']}, Done: {done}")
# Quick non-perm search
print("\nNon-permutation random search (50k)...")
best_np = float('inf')
best_orders = None
rng = np.random.default_rng(42)
t0 = time.time()
for i in range(50000):
oa = {m: rng.permutation(NUM_JOBS_A).tolist() for m in range(NUM_MACHINES_A)}
ob = {m: rng.permutation(NUM_JOBS_B).tolist() for m in range(NUM_MACHINES_B)}
try:
r = simulate_nonperm(oa, ob)
if r["makespan"] < best_np:
best_np = r["makespan"]
best_orders = (oa, ob)
if i % 5000 == 0 or best_np < 1307:
print(f" [{i+1:6d}] Best non-perm: {best_np:.0f}")
except:
pass
elapsed = time.time() - t0
print(f" Non-perm random best: {best_np:.0f} ({elapsed:.1f}s)")
if best_np < 1307:
print(f" *** NON-PERM BEATS PERMUTATION GA by {1307-best_np:.0f} min ***")