File size: 6,303 Bytes
fa65b6c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5f69b60
 
 
 
 
 
fa65b6c
 
 
5f69b60
 
 
 
 
 
 
 
 
 
 
 
fa65b6c
 
 
 
 
5f69b60
fa65b6c
 
5f69b60
 
fa65b6c
 
 
 
 
 
 
 
 
 
 
 
 
 
5f69b60
 
fa65b6c
 
 
 
 
 
 
 
 
 
 
 
 
5f69b60
 
fa65b6c
 
 
5f69b60
 
 
fa65b6c
5f69b60
 
 
 
 
 
 
fa65b6c
 
 
5f69b60
fa65b6c
5f69b60
 
fa65b6c
 
5f69b60
fa65b6c
5f69b60
fa65b6c
 
 
5f69b60
fa65b6c
 
5f69b60
fa65b6c
 
5f69b60
 
fa65b6c
 
 
 
 
5f69b60
 
fa65b6c
 
 
 
5f69b60
 
fa65b6c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5f69b60
 
fa65b6c
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
"""Cloud resource reinforcement-learning environment (Gymnasium wrapper).

Wraps the GPU+CPU cloud management environment for local RL training with
stable-baselines3 and similar libraries.

Observation space (12 features per node, flattened):
    [gpu_util, cpu_util, mem_util, gpu_vram_used, gpu_vram_cap,
     cpu_usage, cpu_cap, gpu_temp, ambient_temp, cooling_level,
     fragmentation_score, cost_per_step]

Action space: Discrete(4) — task-specific mapping
    Task 1 (gpu_cpu_allocation):   0=maintain, 1=allocate_high, 2=allocate_low, 3=migrate
    Task 2 (thermal_management):   0=maintain, 1=increase_cooling, 2=decrease_cooling, 3=migrate_load
    Task 3 (heuristic_fragmentation): 0=best_fit, 1=first_fit, 2=compact, 3=split_workload
"""

# pyright: reportMissingImports=false

from __future__ import annotations

import csv
import json
import math
import random
from pathlib import Path

import numpy as np

try:
    import gymnasium as gym
    from gymnasium import spaces
except ImportError:  # pragma: no cover - optional dependency for local inspection
    gym = object
    spaces = None


# Number of features per node in observation
_FEATURES_PER_NODE = 12
_MAX_NODES = 5  # maximum nodes across all tasks


class CloudResourceEnv(gym.Env if spaces is not None else object):
    """Gymnasium wrapper for the Cloud GPU+CPU management environment."""

    metadata = {"render_modes": ["human"]}

    # Action mappings per task
    ACTION_MAP = {
        "gpu_cpu_allocation": {0: "maintain", 1: "allocate_high", 2: "allocate_low", 3: "migrate"},
        "thermal_management": {0: "maintain", 1: "increase_cooling", 2: "decrease_cooling", 3: "migrate_load"},
        "heuristic_fragmentation": {0: "best_fit", 1: "first_fit", 2: "compact", 3: "split_workload"},
    }

    TASK_CONFIGS = {
        "gpu_cpu_allocation": {"num_nodes": 3, "max_steps": 8},
        "thermal_management": {"num_nodes": 4, "max_steps": 10},
        "heuristic_fragmentation": {"num_nodes": 5, "max_steps": 12},
    }

    def __init__(self, task: str = "gpu_cpu_allocation", seed: int = 42):
        super().__init__()

        if task not in self.TASK_CONFIGS:
            raise ValueError(f"Unknown task: {task}. Valid: {list(self.TASK_CONFIGS.keys())}")

        self.task = task
        self._seed = seed

        # We import and use the server environment directly for local training
        from server.cloud_environment import CloudResourceEnvironment
        self._env = CloudResourceEnvironment()

        cfg = self.TASK_CONFIGS[task]
        self.num_nodes = cfg["num_nodes"]
        self.max_steps = cfg["max_steps"]

        if spaces is not None:
            self.action_space = spaces.Discrete(4)
            # Observation: flattened node metrics (12 features × max_nodes, padded)
            obs_size = _FEATURES_PER_NODE * _MAX_NODES
            self.observation_space = spaces.Box(
                low=0.0,
                high=np.finfo(np.float32).max,
                shape=(obs_size,),
                dtype=np.float32,
            )

        self.current_step = 0

    def reset(self, *, seed=None, options=None):
        if seed is not None:
            self._seed = seed

        obs_result = self._env.reset(seed=self._seed, task=self.task)
        self.current_step = 0
        return self._obs_from_env(), {}

    def step(self, action):
        if action not in self.ACTION_MAP[self.task]:
            raise ValueError(f"Invalid action {action}. Allowed: 0-3")

        action_name = self.ACTION_MAP[self.task][action]

        # Apply same action to all nodes
        decisions = {f"node_{i}": action_name for i in range(self.num_nodes)}
        decisions_str = json.dumps(decisions)

        # Use the internal process
        result = self._env._process_action(decisions_str)

        # Advance internal timestep
        self._env._timestep += 1
        self.current_step += 1

        reward = float(result.get("reward", 0.0))
        terminated = bool(result.get("done", False))
        truncated = False

        obs = self._obs_from_env()

        info = {
            "task": self.task,
            "timestep": self.current_step,
            "feedback": result.get("feedback", ""),
            "score": result.get("score", 0.0),
        }

        return obs, reward, terminated, truncated, info

    def _obs_from_env(self) -> np.ndarray:
        """Extract flattened observation from environment state."""
        state = self._env._build_cluster_state()
        obs = np.zeros(_FEATURES_PER_NODE * _MAX_NODES, dtype=np.float32)

        for i, node in enumerate(state.get("nodes", [])):
            if i >= _MAX_NODES:
                break
            offset = i * _FEATURES_PER_NODE
            obs[offset + 0] = node.get("gpu_utilization_pct", 0.0) / 100.0
            obs[offset + 1] = node.get("cpu_utilization_pct", 0.0) / 100.0
            obs[offset + 2] = node.get("memory_utilization_pct", 0.0) / 100.0
            obs[offset + 3] = node.get("gpu_vram_used_gb", 0.0)
            obs[offset + 4] = node.get("gpu_vram_capacity_gb", 0.0)
            obs[offset + 5] = node.get("cpu_usage", 0.0)
            obs[offset + 6] = node.get("cpu_capacity", 0.0)
            obs[offset + 7] = node.get("gpu_temp_celsius", 0.0) / 100.0  # normalise
            obs[offset + 8] = node.get("ambient_temp_celsius", 25.0) / 50.0  # normalise
            obs[offset + 9] = float(node.get("cooling_level", 0)) / 3.0
            obs[offset + 10] = node.get("fragmentation_score", 0.0)
            obs[offset + 11] = node.get("cost_per_step", 0.0) / 100.0  # normalise

        return obs

    def render(self):
        state = self._env._build_cluster_state()
        print(f"=== Step {self.current_step} | Task: {self.task} ===")
        for node in state.get("nodes", []):
            print(
                f"  {node['node_id']} ({node['gpu_type']}) | "
                f"GPU={node['gpu_utilization_pct']:.1f}% "
                f"CPU={node['cpu_utilization_pct']:.1f}% "
                f"Temp={node['gpu_temp_celsius']:.1f}°C "
                f"Frag={node['fragmentation_score']:.2f} "
                f"Cost=${node['cost_per_step']:.1f}"
            )
        if state.get("budget_per_step"):
            print(f"  Budget: ${state['budget_remaining']:.1f} remaining")