File size: 5,803 Bytes
d755709
6195f6a
 
a0f94c2
 
 
 
 
 
 
6195f6a
 
 
 
 
a0f94c2
6195f6a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c01d448
 
 
 
 
 
 
6195f6a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
import random
from typing import List, Optional

# Lazy base-class: import openenv.core only when it's available.
# This lets FactoryEnv be imported (e.g. by the grader) even in minimal
# environments where openenv-core's gradio/PIL chain fails to load.
try:
    from openenv.core import Environment as _EnvBase
except Exception:
    _EnvBase = object  # type: ignore[assignment,misc]

from factory_env.models import FactoryAction, FactoryObservation, FactoryState, Machine, Job
from factory_env.tasks import TASKS


class FactoryEnv(_EnvBase):
    """Smart Factory Scheduling Environment — OpenEnv compliant."""

    SUPPORTS_CONCURRENT_SESSIONS = True

    def __init__(self, task: str = "easy", seed: int = 42):
        super().__init__()
        if task not in TASKS:
            raise ValueError(f"Unknown task '{task}'. Choose from: {list(TASKS.keys())}")
        self.task = task
        self.seed = seed
        self.config = TASKS[task]
        self._rng = random.Random(seed)
        self.machines: List[Machine] = []
        self.jobs: List[Job] = []
        self.completed_jobs: List[Job] = []
        self.late_jobs: int = 0
        self.time: int = 0
        self.max_steps: int = self.config["max_steps"]

    def reset(self, seed: Optional[int] = None, episode_id: Optional[str] = None, **kwargs) -> FactoryObservation:
        # Allow task to be overridden at reset time (e.g. from inference script)
        task = kwargs.get("task", self.task)
        if task != self.task and task in TASKS:
            self.task = task
            self.config = TASKS[task]
            self.max_steps = self.config["max_steps"]

        use_seed = seed if seed is not None else self.seed
        self._rng = random.Random(use_seed)
        self.time = 0
        self.completed_jobs = []
        self.late_jobs = 0

        cfg = self.config
        self.machines = [
            Machine(id=f"M{i+1}", status="idle", failure_rate=cfg.get("failure_rate", 0.0))
            for i in range(cfg["num_machines"])
        ]
        self.jobs = []
        for i in range(cfg["num_jobs"]):
            proc_time = self._rng.randint(*cfg["job_time_range"])
            deadline = self.time + proc_time + self._rng.randint(*cfg["deadline_slack"])
            priority = self._rng.randint(1, cfg.get("max_priority", 1))
            self.jobs.append(Job(id=f"J{i+1}", remaining_time=proc_time, deadline=deadline, priority=priority))

        return self._make_obs(reward=None, done=False)

    def step(self, action: FactoryAction, timeout_s: Optional[float] = None, **kwargs) -> FactoryObservation:
        reward = 0.0

        if action.action_type == "assign_job":
            job = self._find_job(action.job_id)
            machine = self._find_machine(action.machine_id)
            if job is None or machine is None or machine.status != "idle":
                reward -= 0.1
            else:
                job.assigned_machine = machine.id
                machine.status = "busy"
                machine.current_job = job.id
                reward += 0.1
        elif action.action_type == "repair":
            machine = self._find_machine(action.machine_id)
            if machine and machine.status == "broken":
                machine.status = "idle"
                reward += 0.05
            else:
                reward -= 0.05

        self.time += 1

        for machine in self.machines:
            if machine.status == "busy":
                job = self._find_job(machine.current_job)
                if job:
                    job.remaining_time -= 1
                    if job.remaining_time <= 0:
                        on_time = self.time <= job.deadline
                        reward += (1.0 + 0.2 * job.priority) if on_time else 0.3
                        if not on_time:
                            self.late_jobs += 1
                        self.jobs.remove(job)
                        self.completed_jobs.append(job)
                        machine.status = "idle"
                        machine.current_job = None

            if machine.status == "busy" and machine.failure_rate > 0:
                if self._rng.random() < machine.failure_rate:
                    machine.status = "broken"
                    stalled = self._find_job(machine.current_job)
                    if stalled:
                        stalled.assigned_machine = None
                    machine.current_job = None

        if self.jobs:
            reward -= sum(1 for m in self.machines if m.status == "idle") * 0.05
        for job in self.jobs:
            if self.time > job.deadline:
                reward -= 0.1

        done = self.time >= self.max_steps or len(self.jobs) == 0
        return self._make_obs(reward=reward, done=done)

    @property
    def state(self) -> FactoryState:
        return FactoryState(
            machines=list(self.machines),
            pending_jobs=list(self.jobs),
            completed_jobs=list(self.completed_jobs),
            time=self.time,
            task=self.task,
            late_jobs=self.late_jobs,
        )

    def _make_obs(self, reward, done: bool) -> FactoryObservation:
        return FactoryObservation(
            machines=list(self.machines),
            pending_jobs=list(self.jobs),
            completed_jobs=list(self.completed_jobs),
            time=self.time,
            max_steps=self.max_steps,
            task=self.task,
            reward=reward,
            done=done,
        )

    def _find_job(self, job_id: Optional[str]) -> Optional[Job]:
        return next((j for j in self.jobs if j.id == job_id), None) if job_id else None

    def _find_machine(self, machine_id: Optional[str]) -> Optional[Machine]:
        return next((m for m in self.machines if m.id == machine_id), None) if machine_id else None