import json import datetime import numpy as np from collections import deque # PyTorch import torch import torch.nn as nn import torch.nn.functional as F import torch.optim as optim from huggingface_hub import metadata_eval_result, HfApi, metadata_save from torch.distributions import Categorical # Gym import gym import gym_pygame device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") print(device) env_id = "Pixelcopter-PLE-v0" env = gym.make(env_id) eval_env = gym.make(env_id) s_size = env.observation_space.shape[0] a_size = env.action_space.n class Policy(nn.Module): def __init__(self, s_size, a_size, h_size): super(Policy, self).__init__() self.fc1 = nn.Linear(s_size, h_size) self.fc2 = nn.Linear(h_size, h_size * 2) self.fc3 = nn.Linear(h_size * 2, a_size) def forward(self, x): x = F.relu(self.fc1(x)) x = F.relu(self.fc2(x)) x = self.fc3(x) return F.softmax(x, dim=1) def act(self, state): state = torch.from_numpy(state).float().unsqueeze(0).to(device) probs = self.forward(state).cpu() m = Categorical(probs) action = m.sample() return action.item(), m.log_prob(action) def reinforce(policy, optimizer, n_training_episodes, max_t, gamma, print_every): # Help us to calculate the score during the training scores_deque = deque(maxlen=100) scores = [] # Line 3 of pseudocode for i_episode in range(1, n_training_episodes + 1): saved_log_probs = [] rewards = [] state = env.reset() # Line 4 of pseudocode for t in range(max_t): action, log_prob = policy.act(state) saved_log_probs.append(log_prob) state, reward, done, _ = env.step(action) rewards.append(reward) if done: break scores_deque.append(sum(rewards)) scores.append(sum(rewards)) # Line 6 of pseudocode: calculate the return returns = deque(maxlen=max_t) n_steps = len(rewards) # Compute the discounted returns at each timestep, # as # the sum of the gamma-discounted return at time t (G_t) + the reward at time t # # In O(N) time, where N is the number of time steps # (this definition of the discounted return G_t follows the definition of this quantity # shown at page 44 of Sutton&Barto 2017 2nd draft) # G_t = r_(t+1) + r_(t+2) + ... # Given this formulation, the returns at each timestep t can be computed # by re-using the computed future returns G_(t+1) to compute the current return G_t # G_t = r_(t+1) + gamma*G_(t+1) # G_(t-1) = r_t + gamma* G_t # (this follows a dynamic programming approach, with which we memorize solutions in order # to avoid computing them multiple times) # This is correct since the above is equivalent to (see also page 46 of Sutton&Barto 2017 2nd draft) # G_(t-1) = r_t + gamma*r_(t+1) + gamma*gamma*r_(t+2) + ... ## Given the above, we calculate the returns at timestep t as: # gamma[t] * return[t] + reward[t] # ## We compute this starting from the last timestep to the first, in order ## to employ the formula presented above and avoid redundant computations that would be needed ## if we were to do it from first to last. ## Hence, the queue "returns" will hold the returns in chronological order, from t=0 to t=n_steps ## thanks to the appendleft() function which allows to append to the position 0 in constant time O(1) ## a normal python list would instead require O(N) to do this. for t in range(n_steps)[::-1]: disc_return_t = (returns[0] if len(returns) > 0 else 0) returns.appendleft(gamma * disc_return_t + rewards[t]) ## standardization of the returns is employed to make training more stable eps = np.finfo(np.float32).eps.item() ## eps is the smallest representable float, which is # added to the standard deviation of the returns to avoid numerical instabilities returns = torch.tensor(returns) returns = (returns - returns.mean()) / (returns.std() + eps) # Line 7: policy_loss = [] for log_prob, disc_return in zip(saved_log_probs, returns): policy_loss.append(-log_prob * disc_return) policy_loss = torch.cat(policy_loss).sum() # Line 8: PyTorch prefers gradient descent optimizer.zero_grad() policy_loss.backward() optimizer.step() if i_episode % print_every == 0: print('Episode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_deque))) return scores pixelcopter_hyperparameters = { "h_size": 64, "n_training_episodes": 1000, "n_evaluation_episodes": 10, "max_t": 10000, "gamma": 0.99, "lr": 1e-4, "env_id": env_id, "state_space": s_size, "action_space": a_size, } # Create policy and place it to the device # torch.manual_seed(50) pixelcopter_policy = Policy(pixelcopter_hyperparameters["state_space"], pixelcopter_hyperparameters["action_space"], pixelcopter_hyperparameters["h_size"]).to(device) pixelcopter_optimizer = optim.Adam(pixelcopter_policy.parameters(), lr=pixelcopter_hyperparameters["lr"]) scores = reinforce(pixelcopter_policy, pixelcopter_optimizer, pixelcopter_hyperparameters["n_training_episodes"], pixelcopter_hyperparameters["max_t"], pixelcopter_hyperparameters["gamma"], 1000) def push_to_hub(repo_id, model, hyperparameters, ): """ Evaluate, Generate a video and Upload a model to Hugging Face Hub. This method does the complete pipeline: - It evaluates the model - It generates the model card - It generates a replay video of the agent - It pushes everything to the Hub :param repo_id: repo_id: id of the model repository from the Hugging Face Hub :param model: the pytorch model we want to save :param hyperparameters: training hyperparameters :param eval_env: evaluation environment :param video_fps: how many frame per seconds to record our video replay """ _, repo_name = repo_id.split("/") api = HfApi() # Step 1: Create the repo repo_url = api.create_repo( repo_id=repo_id, exist_ok=True, ) # Step 2: Save the model torch.save(model, "model.pt") # Step 3: Save the hyperparameters to JSON with open("hyperparameters.json", "w") as outfile: json.dump(hyperparameters, outfile) # Step 4: Evaluate the model and build JSON mean_reward, std_reward = 5.03, 0 # Get datetime eval_datetime = datetime.datetime.now() eval_form_datetime = eval_datetime.isoformat() evaluate_data = { "env_id": hyperparameters["env_id"], "mean_reward": mean_reward, "n_evaluation_episodes": hyperparameters["n_evaluation_episodes"], "eval_datetime": eval_form_datetime, } # Write a JSON file with open("results.json", "w") as outfile: json.dump(evaluate_data, outfile) # Step 5: Create the model card env_name = hyperparameters["env_id"] metadata = {} metadata["tags"] = [ env_name, "reinforce", "reinforcement-learning", "custom-implementation", "deep-rl-class" ] # Add metrics eval = metadata_eval_result( model_pretty_name=repo_name, task_pretty_name="reinforcement-learning", task_id="reinforcement-learning", metrics_pretty_name="mean_reward", metrics_id="mean_reward", metrics_value=f"{mean_reward:.2f} +/- {std_reward:.2f}", dataset_pretty_name=env_name, dataset_id=env_name, ) # Merges both dictionaries metadata = {**metadata, **eval} model_card = f""" # **Reinforce** Agent playing **{env_id}** This is a trained model of a **Reinforce** agent playing **{env_id}** . To learn to use this model and train yours check Unit 4 of the Deep Reinforcement Learning Course: https://huggingface.co/deep-rl-course/unit4/introduction """ readme_path = "README.md" readme = model_card with open(readme_path, "w", encoding="utf-8") as f: f.write(readme) # Save our metrics to Readme metadata metadata_save(readme_path, metadata) # Step 7. Push everything to the Hub api.upload_folder( repo_id=repo_id, folder_path=".", path_in_repo=".", ) print(f"Your model is pushed to the Hub. You can view your model here: {repo_url}") repo_id = "cyrodw/Reinforce-Pixelcopter" # TODO Define your repo id {username/Reinforce-{model-id}} push_to_hub(repo_id, pixelcopter_policy, # The model we want to save pixelcopter_hyperparameters, # Hyperparameters )