Spaces:
Sleeping
Sleeping
| """ | |
| Cloud GPU+CPU Resource Management Environment Implementation. | |
| A real-world OpenEnv environment simulating cloud GPU **and** CPU resource | |
| management. Three progressively harder tasks: | |
| 1. gpu_cpu_allocation – combined GPU+CPU allocation with cost optimisation | |
| 2. thermal_management – threshold-based thermal monitoring & cooling | |
| 3. heuristic_fragmentation – GPU allocation via heuristic fragmentation strategies | |
| MCP Tools: | |
| - get_cluster_state() → current metrics for all nodes | |
| - get_task_info() → task description & objectives | |
| - take_action(decisions: str) → apply action, advance timestep, return reward | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import math | |
| import random | |
| from typing import Any, Optional | |
| from uuid import uuid4 | |
| try: | |
| from openenv.core.env_server.mcp_environment import MCPEnvironment | |
| from openenv.core.env_server.types import Action, Observation, State | |
| except ImportError: | |
| from openenv.core.env_server.mcp_environment import MCPEnvironment | |
| from openenv.core.env_server.types import Action, Observation, State | |
| from fastmcp import FastMCP | |
| # --------------------------------------------------------------------------- | |
| # GPU node templates (models commonly found in cloud) | |
| # --------------------------------------------------------------------------- | |
| GPU_NODE_TEMPLATES = [ | |
| { | |
| "name": "T4-node", | |
| "gpu_type": "T4", | |
| "gpu_count": 1, | |
| "gpu_vram_gb": 16.0, | |
| "cpu_capacity": 4.0, | |
| "memory_capacity_gb": 16.0, | |
| "cost_per_step": 8.0, | |
| "tdp_watts": 70.0, | |
| "max_temp_c": 83.0, | |
| }, | |
| { | |
| "name": "A100-node", | |
| "gpu_type": "A100", | |
| "gpu_count": 1, | |
| "gpu_vram_gb": 40.0, | |
| "cpu_capacity": 8.0, | |
| "memory_capacity_gb": 64.0, | |
| "cost_per_step": 30.0, | |
| "tdp_watts": 250.0, | |
| "max_temp_c": 85.0, | |
| }, | |
| { | |
| "name": "H100-node", | |
| "gpu_type": "H100", | |
| "gpu_count": 1, | |
| "gpu_vram_gb": 80.0, | |
| "cpu_capacity": 16.0, | |
| "memory_capacity_gb": 128.0, | |
| "cost_per_step": 55.0, | |
| "tdp_watts": 350.0, | |
| "max_temp_c": 83.0, | |
| }, | |
| { | |
| "name": "V100-node", | |
| "gpu_type": "V100", | |
| "gpu_count": 1, | |
| "gpu_vram_gb": 32.0, | |
| "cpu_capacity": 8.0, | |
| "memory_capacity_gb": 32.0, | |
| "cost_per_step": 18.0, | |
| "tdp_watts": 300.0, | |
| "max_temp_c": 84.0, | |
| }, | |
| { | |
| "name": "L4-node", | |
| "gpu_type": "L4", | |
| "gpu_count": 1, | |
| "gpu_vram_gb": 24.0, | |
| "cpu_capacity": 4.0, | |
| "memory_capacity_gb": 32.0, | |
| "cost_per_step": 12.0, | |
| "tdp_watts": 72.0, | |
| "max_temp_c": 82.0, | |
| }, | |
| ] | |
| # --------------------------------------------------------------------------- | |
| # Task definitions | |
| # --------------------------------------------------------------------------- | |
| TASKS = { | |
| "gpu_cpu_allocation": { | |
| "description": ( | |
| "Manage a cluster of GPU+CPU nodes to maximise compute throughput " | |
| "while minimising cost. Each node has GPU (VRAM, compute) and CPU " | |
| "resources. Incoming workloads vary in demand. Choose how to " | |
| "allocate resources across nodes and GPU types." | |
| ), | |
| "difficulty": "easy", | |
| "num_nodes": 3, | |
| "max_steps": 8, | |
| "target_gpu_util_pct": 0.70, | |
| "target_cpu_util_pct": 0.70, | |
| "budget_per_step": 120.0, | |
| "valid_actions": ["allocate_high", "allocate_low", "maintain", "migrate"], | |
| }, | |
| "thermal_management": { | |
| "description": ( | |
| "Monitor GPU and ambient temperatures across the cluster. " | |
| "If any GPU exceeds its thermal threshold, redistribute load " | |
| "to cooler nodes or increase cooling. Balance thermal safety " | |
| "with performance and energy cost." | |
| ), | |
| "difficulty": "medium", | |
| "num_nodes": 4, | |
| "max_steps": 10, | |
| "target_gpu_util_pct": 0.70, | |
| "target_cpu_util_pct": 0.70, | |
| "budget_per_step": None, | |
| "temp_safe_low": 55.0, | |
| "temp_safe_high": 75.0, | |
| "valid_actions": ["increase_cooling", "decrease_cooling", "migrate_load", "maintain"], | |
| }, | |
| "heuristic_fragmentation": { | |
| "description": ( | |
| "Allocate GPU resources in a fragmented cluster using heuristic " | |
| "strategies. Nodes have 8 GPU slots each; workloads need " | |
| "contiguous blocks of varying sizes (1,2,4,8). Choose " | |
| "placement and defragmentation strategies to minimise waste." | |
| ), | |
| "difficulty": "hard", | |
| "num_nodes": 5, | |
| "max_steps": 12, | |
| "target_gpu_util_pct": 0.80, | |
| "target_cpu_util_pct": 0.70, | |
| "budget_per_step": 200.0, | |
| "slots_per_node": 8, | |
| "valid_actions": ["best_fit", "first_fit", "compact", "split_workload"], | |
| }, | |
| } | |
| # --------------------------------------------------------------------------- | |
| # Trace generators | |
| # --------------------------------------------------------------------------- | |
| def _generate_workload_trace(num_steps: int, base: float, rng: random.Random) -> list[float]: | |
| """Generate a realistic workload utilisation trace in [0.08, 0.95].""" | |
| trend = rng.uniform(-0.03, 0.03) | |
| trace: list[float] = [] | |
| for t in range(num_steps): | |
| val = base + trend * t + rng.gauss(0, 0.06) | |
| if rng.random() < 0.12: | |
| val += rng.uniform(0.15, 0.35) | |
| trace.append(max(0.08, min(0.95, val))) | |
| return trace | |
| def _generate_ambient_trace(num_steps: int, rng: random.Random) -> list[float]: | |
| """Simulate ambient temperature with day/night cycle + heat spikes.""" | |
| trace: list[float] = [] | |
| for t in range(num_steps): | |
| # Day-night sinusoidal: 22-34 °C | |
| day_night = 28.0 + 6.0 * math.sin(2.0 * math.pi * t / max(num_steps, 1)) | |
| noise = rng.gauss(0, 1.5) | |
| spike = rng.uniform(5.0, 12.0) if rng.random() < 0.10 else 0.0 | |
| trace.append(round(max(18.0, min(45.0, day_night + noise + spike)), 1)) | |
| return trace | |
| def _generate_pending_workloads(num_steps: int, rng: random.Random) -> list[list[int]]: | |
| """Generate pending workload queues (GPU slot requirements) per step.""" | |
| workloads_per_step: list[list[int]] = [] | |
| for _ in range(num_steps): | |
| count = rng.randint(1, 4) | |
| sizes = [rng.choice([1, 1, 2, 2, 4, 8]) for _ in range(count)] | |
| workloads_per_step.append(sizes) | |
| return workloads_per_step | |
| # --------------------------------------------------------------------------- | |
| # Main environment | |
| # --------------------------------------------------------------------------- | |
| class CloudResourceEnvironment(MCPEnvironment): | |
| """Cloud GPU+CPU resource management environment with MCP tools.""" | |
| def __init__(self): | |
| mcp = FastMCP("cloud_resource_env") | |
| # ---- MCP Tools ---- | |
| def get_cluster_state() -> dict: | |
| """ | |
| Get the current state of all GPU+CPU nodes in the cluster. | |
| Returns GPU/CPU usage, capacity, temperature, fragmentation, and cost. | |
| """ | |
| return self._build_cluster_state() | |
| def get_task_info() -> dict: | |
| """ | |
| Get information about the current task including objectives, | |
| constraints, and valid actions. | |
| """ | |
| return self._build_task_info() | |
| def take_action(decisions: str) -> dict: | |
| """ | |
| Apply resource management decisions and advance by one timestep. | |
| Args: | |
| decisions: JSON string mapping node_id to action. | |
| Task 1 (gpu_cpu_allocation): | |
| {"node_0": "allocate_high", "node_1": "maintain"} | |
| Valid: allocate_high, allocate_low, maintain, migrate | |
| Task 2 (thermal_management): | |
| {"node_0": "increase_cooling", "node_1": "migrate_load"} | |
| Valid: increase_cooling, decrease_cooling, migrate_load, maintain | |
| Task 3 (heuristic_fragmentation): | |
| {"node_0": "best_fit", "node_1": "compact"} | |
| Valid: best_fit, first_fit, compact, split_workload | |
| Returns: | |
| Dictionary with reward, done, feedback, updated cluster_state, and score. | |
| """ | |
| return self._process_action(decisions) | |
| super().__init__(mcp) | |
| self._state = State(episode_id=str(uuid4()), step_count=0) | |
| self._task_name: str = "gpu_cpu_allocation" | |
| self._task_cfg: dict = TASKS[self._task_name] | |
| self._nodes: list[dict] = [] | |
| # Workload traces | |
| self._gpu_workloads: dict[str, list[float]] = {} | |
| self._cpu_workloads: dict[str, list[float]] = {} | |
| self._mem_workloads: dict[str, list[float]] = {} | |
| # Thermal traces | |
| self._ambient_trace: list[float] = [] | |
| self._cooling_levels: dict[str, int] = {} # 0-3 | |
| # Fragmentation state | |
| self._slot_maps: dict[str, list[int]] = {} # 0=free, workload_id otherwise | |
| self._pending_workloads: list[list[int]] = [] | |
| self._next_workload_id: int = 1 | |
| self._timestep: int = 0 | |
| self._step_rewards: list[float] = [] | |
| self._rng = random.Random(42) | |
| self._episode_done = False | |
| # ------------------------------------------------------------------ | |
| # OpenEnv lifecycle | |
| # ------------------------------------------------------------------ | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| **kwargs: Any, | |
| ) -> Observation: | |
| task = kwargs.get("task", "gpu_cpu_allocation") | |
| if task not in TASKS: | |
| task = "gpu_cpu_allocation" | |
| self._task_name = task | |
| self._task_cfg = TASKS[task] | |
| self._rng = random.Random(seed if seed is not None else 42) | |
| self._timestep = 0 | |
| self._step_rewards = [] | |
| self._episode_done = False | |
| self._next_workload_id = 1 | |
| self._state = State( | |
| episode_id=episode_id or str(uuid4()), | |
| step_count=0, | |
| ) | |
| num = self._task_cfg["num_nodes"] | |
| max_steps = self._task_cfg["max_steps"] | |
| # --- Initialise nodes --- | |
| self._nodes = [] | |
| for i in range(num): | |
| tmpl = GPU_NODE_TEMPLATES[i % len(GPU_NODE_TEMPLATES)] | |
| self._nodes.append({ | |
| "node_id": f"node_{i}", | |
| "gpu_type": tmpl["gpu_type"], | |
| "gpu_count": tmpl["gpu_count"], | |
| "gpu_vram_gb": tmpl["gpu_vram_gb"], | |
| "cpu_capacity": tmpl["cpu_capacity"], | |
| "memory_capacity_gb": tmpl["memory_capacity_gb"], | |
| "cost_per_step": tmpl["cost_per_step"], | |
| "tdp_watts": tmpl["tdp_watts"], | |
| "max_temp_c": tmpl["max_temp_c"], | |
| "name": tmpl["name"], | |
| }) | |
| # --- Workload traces --- | |
| self._gpu_workloads = {} | |
| self._cpu_workloads = {} | |
| self._mem_workloads = {} | |
| for node in self._nodes: | |
| nid = node["node_id"] | |
| self._gpu_workloads[nid] = _generate_workload_trace( | |
| max_steps + 1, self._rng.uniform(0.35, 0.80), self._rng | |
| ) | |
| self._cpu_workloads[nid] = _generate_workload_trace( | |
| max_steps + 1, self._rng.uniform(0.30, 0.75), self._rng | |
| ) | |
| self._mem_workloads[nid] = _generate_workload_trace( | |
| max_steps + 1, self._rng.uniform(0.25, 0.70), self._rng | |
| ) | |
| # --- Thermal traces --- | |
| self._ambient_trace = _generate_ambient_trace(max_steps + 1, self._rng) | |
| self._cooling_levels = {n["node_id"]: 1 for n in self._nodes} | |
| # --- Fragmentation state --- | |
| slots = self._task_cfg.get("slots_per_node", 8) | |
| self._slot_maps = {n["node_id"]: [0] * slots for n in self._nodes} | |
| self._pending_workloads = _generate_pending_workloads(max_steps + 1, self._rng) | |
| return Observation( | |
| done=False, | |
| reward=0.0, | |
| metadata={ | |
| "status": "ready", | |
| "task": self._task_name, | |
| "difficulty": self._task_cfg["difficulty"], | |
| "message": f"Cloud GPU+CPU environment ready. Task: {self._task_name}", | |
| "cluster_state": json.dumps(self._build_cluster_state()), | |
| "task_info": json.dumps(self._build_task_info()), | |
| }, | |
| ) | |
| def _step_impl( | |
| self, | |
| action: Action, | |
| timeout_s: Optional[float] = None, | |
| **kwargs: Any, | |
| ) -> Observation: | |
| if hasattr(action, "decisions"): | |
| result = self._process_action(action.decisions) | |
| return Observation( | |
| done=result["done"], | |
| reward=result["reward"], | |
| metadata=result, | |
| ) | |
| return Observation( | |
| done=False, | |
| reward=0.0, | |
| metadata={"error": f"Unknown action type: {type(action).__name__}. Use MCP tools or CloudAction."}, | |
| ) | |
| def step(self, action: Action, timeout_s: Optional[float] = None, **kwargs: Any) -> Observation: | |
| self._state.step_count += 1 | |
| return super().step(action, timeout_s=timeout_s, **kwargs) | |
| async def step_async(self, action: Action, timeout_s: Optional[float] = None, **kwargs: Any) -> Observation: | |
| self._state.step_count += 1 | |
| return await super().step_async(action, timeout_s=timeout_s, **kwargs) | |
| def state(self) -> State: | |
| return self._state | |
| # ------------------------------------------------------------------ | |
| # Metrics helpers | |
| # ------------------------------------------------------------------ | |
| def _gpu_temp_for_node(self, node: dict) -> float: | |
| """Compute GPU temperature from utilisation, ambient temp, and cooling.""" | |
| nid = node["node_id"] | |
| t = min(self._timestep, len(self._gpu_workloads.get(nid, [0.5])) - 1) | |
| util = self._gpu_workloads[nid][t] | |
| ambient = self._ambient_trace[min(t, len(self._ambient_trace) - 1)] | |
| cooling = self._cooling_levels.get(nid, 1) | |
| # Base temp from utilisation: idle ~35°C, full load ~TDP-mapped | |
| base_temp = 35.0 + util * 50.0 # 35-85°C range at full util | |
| # Ambient contribution | |
| ambient_factor = (ambient - 25.0) * 0.3 # deviation from 25°C baseline | |
| # Cooling reduction: each level reduces ~5°C | |
| cooling_reduction = cooling * 5.0 | |
| # Random jitter | |
| jitter = self._rng.gauss(0, 1.0) | |
| temp = base_temp + ambient_factor - cooling_reduction + jitter | |
| return round(max(30.0, min(100.0, temp)), 1) | |
| def _current_node_metrics(self, node: dict) -> dict: | |
| nid = node["node_id"] | |
| t = min(self._timestep, len(self._gpu_workloads.get(nid, [0.5])) - 1) | |
| gpu_util = self._gpu_workloads[nid][t] | |
| cpu_util = self._cpu_workloads[nid][t] | |
| mem_util = self._mem_workloads[nid][t] | |
| gpu_vram_used = round(gpu_util * node["gpu_vram_gb"], 2) | |
| cpu_usage = round(cpu_util * node["cpu_capacity"], 2) | |
| mem_usage = round(mem_util * node["memory_capacity_gb"], 2) | |
| power_draw = round(node["tdp_watts"] * (0.3 + 0.7 * gpu_util), 1) | |
| gpu_temp = self._gpu_temp_for_node(node) | |
| ambient = self._ambient_trace[min(t, len(self._ambient_trace) - 1)] | |
| cooling = self._cooling_levels.get(nid, 1) | |
| thermal_throttle = gpu_temp > node["max_temp_c"] | |
| # Fragmentation info | |
| slots = self._slot_maps.get(nid, []) | |
| total_slots = len(slots) | |
| free_slots = slots.count(0) | |
| frag_score = self._fragmentation_score(nid) if total_slots > 0 else 0.0 | |
| metrics = { | |
| "node_id": nid, | |
| "gpu_type": node["gpu_type"], | |
| "node_name": node["name"], | |
| # GPU | |
| "gpu_count": node["gpu_count"], | |
| "gpu_utilization_pct": round(gpu_util * 100, 1), | |
| "gpu_vram_used_gb": gpu_vram_used, | |
| "gpu_vram_capacity_gb": node["gpu_vram_gb"], | |
| # CPU | |
| "cpu_usage": cpu_usage, | |
| "cpu_capacity": node["cpu_capacity"], | |
| "cpu_utilization_pct": round(cpu_util * 100, 1), | |
| # Memory | |
| "memory_usage_gb": mem_usage, | |
| "memory_capacity_gb": node["memory_capacity_gb"], | |
| "memory_utilization_pct": round(mem_util * 100, 1), | |
| # Thermal | |
| "gpu_temp_celsius": gpu_temp, | |
| "ambient_temp_celsius": ambient, | |
| "cooling_level": cooling, | |
| "max_temp_threshold": node["max_temp_c"], | |
| "thermal_throttle": thermal_throttle, | |
| # Power & cost | |
| "power_draw_watts": power_draw, | |
| "cost_per_step": round(node["cost_per_step"], 2), | |
| # Fragmentation | |
| "gpu_slots_total": total_slots, | |
| "gpu_slots_free": free_slots, | |
| "gpu_slots_used": total_slots - free_slots, | |
| "fragmentation_score": round(frag_score, 3), | |
| } | |
| return metrics | |
| def _fragmentation_score(self, nid: str) -> float: | |
| """ | |
| Compute fragmentation score for a node. | |
| 0.0 = all free slots are contiguous (ideal) | |
| 1.0 = maximally fragmented | |
| """ | |
| slots = self._slot_maps.get(nid, []) | |
| if not slots: | |
| return 0.0 | |
| free_count = slots.count(0) | |
| if free_count == 0 or free_count == len(slots): | |
| return 0.0 | |
| # Count number of contiguous free blocks | |
| blocks = 0 | |
| in_block = False | |
| for s in slots: | |
| if s == 0 and not in_block: | |
| blocks += 1 | |
| in_block = True | |
| elif s != 0: | |
| in_block = False | |
| if blocks <= 1: | |
| return 0.0 | |
| # Normalise: 1 block = 0, max blocks = free_count | |
| return round(min(1.0, (blocks - 1) / max(1, free_count - 1)), 3) | |
| def _build_cluster_state(self) -> dict: | |
| nodes = [self._current_node_metrics(n) for n in self._nodes] | |
| total_cost = sum(n["cost_per_step"] for n in nodes) | |
| budget = self._task_cfg.get("budget_per_step") | |
| state: dict[str, Any] = { | |
| "timestep": self._timestep, | |
| "max_timesteps": self._task_cfg["max_steps"], | |
| "task": self._task_name, | |
| "nodes": nodes, | |
| "total_cost_per_step": round(total_cost, 2), | |
| "budget_per_step": budget, | |
| "budget_remaining": round(budget - total_cost, 2) if budget else None, | |
| } | |
| # Task-specific extras | |
| if self._task_name == "thermal_management": | |
| t = min(self._timestep, len(self._ambient_trace) - 1) | |
| state["ambient_temp_celsius"] = self._ambient_trace[t] | |
| state["any_throttling"] = any( | |
| self._current_node_metrics(n)["thermal_throttle"] for n in self._nodes | |
| ) | |
| if self._task_name == "heuristic_fragmentation": | |
| pw_idx = min(self._timestep, len(self._pending_workloads) - 1) | |
| state["pending_workloads"] = self._pending_workloads[pw_idx] | |
| state["cluster_fragmentation"] = round( | |
| sum(self._fragmentation_score(n["node_id"]) for n in self._nodes) / len(self._nodes), 3 | |
| ) | |
| return state | |
| def _build_task_info(self) -> dict: | |
| cfg = self._task_cfg | |
| objectives = [ | |
| f"Keep GPU utilisation near {cfg['target_gpu_util_pct'] * 100:.0f}%", | |
| f"Keep CPU utilisation near {cfg['target_cpu_util_pct'] * 100:.0f}%", | |
| "Avoid GPU overloads (utilisation > 100%)", | |
| ] | |
| if self._task_name == "gpu_cpu_allocation": | |
| objectives.append("Minimise cost while meeting demand") | |
| objectives.append("Migrate workloads to cheaper GPUs when possible") | |
| elif self._task_name == "thermal_management": | |
| objectives.append("Keep GPU temperatures below threshold") | |
| objectives.append("Redistribute load when GPU overheats") | |
| objectives.append("Minimise cooling energy cost") | |
| elif self._task_name == "heuristic_fragmentation": | |
| objectives.append("Place pending workloads efficiently") | |
| objectives.append("Minimise fragmentation") | |
| objectives.append("Use heuristic strategies (best-fit, first-fit)") | |
| if cfg.get("budget_per_step"): | |
| objectives.append("Stay within budget constraint") | |
| return { | |
| "task_name": self._task_name, | |
| "difficulty": cfg["difficulty"], | |
| "description": cfg["description"], | |
| "num_nodes": cfg["num_nodes"], | |
| "max_steps": cfg["max_steps"], | |
| "target_gpu_utilization_pct": cfg["target_gpu_util_pct"] * 100, | |
| "target_cpu_utilization_pct": cfg["target_cpu_util_pct"] * 100, | |
| "budget_per_step": cfg.get("budget_per_step"), | |
| "valid_actions": cfg["valid_actions"], | |
| "objectives": objectives, | |
| } | |
| # ------------------------------------------------------------------ | |
| # Action processing (per-task) | |
| # ------------------------------------------------------------------ | |
| def _process_action(self, decisions_str: str) -> dict: | |
| if self._episode_done: | |
| return { | |
| "reward": 0.0, | |
| "done": True, | |
| "feedback": "Episode already finished.", | |
| "cluster_state": self._build_cluster_state(), | |
| "score": self._compute_score(), | |
| } | |
| try: | |
| decisions = json.loads(decisions_str) if isinstance(decisions_str, str) else decisions_str | |
| except (json.JSONDecodeError, TypeError): | |
| decisions = {} | |
| if self._task_name == "gpu_cpu_allocation": | |
| result = self._process_gpu_cpu_allocation(decisions) | |
| elif self._task_name == "thermal_management": | |
| result = self._process_thermal_management(decisions) | |
| elif self._task_name == "heuristic_fragmentation": | |
| result = self._process_heuristic_fragmentation(decisions) | |
| else: | |
| result = {"feedback_lines": [], "step_reward": 0.0} | |
| # Advance timestep | |
| self._timestep += 1 | |
| # Recompute reward after timestep advance (observe new workload) | |
| step_reward = result["step_reward"] | |
| self._step_rewards.append(step_reward) | |
| done = self._timestep >= self._task_cfg["max_steps"] | |
| if done: | |
| self._episode_done = True | |
| score = self._compute_score() | |
| return { | |
| "reward": round(step_reward, 4), | |
| "done": done, | |
| "feedback": " | ".join(result["feedback_lines"]), | |
| "cluster_state": self._build_cluster_state(), | |
| "score": round(score, 4), | |
| "timestep": self._timestep, | |
| "max_timesteps": self._task_cfg["max_steps"], | |
| } | |
| # --- Task 1: GPU+CPU Allocation with Cost Optimisation --- | |
| def _process_gpu_cpu_allocation(self, decisions: dict) -> dict: | |
| feedback: list[str] = [] | |
| valid = self._task_cfg["valid_actions"] | |
| for node in self._nodes: | |
| nid = node["node_id"] | |
| action = decisions.get(nid, "maintain") | |
| if action not in valid: | |
| action = "maintain" | |
| if action == "allocate_high": | |
| # Scale up GPU+CPU capacity by 50% | |
| node["gpu_vram_gb"] = round(node["gpu_vram_gb"] * 1.5, 2) | |
| node["cpu_capacity"] = round(node["cpu_capacity"] * 1.5, 2) | |
| node["memory_capacity_gb"] = round(node["memory_capacity_gb"] * 1.5, 2) | |
| node["cost_per_step"] = round(node["cost_per_step"] * 1.5, 2) | |
| feedback.append(f"{nid}: allocate_high (+50% capacity, +50% cost)") | |
| elif action == "allocate_low": | |
| # Scale down by 33% | |
| node["gpu_vram_gb"] = round(max(8.0, node["gpu_vram_gb"] / 1.5), 2) | |
| node["cpu_capacity"] = round(max(1.0, node["cpu_capacity"] / 1.5), 2) | |
| node["memory_capacity_gb"] = round(max(4.0, node["memory_capacity_gb"] / 1.5), 2) | |
| node["cost_per_step"] = round(max(3.0, node["cost_per_step"] / 1.5), 2) | |
| feedback.append(f"{nid}: allocate_low (-33% capacity, -33% cost)") | |
| elif action == "migrate": | |
| # Reduce this node's workload, slightly increase others | |
| nid_idx = [n["node_id"] for n in self._nodes].index(nid) | |
| t = min(self._timestep, len(self._gpu_workloads[nid]) - 1) | |
| migrated = self._gpu_workloads[nid][t] * 0.3 | |
| self._gpu_workloads[nid][t] *= 0.7 | |
| self._cpu_workloads[nid][t] *= 0.7 | |
| # Spread to other nodes | |
| others = [n for n in self._nodes if n["node_id"] != nid] | |
| if others: | |
| share = migrated / len(others) | |
| for other in others: | |
| oid = other["node_id"] | |
| ot = min(self._timestep, len(self._gpu_workloads[oid]) - 1) | |
| self._gpu_workloads[oid][ot] = min(0.95, self._gpu_workloads[oid][ot] + share) | |
| self._cpu_workloads[oid][ot] = min(0.95, self._cpu_workloads[oid][ot] + share * 0.5) | |
| feedback.append(f"{nid}: migrate (30% load moved to other nodes)") | |
| else: | |
| feedback.append(f"{nid}: maintained") | |
| # Compute reward | |
| target_gpu = self._task_cfg["target_gpu_util_pct"] | |
| target_cpu = self._task_cfg["target_cpu_util_pct"] | |
| reward = 0.0 | |
| for node in self._nodes: | |
| m = self._current_node_metrics(node) | |
| gpu_pct = m["gpu_utilization_pct"] / 100.0 | |
| cpu_pct = m["cpu_utilization_pct"] / 100.0 | |
| if gpu_pct > 1.0 or cpu_pct > 1.0: | |
| reward -= 0.5 | |
| feedback.append(f"⚠️ {node['node_id']} OVERLOADED!") | |
| else: | |
| gpu_eff = max(0.0, 1.0 - 2.0 * abs(gpu_pct - target_gpu)) | |
| cpu_eff = max(0.0, 1.0 - 2.0 * abs(cpu_pct - target_cpu)) | |
| reward += (gpu_eff * 0.6 + cpu_eff * 0.4) # GPU weighted more | |
| reward /= len(self._nodes) | |
| # Budget penalty | |
| budget = self._task_cfg.get("budget_per_step") | |
| if budget: | |
| total_cost = sum(n["cost_per_step"] for n in self._nodes) | |
| if total_cost > budget: | |
| reward *= 0.5 | |
| feedback.append(f"⚠️ Over budget! Cost {total_cost:.0f} > Budget {budget:.0f}") | |
| reward = max(0.0, min(1.0, reward)) | |
| return {"feedback_lines": feedback, "step_reward": reward} | |
| # --- Task 2: Thermal Management --- | |
| def _process_thermal_management(self, decisions: dict) -> dict: | |
| feedback: list[str] = [] | |
| valid = self._task_cfg["valid_actions"] | |
| cooling_energy_cost = 0.0 | |
| for node in self._nodes: | |
| nid = node["node_id"] | |
| action = decisions.get(nid, "maintain") | |
| if action not in valid: | |
| action = "maintain" | |
| current_cooling = self._cooling_levels.get(nid, 1) | |
| if action == "increase_cooling": | |
| new_cooling = min(3, current_cooling + 1) | |
| self._cooling_levels[nid] = new_cooling | |
| cooling_energy_cost += 5.0 * new_cooling | |
| feedback.append(f"{nid}: cooling ↑ (level {current_cooling}→{new_cooling})") | |
| elif action == "decrease_cooling": | |
| new_cooling = max(0, current_cooling - 1) | |
| self._cooling_levels[nid] = new_cooling | |
| feedback.append(f"{nid}: cooling ↓ (level {current_cooling}→{new_cooling})") | |
| elif action == "migrate_load": | |
| # Move 40% of load to coolest node | |
| t = min(self._timestep, len(self._gpu_workloads[nid]) - 1) | |
| migrated = self._gpu_workloads[nid][t] * 0.4 | |
| self._gpu_workloads[nid][t] *= 0.6 | |
| self._cpu_workloads[nid][t] *= 0.6 | |
| # Find coolest other node | |
| others = [n for n in self._nodes if n["node_id"] != nid] | |
| if others: | |
| coolest = min(others, key=lambda n: self._gpu_temp_for_node(n)) | |
| cid = coolest["node_id"] | |
| ct = min(self._timestep, len(self._gpu_workloads[cid]) - 1) | |
| self._gpu_workloads[cid][ct] = min(0.95, self._gpu_workloads[cid][ct] + migrated) | |
| self._cpu_workloads[cid][ct] = min(0.95, self._cpu_workloads[cid][ct] + migrated * 0.5) | |
| feedback.append(f"{nid}: migrated 40% load → {cid} (coolest)") | |
| else: | |
| feedback.append(f"{nid}: migrate_load (no other nodes)") | |
| else: | |
| feedback.append(f"{nid}: maintained") | |
| # Compute reward | |
| safe_low = self._task_cfg["temp_safe_low"] | |
| safe_high = self._task_cfg["temp_safe_high"] | |
| target_gpu = self._task_cfg["target_gpu_util_pct"] | |
| reward = 0.0 | |
| any_throttle = False | |
| for node in self._nodes: | |
| m = self._current_node_metrics(node) | |
| temp = m["gpu_temp_celsius"] | |
| gpu_pct = m["gpu_utilization_pct"] / 100.0 | |
| # Temperature reward | |
| if temp <= safe_high and temp >= safe_low: | |
| temp_reward = 1.0 # In safe zone | |
| elif temp > node["max_temp_c"]: | |
| temp_reward = -1.0 # Critical — throttling | |
| any_throttle = True | |
| feedback.append(f"🔥 {node['node_id']} THERMAL THROTTLE! {temp:.1f}°C > {node['max_temp_c']}°C") | |
| elif temp > safe_high: | |
| # Warning zone | |
| overshoot = (temp - safe_high) / (node["max_temp_c"] - safe_high) | |
| temp_reward = max(-0.5, 0.5 - overshoot) | |
| else: | |
| # Below safe_low — overcooled, wasting energy | |
| temp_reward = 0.5 | |
| # Utilisation efficiency | |
| util_eff = max(0.0, 1.0 - 2.0 * abs(gpu_pct - target_gpu)) | |
| # Weighted: 50% thermal, 40% utilisation, 10% cooling cost penalty | |
| node_reward = temp_reward * 0.5 + util_eff * 0.4 | |
| reward += node_reward | |
| reward /= len(self._nodes) | |
| # Cooling energy penalty | |
| cooling_penalty = cooling_energy_cost / (len(self._nodes) * 15.0) # normalise | |
| reward -= cooling_penalty * 0.1 | |
| reward = max(0.0, min(1.0, reward)) | |
| return {"feedback_lines": feedback, "step_reward": reward} | |
| # --- Task 3: Heuristic Fragmentation GPU Allocation --- | |
| def _process_heuristic_fragmentation(self, decisions: dict) -> dict: | |
| feedback: list[str] = [] | |
| valid = self._task_cfg["valid_actions"] | |
| slots_per = self._task_cfg["slots_per_node"] | |
| # Get pending workloads for this step | |
| pw_idx = min(self._timestep, len(self._pending_workloads) - 1) | |
| pending = list(self._pending_workloads[pw_idx]) | |
| # Randomly free some old slots to create fragmentation | |
| for node in self._nodes: | |
| nid = node["node_id"] | |
| for i in range(len(self._slot_maps[nid])): | |
| if self._slot_maps[nid][i] != 0 and self._rng.random() < 0.15: | |
| self._slot_maps[nid][i] = 0 | |
| # Determine global strategy from decisions (majority vote or per-node) | |
| strategy_votes: dict[str, int] = {} | |
| for node in self._nodes: | |
| nid = node["node_id"] | |
| action = decisions.get(nid, "best_fit") | |
| if action not in valid: | |
| action = "best_fit" | |
| strategy_votes[action] = strategy_votes.get(action, 0) + 1 | |
| primary_strategy = max(strategy_votes, key=strategy_votes.get) # type: ignore | |
| feedback.append(f"Strategy: {primary_strategy}") | |
| placed = 0 | |
| failed = 0 | |
| for wl_size in pending: | |
| wl_id = self._next_workload_id | |
| self._next_workload_id += 1 | |
| if primary_strategy == "compact": | |
| # First compact (defragment), then best-fit | |
| self._compact_nodes() | |
| success = self._place_best_fit(wl_size, wl_id) | |
| elif primary_strategy == "best_fit": | |
| success = self._place_best_fit(wl_size, wl_id) | |
| elif primary_strategy == "first_fit": | |
| success = self._place_first_fit(wl_size, wl_id) | |
| elif primary_strategy == "split_workload": | |
| success = self._place_split(wl_size, wl_id) | |
| else: | |
| success = self._place_best_fit(wl_size, wl_id) | |
| if success: | |
| placed += 1 | |
| else: | |
| failed += 1 | |
| feedback.append(f"Placed {placed}/{placed + failed} workloads (sizes: {pending})") | |
| if failed > 0: | |
| feedback.append(f"⚠️ {failed} workloads could not be placed!") | |
| # Compute reward | |
| # Placement success | |
| placement_ratio = placed / max(1, placed + failed) | |
| # Fragmentation reduction | |
| avg_frag = sum(self._fragmentation_score(n["node_id"]) for n in self._nodes) / len(self._nodes) | |
| # Utilisation balance | |
| target_gpu = self._task_cfg["target_gpu_util_pct"] | |
| util_reward = 0.0 | |
| for node in self._nodes: | |
| nid = node["node_id"] | |
| slots = self._slot_maps[nid] | |
| used = sum(1 for s in slots if s != 0) | |
| util = used / max(1, len(slots)) | |
| util_reward += max(0.0, 1.0 - 2.0 * abs(util - target_gpu)) | |
| util_reward /= len(self._nodes) | |
| # Weighted reward | |
| reward = placement_ratio * 0.4 + (1.0 - avg_frag) * 0.3 + util_reward * 0.3 | |
| # Budget penalty | |
| budget = self._task_cfg.get("budget_per_step") | |
| if budget: | |
| total_cost = sum(n["cost_per_step"] for n in self._nodes) | |
| if total_cost > budget: | |
| reward *= 0.5 | |
| feedback.append(f"⚠️ Over budget! Cost {total_cost:.0f} > Budget {budget:.0f}") | |
| # Compaction overhead penalty | |
| if primary_strategy == "compact": | |
| reward *= 0.9 # 10% penalty for migration overhead | |
| feedback.append("ℹ️ Compact: 10% overhead for defragmentation") | |
| reward = max(0.0, min(1.0, reward)) | |
| return {"feedback_lines": feedback, "step_reward": reward} | |
| # --- Fragmentation placement helpers --- | |
| def _find_contiguous_free(self, nid: str, size: int) -> int: | |
| """Find start index of contiguous free block of given size. Returns -1 if none.""" | |
| slots = self._slot_maps[nid] | |
| for i in range(len(slots) - size + 1): | |
| if all(s == 0 for s in slots[i:i + size]): | |
| return i | |
| return -1 | |
| def _place_best_fit(self, size: int, wl_id: int) -> bool: | |
| """Best-fit: place in node with smallest sufficient contiguous block.""" | |
| best_node = None | |
| best_start = -1 | |
| best_free = float("inf") | |
| for node in self._nodes: | |
| nid = node["node_id"] | |
| start = self._find_contiguous_free(nid, size) | |
| if start >= 0: | |
| free = self._slot_maps[nid].count(0) | |
| if free < best_free: | |
| best_free = free | |
| best_node = nid | |
| best_start = start | |
| if best_node is not None and best_start >= 0: | |
| for i in range(size): | |
| self._slot_maps[best_node][best_start + i] = wl_id | |
| return True | |
| return False | |
| def _place_first_fit(self, size: int, wl_id: int) -> bool: | |
| """First-fit: place in first node with sufficient contiguous block.""" | |
| for node in self._nodes: | |
| nid = node["node_id"] | |
| start = self._find_contiguous_free(nid, size) | |
| if start >= 0: | |
| for i in range(size): | |
| self._slot_maps[nid][start + i] = wl_id | |
| return True | |
| return False | |
| def _place_split(self, size: int, wl_id: int) -> bool: | |
| """Split workload across multiple nodes if no single node has enough.""" | |
| # First try contiguous placement | |
| if self._place_first_fit(size, wl_id): | |
| return True | |
| # Split across nodes | |
| remaining = size | |
| for node in self._nodes: | |
| nid = node["node_id"] | |
| slots = self._slot_maps[nid] | |
| for i in range(len(slots)): | |
| if slots[i] == 0 and remaining > 0: | |
| slots[i] = wl_id | |
| remaining -= 1 | |
| if remaining == 0: | |
| return True | |
| return remaining == 0 | |
| def _compact_nodes(self) -> None: | |
| """Defragment all nodes by moving allocated slots to front.""" | |
| for node in self._nodes: | |
| nid = node["node_id"] | |
| slots = self._slot_maps[nid] | |
| # Gather non-zero (allocated) entries, push to front | |
| allocated = [s for s in slots if s != 0] | |
| free = [0] * (len(slots) - len(allocated)) | |
| self._slot_maps[nid] = allocated + free | |
| def _compute_score(self) -> float: | |
| if not self._step_rewards: | |
| return 0.0 | |
| return max(0.0, min(1.0, sum(self._step_rewards) / len(self._step_rewards))) | |