|
|
import uuid |
|
|
from copy import deepcopy |
|
|
from typing import Optional |
|
|
|
|
|
import simpy |
|
|
from openenv_core.env_server import Environment |
|
|
|
|
|
from ..models import JobObservation, JobT, JSSPAction, JSSPObservation, MachineObservation |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
PENALTY = 100 |
|
|
|
|
|
|
|
|
class JSSPEnvironment(Environment): |
|
|
def __init__(self, jobs: list[JobT]): |
|
|
super().__init__() |
|
|
self.init_jobs = jobs |
|
|
self.reset() |
|
|
|
|
|
def reset(self) -> JSSPObservation: |
|
|
"""Reset the environment to initial state.""" |
|
|
self.episode_id = str(uuid.uuid4()) |
|
|
self.step_count = 0 |
|
|
self.jobs = deepcopy(self.init_jobs) |
|
|
self.nb_machines = max(max(machine for machine, _ in job) for job in self.jobs) + 1 |
|
|
|
|
|
|
|
|
self.env = simpy.Environment() |
|
|
|
|
|
|
|
|
self.job_progress = [0] * len(self.jobs) |
|
|
|
|
|
|
|
|
self.machine_busy_until: list[Optional[int]] = [None] * self.nb_machines |
|
|
self.machine_current_job: list[Optional[int]] = [None] * self.nb_machines |
|
|
|
|
|
|
|
|
self.completed_jobs = 0 |
|
|
|
|
|
return self.state |
|
|
|
|
|
def _get_jobs(self) -> list[JobObservation]: |
|
|
"""Get all jobs with their status and remaining operations.""" |
|
|
jobs: list[JobObservation] = [] |
|
|
for job_id in range(len(self.jobs)): |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
job_operations = self.jobs[job_id] |
|
|
job_progress = self.job_progress[job_id] |
|
|
job_remaining_operations = job_operations[job_progress:] |
|
|
|
|
|
job_busy_until = None |
|
|
for current_job, busy_until in zip(self.machine_current_job, self.machine_busy_until): |
|
|
if current_job is not None and current_job == job_id: |
|
|
job_busy_until = busy_until |
|
|
|
|
|
jobs.append(JobObservation(job_id=job_id, operations=job_remaining_operations, busy_until=job_busy_until)) |
|
|
|
|
|
return jobs |
|
|
|
|
|
def _at_decision_step(self) -> bool: |
|
|
"""Check if we're at a decision step (at least one job can be scheduled).""" |
|
|
return len(self.state.available_jobs()) > 0 |
|
|
|
|
|
def _validate_action(self, action: JSSPAction) -> bool: |
|
|
"""Validate that an action is legal.""" |
|
|
scheduled_machines = set() |
|
|
|
|
|
for job_id in action.job_ids: |
|
|
|
|
|
if job_id < 0 or job_id >= len(self.jobs): |
|
|
return False |
|
|
|
|
|
|
|
|
if self.job_progress[job_id] >= len(self.jobs[job_id]): |
|
|
return False |
|
|
|
|
|
|
|
|
machine_id, _ = self.jobs[job_id][self.job_progress[job_id]] |
|
|
|
|
|
|
|
|
busy_until = self.machine_busy_until[machine_id] |
|
|
if busy_until is not None and busy_until > self.env.now: |
|
|
return False |
|
|
|
|
|
|
|
|
if machine_id in scheduled_machines: |
|
|
return False |
|
|
|
|
|
scheduled_machines.add(machine_id) |
|
|
|
|
|
return True |
|
|
|
|
|
def _schedule_jobs(self, job_ids: list[int]): |
|
|
"""Schedule the given jobs on their respective machines.""" |
|
|
for job_id in job_ids: |
|
|
machine_id, duration = self.jobs[job_id][self.job_progress[job_id]] |
|
|
|
|
|
|
|
|
self.machine_busy_until[machine_id] = int(self.env.now) + duration |
|
|
self.machine_current_job[machine_id] = job_id |
|
|
|
|
|
def _advance_to_decision_step(self): |
|
|
"""Advance simulation time until the next decision step.""" |
|
|
while True: |
|
|
|
|
|
if self._at_decision_step(): |
|
|
break |
|
|
|
|
|
|
|
|
if self.completed_jobs >= len(self.jobs): |
|
|
break |
|
|
|
|
|
|
|
|
future_times = [t for t in self.machine_busy_until if t is not None and t > self.env.now] |
|
|
|
|
|
if not future_times: |
|
|
|
|
|
|
|
|
break |
|
|
|
|
|
next_time = min(future_times) |
|
|
|
|
|
|
|
|
self.env.run(until=next_time) |
|
|
|
|
|
|
|
|
for i in range(self.nb_machines): |
|
|
if self.machine_busy_until[i] is not None and self.machine_busy_until[i] <= self.env.now: |
|
|
|
|
|
job_id = self.machine_current_job[i] |
|
|
if job_id is not None: |
|
|
self.job_progress[job_id] += 1 |
|
|
|
|
|
|
|
|
if self.job_progress[job_id] >= len(self.jobs[job_id]): |
|
|
self.completed_jobs += 1 |
|
|
|
|
|
|
|
|
self.machine_busy_until[i] = None |
|
|
self.machine_current_job[i] = None |
|
|
|
|
|
def step(self, action: JSSPAction) -> JSSPObservation: |
|
|
"""Process an action and advance simulation until next decision step. |
|
|
|
|
|
Returns observation with reward = -(elapsed time) for valid actions, |
|
|
or reward = -PENALTY for invalid actions (without updating state). |
|
|
""" |
|
|
start_time = self.env.now |
|
|
|
|
|
|
|
|
if not self._validate_action(action): |
|
|
|
|
|
obs = self.state |
|
|
obs.reward = -PENALTY |
|
|
return obs |
|
|
|
|
|
|
|
|
self._schedule_jobs(action.job_ids) |
|
|
|
|
|
|
|
|
self._advance_to_decision_step() |
|
|
|
|
|
|
|
|
time_elapsed = self.env.now - start_time |
|
|
reward = -time_elapsed |
|
|
|
|
|
|
|
|
self.step_count = int(self.env.now) |
|
|
|
|
|
|
|
|
obs = self.state |
|
|
obs.reward = reward |
|
|
|
|
|
return obs |
|
|
|
|
|
@property |
|
|
def state(self) -> JSSPObservation: |
|
|
"""Get the current state of the environment, without the reward.""" |
|
|
machines = [ |
|
|
MachineObservation( |
|
|
machine_id=i, |
|
|
busy_until=self.machine_busy_until[i], |
|
|
current_job_id=self.machine_current_job[i], |
|
|
) |
|
|
for i in range(self.nb_machines) |
|
|
] |
|
|
|
|
|
jobs = self._get_jobs() |
|
|
|
|
|
return JSSPObservation( |
|
|
done=self.completed_jobs >= len(self.jobs), |
|
|
episode_id=self.episode_id, |
|
|
step_count=self.step_count, |
|
|
machines=machines, |
|
|
jobs=jobs, |
|
|
reward=0.0, |
|
|
) |
|
|
|