Openenv / cloud_arena /training.py
kavin57447's picture
Add Cloud Arena Mathematical Model RL environment
12263fa
# Cloud Arena Training — Mathematical Model (MaskablePPO)
# Extracted from cloud_arena_final.py (Cell 3)
import os, sys, math
import numpy as np
import torch
from stable_baselines3.common.vec_env import DummyVecEnv, VecNormalize, sync_envs_normalization
from stable_baselines3.common.callbacks import BaseCallback
from stable_baselines3.common.monitor import Monitor
from sb3_contrib import MaskablePPO
from sb3_contrib.common.wrappers import ActionMasker
from sb3_contrib.common.maskable.callbacks import MaskableEvalCallback
from cloud_arena.environment import (
CloudArenaEnv, get_action_masks, GLOBAL_SEED,
N_ACTION_TYPES, MAX_RESOURCES, N_ACTIONS, MAX_STEPS,
)
TOTAL_TIMESTEPS = 500_000
def cosine_lr(progress_remaining: float, init_lr: float = 3e-4, min_lr: float = 5e-5):
return min_lr + (init_lr - min_lr) * 0.5 * (1.0 + math.cos(math.pi * (1.0 - progress_remaining)))
class SafeMaskableEvalCallback(MaskableEvalCallback):
def _on_step(self) -> bool:
if self.model.get_vec_normalize_env() is not None:
sync_envs_normalization(self.training_env, self.eval_env)
return super()._on_step()
class CloudArenaCallback(BaseCallback):
EMA_ALPHA = 0.02
MIN_EPS_PER_PHASE = 800
PHASE_THRESHOLDS = {0: 0.65, 1: 0.62, 2: 0.58, 3: 0.55, 4: 0.52}
PROGRESS_EVERY = 500
def __init__(self, curriculum_ref, verbose=0):
super().__init__(verbose)
self._curriculum_ref = curriculum_ref
self.ema_win_rate = 0.0
self.current_level = 0
self._phase_eps = 0
self.episode_rewards = []
self.episode_wins = []
self.episode_savings = []
self.episode_security = []
self.episode_veto_rates = []
self.curriculum_log = [(0, 0)]
self.action_freq = np.zeros(N_ACTION_TYPES)
def _on_step(self) -> bool:
if self.num_timesteps % self.PROGRESS_EVERY == 0:
self._print_progress()
actions = self.locals.get("actions")
if actions is not None:
for a in actions:
atype = int(a) // MAX_RESOURCES
if atype < N_ACTION_TYPES:
self.action_freq[atype] += 1
dones = self.locals.get("dones", [False])
if dones[0]:
info = self.locals.get("infos", [{}])[0]
self._on_episode_end(info)
return True
def _on_episode_end(self, info):
if "final_info" in info:
info = info["final_info"]
win = int(info.get("win", 0))
self.ema_win_rate = (1 - self.EMA_ALPHA) * self.ema_win_rate + self.EMA_ALPHA * win
self.episode_rewards.append(float(self.locals.get("rewards", [0])[0]))
self.episode_wins.append(win)
self.episode_savings.append(info.get("savings_pct", 0))
self.episode_security.append(info.get("security_score", 0))
self.episode_veto_rates.append(info.get("veto_rate", 0))
self._phase_eps += 1
thr = self.PHASE_THRESHOLDS.get(self.current_level, 0.50)
if self.current_level < 5 and self._phase_eps >= self.MIN_EPS_PER_PHASE and self.ema_win_rate >= thr:
self._try_promote()
def _try_promote(self):
self.current_level += 1
self._curriculum_ref[0] = self.current_level
self._phase_eps = 0
self.ema_win_rate = 0.0
self.curriculum_log.append((self.num_timesteps, self.current_level))
print(f"\n✄ PROMOTED -> Phase {self.current_level}")
def _print_progress(self):
pct = min(100.0, self.num_timesteps / TOTAL_TIMESTEPS * 100)
sys.stdout.write(f"\rProgress: {pct:.1f}% | Phase: {self.current_level} | EMA Win: {self.ema_win_rate*100:.1f}%")
sys.stdout.flush()
def train_model(total_timesteps=TOTAL_TIMESTEPS, save_dir="./models"):
os.makedirs(save_dir, exist_ok=True)
os.makedirs("./logs/", exist_ok=True)
os.makedirs("./eval_logs/", exist_ok=True)
torch.manual_seed(GLOBAL_SEED)
curriculum_ref = [0]
global_step_ref = [0]
def make_env():
env = CloudArenaEnv(curriculum_ref, global_step_ref)
env = Monitor(env)
return ActionMasker(env, get_action_masks)
train_env = DummyVecEnv([make_env])
train_env = VecNormalize(train_env, norm_obs=True, norm_reward=True, clip_obs=10.0)
eval_env = DummyVecEnv([make_env])
eval_env = VecNormalize(eval_env, norm_obs=True, norm_reward=False, training=False)
eval_env.obs_rms = train_env.obs_rms
model = MaskablePPO("MlpPolicy", train_env, learning_rate=cosine_lr, ent_coef=0.01, verbose=0)
arena_cb = CloudArenaCallback(curriculum_ref)
eval_cb = SafeMaskableEvalCallback(eval_env, best_model_save_path=save_dir, eval_freq=10000)
print("Starting Pipeline...")
model.learn(total_timesteps=total_timesteps, callback=[arena_cb, eval_cb])
model.save(os.path.join(save_dir, "cloud_arena_final"))
train_env.save(os.path.join(save_dir, "cloud_arena_vecnorm.pkl"))
print("\n✅ Model and VecNormalize stats saved.")
return model, arena_cb, train_env