""" Graders for Smart Factory Scheduling tasks. Each public function: - Accepts an optional state/env argument to score a finished episode. - When called with no argument, runs a deterministic heuristic episode and returns the score. - Always returns a float strictly in (0.0, 1.0). This module is fully self-contained (stdlib only) so it works in any Python 3.8+ environment regardless of what packages are installed. The simulation implements the exact same RL dynamics as FactoryEnv. """ from __future__ import annotations import random # ── Minimal RL simulation (identical dynamics to FactoryEnv) ───────────────── TASKS = { "easy": { "num_machines": 2, "num_jobs": 3, "failure_rate": 0.0, "max_priority": 1, "job_time_range": (2, 5), "deadline_slack": (4, 8), "max_steps": 20, }, "medium": { "num_machines": 4, "num_jobs": 7, "failure_rate": 0.08, "max_priority": 2, "job_time_range": (3, 7), "deadline_slack": (2, 5), "max_steps": 30, }, "hard": { "num_machines": 6, "num_jobs": 12, "failure_rate": 0.15, "max_priority": 3, "job_time_range": (3, 8), "deadline_slack": (1, 4), "max_steps": 40, }, } class _Machine: __slots__ = ("id", "status", "current_job", "failure_rate") def __init__(self, id, failure_rate=0.0): self.id = id self.status = "idle" self.current_job = None self.failure_rate = failure_rate class _Job: __slots__ = ("id", "remaining_time", "deadline", "priority", "assigned_machine") def __init__(self, id, remaining_time, deadline, priority=1): self.id = id self.remaining_time = remaining_time self.deadline = deadline self.priority = priority self.assigned_machine = None class _Env: """Pure-Python FactoryEnv with identical RL dynamics.""" def __init__(self, task="easy", seed=42): cfg = TASKS[task] rng = random.Random(seed) self.machines = [ _Machine(f"M{i+1}", cfg["failure_rate"]) for i in range(cfg["num_machines"]) ] self.jobs = [] for i in range(cfg["num_jobs"]): pt = rng.randint(*cfg["job_time_range"]) dl = pt + rng.randint(*cfg["deadline_slack"]) pr = rng.randint(1, cfg["max_priority"]) self.jobs.append(_Job(f"J{i+1}", pt, dl, pr)) self.completed_jobs = [] self.late_jobs = 0 self.time = 0 self.max_steps = cfg["max_steps"] self._rng = rng def _find_job(self, jid): return next((j for j in self.jobs if j.id == jid), None) if jid else None def _find_machine(self, mid): return next((m for m in self.machines if m.id == mid), None) if mid else None def step(self, action_type, job_id=None, machine_id=None): if action_type == "assign_job": job = self._find_job(job_id) machine = self._find_machine(machine_id) if job and machine and machine.status == "idle": job.assigned_machine = machine.id machine.status = "busy" machine.current_job = job.id elif action_type == "repair": machine = self._find_machine(machine_id) if machine and machine.status == "broken": machine.status = "idle" self.time += 1 for machine in self.machines: if machine.status == "busy": job = self._find_job(machine.current_job) if job: job.remaining_time -= 1 if job.remaining_time <= 0: if self.time > job.deadline: self.late_jobs += 1 self.jobs.remove(job) self.completed_jobs.append(job) machine.status = "idle" machine.current_job = None if machine.status == "busy" and machine.failure_rate > 0: if self._rng.random() < machine.failure_rate: machine.status = "broken" stalled = self._find_job(machine.current_job) if stalled: stalled.assigned_machine = None machine.current_job = None return self.time >= self.max_steps or len(self.jobs) == 0 # ── Score formula ───────────────────────────────────────────────────────────── def _compute(completed, on_time, total, late): if total == 0: return 0.001 score = ( 0.5 * (completed / total) + 0.3 * (on_time / max(completed, 1)) + 0.2 * max(0.0, 1.0 - late / max(completed, 1)) ) return round(max(0.001, min(0.999, score)), 4) def _score_env(env): t = env.time completed = len(env.completed_jobs) total = completed + len(env.jobs) on_time = sum(1 for j in env.completed_jobs if j.deadline >= t) return _compute(completed, on_time, total, env.late_jobs) def _score_obj(obj): """Score from a finished FactoryEnv object or state dict.""" if isinstance(obj, dict): done_list = obj.get("completed_jobs", []) or [] pend_list = obj.get("pending_jobs", []) or [] late = int(obj.get("late_jobs", 0) or 0) t = int(obj.get("time", 0) or 0) completed = len(done_list) total = completed + len(pend_list) on_time = sum( 1 for j in done_list if (j.get("deadline", 0) if isinstance(j, dict) else getattr(j, "deadline", 0)) >= t ) else: done_list = list(getattr(obj, "completed_jobs", []) or []) pend_list = list(getattr(obj, "jobs", getattr(obj, "pending_jobs", [])) or []) late = int(getattr(obj, "late_jobs", 0) or 0) t = int(getattr(obj, "time", 0) or 0) completed = len(done_list) total = completed + len(pend_list) on_time = sum(1 for j in done_list if getattr(j, "deadline", 0) >= t) return _compute(completed, on_time, total, late) # ── Heuristic agent ─────────────────────────────────────────────────────────── def _heuristic(machines, jobs): """Earliest-deadline-first heuristic.""" for m in machines: if m.status == "broken": return "repair", None, m.id for j in sorted(jobs, key=lambda x: (x.deadline, -x.priority)): for m in machines: if m.status == "idle": return "assign_job", j.id, m.id return "wait", None, None # ── Episode runner ──────────────────────────────────────────────────────────── def _run_episode(task, seed=42): """Run a full heuristic episode and return the graded score.""" # Try to use the real FactoryEnv from the package first. try: from factory_env.env import FactoryEnv from factory_env.models import FactoryAction env = FactoryEnv(task=task, seed=seed) obs = env.reset() for _ in range(obs.max_steps): if obs.done: break # Heuristic action selection broken = [m for m in obs.machines if m.status == "broken"] if broken: action = FactoryAction(action_type="repair", machine_id=broken[0].id) else: action = None for j in sorted(obs.pending_jobs, key=lambda x: (x.deadline, -x.priority)): for m in obs.machines: if m.status == "idle": action = FactoryAction(action_type="assign_job", job_id=j.id, machine_id=m.id) break if action: break if action is None: action = FactoryAction(action_type="wait") obs = env.step(action) return _score_obj(env) except Exception: pass # Fallback: identical RL dynamics implemented in pure Python above. env = _Env(task=task, seed=seed) for _ in range(env.max_steps): action_type, job_id, machine_id = _heuristic(env.machines, env.jobs) done = env.step(action_type, job_id, machine_id) if done: break return _score_env(env) # ── Public graders ──────────────────────────────────────────────────────────── def score_easy(state_or_env=None) -> float: """Grade an easy-task episode (2 machines, 3 jobs, no failures). Returns float in (0.0, 1.0).""" if state_or_env is not None: return _score_obj(state_or_env) return _run_episode("easy") def score_medium(state_or_env=None) -> float: """Grade a medium-task episode (4 machines, 7 jobs, 8% failure rate). Returns float in (0.0, 1.0).""" if state_or_env is not None: return _score_obj(state_or_env) return _run_episode("medium") def score_hard(state_or_env=None) -> float: """Grade a hard-task episode (6 machines, 12 jobs, 15% failure rate). Returns float in (0.0, 1.0).""" if state_or_env is not None: return _score_obj(state_or_env) return _run_episode("hard")