| import os |
| import numpy as np |
| import torch |
| import torch.nn as nn |
| import torch.nn.functional as F |
| from torch.distributions.normal import Normal |
| import gymnasium as gym |
|
|
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
| |
| |
| |
|
|
| def layer_init(layer, std=np.sqrt(2), bias_const=0.0): |
| torch.nn.init.orthogonal_(layer.weight, std) |
| torch.nn.init.constant_(layer.bias, bias_const) |
| return layer |
|
|
| class ContinuousPPOMLP(nn.Module): |
| def __init__(self, obs_dim, action_dim): |
| super().__init__() |
| |
| self.fc1 = layer_init(nn.Linear(obs_dim, 256)) |
| self.fc2 = layer_init(nn.Linear(256, 256)) |
| self.fc3 = layer_init(nn.Linear(256, 256)) |
| |
| self.actor_mean = layer_init(nn.Linear(256, action_dim), std=0.01) |
| self.actor_logstd = nn.Parameter(torch.zeros(1, action_dim)) |
| |
| self.critic = layer_init(nn.Linear(256, 1), std=1) |
|
|
| def get_features(self, x): |
| x = x.float() |
| x = F.relu(self.fc1(x)) |
| x = F.relu(self.fc2(x)) |
| x = F.relu(self.fc3(x)) |
| return x |
|
|
| def get_action_and_value(self, x, action=None): |
| features = self.get_features(x) |
| action_mean = self.actor_mean(features) |
| |
| |
| action_logstd = torch.clamp(self.actor_logstd.expand_as(action_mean), -20, 2) |
| action_std = torch.exp(action_logstd) |
| probs = Normal(action_mean, action_std) |
| |
| if action is None: |
| action = probs.sample() |
| |
| return action |
|
|
| |
| |
| |
|
|
| def evaluate_model(model_path, num_episodes=100): |
| env = gym.wrappers.RecordEpisodeStatistics(gym.wrappers.ClipAction(gym.make("MountainCarContinuous-v0", max_episode_steps=999))) |
| |
| obs_dim = env.observation_space.shape[0] |
| act_dim = env.action_space.shape[0] |
| |
| |
| model = ContinuousPPOMLP(obs_dim, act_dim).to(DEVICE) |
| |
| print(f"Loading weights from {model_path}...") |
| state_dict = torch.load(model_path, map_location=DEVICE, weights_only=True) |
| |
| |
| clean_state_dict = {k.replace('_orig_mod.', ''): v for k, v in state_dict.items()} |
| model.load_state_dict(clean_state_dict, strict=False) |
| model.eval() |
|
|
| returns = [] |
| print(f"Starting {num_episodes} evaluation episodes...") |
| |
| obs_np, _ = env.reset(seed=42) |
| obs = torch.Tensor(obs_np).unsqueeze(0).to(DEVICE) |
|
|
| while len(returns) < num_episodes: |
| with torch.no_grad(): |
| act = model.get_action_and_value(obs) |
| |
| raw_act = act[0].cpu().numpy()[0] |
| obs_np, reward_np, term_np, trunc_np, info = env.step(np.array([raw_act])) |
| obs = torch.Tensor(obs_np).unsqueeze(0).to(DEVICE) |
| |
| if term_np or trunc_np: |
| if 'episode' in info: |
| ep_return = info['episode']['r'][0] if hasattr(info['episode']['r'], '__len__') else info['episode']['r'] |
| returns.append(ep_return) |
| print(f"Episode {len(returns):3d}/{num_episodes} Return: {ep_return:.2f} (Steps: {info['episode']['l']})") |
| |
| obs_np, _ = env.reset() |
| obs = torch.Tensor(obs_np).unsqueeze(0).to(DEVICE) |
| |
| avg_score = np.mean(returns) |
| print(f"\n==========================================") |
| print(f"PUBLIC EVALUATION COMPLETE") |
| print(f"Average Score ({num_episodes} trials): {avg_score:.2f}") |
| if avg_score > 90.0: |
| print("Status: π SOLVED") |
| print(f"==========================================") |
|
|
| if __name__ == "__main__": |
| import argparse |
| parser = argparse.ArgumentParser() |
| parser.add_argument('--model_path', type=str, required=True, help="Path to the frozen .pt model weights") |
| parser.add_argument('--episodes', type=int, default=100, help="Number of episodes to evaluate") |
| args = parser.parse_args() |
| |
| if not os.path.exists(args.model_path): |
| print(f"Error: Could not find model weights at '{args.model_path}'") |
| else: |
| evaluate_model(args.model_path, args.episodes) |
|
|