til-26-ae-agent / train_all_phases.py
E-Rong's picture
Fix agent tracking to use possible_agents instead of agents attribute
0e66bf6 verified
#!/usr/bin/env python3
"""
Full training pipeline: Phase 1 -> Phase 2 -> Phase 3
TIL-26-AE Bomberman Agent Training
References:
- Pommerman multi-agent RL: arxiv:2407.00662
- MAPPO best practices: arxiv:2103.01955
- Invalid Action Masking: arxiv:2006.14171
"""
import os
import sys
import subprocess
# Bootstrap: download and set up the TIL environment if not present
repo_path = "/app/til-26-ae-repo/til-26-ae"
if not os.path.exists(repo_path):
try:
from huggingface_hub import snapshot_download
snapshot_download(
repo_id='e-rong/til-26-ae',
repo_type='space',
local_dir='/app/til-26-ae-repo',
local_dir_use_symlinks=False
)
except Exception:
subprocess.run(
["git", "clone", "https://huggingface.co/spaces/e-rong/til-26-ae", "/app/til-26-ae-repo"],
capture_output=True, check=False
)
if os.path.exists(repo_path):
sys.path.insert(0, repo_path)
elif os.path.exists("/app/til-26-ae-repo"):
sys.path.insert(0, "/app/til-26-ae-repo")
import numpy as np
import gymnasium as gym
from gymnasium.spaces import Box, Discrete
import torch
from til_environment.bomberman_env import Bomberman
from til_environment.config import default_config
from pettingzoo.utils.conversions import aec_to_parallel
from sb3_contrib import MaskablePPO
from sb3_contrib.common.wrappers import ActionMasker
from stable_baselines3.common.callbacks import BaseCallback, CheckpointCallback
from stable_baselines3.common.monitor import Monitor
import trackio
# ============================================================================
# PHASE 1: Base environment wrapper
# ============================================================================
class BombermanSingleAgentEnv(gym.Env):
"""
Wraps parallel PettingZoo Bomberman into a single-agent gymnasium env.
Agent 0 is the learning agent; opponents use random valid actions.
"""
def __init__(self, cfg=None, seed=None, opponent_policy="random"):
super().__init__()
self.cfg = cfg or default_config()
self.cfg.env.render_mode = None
raw = Bomberman(self.cfg)
self._parallel_env = aec_to_parallel(raw)
self.agent_id = "agent_0"
self.opponent_policy = opponent_policy
self._episode_seed = seed
self._episode_count = 0
self.action_space = Discrete(6)
self._last_action_mask = None
self._obs_size = None
self._last_obs_dict = None
self._compute_obs_space()
def _compute_obs_space(self):
cfg = self.cfg
viewcone_l = int(cfg.dynamics.vision.behind) + int(cfg.dynamics.vision.ahead) + 1
viewcone_w = int(cfg.dynamics.vision.left) + int(cfg.dynamics.vision.right) + 1
agent_viewcone_size = viewcone_l * viewcone_w * 25
base_r = int(cfg.entities.base.vision_radius)
base_side = 2 * base_r + 1
base_viewcone_size = base_side * base_side * 25
scalar_size = 11
self._obs_size = agent_viewcone_size + base_viewcone_size + scalar_size
self.observation_space = Box(
low=-np.inf, high=np.inf,
shape=(self._obs_size,), dtype=np.float32,
)
def _get_agents(self):
"""Get list of currently active agents from obs_dict."""
if self._last_obs_dict is not None:
return list(self._last_obs_dict.keys())
return self._parallel_env.possible_agents
def reset(self, seed=None, options=None):
if seed is not None:
self._episode_seed = seed
else:
self._episode_seed = self._episode_count
self._episode_count += 1
obs_dict, info_dict = self._parallel_env.reset(seed=self._episode_seed, options=options)
self._last_obs_dict = obs_dict
self._store_action_mask(obs_dict[self.agent_id])
return self._flatten_obs(obs_dict[self.agent_id]), {}
def step(self, action):
actions = {}
for agent_id in self._get_agents():
if agent_id == self.agent_id:
actions[agent_id] = action
else:
mask = (
self._last_obs_dict[agent_id].get("action_mask")
if self._last_obs_dict and agent_id in self._last_obs_dict
else np.ones(6, dtype=np.int8)
)
valid = np.where(mask == 1)[0]
actions[agent_id] = int(np.random.choice(valid)) if len(valid) > 0 else 0
obs_dict, rewards, terminations, truncations, infos = self._parallel_env.step(actions)
self._last_obs_dict = obs_dict
if self.agent_id not in obs_dict:
return np.zeros(self._obs_size, dtype=np.float32), 0.0, True, False, {}
self._store_action_mask(obs_dict[self.agent_id])
obs = self._flatten_obs(obs_dict[self.agent_id])
reward = float(rewards.get(self.agent_id, 0.0))
done = terminations.get(self.agent_id, False) or truncations.get(self.agent_id, False)
return obs, reward, done, False, infos.get(self.agent_id, {})
def _store_action_mask(self, obs_dict):
if "action_mask" in obs_dict:
self._last_action_mask = obs_dict["action_mask"].copy().astype(bool)
else:
self._last_action_mask = np.ones(6, dtype=bool)
def action_masks(self):
return self._last_action_mask
def _flatten_obs(self, obs_dict):
return np.concatenate(
[
obs_dict["agent_viewcone"].flatten(),
obs_dict["base_viewcone"].flatten(),
np.array([obs_dict["direction"]], dtype=np.float32),
obs_dict["location"].flatten().astype(np.float32),
obs_dict["base_location"].flatten().astype(np.float32),
obs_dict["health"].flatten().astype(np.float32),
np.array([obs_dict["frozen_ticks"]], dtype=np.float32),
obs_dict["base_health"].flatten().astype(np.float32),
obs_dict["team_resources"].flatten().astype(np.float32),
np.array([obs_dict["team_bombs"]], dtype=np.float32),
np.array([obs_dict["step"]], dtype=np.float32),
],
dtype=np.float32,
)
def close(self):
self._parallel_env.close()
# ============================================================================
# PHASE 2: Exploration reward shaping
# ============================================================================
class RewardShapingWrapper(gym.Wrapper):
"""
Adds visit-count exploration bonus with adaptive annealing.
alpha = 1 - tanh(k * avg_enemy_deaths) gradually reduces exploration weight.
"""
def __init__(self, env, adaptive_k=1.2, base_explore_weight=0.5):
super().__init__(env)
self.adaptive_k = adaptive_k
self.base_explore_weight = base_explore_weight
self._visit_counts = None
self._grid_size = 16
self._avg_enemy_deaths = 0.0
self._episode_count = 0
self._episode_enemy_deaths = 0
self._explore_weight = base_explore_weight
def reset(self, **kwargs):
self._visit_counts = np.zeros((self._grid_size, self._grid_size), dtype=np.int32)
self._episode_enemy_deaths = 0
return self.env.reset(**kwargs)
def step(self, action):
obs, reward, done, truncated, info = self.env.step(action)
pos = info.get("location", None)
visit_bonus = 0.0
if pos is not None:
x, y = int(pos[0]), int(pos[1])
if 0 <= x < self._grid_size and 0 <= y < self._grid_size:
visits = self._visit_counts[x, y]
visit_bonus = 1.0 / (1.0 + visits)
self._visit_counts[x, y] += 1
if done:
self._episode_count += 1
alpha = 1.0 - np.tanh(self.adaptive_k * self._avg_enemy_deaths)
self._explore_weight = self.base_explore_weight * max(0.1, alpha)
self._avg_enemy_deaths = 0.95 * self._avg_enemy_deaths + 0.05 * self._episode_enemy_deaths
shaped_reward = reward + self._explore_weight * visit_bonus
info["raw_reward"] = reward
info["explore_bonus"] = visit_bonus
info["explore_weight"] = self._explore_weight
return obs, shaped_reward, done, truncated, info
def action_masks(self):
return self.env.action_masks()
# ============================================================================
# PHASE 3: Rule-based opponents + curriculum
# ============================================================================
class RuleBasedOpponent:
"""Rule-based Bomberman opponent with three difficulty levels."""
def __init__(self, team_id=1, difficulty="simple"):
self.team_id = team_id
self.difficulty = difficulty
self.visited = None
self.grid_size = 16
def reset(self):
self.visited = np.zeros((self.grid_size, self.grid_size), dtype=np.int32)
def act(self, obs_dict):
action_mask = obs_dict["action_mask"]
valid_actions = np.where(action_mask == 1)[0]
if len(valid_actions) == 0:
return 4 # STAY
if self.difficulty == "static":
return 4
elif self.difficulty == "simple":
viewcone = obs_dict["agent_viewcone"]
has_enemy = np.any(viewcone[..., 10] > 0)
has_enemy_base = np.any(viewcone[..., 12] > 0)
if (has_enemy or has_enemy_base) and 5 in valid_actions:
return 5
movement_actions = [a for a in valid_actions if a < 4]
if len(movement_actions) > 0:
return int(np.random.choice(movement_actions))
return 4
elif self.difficulty == "smart":
return self._smart_policy(obs_dict, valid_actions)
return 4
def _smart_policy(self, obs, valid_actions):
viewcone = obs["agent_viewcone"]
h, w, _ = viewcone.shape
collectibles = np.stack([
viewcone[..., 7], viewcone[..., 8], viewcone[..., 6],
], axis=-1)
has_collectible = np.any(collectibles > 0, axis=-1)
cx, cy = 3, 2
best_action = 4
best_score = -1
for action in valid_actions:
if action == 4 or action == 5:
continue
if action == 0:
nx, ny = cx - 1, cy
elif action == 1:
nx, ny = cx + 1, cy
elif action == 2:
nx, ny = cx, cy - 1
elif action == 3:
nx, ny = cx, cy + 1
else:
continue
if 0 <= nx < h and 0 <= ny < w:
score = 0
if has_collectible[nx, ny]:
score += 10.0
if viewcone[nx, ny, 0] < 1:
score -= 5.0
wall_score = (
viewcone[nx, ny, 1] + viewcone[nx, ny, 2]
+ viewcone[nx, ny, 3] + viewcone[nx, ny, 4]
)
score -= wall_score * 2.0
if score > best_score:
best_score = score
best_action = action
for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1), (0, 0)]:
nx, ny = cx + dx, cy + dy
if 0 <= nx < h and 0 <= ny < w:
if viewcone[nx, ny, 10] > 0 or viewcone[nx, ny, 12] > 0:
if 5 in valid_actions and np.random.random() < 0.7:
return 5
break
return int(best_action) if best_score > -1 else 4
class CurriculumEnv(gym.Env):
"""Single-agent env with curriculum-based opponent difficulty."""
CURRICULUM_STAGES = ["static", "simple", "smart", "mixed"]
WIN_RATE_THRESHOLD = 0.55
EPISODES_PER_STAGE = 500
def __init__(self, cfg=None, seed=None):
super().__init__()
self.cfg = cfg or default_config()
self.cfg.env.render_mode = None
raw = Bomberman(self.cfg)
self._parallel_env = aec_to_parallel(raw)
self.agent_id = "agent_0"
self._episode_seed = seed
self._episode_count = 0
self.action_space = Discrete(6)
self._last_action_mask = None
self._obs_size = None
self._last_obs_dict = None
self._compute_obs_space()
self.stage_idx = 0
self.stage_episodes = 0
self.stage_wins = 0
self.stage_rewards = []
self.opponents = {}
self._init_opponents()
def _compute_obs_space(self):
cfg = self.cfg
viewcone_l = int(cfg.dynamics.vision.behind) + int(cfg.dynamics.vision.ahead) + 1
viewcone_w = int(cfg.dynamics.vision.left) + int(cfg.dynamics.vision.right) + 1
agent_viewcone_size = viewcone_l * viewcone_w * 25
base_r = int(cfg.entities.base.vision_radius)
base_side = 2 * base_r + 1
base_viewcone_size = base_side * base_side * 25
scalar_size = 11
self._obs_size = agent_viewcone_size + base_viewcone_size + scalar_size
self.observation_space = Box(
low=-np.inf, high=np.inf,
shape=(self._obs_size,), dtype=np.float32,
)
def _get_agents(self):
if self._last_obs_dict is not None:
return list(self._last_obs_dict.keys())
return self._parallel_env.possible_agents
def _init_opponents(self):
for i in range(1, self.cfg.env.num_teams):
opp_id = f"agent_{i}"
self.opponents[opp_id] = RuleBasedOpponent(team_id=i, difficulty="static")
def _update_opponent_difficulty(self):
stage = self.CURRICULUM_STAGES[self.stage_idx]
for opp_id, opp in self.opponents.items():
if stage == "mixed":
opp.difficulty = "smart" if (int(opp_id.split("_")[1]) % 2 == 0) else "simple"
else:
opp.difficulty = stage
def _check_stage_advance(self):
if self.stage_idx >= len(self.CURRICULUM_STAGES) - 1:
return False
if len(self.stage_rewards) >= self.EPISODES_PER_STAGE:
win_rate = self.stage_wins / max(1, len(self.stage_rewards))
avg_reward = np.mean(self.stage_rewards)
if win_rate >= self.WIN_RATE_THRESHOLD or len(self.stage_rewards) >= self.EPISODES_PER_STAGE:
trackio.alert(
"Curriculum Advance",
f"Stage {self.CURRICULUM_STAGES[self.stage_idx]} complete: "
f"win_rate={win_rate:.2%}, avg_reward={avg_reward:.1f}. "
f"Advancing to {self.CURRICULUM_STAGES[self.stage_idx + 1]}",
trackio.AlertLevel.INFO,
)
self.stage_idx += 1
self.stage_episodes = 0
self.stage_wins = 0
self.stage_rewards = []
self._update_opponent_difficulty()
return True
return False
def reset(self, seed=None, options=None):
if seed is not None:
self._episode_seed = seed
else:
self._episode_seed = self._episode_count
self._episode_count += 1
for opp in self.opponents.values():
opp.reset()
obs_dict, info_dict = self._parallel_env.reset(
seed=self._episode_seed, options=options
)
self._last_obs_dict = obs_dict
self._store_action_mask(obs_dict[self.agent_id])
return self._flatten_obs(obs_dict[self.agent_id]), {}
def step(self, action):
actions = {}
for agent_id in self._get_agents():
if agent_id == self.agent_id:
actions[agent_id] = action
else:
opp = self.opponents.get(agent_id)
if opp is not None and agent_id in self._last_obs_dict:
actions[agent_id] = opp.act(self._last_obs_dict[agent_id])
else:
actions[agent_id] = 4
obs_dict, rewards, terminations, truncations, infos = self._parallel_env.step(actions)
self._last_obs_dict = obs_dict
if self.agent_id not in obs_dict:
self.stage_episodes += 1
return np.zeros(self._obs_size, dtype=np.float32), 0.0, True, False, {}
self._store_action_mask(obs_dict[self.agent_id])
obs = self._flatten_obs(obs_dict[self.agent_id])
reward = float(rewards.get(self.agent_id, 0.0))
done = terminations.get(self.agent_id, False) or truncations.get(self.agent_id, False)
if done:
self.stage_episodes += 1
self.stage_rewards.append(reward)
if reward > 10.0:
self.stage_wins += 1
self._check_stage_advance()
info = dict(infos.get(self.agent_id, {}))
info["curriculum_stage"] = self.stage_idx
info["curriculum_stage_name"] = self.CURRICULUM_STAGES[self.stage_idx]
return obs, reward, done, False, info
def _store_action_mask(self, obs_dict):
if "action_mask" in obs_dict:
self._last_action_mask = obs_dict["action_mask"].copy().astype(bool)
else:
self._last_action_mask = np.ones(6, dtype=bool)
def action_masks(self):
return self._last_action_mask
def _flatten_obs(self, obs_dict):
return np.concatenate(
[
obs_dict["agent_viewcone"].flatten(),
obs_dict["base_viewcone"].flatten(),
np.array([obs_dict["direction"]], dtype=np.float32),
obs_dict["location"].flatten().astype(np.float32),
obs_dict["base_location"].flatten().astype(np.float32),
obs_dict["health"].flatten().astype(np.float32),
np.array([obs_dict["frozen_ticks"]], dtype=np.float32),
obs_dict["base_health"].flatten().astype(np.float32),
obs_dict["team_resources"].flatten().astype(np.float32),
np.array([obs_dict["team_bombs"]], dtype=np.float32),
np.array([obs_dict["step"]], dtype=np.float32),
],
dtype=np.float32,
)
def close(self):
self._parallel_env.close()
# ============================================================================
# Trackio logging callback
# ============================================================================
class TrackioLoggingCallback(BaseCallback):
def __init__(self, project, run_name, log_interval=2048, verbose=0):
super().__init__(verbose)
self.project = project
self.run_name = run_name
self.log_interval = log_interval
self._last_mean_reward = 0.0
def _on_training_start(self):
trackio.init(project=self.project, name=self.run_name)
trackio.alert("Training Started", f"{self.run_name} training began.", trackio.AlertLevel.INFO)
def _on_step(self):
if self.n_calls % self.log_interval == 0:
infos = self.locals.get("infos", [{}])
ep_rewards = [info.get("episode", {}).get("r", 0) for info in infos if "episode" in info]
ep_lengths = [info.get("episode", {}).get("l", 0) for info in infos if "episode" in info]
explore_bonuses = [info.get("explore_bonus", 0) for info in infos]
stages = [info.get("curriculum_stage", 0) for info in infos]
if ep_rewards:
mean_r = float(np.mean(ep_rewards))
self._last_mean_reward = mean_r
log_dict = {
"train/mean_episode_reward": mean_r,
"train/mean_episode_length": float(np.mean(ep_lengths)) if ep_lengths else 0.0,
"train/timesteps": self.num_timesteps,
}
if explore_bonuses:
log_dict["train/mean_explore_bonus"] = float(np.mean(explore_bonuses))
if stages:
log_dict["train/curriculum_stage"] = float(np.mean(stages))
trackio.log(log_dict)
if mean_r < -5.0 and self.num_timesteps > 50_000:
trackio.alert("Low Reward Warning",
f"mean_reward={mean_r:.2f} at step {self.num_timesteps} -- may be camping.", trackio.AlertLevel.WARN)
return True
def _on_training_end(self):
trackio.alert("Training Complete",
f"Finished at {self.num_timesteps}. Final mean reward: {self._last_mean_reward:.2f}",
trackio.AlertLevel.INFO)
trackio.finish()
# ============================================================================
# Main training pipeline
# ============================================================================
def train_phase(cfg, phase, total_timesteps, model=None):
trackio_project = os.environ.get("TRACKIO_PROJECT", "til-26-ae")
if phase == 1:
print("=== PHASE 1: MaskablePPO vs Random Opponents ===")
base_env = BombermanSingleAgentEnv(cfg=cfg, opponent_policy="random")
env = ActionMasker(base_env, lambda env: env.action_masks())
env = Monitor(env)
run_name = "phase1-maskable-ppo-random"
elif phase == 2:
print("=== PHASE 2: Adaptive Exploration Annealing ===")
base_env = BombermanSingleAgentEnv(cfg=cfg, opponent_policy="random")
shaped_env = RewardShapingWrapper(base_env, adaptive_k=1.2, base_explore_weight=0.5)
env = ActionMasker(shaped_env, lambda env: env.action_masks())
env = Monitor(env)
run_name = "phase2-adaptive-explore"
elif phase == 3:
print("=== PHASE 3: Curriculum + Rule-Based Self-Play ===")
cfg.env.num_teams = 3
base_env = CurriculumEnv(cfg=cfg)
env = ActionMasker(base_env, lambda env: env.action_masks())
env = Monitor(env)
run_name = "phase3-curriculum-selfplay"
else:
raise ValueError(f"Unknown phase: {phase}")
if model is None:
model = MaskablePPO(
"MlpPolicy", env,
learning_rate=3e-4, n_steps=2048, batch_size=64, n_epochs=10,
gamma=0.99, gae_lambda=0.95, clip_range=0.2,
ent_coef=0.01, vf_coef=0.5, max_grad_norm=0.5,
verbose=1,
device="cuda" if torch.cuda.is_available() else "cpu",
)
else:
model.set_env(env)
checkpoint_callback = CheckpointCallback(
save_freq=50_000, save_path=f"./checkpoints/phase{phase}",
name_prefix=f"bomberman_phase{phase}",
)
trackio_callback = TrackioLoggingCallback(
trackio_project, run_name, log_interval=2048,
)
model.learn(
total_timesteps=total_timesteps,
callback=[checkpoint_callback, trackio_callback],
progress_bar=False,
)
model.save(f"bomberman_phase{phase}_final")
env.close()
print(f"Phase {phase} complete. Model saved to bomberman_phase{phase}_final.zip")
return model
def main():
cfg = default_config()
cfg.env.render_mode = None
total_ts_env = os.environ.get("TOTAL_TIMESTEPS", "500_000:500_000:1_000_000")
phase_ts = [int(x.replace("_", "")) for x in total_ts_env.split(":")]
model = None
model = train_phase(cfg, phase=1, total_timesteps=phase_ts[0], model=model)
if len(phase_ts) > 1:
model = train_phase(cfg, phase=2, total_timesteps=phase_ts[1], model=model)
if len(phase_ts) > 2:
model = train_phase(cfg, phase=3, total_timesteps=phase_ts[2], model=model)
hub_model_id = os.environ.get("HUB_MODEL_ID", "")
if hub_model_id:
from huggingface_hub import HfApi
api = HfApi()
for phase in range(1, len(phase_ts) + 1):
try:
api.upload_file(
path_or_fileobj=f"bomberman_phase{phase}_final.zip",
path_in_repo=f"bomberman_phase{phase}_final.zip",
repo_id=hub_model_id, repo_type="model",
)
print(f"Phase {phase} model pushed to {hub_model_id}")
except Exception as e:
print(f"Failed to push phase {phase}: {e}")
print("\n=== All phases complete! ===")
if hub_model_id:
print(f"Model repository: https://huggingface.co/{hub_model_id}")
if __name__ == "__main__":
main()