mshamrai's picture
fix: add empty Submitted
994f1ab
import os
import requests
import gradio as gr
import pandas as pd
from huggingface_hub import HfApi, hf_hub_download, snapshot_download
from huggingface_hub.repocard import metadata_load
from apscheduler.schedulers.background import BackgroundScheduler
from utils import *
DATASET_REPO_URL = "https://huggingface.co/datasets/mshamrai/rlc-leaderboard-data"
DATASET_REPO_ID = "mshamrai/rlc-leaderboard-data"
HF_TOKEN = os.environ.get("HF_TOKEN")
STUDENTS_SET = {"mshamrai", "Kolosok", "grinvolod", "ostap-khm", "elusivephantasm", "letaldir", "QuantBanana", "olehsamoilenko", "DmytroKhitro"}
block = gr.Blocks()
api = HfApi(token=HF_TOKEN)
# Containing the data
rl_envs = [
{
"rl_env_beautiful": "LunarLander-v2 🚀",
"rl_env": "LunarLander-v2",
"unit": "Unit 1",
"library": "stable-baselines3",
"min_result": 200,
},
{
"rl_env_beautiful": "Taxi-v3 🚖",
"rl_env": "Taxi-v3",
"unit": "Unit 2",
"library": "q-learning",
"min_result": 4,
},
{
"rl_env_beautiful": "SpaceInvadersNoFrameskip-v4 👾",
"rl_env": "SpaceInvadersNoFrameskip-v4",
"unit": "Unit 3",
"library": "stable-baselines3",
"min_result": 200,
},
{
"rl_env_beautiful": "CartPole-v1",
"rl_env": "CartPole-v1",
"unit": "Unit 4",
"library": "reinforce",
"min_result": 350,
},
{
"rl_env_beautiful": "Pixelcopter-PLE-v0",
"rl_env": "Pixelcopter-PLE-v0",
"unit": "Unit 4",
"library": "reinforce",
"min_result": 5,
},
{
"rl_env_beautiful": "ML-Agents Snowball Target ❄️",
"rl_env": "ML-Agents-SnowballTarget",
"unit": "Unit 5",
"library": "ml-agents",
"min_result": -100,
},
{
"rl_env_beautiful": "ML-Agents Pyramids 🏔️",
"rl_env": "ML-Agents-Pyramids",
"unit": "Unit 5",
"library": "ml-agents",
"min_result": -100,
},
{
"rl_env_beautiful": "Panda Reach Dense 🤖",
"rl_env": "PandaReachDense",
"unit": "Unit 6",
"library": "stable-baselines3",
"min_result": -3.5,
},
{
"rl_env_beautiful": "ML-Agents Soccer Twos ⚽",
"rl_env": "ML-Agents-SoccerTwos",
"unit": "Unit 7",
"library": "ml-agents",
"min_result": -100,
},
{
"rl_env_beautiful": "Doom Health Gathering Supreme",
"rl_env": "doom_health_gathering_supreme",
"unit": "Unit 8 PII",
"library": "sample-factory",
"min_result": 5,
}
]
def restart():
print("RESTART")
api.restart_space(repo_id="mshamrai/KAU-RL-Leaderboard")
def get_metadata(model_id):
try:
readme_path = hf_hub_download(model_id, filename="README.md", etag_timeout=180)
return metadata_load(readme_path)
except requests.exceptions.HTTPError:
# 404 README.md not found
return None
def parse_metrics_accuracy(meta):
if "model-index" not in meta:
return None
result = meta["model-index"][0]["results"]
metrics = result[0]["metrics"]
accuracy = metrics[0]["value"]
return accuracy
# We keep the worst case episode
def parse_rewards(accuracy):
default_std = -1000
default_reward=-1000
if accuracy != None:
accuracy = str(accuracy)
parsed = accuracy.split('+/-')
if len(parsed)>1:
mean_reward = float(parsed[0].strip())
std_reward = float(parsed[1].strip())
elif len(parsed)==1: #only mean reward
mean_reward = float(parsed[0].strip())
std_reward = float(0)
else:
mean_reward = float(default_std)
std_reward = float(default_reward)
else:
mean_reward = float(default_std)
std_reward = float(default_reward)
return mean_reward, std_reward
def get_user_models(hf_username, env_tag, lib_tag):
"""
List the Reinforcement Learning models
from user given environment and lib
:param hf_username: User HF username
:param env_tag: Environment tag
:param lib_tag: Library tag
"""
api = HfApi()
models = api.list_models(author=hf_username, filter=["reinforcement-learning", env_tag, lib_tag])
user_model_ids = [(x.modelId, (x.created_at or x.last_modified)) for x in models]
return user_model_ids
def get_user_sf_models(hf_username, env_tag, lib_tag):
models_sf = []
models = api.list_models(author=hf_username, filter=["reinforcement-learning", lib_tag])
user_model_ids = [(x.modelId, (x.created_at or x.last_modified)) for x in models]
for model, last_updated in user_model_ids:
meta = get_metadata(model)
if meta is None:
continue
result = meta["model-index"][0]["results"][0]["dataset"]["name"]
if result == env_tag:
models_sf.append((model, last_updated))
return models_sf
def calculate_best_result(user_model_ids):
"""
Calculate the best results of a unit
best_result = mean_reward - std_reward
:param user_model_ids: RL models of a user
"""
best_result = -1000
best_model_id = ""
best_last_updated = None
for model, last_updated in user_model_ids:
meta = get_metadata(model)
if meta is None:
continue
accuracy = parse_metrics_accuracy(meta)
mean_reward, std_reward = parse_rewards(accuracy)
result = mean_reward - std_reward
if result > best_result:
best_result = result
best_model_id = model
best_last_updated = last_updated
return best_result, best_model_id, best_last_updated
def get_model_ids(hf_username, rl_env):
if rl_env["rl_env"] == "PandaReachDense":
# Since Unit 6 can use PandaReachDense-v2 or v3
user_models = get_user_models(hf_username, "PandaReachDense-v3", rl_env["library"])
if len(user_models) == 0:
user_models = get_user_models(hf_username, "PandaReachDense-v2", rl_env["library"])
elif rl_env["rl_env"] != "doom_health_gathering_supreme":
user_models = get_user_models(hf_username, rl_env["rl_env"], rl_env["library"])
else:
user_models = get_user_sf_models(hf_username, rl_env["rl_env"], rl_env["library"])
# Calculate the best result and get the best_model_id
best_result, best_model_id, best_last_updated = calculate_best_result(user_models)
passed = best_result >= rl_env["min_result"]
return best_model_id, best_result, best_last_updated, passed
def update_leaderboard_dataset(rl_env, path):
# Get model ids associated with rl_env
model_info = []
for user_id in STUDENTS_SET:
model_info.append(get_model_ids(user_id, rl_env))
data = []
for model_id, result, updated, passed in model_info:
if model_id is None or model_id == "":
continue
user_id = model_id.split('/')[0]
row = {}
row["User"] = user_id
row["Model"] = model_id
row["Result"] = result
row["Submitted"] = updated
row["Passed"] = passed
data.append(row)
if not data:
return
ranked_dataframe = rank_dataframe(pd.DataFrame.from_records(data))
new_history = ranked_dataframe
file_path = path + "/" + rl_env["rl_env"] + ".csv"
new_history.to_csv(file_path, index=False)
def download_leaderboard_dataset():
path = snapshot_download(repo_id=DATASET_REPO_ID, repo_type="dataset")
return path
def get_data(rl_env, path) -> pd.DataFrame:
"""
Get data from rl_env
:return: data as a pandas DataFrame
"""
csv_path = path + "/" + rl_env + ".csv"
if not os.path.exists(csv_path):
return pd.DataFrame(columns=['Ranking', 'User', 'Model', 'Result', 'Submitted', 'Passed'])
data = pd.read_csv(csv_path)
for index, row in data.iterrows():
user_id = row["User"]
data.loc[index, "User"] = make_clickable_user(user_id)
model_id = row["Model"]
data.loc[index, "Model"] = make_clickable_model(model_id)
return data
def get_data_no_html(rl_env, path) -> pd.DataFrame:
"""
Get data from rl_env
:return: data as a pandas DataFrame
"""
csv_path = path + "/" + rl_env + ".csv"
data = pd.read_csv(csv_path)
return data
def rank_dataframe(dataframe):
if dataframe.empty:
return pd.DataFrame(columns=['User', 'Model', 'Result', 'Submitted', 'Passed'])
dataframe = dataframe.sort_values(by=['Result'], ascending=False)
if not 'Ranking' in dataframe.columns:
dataframe.insert(0, 'Ranking', [i for i in range(1,len(dataframe)+1)])
else:
dataframe['Ranking'] = [i for i in range(1,len(dataframe)+1)]
return dataframe
def run_update_dataset():
path_ = download_leaderboard_dataset()
for i in range(0, len(rl_envs)):
rl_env = rl_envs[i]
update_leaderboard_dataset(rl_env, path_)
api.upload_folder(
folder_path=path_,
repo_id="mshamrai/rlc-leaderboard-data",
repo_type="dataset",
commit_message="Update dataset")
run_update_dataset()
with block:
gr.Markdown(f"""
# 🏆 Reinforcement Learning Course Leaderboard 🏆
This leaderboard is for Kyiv Academic University students to see their results during the Hugging Face <a href="https://huggingface.co/learn/deep-rl-course/unit0/introduction?fw=pt">Deep Reinforcement Learning Course</a>.
### How are the results calculated?
We use **lower bound result to sort the models: mean_reward - std_reward.**
### I can't find my model 😭
The leaderboard is **updated every two hours** if you can't find your models, just wait for the next update.
""")
path_ = download_leaderboard_dataset()
for i in range(0, len(rl_envs)):
rl_env = rl_envs[i]
with gr.TabItem(rl_env["rl_env_beautiful"]) as rl_tab:
with gr.Row():
markdown = """
# {unit}
## {name_leaderboard}
""".format(name_leaderboard = rl_env["rl_env_beautiful"], unit=rl_env["unit"])
gr.Markdown(markdown)
with gr.Row():
gr_dataframe = gr.components.Dataframe(value=get_data(rl_env["rl_env"], path_), headers=["Ranking 🏆", "User 🤗", "Model id 🤖", "Result", "Submitted", "Passed"], datatype=["number", "markdown", "markdown", "number", "date", "bool"], row_count=(15, 'dynamic'))
"""
block.load(
download_leaderboard_dataset,
inputs=[],
outputs=[
grpath
],
)
"""
scheduler = BackgroundScheduler()
# Refresh every hour
#scheduler.add_job(func=run_update_dataset, trigger="interval", seconds=3600)
#scheduler.add_job(download_leaderboard_dataset, 'interval', seconds=3600)
#scheduler.add_job(run_update_dataset, 'interval', seconds=3600)
scheduler.add_job(restart, 'interval', seconds=10800)
scheduler.start()
block.launch()