File size: 5,294 Bytes
d755709
6195f6a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
import random
from typing import List, Optional

from openenv.core import Environment

from factory_env.models import FactoryAction, FactoryObservation, FactoryState, Machine, Job
from factory_env.tasks import TASKS


class FactoryEnv(Environment[FactoryAction, FactoryObservation, FactoryState]):
    """Smart Factory Scheduling Environment — OpenEnv compliant."""

    SUPPORTS_CONCURRENT_SESSIONS = True

    def __init__(self, task: str = "easy", seed: int = 42):
        super().__init__()
        if task not in TASKS:
            raise ValueError(f"Unknown task '{task}'. Choose from: {list(TASKS.keys())}")
        self.task = task
        self.seed = seed
        self.config = TASKS[task]
        self._rng = random.Random(seed)
        self.machines: List[Machine] = []
        self.jobs: List[Job] = []
        self.completed_jobs: List[Job] = []
        self.late_jobs: int = 0
        self.time: int = 0
        self.max_steps: int = self.config["max_steps"]

    def reset(self, seed: Optional[int] = None, episode_id: Optional[str] = None, **kwargs) -> FactoryObservation:
        use_seed = seed if seed is not None else self.seed
        self._rng = random.Random(use_seed)
        self.time = 0
        self.completed_jobs = []
        self.late_jobs = 0

        cfg = self.config
        self.machines = [
            Machine(id=f"M{i+1}", status="idle", failure_rate=cfg.get("failure_rate", 0.0))
            for i in range(cfg["num_machines"])
        ]
        self.jobs = []
        for i in range(cfg["num_jobs"]):
            proc_time = self._rng.randint(*cfg["job_time_range"])
            deadline = self.time + proc_time + self._rng.randint(*cfg["deadline_slack"])
            priority = self._rng.randint(1, cfg.get("max_priority", 1))
            self.jobs.append(Job(id=f"J{i+1}", remaining_time=proc_time, deadline=deadline, priority=priority))

        return self._make_obs(reward=None, done=False)

    def step(self, action: FactoryAction, timeout_s: Optional[float] = None, **kwargs) -> FactoryObservation:
        reward = 0.0

        if action.action_type == "assign_job":
            job = self._find_job(action.job_id)
            machine = self._find_machine(action.machine_id)
            if job is None or machine is None or machine.status != "idle":
                reward -= 0.1
            else:
                job.assigned_machine = machine.id
                machine.status = "busy"
                machine.current_job = job.id
                reward += 0.1
        elif action.action_type == "repair":
            machine = self._find_machine(action.machine_id)
            if machine and machine.status == "broken":
                machine.status = "idle"
                reward += 0.05
            else:
                reward -= 0.05

        self.time += 1

        for machine in self.machines:
            if machine.status == "busy":
                job = self._find_job(machine.current_job)
                if job:
                    job.remaining_time -= 1
                    if job.remaining_time <= 0:
                        on_time = self.time <= job.deadline
                        reward += (1.0 + 0.2 * job.priority) if on_time else 0.3
                        if not on_time:
                            self.late_jobs += 1
                        self.jobs.remove(job)
                        self.completed_jobs.append(job)
                        machine.status = "idle"
                        machine.current_job = None

            if machine.status == "busy" and machine.failure_rate > 0:
                if self._rng.random() < machine.failure_rate:
                    machine.status = "broken"
                    stalled = self._find_job(machine.current_job)
                    if stalled:
                        stalled.assigned_machine = None
                    machine.current_job = None

        if self.jobs:
            reward -= sum(1 for m in self.machines if m.status == "idle") * 0.05
        for job in self.jobs:
            if self.time > job.deadline:
                reward -= 0.1

        done = self.time >= self.max_steps or len(self.jobs) == 0
        return self._make_obs(reward=reward, done=done)

    @property
    def state(self) -> FactoryState:
        return FactoryState(
            machines=list(self.machines),
            pending_jobs=list(self.jobs),
            completed_jobs=list(self.completed_jobs),
            time=self.time,
            task=self.task,
            late_jobs=self.late_jobs,
            step_count=self.time,
        )

    def _make_obs(self, reward, done: bool) -> FactoryObservation:
        return FactoryObservation(
            machines=list(self.machines),
            pending_jobs=list(self.jobs),
            completed_jobs=list(self.completed_jobs),
            time=self.time,
            max_steps=self.max_steps,
            task=self.task,
            reward=reward,
            done=done,
        )

    def _find_job(self, job_id: Optional[str]) -> Optional[Job]:
        return next((j for j in self.jobs if j.id == job_id), None) if job_id else None

    def _find_machine(self, machine_id: Optional[str]) -> Optional[Machine]:
        return next((m for m in self.machines if m.id == machine_id), None) if machine_id else None