FDF-MountainCarContinuous-v0 / fdf_inference.py
matthewbrach's picture
Upload fdf_inference.py with huggingface_hub
502b254 verified
import os
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions.normal import Normal
import gymnasium as gym
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
# ─────────────────────────────────────────────────────────
# Sterile Network Architecture (Inference Only)
# ─────────────────────────────────────────────────────────
def layer_init(layer, std=np.sqrt(2), bias_const=0.0):
torch.nn.init.orthogonal_(layer.weight, std)
torch.nn.init.constant_(layer.bias, bias_const)
return layer
class ContinuousPPOMLP(nn.Module):
def __init__(self, obs_dim, action_dim):
super().__init__()
# Standard 256x256 PyTorch Multi-Layer Perceptron
self.fc1 = layer_init(nn.Linear(obs_dim, 256))
self.fc2 = layer_init(nn.Linear(256, 256))
self.fc3 = layer_init(nn.Linear(256, 256))
self.actor_mean = layer_init(nn.Linear(256, action_dim), std=0.01)
self.actor_logstd = nn.Parameter(torch.zeros(1, action_dim))
self.critic = layer_init(nn.Linear(256, 1), std=1)
def get_features(self, x):
x = x.float()
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = F.relu(self.fc3(x))
return x
def get_action_and_value(self, x, action=None):
features = self.get_features(x)
action_mean = self.actor_mean(features)
# We only need the forward pass for inference, no backward loss calculations
action_logstd = torch.clamp(self.actor_logstd.expand_as(action_mean), -20, 2)
action_std = torch.exp(action_logstd)
probs = Normal(action_mean, action_std)
if action is None:
action = probs.sample()
return action
# ─────────────────────────────────────────────────────────
# Public Evaluator Loop
# ─────────────────────────────────────────────────────────
def evaluate_model(model_path, num_episodes=100):
env = gym.wrappers.RecordEpisodeStatistics(gym.wrappers.ClipAction(gym.make("MountainCarContinuous-v0", max_episode_steps=999)))
obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.shape[0]
# Initialize sterile network
model = ContinuousPPOMLP(obs_dim, act_dim).to(DEVICE)
print(f"Loading weights from {model_path}...")
state_dict = torch.load(model_path, map_location=DEVICE, weights_only=True)
# Clean possible torch.compile prefixes just in case
clean_state_dict = {k.replace('_orig_mod.', ''): v for k, v in state_dict.items()}
model.load_state_dict(clean_state_dict, strict=False)
model.eval()
returns = []
print(f"Starting {num_episodes} evaluation episodes...")
obs_np, _ = env.reset(seed=42)
obs = torch.Tensor(obs_np).unsqueeze(0).to(DEVICE)
while len(returns) < num_episodes:
with torch.no_grad():
act = model.get_action_and_value(obs)
raw_act = act[0].cpu().numpy()[0]
obs_np, reward_np, term_np, trunc_np, info = env.step(np.array([raw_act]))
obs = torch.Tensor(obs_np).unsqueeze(0).to(DEVICE)
if term_np or trunc_np:
if 'episode' in info:
ep_return = info['episode']['r'][0] if hasattr(info['episode']['r'], '__len__') else info['episode']['r']
returns.append(ep_return)
print(f"Episode {len(returns):3d}/{num_episodes} Return: {ep_return:.2f} (Steps: {info['episode']['l']})")
obs_np, _ = env.reset()
obs = torch.Tensor(obs_np).unsqueeze(0).to(DEVICE)
avg_score = np.mean(returns)
print(f"\n==========================================")
print(f"PUBLIC EVALUATION COMPLETE")
print(f"Average Score ({num_episodes} trials): {avg_score:.2f}")
if avg_score > 90.0:
print("Status: πŸ† SOLVED")
print(f"==========================================")
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--model_path', type=str, required=True, help="Path to the frozen .pt model weights")
parser.add_argument('--episodes', type=int, default=100, help="Number of episodes to evaluate")
args = parser.parse_args()
if not os.path.exists(args.model_path):
print(f"Error: Could not find model weights at '{args.model_path}'")
else:
evaluate_model(args.model_path, args.episodes)