| |
| |
| from collections import deque |
|
|
| import gymnasium as gym |
| import gym_pygame |
|
|
| |
| |
|
|
| import numpy as np |
| import torch as th |
| import torch.nn as nn |
| import torch.nn.functional as F |
| import torch.optim as optim |
| from torch.distributions import Categorical |
|
|
|
|
| |
| |
| if not th.cuda.is_available(): |
| raise Exception("CUDA is not supported!") |
|
|
| device_id = th.cuda.current_device() |
| device_name = th.cuda.get_device_name(device_id) |
| print("Available CUDA device:", device_name) |
|
|
| device = th.device("cuda:0" if th.cuda.is_available() else "cpu") |
| print("Selected device:", device) |
|
|
|
|
| |
| |
|
|
| env_id = "Pixelcopter-PLE-v0" |
| env = gym.make(env_id) |
| s_size = env.observation_space.shape[0] |
| a_size = env.action_space.n.item() |
|
|
| print("\n_____OBSERVATION SPACE_____") |
| print("The State Space is: ", s_size) |
| print("Sample observation", env.observation_space.sample()) |
|
|
| print("\n _____ACTION SPACE_____") |
| print("The Action Space is: ", a_size) |
| print("Action Space Sample", env.action_space.sample()) |
|
|
|
|
| |
| |
|
|
|
|
| class Policy(nn.Module): |
| def __init__(self, state_dim: int, action_dim: int, hidden_dim: int) -> None: |
| super().__init__() |
| self.fc1 = nn.Linear(state_dim, hidden_dim) |
| self.fc2 = nn.Linear(hidden_dim, hidden_dim) |
| self.fc3 = nn.Linear(hidden_dim, action_dim) |
|
|
| def forward(self, x: th.Tensor) -> th.Tensor: |
| x = F.relu(self.fc1(x)) |
| x = F.relu(self.fc2(x)) |
| x = self.fc3(x) |
| return F.softmax(x, dim=1) |
|
|
| def act(self, state: np.ndarray) -> tuple[int, float]: |
| |
| x = th.tensor(data=state, dtype=th.float32, device=device).unsqueeze(0) |
| probs = self.forward(x) |
| m = Categorical(probs=probs) |
| action = m.sample() |
| return action.item(), m.log_prob(action) |
|
|
|
|
| debug_policy = Policy(state_dim=s_size, action_dim=a_size, hidden_dim=64).to( |
| device |
| ) |
| obs, info = env.reset() |
| debug_policy.act(state=obs) |
|
|
|
|
| |
| |
|
|
|
|
| def reinforce( |
| policy: Policy, |
| env: gym.Env, |
| optimizer: optim.Optimizer, |
| num_episodes: int, |
| max_steps: int, |
| gamma: float = 0.99, |
| log_freq: int = 10, |
| ): |
| scores = [] |
| for episode in range(1, num_episodes + 1): |
|
|
| |
| rewards, log_probs = [], [] |
| obs, info = env.reset() |
| for t in range(max_steps): |
| action, log_prob = policy.act(obs) |
| obs, reward, done, trunc, info = env.step(action) |
|
|
| rewards.append(reward) |
| log_probs.append(log_prob) |
|
|
| if done or trunc: |
| break |
|
|
| |
| score = sum(rewards) |
| scores.append(score) |
|
|
| |
| disc_returns = deque(maxlen=max_steps) |
| for reward in rewards[::-1]: |
| next_return = disc_returns[0] if len(disc_returns) > 0 else 0.0 |
| disc_returns.appendleft(reward + gamma * next_return) |
|
|
| |
| disc_returns = th.tensor(disc_returns) |
| eps = th.finfo(th.float32).eps |
| disc_returns = (disc_returns - disc_returns.mean()) / (disc_returns.std() + eps) |
|
|
| |
| policy_losses = [] |
| for log_prob, disc_return in zip(log_probs, disc_returns): |
| policy_losses.append(-log_prob * disc_return) |
| policy_loss = th.cat(policy_losses).sum() |
|
|
| |
| optimizer.zero_grad() |
| policy_loss.backward() |
| optimizer.step() |
|
|
| |
| if episode == 1 or episode % log_freq == 0: |
| last_scores = scores[-100:] if len(scores) > 100 else scores |
| mean_score = np.mean(last_scores) |
| std_score = np.std(last_scores) |
| print(f"Episode {episode} score: {mean_score:.2f} ± {std_score:.2f}") |
|
|
| return scores |
|
|
|
|
| |
| |
| hyperparameters = { |
| "h_size": 64, |
| "n_training_episodes": 50_000, |
| "n_evaluation_episodes": 10, |
| "max_t": 10_000, |
| "gamma": 0.99, |
| "lr": 1e-4, |
| "env_id": env_id, |
| "state_space": s_size, |
| "action_space": a_size, |
| } |
|
|
| policy = Policy( |
| state_dim=hyperparameters["state_space"], |
| action_dim=hyperparameters["action_space"], |
| hidden_dim=hyperparameters["h_size"], |
| ).to(device) |
| optimizer = optim.Adam(params=policy.parameters(), lr=hyperparameters["lr"]) |
|
|
| |
| |
| |
| |
| |
| |
| |
| |
| |
|
|
|
|
| |
| |
| model_path = f"models/reinforce-{env_id}.pth" |
| |
| policy = th.load(model_path, weights_only=False) |
|
|
|
|
| |
| |
| from extra import evaluate_agent |
|
|
| env = gym.make(env_id) |
| mean_score, std_score = evaluate_agent( |
| env=env, |
| max_steps=hyperparameters["max_t"], |
| n_eval_episodes=hyperparameters["n_evaluation_episodes"], |
| policy=policy, |
| ) |
| print(f"\nScore: {mean_score:.2f} ± {std_score:.2f}") |
|
|
|
|
| |
| |
| from extra import push_to_hub |
|
|
| repo_id = f"pabloramesc/Reinforce-{env_id}" |
| eval_env = gym.make(env_id).unwrapped |
| push_to_hub( |
| repo_id, |
| env_id, |
| policy, |
| hyperparameters, |
| eval_env, |
| video_fps=30, |
| ) |
|
|
|
|
| |
|
|