import os import requests import gradio as gr import pandas as pd from huggingface_hub import HfApi, hf_hub_download, snapshot_download from huggingface_hub.repocard import metadata_load from apscheduler.schedulers.background import BackgroundScheduler from utils import * DATASET_REPO_URL = "https://huggingface.co/datasets/mshamrai/rlc-leaderboard-data" DATASET_REPO_ID = "mshamrai/rlc-leaderboard-data" HF_TOKEN = os.environ.get("HF_TOKEN") STUDENTS_SET = {"mshamrai", "Kolosok", "grinvolod", "ostap-khm", "elusivephantasm", "letaldir", "QuantBanana", "olehsamoilenko", "DmytroKhitro"} block = gr.Blocks() api = HfApi(token=HF_TOKEN) # Containing the data rl_envs = [ { "rl_env_beautiful": "LunarLander-v2 🚀", "rl_env": "LunarLander-v2", "unit": "Unit 1", "library": "stable-baselines3", "min_result": 200, }, { "rl_env_beautiful": "Taxi-v3 🚖", "rl_env": "Taxi-v3", "unit": "Unit 2", "library": "q-learning", "min_result": 4, }, { "rl_env_beautiful": "SpaceInvadersNoFrameskip-v4 👾", "rl_env": "SpaceInvadersNoFrameskip-v4", "unit": "Unit 3", "library": "stable-baselines3", "min_result": 200, }, { "rl_env_beautiful": "CartPole-v1", "rl_env": "CartPole-v1", "unit": "Unit 4", "library": "reinforce", "min_result": 350, }, { "rl_env_beautiful": "Pixelcopter-PLE-v0", "rl_env": "Pixelcopter-PLE-v0", "unit": "Unit 4", "library": "reinforce", "min_result": 5, }, { "rl_env_beautiful": "ML-Agents Snowball Target ❄️", "rl_env": "ML-Agents-SnowballTarget", "unit": "Unit 5", "library": "ml-agents", "min_result": -100, }, { "rl_env_beautiful": "ML-Agents Pyramids 🏔️", "rl_env": "ML-Agents-Pyramids", "unit": "Unit 5", "library": "ml-agents", "min_result": -100, }, { "rl_env_beautiful": "Panda Reach Dense 🤖", "rl_env": "PandaReachDense", "unit": "Unit 6", "library": "stable-baselines3", "min_result": -3.5, }, { "rl_env_beautiful": "ML-Agents Soccer Twos ⚽", "rl_env": "ML-Agents-SoccerTwos", "unit": "Unit 7", "library": "ml-agents", "min_result": -100, }, { "rl_env_beautiful": "Doom Health Gathering Supreme", "rl_env": "doom_health_gathering_supreme", "unit": "Unit 8 PII", "library": "sample-factory", "min_result": 5, } ] def restart(): print("RESTART") api.restart_space(repo_id="mshamrai/KAU-RL-Leaderboard") def get_metadata(model_id): try: readme_path = hf_hub_download(model_id, filename="README.md", etag_timeout=180) return metadata_load(readme_path) except requests.exceptions.HTTPError: # 404 README.md not found return None def parse_metrics_accuracy(meta): if "model-index" not in meta: return None result = meta["model-index"][0]["results"] metrics = result[0]["metrics"] accuracy = metrics[0]["value"] return accuracy # We keep the worst case episode def parse_rewards(accuracy): default_std = -1000 default_reward=-1000 if accuracy != None: accuracy = str(accuracy) parsed = accuracy.split('+/-') if len(parsed)>1: mean_reward = float(parsed[0].strip()) std_reward = float(parsed[1].strip()) elif len(parsed)==1: #only mean reward mean_reward = float(parsed[0].strip()) std_reward = float(0) else: mean_reward = float(default_std) std_reward = float(default_reward) else: mean_reward = float(default_std) std_reward = float(default_reward) return mean_reward, std_reward def get_user_models(hf_username, env_tag, lib_tag): """ List the Reinforcement Learning models from user given environment and lib :param hf_username: User HF username :param env_tag: Environment tag :param lib_tag: Library tag """ api = HfApi() models = api.list_models(author=hf_username, filter=["reinforcement-learning", env_tag, lib_tag]) user_model_ids = [(x.modelId, (x.created_at or x.last_modified)) for x in models] return user_model_ids def get_user_sf_models(hf_username, env_tag, lib_tag): models_sf = [] models = api.list_models(author=hf_username, filter=["reinforcement-learning", lib_tag]) user_model_ids = [(x.modelId, (x.created_at or x.last_modified)) for x in models] for model, last_updated in user_model_ids: meta = get_metadata(model) if meta is None: continue result = meta["model-index"][0]["results"][0]["dataset"]["name"] if result == env_tag: models_sf.append((model, last_updated)) return models_sf def calculate_best_result(user_model_ids): """ Calculate the best results of a unit best_result = mean_reward - std_reward :param user_model_ids: RL models of a user """ best_result = -1000 best_model_id = "" best_last_updated = None for model, last_updated in user_model_ids: meta = get_metadata(model) if meta is None: continue accuracy = parse_metrics_accuracy(meta) mean_reward, std_reward = parse_rewards(accuracy) result = mean_reward - std_reward if result > best_result: best_result = result best_model_id = model best_last_updated = last_updated return best_result, best_model_id, best_last_updated def get_model_ids(hf_username, rl_env): if rl_env["rl_env"] == "PandaReachDense": # Since Unit 6 can use PandaReachDense-v2 or v3 user_models = get_user_models(hf_username, "PandaReachDense-v3", rl_env["library"]) if len(user_models) == 0: user_models = get_user_models(hf_username, "PandaReachDense-v2", rl_env["library"]) elif rl_env["rl_env"] != "doom_health_gathering_supreme": user_models = get_user_models(hf_username, rl_env["rl_env"], rl_env["library"]) else: user_models = get_user_sf_models(hf_username, rl_env["rl_env"], rl_env["library"]) # Calculate the best result and get the best_model_id best_result, best_model_id, best_last_updated = calculate_best_result(user_models) passed = best_result >= rl_env["min_result"] return best_model_id, best_result, best_last_updated, passed def update_leaderboard_dataset(rl_env, path): # Get model ids associated with rl_env model_info = [] for user_id in STUDENTS_SET: model_info.append(get_model_ids(user_id, rl_env)) data = [] for model_id, result, updated, passed in model_info: if model_id is None or model_id == "": continue user_id = model_id.split('/')[0] row = {} row["User"] = user_id row["Model"] = model_id row["Result"] = result row["Submitted"] = updated row["Passed"] = passed data.append(row) if not data: return ranked_dataframe = rank_dataframe(pd.DataFrame.from_records(data)) new_history = ranked_dataframe file_path = path + "/" + rl_env["rl_env"] + ".csv" new_history.to_csv(file_path, index=False) def download_leaderboard_dataset(): path = snapshot_download(repo_id=DATASET_REPO_ID, repo_type="dataset") return path def get_data(rl_env, path) -> pd.DataFrame: """ Get data from rl_env :return: data as a pandas DataFrame """ csv_path = path + "/" + rl_env + ".csv" if not os.path.exists(csv_path): return pd.DataFrame(columns=['Ranking', 'User', 'Model', 'Result', 'Submitted', 'Passed']) data = pd.read_csv(csv_path) for index, row in data.iterrows(): user_id = row["User"] data.loc[index, "User"] = make_clickable_user(user_id) model_id = row["Model"] data.loc[index, "Model"] = make_clickable_model(model_id) return data def get_data_no_html(rl_env, path) -> pd.DataFrame: """ Get data from rl_env :return: data as a pandas DataFrame """ csv_path = path + "/" + rl_env + ".csv" data = pd.read_csv(csv_path) return data def rank_dataframe(dataframe): if dataframe.empty: return pd.DataFrame(columns=['User', 'Model', 'Result', 'Submitted', 'Passed']) dataframe = dataframe.sort_values(by=['Result'], ascending=False) if not 'Ranking' in dataframe.columns: dataframe.insert(0, 'Ranking', [i for i in range(1,len(dataframe)+1)]) else: dataframe['Ranking'] = [i for i in range(1,len(dataframe)+1)] return dataframe def run_update_dataset(): path_ = download_leaderboard_dataset() for i in range(0, len(rl_envs)): rl_env = rl_envs[i] update_leaderboard_dataset(rl_env, path_) api.upload_folder( folder_path=path_, repo_id="mshamrai/rlc-leaderboard-data", repo_type="dataset", commit_message="Update dataset") run_update_dataset() with block: gr.Markdown(f""" # 🏆 Reinforcement Learning Course Leaderboard 🏆 This leaderboard is for Kyiv Academic University students to see their results during the Hugging Face Deep Reinforcement Learning Course. ### How are the results calculated? We use **lower bound result to sort the models: mean_reward - std_reward.** ### I can't find my model 😭 The leaderboard is **updated every two hours** if you can't find your models, just wait for the next update. """) path_ = download_leaderboard_dataset() for i in range(0, len(rl_envs)): rl_env = rl_envs[i] with gr.TabItem(rl_env["rl_env_beautiful"]) as rl_tab: with gr.Row(): markdown = """ # {unit} ## {name_leaderboard} """.format(name_leaderboard = rl_env["rl_env_beautiful"], unit=rl_env["unit"]) gr.Markdown(markdown) with gr.Row(): gr_dataframe = gr.components.Dataframe(value=get_data(rl_env["rl_env"], path_), headers=["Ranking 🏆", "User 🤗", "Model id 🤖", "Result", "Submitted", "Passed"], datatype=["number", "markdown", "markdown", "number", "date", "bool"], row_count=(15, 'dynamic')) """ block.load( download_leaderboard_dataset, inputs=[], outputs=[ grpath ], ) """ scheduler = BackgroundScheduler() # Refresh every hour #scheduler.add_job(func=run_update_dataset, trigger="interval", seconds=3600) #scheduler.add_job(download_leaderboard_dataset, 'interval', seconds=3600) #scheduler.add_job(run_update_dataset, 'interval', seconds=3600) scheduler.add_job(restart, 'interval', seconds=10800) scheduler.start() block.launch()