Reinforce-Pixelcopter-PLE-v0 / reinforce-pixelcopter.py
pabloramesc's picture
Upload 3 files
99a13cb verified
# %%
# Import modules
from collections import deque
import gymnasium as gym
import gym_pygame
# import gym
# import gym_ple
import numpy as np
import torch as th
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
# %%
# Check GPU device with CUDA for PyTorch
if not th.cuda.is_available():
raise Exception("CUDA is not supported!")
device_id = th.cuda.current_device()
device_name = th.cuda.get_device_name(device_id)
print("Available CUDA device:", device_name)
device = th.device("cuda:0" if th.cuda.is_available() else "cpu")
print("Selected device:", device)
# %%
# Create training and evaluation environments
env_id = "Pixelcopter-PLE-v0"
env = gym.make(env_id)
s_size = env.observation_space.shape[0]
a_size = env.action_space.n.item()
print("\n_____OBSERVATION SPACE_____")
print("The State Space is: ", s_size)
print("Sample observation", env.observation_space.sample()) # Get a random observation
print("\n _____ACTION SPACE_____")
print("The Action Space is: ", a_size)
print("Action Space Sample", env.action_space.sample()) # Take a random action
# %%
# Define policy model
class Policy(nn.Module):
def __init__(self, state_dim: int, action_dim: int, hidden_dim: int) -> None:
super().__init__()
self.fc1 = nn.Linear(state_dim, hidden_dim)
self.fc2 = nn.Linear(hidden_dim, hidden_dim)
self.fc3 = nn.Linear(hidden_dim, action_dim)
def forward(self, x: th.Tensor) -> th.Tensor:
x = F.relu(self.fc1(x))
x = F.relu(self.fc2(x))
x = self.fc3(x)
return F.softmax(x, dim=1)
def act(self, state: np.ndarray) -> tuple[int, float]:
# x = th.from_numpy(state).float().unsqueeze(0).to(device)
x = th.tensor(data=state, dtype=th.float32, device=device).unsqueeze(0)
probs = self.forward(x)
m = Categorical(probs=probs)
action = m.sample()
return action.item(), m.log_prob(action)
debug_policy = Policy(state_dim=s_size, action_dim=a_size, hidden_dim=64).to(
device
)
obs, info = env.reset()
debug_policy.act(state=obs)
# %%
# Define REINFORCE algorithm
def reinforce(
policy: Policy,
env: gym.Env,
optimizer: optim.Optimizer,
num_episodes: int,
max_steps: int,
gamma: float = 0.99,
log_freq: int = 10,
):
scores = []
for episode in range(1, num_episodes + 1):
# Run episode
rewards, log_probs = [], []
obs, info = env.reset()
for t in range(max_steps):
action, log_prob = policy.act(obs)
obs, reward, done, trunc, info = env.step(action)
rewards.append(reward)
log_probs.append(log_prob)
if done or trunc:
break
# Save scores (accumulated rewards)
score = sum(rewards)
scores.append(score)
# Calculate discounted returns (discounted cumulative rewards)
disc_returns = deque(maxlen=max_steps) # Use deque for efficient appendleft
for reward in rewards[::-1]: # Review rewards in reverse order
next_return = disc_returns[0] if len(disc_returns) > 0 else 0.0
disc_returns.appendleft(reward + gamma * next_return)
# Normalize discounted returns for stabilization
disc_returns = th.tensor(disc_returns)
eps = th.finfo(th.float32).eps
disc_returns = (disc_returns - disc_returns.mean()) / (disc_returns.std() + eps)
# Calculate policy loss
policy_losses = []
for log_prob, disc_return in zip(log_probs, disc_returns):
policy_losses.append(-log_prob * disc_return)
policy_loss = th.cat(policy_losses).sum()
# Apply gradient descent optimization
optimizer.zero_grad()
policy_loss.backward()
optimizer.step()
# Print achieved scores
if episode == 1 or episode % log_freq == 0:
last_scores = scores[-100:] if len(scores) > 100 else scores
mean_score = np.mean(last_scores)
std_score = np.std(last_scores)
print(f"Episode {episode} score: {mean_score:.2f} ± {std_score:.2f}")
return scores
# %%
# Train the agent
hyperparameters = {
"h_size": 64,
"n_training_episodes": 50_000,
"n_evaluation_episodes": 10,
"max_t": 10_000,
"gamma": 0.99,
"lr": 1e-4,
"env_id": env_id,
"state_space": s_size,
"action_space": a_size,
}
policy = Policy(
state_dim=hyperparameters["state_space"],
action_dim=hyperparameters["action_space"],
hidden_dim=hyperparameters["h_size"],
).to(device)
optimizer = optim.Adam(params=policy.parameters(), lr=hyperparameters["lr"])
# scores = reinforce(
# policy=policy,
# env=env,
# optimizer=optimizer,
# num_episodes=hyperparameters["n_training_episodes"],
# max_steps=hyperparameters["max_t"],
# gamma=hyperparameters["gamma"],
# log_freq=1000,
# )
# %%
# Save/Load the agent
model_path = f"models/reinforce-{env_id}.pth"
# th.save(policy, model_path)
policy = th.load(model_path, weights_only=False)
# %%
# Evaluate the agent
from extra import evaluate_agent
env = gym.make(env_id)
mean_score, std_score = evaluate_agent(
env=env,
max_steps=hyperparameters["max_t"],
n_eval_episodes=hyperparameters["n_evaluation_episodes"],
policy=policy,
)
print(f"\nScore: {mean_score:.2f} ± {std_score:.2f}")
# %%
# Upload results to repo
from extra import push_to_hub
repo_id = f"pabloramesc/Reinforce-{env_id}"
eval_env = gym.make(env_id).unwrapped
push_to_hub(
repo_id,
env_id,
policy, # The model we want to save
hyperparameters, # Hyperparameters
eval_env, # Evaluation environment
video_fps=30,
)
# %%