Spaces:
Running
Running
| from __future__ import annotations | |
| import random | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| from typing import Any | |
| class GPUState(str, Enum): | |
| IDLE = "IDLE" | |
| ALLOCATED = "ALLOCATED" | |
| OVERLOADED = "OVERLOADED" | |
| FAILED = "FAILED" | |
| RECOVERING = "RECOVERING" | |
| class GPUDevice: | |
| gpu_id: str | |
| memory_total: int = 80 | |
| state: GPUState = GPUState.IDLE | |
| jobs_running: dict[str, int] = field(default_factory=dict) | |
| failure_probability: float = 0.0 | |
| recovery_steps_remaining: int = 0 | |
| false_report: dict[str, Any] | None = None | |
| def memory_used(self) -> int: | |
| return sum(self.jobs_running.values()) | |
| def memory_free(self) -> int: | |
| return max(0, self.memory_total - self.memory_used) | |
| def is_operational(self) -> bool: | |
| return self.state not in (GPUState.FAILED, GPUState.RECOVERING) | |
| class GPUPool: | |
| """ | |
| Stateful GPU cluster simulator. | |
| Phase 1 intentionally keeps this independent from SentinelEnv so we can | |
| test the cluster mechanics before wiring them into the OpenEnv API. | |
| """ | |
| def __init__( | |
| self, | |
| num_gpus: int = 16, | |
| memory_per_gpu: int = 80, | |
| failure_probability: float = 0.0, | |
| recovery_steps: int = 3, | |
| ) -> None: | |
| if num_gpus <= 0: | |
| raise ValueError("num_gpus must be positive.") | |
| if memory_per_gpu <= 0: | |
| raise ValueError("memory_per_gpu must be positive.") | |
| self._recovery_steps = recovery_steps | |
| self._gpus: dict[str, GPUDevice] = { | |
| f"GPU-{idx:02d}": GPUDevice( | |
| gpu_id=f"GPU-{idx:02d}", | |
| memory_total=memory_per_gpu, | |
| failure_probability=failure_probability, | |
| ) | |
| for idx in range(num_gpus) | |
| } | |
| def allocate( | |
| self, | |
| job_id: str, | |
| gpu_id: str, | |
| memory_required: int, | |
| allow_overcommit: bool = True, | |
| ) -> bool: | |
| if memory_required <= 0: | |
| raise ValueError("memory_required must be positive.") | |
| gpu = self._require_gpu(gpu_id) | |
| if not gpu.is_operational(): | |
| return False | |
| if self.find_job_gpu(job_id) is not None: | |
| return False | |
| if not allow_overcommit and memory_required > gpu.memory_free: | |
| return False | |
| gpu.jobs_running[job_id] = memory_required | |
| self._refresh_state(gpu) | |
| return True | |
| def preempt(self, job_id: str) -> bool: | |
| gpu_id = self.find_job_gpu(job_id) | |
| if gpu_id is None: | |
| return False | |
| gpu = self._gpus[gpu_id] | |
| gpu.jobs_running.pop(job_id, None) | |
| self._refresh_state(gpu) | |
| return True | |
| def find_job_gpu(self, job_id: str) -> str | None: | |
| for gpu_id, gpu in self._gpus.items(): | |
| if job_id in gpu.jobs_running: | |
| return gpu_id | |
| return None | |
| def tick(self, rng: random.Random | None = None) -> list[str]: | |
| """ | |
| Advance hardware state by one step. | |
| Returns GPU ids that newly failed on this tick. | |
| """ | |
| rng = rng or random.Random() | |
| newly_failed: list[str] = [] | |
| for gpu in self._gpus.values(): | |
| if gpu.state == GPUState.FAILED: | |
| gpu.state = GPUState.RECOVERING | |
| gpu.recovery_steps_remaining = self._recovery_steps | |
| continue | |
| if gpu.state == GPUState.RECOVERING: | |
| gpu.recovery_steps_remaining -= 1 | |
| if gpu.recovery_steps_remaining <= 0: | |
| gpu.jobs_running.clear() | |
| gpu.state = GPUState.IDLE | |
| continue | |
| if gpu.jobs_running and rng.random() < gpu.failure_probability: | |
| gpu.state = GPUState.FAILED | |
| newly_failed.append(gpu.gpu_id) | |
| continue | |
| self._refresh_state(gpu) | |
| return newly_failed | |
| def inject_false_report(self, gpu_id: str, false_state: dict[str, Any]) -> None: | |
| gpu = self._require_gpu(gpu_id) | |
| gpu.false_report = dict(false_state) | |
| def clear_false_reports(self) -> None: | |
| for gpu in self._gpus.values(): | |
| gpu.false_report = None | |
| def utilization_rate(self) -> float: | |
| total_memory = sum(gpu.memory_total for gpu in self._gpus.values() if gpu.is_operational()) | |
| if total_memory <= 0: | |
| return 0.0 | |
| used = sum(min(gpu.memory_used, gpu.memory_total) for gpu in self._gpus.values() if gpu.is_operational()) | |
| return round(used / total_memory, 4) | |
| def cluster_health_score(self) -> float: | |
| total = len(self._gpus) | |
| failed_like = sum( | |
| 1 for gpu in self._gpus.values() | |
| if gpu.state in (GPUState.FAILED, GPUState.RECOVERING) | |
| ) | |
| idle_or_failed = sum( | |
| 1 for gpu in self._gpus.values() | |
| if gpu.state in (GPUState.IDLE, GPUState.FAILED, GPUState.RECOVERING) | |
| ) | |
| overloaded = sum(1 for gpu in self._gpus.values() if gpu.state == GPUState.OVERLOADED) | |
| if failed_like / total > 0.60: | |
| return 0.0 | |
| if idle_or_failed / total > 0.30 or overloaded / total > 0.25: | |
| return 0.5 | |
| return 1.0 | |
| def snapshot(self, include_hidden: bool = False) -> list[dict[str, Any]]: | |
| return [self._gpu_snapshot(gpu, include_hidden=include_hidden) for gpu in self._gpus.values()] | |
| def summary(self) -> dict[str, Any]: | |
| states = {state.value: 0 for state in GPUState} | |
| for gpu in self._gpus.values(): | |
| states[gpu.state.value] += 1 | |
| return { | |
| "num_gpus": len(self._gpus), | |
| "states": states, | |
| "utilization_rate": self.utilization_rate(), | |
| "cluster_health_score": self.cluster_health_score(), | |
| "memory_used": sum(gpu.memory_used for gpu in self._gpus.values()), | |
| "memory_total": sum(gpu.memory_total for gpu in self._gpus.values()), | |
| } | |
| def _require_gpu(self, gpu_id: str) -> GPUDevice: | |
| if gpu_id not in self._gpus: | |
| raise KeyError(f"Unknown gpu_id: {gpu_id}") | |
| return self._gpus[gpu_id] | |
| def _refresh_state(self, gpu: GPUDevice) -> None: | |
| if gpu.state in (GPUState.FAILED, GPUState.RECOVERING): | |
| return | |
| if not gpu.jobs_running: | |
| gpu.state = GPUState.IDLE | |
| elif gpu.memory_used > gpu.memory_total: | |
| gpu.state = GPUState.OVERLOADED | |
| else: | |
| gpu.state = GPUState.ALLOCATED | |
| def _gpu_snapshot(self, gpu: GPUDevice, include_hidden: bool) -> dict[str, Any]: | |
| actual = { | |
| "id": gpu.gpu_id, | |
| "state": gpu.state.value, | |
| "memory_total": gpu.memory_total, | |
| "memory_used": gpu.memory_used, | |
| "memory_free": gpu.memory_free, | |
| "jobs": list(gpu.jobs_running.keys()), | |
| } | |
| if include_hidden: | |
| actual["false_report"] = gpu.false_report | |
| actual["recovery_steps_remaining"] = gpu.recovery_steps_remaining | |
| return actual | |
| if gpu.false_report: | |
| visible = dict(actual) | |
| visible.update(gpu.false_report) | |
| visible["report_tampered"] = True | |
| return visible | |
| return actual | |