| |
| """ |
| Full training pipeline: Phase 1 -> Phase 2 -> Phase 3 |
| TIL-26-AE Bomberman Agent Training |
| |
| References: |
| - Pommerman multi-agent RL: arxiv:2407.00662 |
| - MAPPO best practices: arxiv:2103.01955 |
| - Invalid Action Masking: arxiv:2006.14171 |
| """ |
|
|
| import os |
| import sys |
| import subprocess |
|
|
| |
| repo_path = "/app/til-26-ae-repo/til-26-ae" |
| if not os.path.exists(repo_path): |
| try: |
| from huggingface_hub import snapshot_download |
| snapshot_download( |
| repo_id='e-rong/til-26-ae', |
| repo_type='space', |
| local_dir='/app/til-26-ae-repo', |
| local_dir_use_symlinks=False |
| ) |
| except Exception: |
| subprocess.run( |
| ["git", "clone", "https://huggingface.co/spaces/e-rong/til-26-ae", "/app/til-26-ae-repo"], |
| capture_output=True, check=False |
| ) |
|
|
| if os.path.exists(repo_path): |
| sys.path.insert(0, repo_path) |
| elif os.path.exists("/app/til-26-ae-repo"): |
| sys.path.insert(0, "/app/til-26-ae-repo") |
|
|
| import numpy as np |
| import gymnasium as gym |
| from gymnasium.spaces import Box, Discrete |
| import torch |
|
|
| from til_environment.bomberman_env import Bomberman |
| from til_environment.config import default_config |
| from pettingzoo.utils.conversions import aec_to_parallel |
| from sb3_contrib import MaskablePPO |
| from sb3_contrib.common.wrappers import ActionMasker |
| from stable_baselines3.common.callbacks import BaseCallback, CheckpointCallback |
| from stable_baselines3.common.monitor import Monitor |
| import trackio |
|
|
|
|
| |
| |
| |
|
|
| class BombermanSingleAgentEnv(gym.Env): |
| """ |
| Wraps parallel PettingZoo Bomberman into a single-agent gymnasium env. |
| Agent 0 is the learning agent; opponents use random valid actions. |
| """ |
|
|
| def __init__(self, cfg=None, seed=None, opponent_policy="random"): |
| super().__init__() |
| self.cfg = cfg or default_config() |
| self.cfg.env.render_mode = None |
|
|
| raw = Bomberman(self.cfg) |
| self._parallel_env = aec_to_parallel(raw) |
| self.agent_id = "agent_0" |
| self.opponent_policy = opponent_policy |
| self._episode_seed = seed |
| self._episode_count = 0 |
|
|
| self.action_space = Discrete(6) |
|
|
| self._last_action_mask = None |
| self._obs_size = None |
| self._last_obs_dict = None |
|
|
| self._compute_obs_space() |
|
|
| def _compute_obs_space(self): |
| cfg = self.cfg |
| viewcone_l = int(cfg.dynamics.vision.behind) + int(cfg.dynamics.vision.ahead) + 1 |
| viewcone_w = int(cfg.dynamics.vision.left) + int(cfg.dynamics.vision.right) + 1 |
| agent_viewcone_size = viewcone_l * viewcone_w * 25 |
| base_r = int(cfg.entities.base.vision_radius) |
| base_side = 2 * base_r + 1 |
| base_viewcone_size = base_side * base_side * 25 |
| scalar_size = 11 |
| self._obs_size = agent_viewcone_size + base_viewcone_size + scalar_size |
| self.observation_space = Box( |
| low=-np.inf, high=np.inf, |
| shape=(self._obs_size,), dtype=np.float32, |
| ) |
|
|
| def _get_agents(self): |
| """Get list of currently active agents from obs_dict.""" |
| if self._last_obs_dict is not None: |
| return list(self._last_obs_dict.keys()) |
| return self._parallel_env.possible_agents |
|
|
| def reset(self, seed=None, options=None): |
| if seed is not None: |
| self._episode_seed = seed |
| else: |
| self._episode_seed = self._episode_count |
| self._episode_count += 1 |
|
|
| obs_dict, info_dict = self._parallel_env.reset(seed=self._episode_seed, options=options) |
| self._last_obs_dict = obs_dict |
| self._store_action_mask(obs_dict[self.agent_id]) |
| return self._flatten_obs(obs_dict[self.agent_id]), {} |
|
|
| def step(self, action): |
| actions = {} |
| for agent_id in self._get_agents(): |
| if agent_id == self.agent_id: |
| actions[agent_id] = action |
| else: |
| mask = ( |
| self._last_obs_dict[agent_id].get("action_mask") |
| if self._last_obs_dict and agent_id in self._last_obs_dict |
| else np.ones(6, dtype=np.int8) |
| ) |
| valid = np.where(mask == 1)[0] |
| actions[agent_id] = int(np.random.choice(valid)) if len(valid) > 0 else 0 |
|
|
| obs_dict, rewards, terminations, truncations, infos = self._parallel_env.step(actions) |
| self._last_obs_dict = obs_dict |
|
|
| if self.agent_id not in obs_dict: |
| return np.zeros(self._obs_size, dtype=np.float32), 0.0, True, False, {} |
|
|
| self._store_action_mask(obs_dict[self.agent_id]) |
| obs = self._flatten_obs(obs_dict[self.agent_id]) |
| reward = float(rewards.get(self.agent_id, 0.0)) |
| done = terminations.get(self.agent_id, False) or truncations.get(self.agent_id, False) |
|
|
| return obs, reward, done, False, infos.get(self.agent_id, {}) |
|
|
| def _store_action_mask(self, obs_dict): |
| if "action_mask" in obs_dict: |
| self._last_action_mask = obs_dict["action_mask"].copy().astype(bool) |
| else: |
| self._last_action_mask = np.ones(6, dtype=bool) |
|
|
| def action_masks(self): |
| return self._last_action_mask |
|
|
| def _flatten_obs(self, obs_dict): |
| return np.concatenate( |
| [ |
| obs_dict["agent_viewcone"].flatten(), |
| obs_dict["base_viewcone"].flatten(), |
| np.array([obs_dict["direction"]], dtype=np.float32), |
| obs_dict["location"].flatten().astype(np.float32), |
| obs_dict["base_location"].flatten().astype(np.float32), |
| obs_dict["health"].flatten().astype(np.float32), |
| np.array([obs_dict["frozen_ticks"]], dtype=np.float32), |
| obs_dict["base_health"].flatten().astype(np.float32), |
| obs_dict["team_resources"].flatten().astype(np.float32), |
| np.array([obs_dict["team_bombs"]], dtype=np.float32), |
| np.array([obs_dict["step"]], dtype=np.float32), |
| ], |
| dtype=np.float32, |
| ) |
|
|
| def close(self): |
| self._parallel_env.close() |
|
|
|
|
| |
| |
| |
|
|
| class RewardShapingWrapper(gym.Wrapper): |
| """ |
| Adds visit-count exploration bonus with adaptive annealing. |
| alpha = 1 - tanh(k * avg_enemy_deaths) gradually reduces exploration weight. |
| """ |
|
|
| def __init__(self, env, adaptive_k=1.2, base_explore_weight=0.5): |
| super().__init__(env) |
| self.adaptive_k = adaptive_k |
| self.base_explore_weight = base_explore_weight |
| self._visit_counts = None |
| self._grid_size = 16 |
| self._avg_enemy_deaths = 0.0 |
| self._episode_count = 0 |
| self._episode_enemy_deaths = 0 |
| self._explore_weight = base_explore_weight |
|
|
| def reset(self, **kwargs): |
| self._visit_counts = np.zeros((self._grid_size, self._grid_size), dtype=np.int32) |
| self._episode_enemy_deaths = 0 |
| return self.env.reset(**kwargs) |
|
|
| def step(self, action): |
| obs, reward, done, truncated, info = self.env.step(action) |
|
|
| pos = info.get("location", None) |
| visit_bonus = 0.0 |
| if pos is not None: |
| x, y = int(pos[0]), int(pos[1]) |
| if 0 <= x < self._grid_size and 0 <= y < self._grid_size: |
| visits = self._visit_counts[x, y] |
| visit_bonus = 1.0 / (1.0 + visits) |
| self._visit_counts[x, y] += 1 |
|
|
| if done: |
| self._episode_count += 1 |
| alpha = 1.0 - np.tanh(self.adaptive_k * self._avg_enemy_deaths) |
| self._explore_weight = self.base_explore_weight * max(0.1, alpha) |
| self._avg_enemy_deaths = 0.95 * self._avg_enemy_deaths + 0.05 * self._episode_enemy_deaths |
|
|
| shaped_reward = reward + self._explore_weight * visit_bonus |
| info["raw_reward"] = reward |
| info["explore_bonus"] = visit_bonus |
| info["explore_weight"] = self._explore_weight |
|
|
| return obs, shaped_reward, done, truncated, info |
|
|
| def action_masks(self): |
| return self.env.action_masks() |
|
|
|
|
| |
| |
| |
|
|
| class RuleBasedOpponent: |
| """Rule-based Bomberman opponent with three difficulty levels.""" |
|
|
| def __init__(self, team_id=1, difficulty="simple"): |
| self.team_id = team_id |
| self.difficulty = difficulty |
| self.visited = None |
| self.grid_size = 16 |
|
|
| def reset(self): |
| self.visited = np.zeros((self.grid_size, self.grid_size), dtype=np.int32) |
|
|
| def act(self, obs_dict): |
| action_mask = obs_dict["action_mask"] |
| valid_actions = np.where(action_mask == 1)[0] |
| if len(valid_actions) == 0: |
| return 4 |
|
|
| if self.difficulty == "static": |
| return 4 |
|
|
| elif self.difficulty == "simple": |
| viewcone = obs_dict["agent_viewcone"] |
| has_enemy = np.any(viewcone[..., 10] > 0) |
| has_enemy_base = np.any(viewcone[..., 12] > 0) |
|
|
| if (has_enemy or has_enemy_base) and 5 in valid_actions: |
| return 5 |
|
|
| movement_actions = [a for a in valid_actions if a < 4] |
| if len(movement_actions) > 0: |
| return int(np.random.choice(movement_actions)) |
| return 4 |
|
|
| elif self.difficulty == "smart": |
| return self._smart_policy(obs_dict, valid_actions) |
|
|
| return 4 |
|
|
| def _smart_policy(self, obs, valid_actions): |
| viewcone = obs["agent_viewcone"] |
| h, w, _ = viewcone.shape |
|
|
| collectibles = np.stack([ |
| viewcone[..., 7], viewcone[..., 8], viewcone[..., 6], |
| ], axis=-1) |
| has_collectible = np.any(collectibles > 0, axis=-1) |
|
|
| cx, cy = 3, 2 |
|
|
| best_action = 4 |
| best_score = -1 |
|
|
| for action in valid_actions: |
| if action == 4 or action == 5: |
| continue |
|
|
| if action == 0: |
| nx, ny = cx - 1, cy |
| elif action == 1: |
| nx, ny = cx + 1, cy |
| elif action == 2: |
| nx, ny = cx, cy - 1 |
| elif action == 3: |
| nx, ny = cx, cy + 1 |
| else: |
| continue |
|
|
| if 0 <= nx < h and 0 <= ny < w: |
| score = 0 |
| if has_collectible[nx, ny]: |
| score += 10.0 |
| if viewcone[nx, ny, 0] < 1: |
| score -= 5.0 |
| wall_score = ( |
| viewcone[nx, ny, 1] + viewcone[nx, ny, 2] |
| + viewcone[nx, ny, 3] + viewcone[nx, ny, 4] |
| ) |
| score -= wall_score * 2.0 |
|
|
| if score > best_score: |
| best_score = score |
| best_action = action |
|
|
| for dx, dy in [(-1, 0), (1, 0), (0, -1), (0, 1), (0, 0)]: |
| nx, ny = cx + dx, cy + dy |
| if 0 <= nx < h and 0 <= ny < w: |
| if viewcone[nx, ny, 10] > 0 or viewcone[nx, ny, 12] > 0: |
| if 5 in valid_actions and np.random.random() < 0.7: |
| return 5 |
| break |
|
|
| return int(best_action) if best_score > -1 else 4 |
|
|
|
|
| class CurriculumEnv(gym.Env): |
| """Single-agent env with curriculum-based opponent difficulty.""" |
|
|
| CURRICULUM_STAGES = ["static", "simple", "smart", "mixed"] |
| WIN_RATE_THRESHOLD = 0.55 |
| EPISODES_PER_STAGE = 500 |
|
|
| def __init__(self, cfg=None, seed=None): |
| super().__init__() |
| self.cfg = cfg or default_config() |
| self.cfg.env.render_mode = None |
|
|
| raw = Bomberman(self.cfg) |
| self._parallel_env = aec_to_parallel(raw) |
| self.agent_id = "agent_0" |
| self._episode_seed = seed |
| self._episode_count = 0 |
|
|
| self.action_space = Discrete(6) |
|
|
| self._last_action_mask = None |
| self._obs_size = None |
| self._last_obs_dict = None |
|
|
| self._compute_obs_space() |
|
|
| self.stage_idx = 0 |
| self.stage_episodes = 0 |
| self.stage_wins = 0 |
| self.stage_rewards = [] |
|
|
| self.opponents = {} |
| self._init_opponents() |
|
|
| def _compute_obs_space(self): |
| cfg = self.cfg |
| viewcone_l = int(cfg.dynamics.vision.behind) + int(cfg.dynamics.vision.ahead) + 1 |
| viewcone_w = int(cfg.dynamics.vision.left) + int(cfg.dynamics.vision.right) + 1 |
| agent_viewcone_size = viewcone_l * viewcone_w * 25 |
| base_r = int(cfg.entities.base.vision_radius) |
| base_side = 2 * base_r + 1 |
| base_viewcone_size = base_side * base_side * 25 |
| scalar_size = 11 |
| self._obs_size = agent_viewcone_size + base_viewcone_size + scalar_size |
| self.observation_space = Box( |
| low=-np.inf, high=np.inf, |
| shape=(self._obs_size,), dtype=np.float32, |
| ) |
|
|
| def _get_agents(self): |
| if self._last_obs_dict is not None: |
| return list(self._last_obs_dict.keys()) |
| return self._parallel_env.possible_agents |
|
|
| def _init_opponents(self): |
| for i in range(1, self.cfg.env.num_teams): |
| opp_id = f"agent_{i}" |
| self.opponents[opp_id] = RuleBasedOpponent(team_id=i, difficulty="static") |
|
|
| def _update_opponent_difficulty(self): |
| stage = self.CURRICULUM_STAGES[self.stage_idx] |
| for opp_id, opp in self.opponents.items(): |
| if stage == "mixed": |
| opp.difficulty = "smart" if (int(opp_id.split("_")[1]) % 2 == 0) else "simple" |
| else: |
| opp.difficulty = stage |
|
|
| def _check_stage_advance(self): |
| if self.stage_idx >= len(self.CURRICULUM_STAGES) - 1: |
| return False |
| if len(self.stage_rewards) >= self.EPISODES_PER_STAGE: |
| win_rate = self.stage_wins / max(1, len(self.stage_rewards)) |
| avg_reward = np.mean(self.stage_rewards) |
| if win_rate >= self.WIN_RATE_THRESHOLD or len(self.stage_rewards) >= self.EPISODES_PER_STAGE: |
| trackio.alert( |
| "Curriculum Advance", |
| f"Stage {self.CURRICULUM_STAGES[self.stage_idx]} complete: " |
| f"win_rate={win_rate:.2%}, avg_reward={avg_reward:.1f}. " |
| f"Advancing to {self.CURRICULUM_STAGES[self.stage_idx + 1]}", |
| trackio.AlertLevel.INFO, |
| ) |
| self.stage_idx += 1 |
| self.stage_episodes = 0 |
| self.stage_wins = 0 |
| self.stage_rewards = [] |
| self._update_opponent_difficulty() |
| return True |
| return False |
|
|
| def reset(self, seed=None, options=None): |
| if seed is not None: |
| self._episode_seed = seed |
| else: |
| self._episode_seed = self._episode_count |
| self._episode_count += 1 |
|
|
| for opp in self.opponents.values(): |
| opp.reset() |
|
|
| obs_dict, info_dict = self._parallel_env.reset( |
| seed=self._episode_seed, options=options |
| ) |
| self._last_obs_dict = obs_dict |
| self._store_action_mask(obs_dict[self.agent_id]) |
| return self._flatten_obs(obs_dict[self.agent_id]), {} |
|
|
| def step(self, action): |
| actions = {} |
| for agent_id in self._get_agents(): |
| if agent_id == self.agent_id: |
| actions[agent_id] = action |
| else: |
| opp = self.opponents.get(agent_id) |
| if opp is not None and agent_id in self._last_obs_dict: |
| actions[agent_id] = opp.act(self._last_obs_dict[agent_id]) |
| else: |
| actions[agent_id] = 4 |
|
|
| obs_dict, rewards, terminations, truncations, infos = self._parallel_env.step(actions) |
| self._last_obs_dict = obs_dict |
|
|
| if self.agent_id not in obs_dict: |
| self.stage_episodes += 1 |
| return np.zeros(self._obs_size, dtype=np.float32), 0.0, True, False, {} |
|
|
| self._store_action_mask(obs_dict[self.agent_id]) |
| obs = self._flatten_obs(obs_dict[self.agent_id]) |
| reward = float(rewards.get(self.agent_id, 0.0)) |
| done = terminations.get(self.agent_id, False) or truncations.get(self.agent_id, False) |
|
|
| if done: |
| self.stage_episodes += 1 |
| self.stage_rewards.append(reward) |
| if reward > 10.0: |
| self.stage_wins += 1 |
| self._check_stage_advance() |
|
|
| info = dict(infos.get(self.agent_id, {})) |
| info["curriculum_stage"] = self.stage_idx |
| info["curriculum_stage_name"] = self.CURRICULUM_STAGES[self.stage_idx] |
|
|
| return obs, reward, done, False, info |
|
|
| def _store_action_mask(self, obs_dict): |
| if "action_mask" in obs_dict: |
| self._last_action_mask = obs_dict["action_mask"].copy().astype(bool) |
| else: |
| self._last_action_mask = np.ones(6, dtype=bool) |
|
|
| def action_masks(self): |
| return self._last_action_mask |
|
|
| def _flatten_obs(self, obs_dict): |
| return np.concatenate( |
| [ |
| obs_dict["agent_viewcone"].flatten(), |
| obs_dict["base_viewcone"].flatten(), |
| np.array([obs_dict["direction"]], dtype=np.float32), |
| obs_dict["location"].flatten().astype(np.float32), |
| obs_dict["base_location"].flatten().astype(np.float32), |
| obs_dict["health"].flatten().astype(np.float32), |
| np.array([obs_dict["frozen_ticks"]], dtype=np.float32), |
| obs_dict["base_health"].flatten().astype(np.float32), |
| obs_dict["team_resources"].flatten().astype(np.float32), |
| np.array([obs_dict["team_bombs"]], dtype=np.float32), |
| np.array([obs_dict["step"]], dtype=np.float32), |
| ], |
| dtype=np.float32, |
| ) |
|
|
| def close(self): |
| self._parallel_env.close() |
|
|
|
|
| |
| |
| |
|
|
| class TrackioLoggingCallback(BaseCallback): |
| def __init__(self, project, run_name, log_interval=2048, verbose=0): |
| super().__init__(verbose) |
| self.project = project |
| self.run_name = run_name |
| self.log_interval = log_interval |
| self._last_mean_reward = 0.0 |
|
|
| def _on_training_start(self): |
| trackio.init(project=self.project, name=self.run_name) |
| trackio.alert("Training Started", f"{self.run_name} training began.", trackio.AlertLevel.INFO) |
|
|
| def _on_step(self): |
| if self.n_calls % self.log_interval == 0: |
| infos = self.locals.get("infos", [{}]) |
| ep_rewards = [info.get("episode", {}).get("r", 0) for info in infos if "episode" in info] |
| ep_lengths = [info.get("episode", {}).get("l", 0) for info in infos if "episode" in info] |
| explore_bonuses = [info.get("explore_bonus", 0) for info in infos] |
| stages = [info.get("curriculum_stage", 0) for info in infos] |
|
|
| if ep_rewards: |
| mean_r = float(np.mean(ep_rewards)) |
| self._last_mean_reward = mean_r |
| log_dict = { |
| "train/mean_episode_reward": mean_r, |
| "train/mean_episode_length": float(np.mean(ep_lengths)) if ep_lengths else 0.0, |
| "train/timesteps": self.num_timesteps, |
| } |
| if explore_bonuses: |
| log_dict["train/mean_explore_bonus"] = float(np.mean(explore_bonuses)) |
| if stages: |
| log_dict["train/curriculum_stage"] = float(np.mean(stages)) |
| trackio.log(log_dict) |
|
|
| if mean_r < -5.0 and self.num_timesteps > 50_000: |
| trackio.alert("Low Reward Warning", |
| f"mean_reward={mean_r:.2f} at step {self.num_timesteps} -- may be camping.", trackio.AlertLevel.WARN) |
| return True |
|
|
| def _on_training_end(self): |
| trackio.alert("Training Complete", |
| f"Finished at {self.num_timesteps}. Final mean reward: {self._last_mean_reward:.2f}", |
| trackio.AlertLevel.INFO) |
| trackio.finish() |
|
|
|
|
| |
| |
| |
|
|
| def train_phase(cfg, phase, total_timesteps, model=None): |
| trackio_project = os.environ.get("TRACKIO_PROJECT", "til-26-ae") |
|
|
| if phase == 1: |
| print("=== PHASE 1: MaskablePPO vs Random Opponents ===") |
| base_env = BombermanSingleAgentEnv(cfg=cfg, opponent_policy="random") |
| env = ActionMasker(base_env, lambda env: env.action_masks()) |
| env = Monitor(env) |
| run_name = "phase1-maskable-ppo-random" |
|
|
| elif phase == 2: |
| print("=== PHASE 2: Adaptive Exploration Annealing ===") |
| base_env = BombermanSingleAgentEnv(cfg=cfg, opponent_policy="random") |
| shaped_env = RewardShapingWrapper(base_env, adaptive_k=1.2, base_explore_weight=0.5) |
| env = ActionMasker(shaped_env, lambda env: env.action_masks()) |
| env = Monitor(env) |
| run_name = "phase2-adaptive-explore" |
|
|
| elif phase == 3: |
| print("=== PHASE 3: Curriculum + Rule-Based Self-Play ===") |
| cfg.env.num_teams = 3 |
| base_env = CurriculumEnv(cfg=cfg) |
| env = ActionMasker(base_env, lambda env: env.action_masks()) |
| env = Monitor(env) |
| run_name = "phase3-curriculum-selfplay" |
|
|
| else: |
| raise ValueError(f"Unknown phase: {phase}") |
|
|
| if model is None: |
| model = MaskablePPO( |
| "MlpPolicy", env, |
| learning_rate=3e-4, n_steps=2048, batch_size=64, n_epochs=10, |
| gamma=0.99, gae_lambda=0.95, clip_range=0.2, |
| ent_coef=0.01, vf_coef=0.5, max_grad_norm=0.5, |
| verbose=1, |
| device="cuda" if torch.cuda.is_available() else "cpu", |
| ) |
| else: |
| model.set_env(env) |
|
|
| checkpoint_callback = CheckpointCallback( |
| save_freq=50_000, save_path=f"./checkpoints/phase{phase}", |
| name_prefix=f"bomberman_phase{phase}", |
| ) |
|
|
| trackio_callback = TrackioLoggingCallback( |
| trackio_project, run_name, log_interval=2048, |
| ) |
|
|
| model.learn( |
| total_timesteps=total_timesteps, |
| callback=[checkpoint_callback, trackio_callback], |
| progress_bar=False, |
| ) |
|
|
| model.save(f"bomberman_phase{phase}_final") |
| env.close() |
| print(f"Phase {phase} complete. Model saved to bomberman_phase{phase}_final.zip") |
| return model |
|
|
|
|
| def main(): |
| cfg = default_config() |
| cfg.env.render_mode = None |
|
|
| total_ts_env = os.environ.get("TOTAL_TIMESTEPS", "500_000:500_000:1_000_000") |
| phase_ts = [int(x.replace("_", "")) for x in total_ts_env.split(":")] |
|
|
| model = None |
| model = train_phase(cfg, phase=1, total_timesteps=phase_ts[0], model=model) |
|
|
| if len(phase_ts) > 1: |
| model = train_phase(cfg, phase=2, total_timesteps=phase_ts[1], model=model) |
|
|
| if len(phase_ts) > 2: |
| model = train_phase(cfg, phase=3, total_timesteps=phase_ts[2], model=model) |
|
|
| hub_model_id = os.environ.get("HUB_MODEL_ID", "") |
| if hub_model_id: |
| from huggingface_hub import HfApi |
| api = HfApi() |
| for phase in range(1, len(phase_ts) + 1): |
| try: |
| api.upload_file( |
| path_or_fileobj=f"bomberman_phase{phase}_final.zip", |
| path_in_repo=f"bomberman_phase{phase}_final.zip", |
| repo_id=hub_model_id, repo_type="model", |
| ) |
| print(f"Phase {phase} model pushed to {hub_model_id}") |
| except Exception as e: |
| print(f"Failed to push phase {phase}: {e}") |
|
|
| print("\n=== All phases complete! ===") |
| if hub_model_id: |
| print(f"Model repository: https://huggingface.co/{hub_model_id}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|