"""Cloud resource reinforcement-learning environment (Gymnasium wrapper). Wraps the GPU+CPU cloud management environment for local RL training with stable-baselines3 and similar libraries. Observation space (12 features per node, flattened): [gpu_util, cpu_util, mem_util, gpu_vram_used, gpu_vram_cap, cpu_usage, cpu_cap, gpu_temp, ambient_temp, cooling_level, fragmentation_score, cost_per_step] Action space: Discrete(4) — task-specific mapping Task 1 (gpu_cpu_allocation): 0=maintain, 1=allocate_high, 2=allocate_low, 3=migrate Task 2 (thermal_management): 0=maintain, 1=increase_cooling, 2=decrease_cooling, 3=migrate_load Task 3 (heuristic_fragmentation): 0=best_fit, 1=first_fit, 2=compact, 3=split_workload """ # pyright: reportMissingImports=false from __future__ import annotations import csv import json import math import random from pathlib import Path import numpy as np try: import gymnasium as gym from gymnasium import spaces except ImportError: # pragma: no cover - optional dependency for local inspection gym = object spaces = None # Number of features per node in observation _FEATURES_PER_NODE = 12 _MAX_NODES = 5 # maximum nodes across all tasks class CloudResourceEnv(gym.Env if spaces is not None else object): """Gymnasium wrapper for the Cloud GPU+CPU management environment.""" metadata = {"render_modes": ["human"]} # Action mappings per task ACTION_MAP = { "gpu_cpu_allocation": {0: "maintain", 1: "allocate_high", 2: "allocate_low", 3: "migrate"}, "thermal_management": {0: "maintain", 1: "increase_cooling", 2: "decrease_cooling", 3: "migrate_load"}, "heuristic_fragmentation": {0: "best_fit", 1: "first_fit", 2: "compact", 3: "split_workload"}, } TASK_CONFIGS = { "gpu_cpu_allocation": {"num_nodes": 3, "max_steps": 8}, "thermal_management": {"num_nodes": 4, "max_steps": 10}, "heuristic_fragmentation": {"num_nodes": 5, "max_steps": 12}, } def __init__(self, task: str = "gpu_cpu_allocation", seed: int = 42): super().__init__() if task not in self.TASK_CONFIGS: raise ValueError(f"Unknown task: {task}. Valid: {list(self.TASK_CONFIGS.keys())}") self.task = task self._seed = seed # We import and use the server environment directly for local training from server.cloud_environment import CloudResourceEnvironment self._env = CloudResourceEnvironment() cfg = self.TASK_CONFIGS[task] self.num_nodes = cfg["num_nodes"] self.max_steps = cfg["max_steps"] if spaces is not None: self.action_space = spaces.Discrete(4) # Observation: flattened node metrics (12 features × max_nodes, padded) obs_size = _FEATURES_PER_NODE * _MAX_NODES self.observation_space = spaces.Box( low=0.0, high=np.finfo(np.float32).max, shape=(obs_size,), dtype=np.float32, ) self.current_step = 0 def reset(self, *, seed=None, options=None): if seed is not None: self._seed = seed obs_result = self._env.reset(seed=self._seed, task=self.task) self.current_step = 0 return self._obs_from_env(), {} def step(self, action): if action not in self.ACTION_MAP[self.task]: raise ValueError(f"Invalid action {action}. Allowed: 0-3") action_name = self.ACTION_MAP[self.task][action] # Apply same action to all nodes decisions = {f"node_{i}": action_name for i in range(self.num_nodes)} decisions_str = json.dumps(decisions) # Use the internal process result = self._env._process_action(decisions_str) # Advance internal timestep self._env._timestep += 1 self.current_step += 1 reward = float(result.get("reward", 0.0)) terminated = bool(result.get("done", False)) truncated = False obs = self._obs_from_env() info = { "task": self.task, "timestep": self.current_step, "feedback": result.get("feedback", ""), "score": result.get("score", 0.0), } return obs, reward, terminated, truncated, info def _obs_from_env(self) -> np.ndarray: """Extract flattened observation from environment state.""" state = self._env._build_cluster_state() obs = np.zeros(_FEATURES_PER_NODE * _MAX_NODES, dtype=np.float32) for i, node in enumerate(state.get("nodes", [])): if i >= _MAX_NODES: break offset = i * _FEATURES_PER_NODE obs[offset + 0] = node.get("gpu_utilization_pct", 0.0) / 100.0 obs[offset + 1] = node.get("cpu_utilization_pct", 0.0) / 100.0 obs[offset + 2] = node.get("memory_utilization_pct", 0.0) / 100.0 obs[offset + 3] = node.get("gpu_vram_used_gb", 0.0) obs[offset + 4] = node.get("gpu_vram_capacity_gb", 0.0) obs[offset + 5] = node.get("cpu_usage", 0.0) obs[offset + 6] = node.get("cpu_capacity", 0.0) obs[offset + 7] = node.get("gpu_temp_celsius", 0.0) / 100.0 # normalise obs[offset + 8] = node.get("ambient_temp_celsius", 25.0) / 50.0 # normalise obs[offset + 9] = float(node.get("cooling_level", 0)) / 3.0 obs[offset + 10] = node.get("fragmentation_score", 0.0) obs[offset + 11] = node.get("cost_per_step", 0.0) / 100.0 # normalise return obs def render(self): state = self._env._build_cluster_state() print(f"=== Step {self.current_step} | Task: {self.task} ===") for node in state.get("nodes", []): print( f" {node['node_id']} ({node['gpu_type']}) | " f"GPU={node['gpu_utilization_pct']:.1f}% " f"CPU={node['cpu_utilization_pct']:.1f}% " f"Temp={node['gpu_temp_celsius']:.1f}°C " f"Frag={node['fragmentation_score']:.2f} " f"Cost=${node['cost_per_step']:.1f}" ) if state.get("budget_per_step"): print(f" Budget: ${state['budget_remaining']:.1f} remaining")