| from __future__ import annotations |
|
|
| import datetime as dt |
| import json |
| import pickle |
| import random |
| import shutil |
| from pathlib import Path |
|
|
| import gymnasium as gym |
| import imageio |
| import numpy as np |
| from huggingface_hub import HfApi |
| from huggingface_hub.errors import HfHubHTTPError |
| from huggingface_hub.repocard import metadata_eval_result, metadata_save |
| from tqdm import tqdm |
|
|
|
|
| USERNAME = "Sami94" |
| STUDENT_NAME = "Sami Chellia" |
| OUTPUT_DIR = Path("artifacts/unit2") |
|
|
|
|
| def initialize_q_table(state_space: int, action_space: int) -> np.ndarray: |
| return np.zeros((state_space, action_space)) |
|
|
|
|
| def greedy_policy(qtable: np.ndarray, state: int) -> int: |
| return int(np.argmax(qtable[state][:])) |
|
|
|
|
| def epsilon_greedy_policy(qtable: np.ndarray, state: int, epsilon: float, env: gym.Env) -> int: |
| if random.uniform(0, 1) > epsilon: |
| return greedy_policy(qtable, state) |
| return int(env.action_space.sample()) |
|
|
|
|
| def train( |
| *, |
| n_training_episodes: int, |
| learning_rate: float, |
| min_epsilon: float, |
| max_epsilon: float, |
| decay_rate: float, |
| gamma: float, |
| env: gym.Env, |
| max_steps: int, |
| qtable: np.ndarray, |
| ) -> np.ndarray: |
| for episode in tqdm(range(n_training_episodes), desc=f"training {env.spec.id}"): |
| epsilon = min_epsilon + (max_epsilon - min_epsilon) * np.exp(-decay_rate * episode) |
| state, _ = env.reset() |
|
|
| for _ in range(max_steps): |
| action = epsilon_greedy_policy(qtable, int(state), epsilon, env) |
| new_state, reward, terminated, truncated, _ = env.step(action) |
| new_state = int(new_state) |
|
|
| qtable[int(state)][action] += learning_rate * ( |
| reward + gamma * np.max(qtable[new_state]) - qtable[int(state)][action] |
| ) |
|
|
| if terminated or truncated: |
| break |
| state = new_state |
|
|
| return qtable |
|
|
|
|
| def evaluate_agent( |
| env: gym.Env, |
| max_steps: int, |
| n_eval_episodes: int, |
| qtable: np.ndarray, |
| seed: list[int], |
| ) -> tuple[float, float]: |
| episode_rewards: list[float] = [] |
| for episode in tqdm(range(n_eval_episodes), desc=f"evaluating {env.spec.id}"): |
| if seed: |
| state, _ = env.reset(seed=seed[episode]) |
| else: |
| state, _ = env.reset() |
| total_rewards_ep = 0.0 |
|
|
| for _ in range(max_steps): |
| action = greedy_policy(qtable, int(state)) |
| new_state, reward, terminated, truncated, _ = env.step(action) |
| total_rewards_ep += float(reward) |
| if terminated or truncated: |
| break |
| state = new_state |
|
|
| episode_rewards.append(total_rewards_ep) |
|
|
| return float(np.mean(episode_rewards)), float(np.std(episode_rewards)) |
|
|
|
|
| def record_video(env: gym.Env, qtable: np.ndarray, out_path: Path, fps: int = 1) -> None: |
| images = [] |
| terminated = False |
| truncated = False |
| state, _ = env.reset(seed=random.randint(0, 500)) |
| images.append(env.render()) |
|
|
| while not (terminated or truncated): |
| action = greedy_policy(qtable, int(state)) |
| state, _, terminated, truncated, _ = env.step(action) |
| images.append(env.render()) |
|
|
| imageio.mimsave(out_path, [np.array(img) for img in images], fps=fps) |
|
|
|
|
| def push_to_hub(repo_id: str, model: dict, env: gym.Env, local_repo_path: Path) -> str: |
| _, repo_name = repo_id.split("/") |
| api = HfApi() |
|
|
| if local_repo_path.exists(): |
| shutil.rmtree(local_repo_path) |
| local_repo_path.mkdir(parents=True, exist_ok=True) |
|
|
| if env.spec.kwargs.get("map_name"): |
| model["map_name"] = env.spec.kwargs.get("map_name") |
| if env.spec.kwargs.get("is_slippery", "") is False: |
| model["slippery"] = False |
|
|
| with (local_repo_path / "q-learning.pkl").open("wb") as f: |
| pickle.dump(model, f) |
|
|
| mean_reward, std_reward = evaluate_agent( |
| env, |
| model["max_steps"], |
| model["n_eval_episodes"], |
| model["qtable"], |
| model["eval_seed"], |
| ) |
|
|
| evaluate_data = { |
| "env_id": model["env_id"], |
| "mean_reward": mean_reward, |
| "std_reward": std_reward, |
| "n_eval_episodes": model["n_eval_episodes"], |
| "eval_datetime": dt.datetime.now().isoformat(), |
| "student": STUDENT_NAME, |
| "hf_username": USERNAME, |
| } |
| (local_repo_path / "results.json").write_text(json.dumps(evaluate_data, indent=2), encoding="utf-8") |
|
|
| env_name = model["env_id"] |
| if env.spec.kwargs.get("map_name"): |
| env_name += "-" + env.spec.kwargs.get("map_name") |
| if env.spec.kwargs.get("is_slippery", "") is False: |
| env_name += "-no_slippery" |
|
|
| metadata = { |
| "tags": [ |
| env_name, |
| "q-learning", |
| "reinforcement-learning", |
| "custom-implementation", |
| "huggingface-deep-rl-course", |
| ] |
| } |
| eval_metadata = metadata_eval_result( |
| model_pretty_name=repo_name, |
| task_pretty_name="reinforcement-learning", |
| task_id="reinforcement-learning", |
| metrics_pretty_name="mean_reward", |
| metrics_id="mean_reward", |
| metrics_value=f"{mean_reward:.2f} +/- {std_reward:.2f}", |
| dataset_pretty_name=env_name, |
| dataset_id=env_name, |
| ) |
| metadata = {**metadata, **eval_metadata} |
|
|
| readme_path = local_repo_path / "README.md" |
| readme_path.write_text( |
| f"""# Q-Learning Agent for {env_name} |
| |
| Student: {STUDENT_NAME} |
| Hugging Face username: {USERNAME} |
| |
| This repository contains a Q-Learning agent trained for the Hugging Face Deep RL course. |
| |
| Mean reward: {mean_reward:.2f} +/- {std_reward:.2f} |
| |
| ```python |
| from huggingface_hub import hf_hub_download |
| import pickle |
| |
| model_path = hf_hub_download(repo_id="{repo_id}", filename="q-learning.pkl") |
| with open(model_path, "rb") as f: |
| model = pickle.load(f) |
| ``` |
| """, |
| encoding="utf-8", |
| ) |
| metadata_save(readme_path, metadata) |
|
|
| record_video(env, model["qtable"], local_repo_path / "replay.mp4") |
|
|
| try: |
| repo_url = api.create_repo(repo_id=repo_id, exist_ok=True) |
| api.upload_folder(repo_id=repo_id, folder_path=local_repo_path, path_in_repo=".") |
| return str(repo_url) |
| except HfHubHTTPError as exc: |
| print(f"Hub push failed for {repo_id}: {exc}") |
| print(f"Local artifacts saved in {local_repo_path.resolve()}") |
| return f"LOCAL_ONLY:{local_repo_path.resolve()}" |
|
|
|
|
| def run_frozenlake() -> dict: |
| env = gym.make("FrozenLake-v1", map_name="4x4", is_slippery=False, render_mode="rgb_array") |
| qtable = initialize_q_table(env.observation_space.n, env.action_space.n) |
| params = { |
| "n_training_episodes": 10000, |
| "learning_rate": 0.7, |
| "n_eval_episodes": 100, |
| "env_id": "FrozenLake-v1", |
| "max_steps": 99, |
| "gamma": 0.95, |
| "eval_seed": [], |
| "max_epsilon": 1.0, |
| "min_epsilon": 0.05, |
| "decay_rate": 0.0005, |
| } |
| qtable = train(env=env, qtable=qtable, **{k: params[k] for k in [ |
| "n_training_episodes", |
| "learning_rate", |
| "min_epsilon", |
| "max_epsilon", |
| "decay_rate", |
| "gamma", |
| "max_steps", |
| ]}) |
| model = {**params, "qtable": qtable} |
| repo_id = f"{USERNAME}/q-FrozenLake-v1-4x4-noSlippery" |
| url = push_to_hub(repo_id, model, env, OUTPUT_DIR / "q-FrozenLake-v1-4x4-noSlippery") |
| env.close() |
| return {"repo_id": repo_id, "url": url} |
|
|
|
|
| def run_taxi() -> dict: |
| env = gym.make("Taxi-v3", render_mode="rgb_array") |
| qtable = initialize_q_table(env.observation_space.n, env.action_space.n) |
| params = { |
| "n_training_episodes": 25000, |
| "learning_rate": 0.7, |
| "n_eval_episodes": 100, |
| "eval_seed": [ |
| 16, 54, 165, 177, 191, 191, 120, 80, 149, 178, 48, 38, 6, 125, 174, |
| 73, 50, 172, 100, 148, 146, 6, 25, 40, 68, 148, 49, 167, 9, 97, |
| 164, 176, 61, 7, 54, 55, 161, 131, 184, 51, 170, 12, 120, 113, |
| 95, 126, 51, 98, 36, 135, 54, 82, 45, 95, 89, 59, 95, 124, 9, |
| 113, 58, 85, 51, 134, 121, 169, 105, 21, 30, 11, 50, 65, 12, 43, |
| 82, 145, 152, 97, 106, 55, 31, 85, 38, 112, 102, 168, 123, 97, |
| 21, 83, 158, 26, 80, 63, 5, 81, 32, 11, 28, 148, |
| ], |
| "env_id": "Taxi-v3", |
| "max_steps": 99, |
| "gamma": 0.95, |
| "max_epsilon": 1.0, |
| "min_epsilon": 0.05, |
| "decay_rate": 0.005, |
| } |
| qtable = train(env=env, qtable=qtable, **{k: params[k] for k in [ |
| "n_training_episodes", |
| "learning_rate", |
| "min_epsilon", |
| "max_epsilon", |
| "decay_rate", |
| "gamma", |
| "max_steps", |
| ]}) |
| model = {**params, "qtable": qtable} |
| repo_id = f"{USERNAME}/q-Taxi-v3" |
| url = push_to_hub(repo_id, model, env, OUTPUT_DIR / "q-Taxi-v3") |
| env.close() |
| return {"repo_id": repo_id, "url": url} |
|
|
|
|
| def main() -> None: |
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) |
| results = [run_frozenlake(), run_taxi()] |
| (OUTPUT_DIR / "pushed_repos.json").write_text(json.dumps(results, indent=2), encoding="utf-8") |
| print(json.dumps(results, indent=2)) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|