import json import os from typing import Any, Dict import pandas as pd from huggingface_hub import HfApi, hf_hub_download, metadata_load from .dataset_handler import DATASETS_KEYWORDS, get_datasets_nickname BLOCKLIST = [ "rlhf_all", "Qwen2.5-Coder-7B-Instruct_lora_r16a32-java", "Qwen2.5-Coder-7B-Instruct_lora_r16a32-python", "Qwen2.5-Coder-7B-Instruct_lora_r16a32-C", "Qwen2.5-Coder-7B-Instruct_lora_r16a32-c_sharp", "CodeBERT-javascript", "Qwen2.5-Coder-1.5B-Instruct_lora_reasoning" ] USER = "TitanCAProject" class ModelHandler: def __init__(self, model_infos_path="model_infos.json"): self.api = HfApi() self.model_infos_path = model_infos_path self.model_infos = self._load_model_infos() def _load_model_infos(self) -> Dict: if os.path.exists(self.model_infos_path): with open(self.model_infos_path) as f: return json.load(f) return {} def _save_model_infos(self): with open(self.model_infos_path, "w") as f: json.dump(self.model_infos, f) def sanitize_model_name(self, model_name): return model_name.replace("/", "_").replace(".", "-thisisapoint-") def fuze_model_infos(self, model_name, results): for dataset, metrics in results.items(): if dataset not in self.model_infos[model_name]["results"].keys(): self.model_infos[model_name]["results"][dataset] = metrics else: continue def get_titan_data(self): models = self.api.list_models(author=USER) repositories = [model.modelId for model in models] # type: ignore for repo_id in repositories: org_name = repo_id.split("/")[0] if org_name in BLOCKLIST: continue files = [f for f in self.api.list_repo_files(repo_id) if f.endswith("metrics.json") or f == "results.json"] if len(files) == 0: continue else: for file in files: readme_path = hf_hub_download(repo_id, filename="README.md") meta = metadata_load(readme_path) try: result_path = hf_hub_download(repo_id, filename=file) with open(result_path) as f: results = json.load(f) # Handles the case where the model is both in baseline and outside of it # (prioritizes the non-baseline results) if repo_id in self.model_infos: self.fuze_model_infos(repo_id, results) self.model_infos[repo_id] = {"meta": meta, "results": results} except Exception as e: print(f"Error loading {repo_id} - {e}") continue # Compute the average of a metric for each model, def compute_averages(self, metric="f03"): model_res = {} if len(self.model_infos) > 0: for model in self.model_infos.keys(): res = self.model_infos[model]["results"] dataset_res = {} keywords = DATASETS_KEYWORDS for dataset in res.keys(): if not any(keyword in dataset for keyword in keywords): continue dataset_nickname = get_datasets_nickname(dataset) dataset_res[dataset_nickname] = res[dataset][metric] if isinstance(res[dataset], dict) else res[dataset] model_res[model] = dataset_res df = pd.DataFrame(model_res).T return df return pd.DataFrame() @staticmethod def add_rank(df: pd.DataFrame) -> pd.DataFrame: df.fillna(0.0, inplace=True) cols_to_rank = [ col for col in df.columns if col not in [ "Model", "Train\nSize", "Test\nSize", ] ] if len(cols_to_rank) == 1: df.sort_values(cols_to_rank[0], ascending=False, inplace=True) else: df.insert(len(df.columns) - len(cols_to_rank), "Average", df[cols_to_rank].mean(axis=1, skipna=False)) df.sort_values("Average", ascending=False, inplace=True) df.insert(0, "Rank", list(range(1, len(df) + 1))) # multiply values by 100 if they are floats and round to 1 decimal place for col in df.columns: if df[col].dtype == "float64" and df[col].max() <= 1.0: df[col] = df[col].apply(lambda x: round(x * 100, 1)) # Move cols_to_rank to the end of the DataFrame cols_to_rank.append("Average") df = df[[col for col in df if col not in cols_to_rank] + cols_to_rank] return df