#!/usr/bin/env python3 """Phase 3: Rule-based curriculum training - 1M steps with progressive opponents.""" import os, sys, subprocess, numpy as np, torch, gymnasium from gymnasium.spaces import Box, Discrete # ── 1. Download TIL env via snapshot_download ── print("[1/5] Downloading TIL repo...") from huggingface_hub import snapshot_download snapshot_download(repo_id="e-rong/til-26-ae", repo_type="space", local_dir="/app/til-26-ae-repo") PKG_ROOT = None for root, dirs, files in os.walk("/app/til-26-ae-repo"): if "pyproject.toml" in files: PKG_ROOT = root break if PKG_ROOT is None: raise RuntimeError("pyproject.toml not found") subprocess.run(["pip", "install", "-e", "."], cwd=PKG_ROOT, check=True) sys.path.insert(0, PKG_ROOT) from til_environment.bomberman_env import Bomberman from til_environment.config import default_config from pettingzoo.utils.conversions import aec_to_parallel from sb3_contrib import MaskablePPO from sb3_contrib.common.wrappers import ActionMasker from stable_baselines3.common.callbacks import BaseCallback from stable_baselines3.common.monitor import Monitor from huggingface_hub import HfApi, hf_hub_download HUB_REPO = "E-Rong/til-26-ae-agent" DATA_DIR = "/app/data" os.makedirs(DATA_DIR, exist_ok=True) def hub_push(local_path, repo_path): try: HfApi().upload_file(path_or_fileobj=local_path, path_in_repo=repo_path, repo_id=HUB_REPO, repo_type="model") print(f" -> pushed {repo_path}") except Exception as e: print(f" -> push failed: {e}") # ── Opponent Policies ── def static_opponent(obs): """Never moves, never places bombs.""" return 4 # STAY def random_valid_opponent(obs): """Random valid action (Phase 1 style).""" mask = np.array(obs.get("action_mask", [1]*6), dtype=bool) valid = np.where(mask)[0] return int(np.random.choice(valid)) if len(valid) > 0 else 4 def simple_bomb_opponent(obs): """Moves randomly but places bombs when enemies are visible.""" mask = np.array(obs.get("action_mask", [1]*6), dtype=bool) # Check if enemies visible in viewcone view = np.array(obs.get("agent_viewcone", np.zeros((7,5,25)))) if view.shape[-1] >= 11: # ENEMY_AGENT channel exists enemy_present = np.any(view[..., 10] > 0) # ENEMY_AGENT channel if enemy_present and mask[5]: # PLACE_BOMB is valid return 5 valid = np.where(mask)[0] # Prefer movement over stay move_actions = [v for v in valid if v < 4] if move_actions: return int(np.random.choice(move_actions)) return int(np.random.choice(valid)) if len(valid) > 0 else 4 def evasive_opponent(obs): """Tries to move away from bombs, random otherwise.""" mask = np.array(obs.get("action_mask", [1]*6), dtype=bool) view = np.array(obs.get("agent_viewcone", np.zeros((7,5,25)))) # If enemy bomb visible, try to move away if view.shape[-1] >= 20: enemy_bombs = view[..., 18] # ENEMY_BOMB channel if np.any(enemy_bombs > 0): # Find safest direction - away from bomb bomb_y, bomb_x = np.where(enemy_bombs > 0) if len(bomb_y) > 0: # Just pick any valid movement action move_actions = [v for v in np.where(mask)[0] if v < 4] if move_actions: return int(np.random.choice(move_actions)) valid = np.where(mask)[0] return int(np.random.choice(valid)) if len(valid) > 0 else 4 CURRICULUM_STAGES = [ ("static", static_opponent, 150000), ("random", random_valid_opponent, 200000), ("simple_bomb", simple_bomb_opponent, 250000), ("evasive", evasive_opponent, 200000), ("mixed", None, 200000), # cycles through all ] class CurriculumEnv(gymnasium.Env): """Single-agent env with curriculum opponents.""" def __init__(self, opponent_fn=None, cfg=None): super().__init__() self.cfg = cfg or default_config() self.cfg.env.render_mode = None raw = Bomberman(self.cfg) self._parallel_env = aec_to_parallel(raw) self.agent_id = "agent_0" self._episode_count = 0 self.action_space = Discrete(6) self._last_action_mask = None self._obs_size = None self._last_obs_dict = None self.opponent_fn = opponent_fn or random_valid_opponent self._compute_obs_space() def _compute_obs_space(self): cfg = self.cfg vl = int(cfg.dynamics.vision.behind) + int(cfg.dynamics.vision.ahead) + 1 vw = int(cfg.dynamics.vision.left) + int(cfg.dynamics.vision.right) + 1 av = vl * vw * 25 br = int(cfg.entities.base.vision_radius) bs = 2 * br + 1 bv = bs * bs * 25 self._obs_size = av + bv + 11 self.observation_space = Box(low=-np.inf, high=np.inf, shape=(self._obs_size,), dtype=np.float32) def reset(self, seed=None, options=None): self._episode_count += 1 obs_dict, info_dict = self._parallel_env.reset(seed=self._episode_count, options=options) self._last_obs_dict = obs_dict self._last_action_mask = obs_dict[self.agent_id]["action_mask"].astype(bool) return self._flatten(obs_dict[self.agent_id]), {} def step(self, action): actions = {self.agent_id: action} for aid, obs in self._last_obs_dict.items(): if aid != self.agent_id: actions[aid] = self.opponent_fn(obs) obs_dict, rewards, terminations, truncations, infos = self._parallel_env.step(actions) self._last_obs_dict = obs_dict if self.agent_id not in obs_dict: return np.zeros(self._obs_size, dtype=np.float32), 0.0, True, False, {} self._last_action_mask = obs_dict[self.agent_id]["action_mask"].astype(bool) obs = self._flatten(obs_dict[self.agent_id]) r = float(rewards.get(self.agent_id, 0.0)) done = terminations.get(self.agent_id, False) or truncations.get(self.agent_id, False) return obs, r, done, False, infos.get(self.agent_id, {}) def action_masks(self): return self._last_action_mask def _flatten(self, od): return np.concatenate([ od["agent_viewcone"].flatten(), od["base_viewcone"].flatten(), np.array([od["direction"]], dtype=np.float32), od["location"].flatten().astype(np.float32), od["base_location"].flatten().astype(np.float32), od["health"].flatten().astype(np.float32), np.array([od["frozen_ticks"]], dtype=np.float32), od["base_health"].flatten().astype(np.float32), od["team_resources"].flatten().astype(np.float32), np.array([od["team_bombs"]], dtype=np.float32), np.array([od["step"]], dtype=np.float32), ], dtype=np.float32) def close(self): self._parallel_env.close() class CurriculumCallback(BaseCallback): """Advances curriculum stage based on win rate + pushes checkpoints.""" def __init__(self, eval_freq=50000, save_freq=50000): super().__init__() self.eval_freq = eval_freq self.save_freq = save_freq self.stage_idx = 0 self.stage_steps = 0 self.wins_history = [] self.eval_episodes = 100 def _on_step(self) -> bool: if self.num_timesteps % self.save_freq == 0: path = os.path.join(DATA_DIR, f"phase3_ckpt_{self.num_timesteps}.zip") self.model.save(path) hub_push(path, f"phase3_ckpt_{self.num_timesteps}.zip") if self.num_timesteps % self.eval_freq == 0: self._evaluate_and_maybe_advance() return True def _evaluate_and_maybe_advance(self): stage_name, opp_fn, stage_limit = CURRICULUM_STAGES[self.stage_idx] print(f"\n--- Evaluating at stage {stage_name} (step {self.num_timesteps}) ---") # Run eval episodes env = CurriculumEnv(opponent_fn=opp_fn, cfg=default_config()) env = ActionMasker(env, lambda e: e.action_masks()) wins = 0; total_r = 0 for ep in range(self.eval_episodes): obs, _ = env.reset(seed=ep + 100000 + self.num_timesteps) ep_r = 0; done = False while not done: action, _ = self.model.predict(obs, action_masks=env.action_masks(), deterministic=True) obs, r, done, _, _ = env.step(int(action)) ep_r += r total_r += ep_r if ep_r > 10: wins += 1 env.close() win_rate = wins / self.eval_episodes avg_r = total_r / self.eval_episodes print(f" Win rate: {win_rate:.1%}, Avg reward: {avg_r:.1f}") self.wins_history.append((self.num_timesteps, stage_name, win_rate, avg_r)) # Save eval results eval_file = f"/app/phase3_eval_{self.num_timesteps}.txt" with open(eval_file, "w") as f: f.write(f"Stage: {stage_name}\nStep: {self.num_timesteps}\nWinRate: {win_rate:.1%}\nAvgReward: {avg_r:.1f}\n") hub_push(eval_file, f"phase3_eval_{self.num_timesteps}.txt") # Advance curriculum if win rate > 55% and we've spent enough steps if win_rate > 0.55 and self.stage_idx < len(CURRICULUM_STAGES) - 1: self.stage_idx += 1 new_stage = CURRICULUM_STAGES[self.stage_idx][0] print(f" >>> ADVANCING to stage: {new_stage} <<<") def main(): print("=" * 60) print("PHASE 3: Rule-Based Curriculum") print("=" * 60) # Download latest checkpoint (phase2_final or best available) latest = None for ckpt in ["phase2_final.zip", "phase2_ckpt_600352.zip", "phase1_final.zip"]: try: latest = hf_hub_download(repo_id=HUB_REPO, filename=ckpt, repo_type="model", local_dir=DATA_DIR) print(f"Downloaded checkpoint: {ckpt}") break except Exception: pass if latest is None: raise RuntimeError("No checkpoint found!") # Start with first curriculum stage stage_name, opp_fn, _ = CURRICULUM_STAGES[0] print(f"Starting curriculum stage: {stage_name}") cfg = default_config() cfg.env.render_mode = None base = CurriculumEnv(opponent_fn=opp_fn, cfg=cfg) env = ActionMasker(base, lambda e: e.action_masks()) env = Monitor(env) model = MaskablePPO.load(latest, env=env) start_ts = model.num_timesteps print(f"Loaded model at timestep {start_ts}") cb = CurriculumCallback(eval_freq=50000, save_freq=50000) model.learn(total_timesteps=1000000, callback=cb, progress_bar=False, reset_num_timesteps=False) # Save final final = os.path.join(DATA_DIR, "phase3_final.zip") model.save(final) hub_push(final, "phase3_final.zip") # Final eval print("\n=== FINAL EVALUATION ===") raw = Bomberman(default_config()) env = aec_to_parallel(raw) wins = 0; total_r = 0 for ep in range(200): obs, _ = env.reset(seed=ep + 200000) ep_r = 0; done = False while not done: if "agent_0" not in obs: break ao = obs["agent_0"] mask = np.array(ao.get("action_mask", [1]*6), dtype=bool) vec = np.concatenate([ np.array(ao["agent_viewcone"], np.float32).flatten(), np.array(ao["base_viewcone"], np.float32).flatten(), np.array([ao["direction"]], np.float32), np.array(ao["location"], np.float32).flatten(), np.array(ao["base_location"], np.float32).flatten(), np.array(ao["health"], np.float32).flatten(), np.array([ao["frozen_ticks"]], np.float32), np.array(ao["base_health"], np.float32).flatten(), np.array(ao["team_resources"], np.float32).flatten(), np.array([ao["team_bombs"]], np.float32), np.array([ao["step"]], np.float32), ], dtype=np.float32) action, _ = model.predict(vec, action_masks=mask, deterministic=True) acts = {"agent_0": int(action)} for aid, o in obs.items(): if aid != "agent_0": acts[aid] = random_valid_opponent(o) obs, rewards, terminations, truncations, _ = env.step(acts) ep_r += rewards.get("agent_0", 0) done = terminations.get("agent_0", False) or truncations.get("agent_0", False) or "agent_0" not in obs total_r += ep_r if ep_r > 10: wins += 1 env.close() results = ( f"=== Phase 3 Final Evaluation ===\n" f"Episodes: 200\n" f"Win Rate: {wins/200:.1%}\n" f"Avg Reward: {total_r/200:.1f}\n" ) print(results) with open("/app/phase3_final_eval.txt", "w") as f: f.write(results) hub_push("/app/phase3_final_eval.txt", "phase3_final_eval.txt") print("\n✅ PHASE 3 COMPLETE!") if __name__ == "__main__": main()