| from __future__ import annotations |
|
|
| import json |
|
|
| import os |
| import re |
| from functools import reduce |
| from typing import Any |
|
|
| import pandas as pd |
| from datasets import load_dataset |
| from huggingface_hub import hf_hub_download |
| from huggingface_hub.repocard import metadata_load |
| from tqdm.autonotebook import tqdm |
|
|
| from envs import API, LEADERBOARD_CONFIG, MODEL_META, REPO_ID, RESULTS_REPO |
| from utils.model_size import get_model_parameters_memory |
|
|
| MODEL_CACHE = {} |
| TASKS_CONFIG = LEADERBOARD_CONFIG["tasks"] |
| BOARDS_CONFIG = LEADERBOARD_CONFIG["boards"] |
|
|
| TASKS = list(TASKS_CONFIG.keys()) |
| PRETTY_NAMES = { |
| "InstructionRetrieval": "Retrieval w/Instructions", |
| "PairClassification": "Pair Classification", |
| "BitextMining": "Bitext Mining", |
| } |
|
|
| TASK_TO_METRIC = {k: [v["metric"]] for k, v in TASKS_CONFIG.items()} |
| |
| TASK_TO_METRIC["STS"].append("cos_sim_spearman") |
| TASK_TO_METRIC["STS"].append("cosine_spearman") |
| TASK_TO_METRIC["Summarization"].append("cos_sim_spearman") |
| TASK_TO_METRIC["Summarization"].append("cosine_spearman") |
| TASK_TO_METRIC["PairClassification"].append("cos_sim_ap") |
| TASK_TO_METRIC["PairClassification"].append("cosine_ap") |
|
|
|
|
| EXTERNAL_MODELS = { |
| k for k, v in MODEL_META["model_meta"].items() if v.get("is_external", False) |
| } |
| EXTERNAL_MODEL_TO_LINK = { |
| k: v["link"] for k, v in MODEL_META["model_meta"].items() if v.get("link", False) |
| } |
| EXTERNAL_MODEL_TO_DIM = { |
| k: v["dim"] for k, v in MODEL_META["model_meta"].items() if v.get("dim", False) |
| } |
| EXTERNAL_MODEL_TO_SEQLEN = { |
| k: v["seq_len"] |
| for k, v in MODEL_META["model_meta"].items() |
| if v.get("seq_len", False) |
| } |
| EXTERNAL_MODEL_TO_SIZE = { |
| k: v["size"] for k, v in MODEL_META["model_meta"].items() if v.get("size", False) |
| } |
| PROPRIETARY_MODELS = { |
| k for k, v in MODEL_META["model_meta"].items() if v.get("is_proprietary", False) |
| } |
| TASK_DESCRIPTIONS = {k: v["task_description"] for k, v in TASKS_CONFIG.items()} |
| TASK_DESCRIPTIONS["Overall"] = "Overall performance across MTEB tasks." |
| SENTENCE_TRANSFORMERS_COMPATIBLE_MODELS = { |
| k |
| for k, v in MODEL_META["model_meta"].items() |
| if v.get("is_sentence_transformers_compatible", False) |
| } |
| MODELS_TO_SKIP = MODEL_META["models_to_skip"] |
| CROSS_ENCODERS = MODEL_META["cross_encoders"] |
| BI_ENCODERS = [ |
| k for k, _ in MODEL_META["model_meta"].items() if k not in CROSS_ENCODERS + ["bm25"] |
| ] |
| INSTRUCT_MODELS = { |
| k for k, v in MODEL_META["model_meta"].items() if v.get("uses_instruct", False) |
| } |
| NOINSTRUCT_MODELS = { |
| k for k, v in MODEL_META["model_meta"].items() if not v.get("uses_instruct", False) |
| } |
|
|
|
|
| TASK_TO_TASK_TYPE = {task_category: [] for task_category in TASKS} |
| for board_config in BOARDS_CONFIG.values(): |
| for task_category, task_list in board_config["tasks"].items(): |
| TASK_TO_TASK_TYPE[task_category].extend(task_list) |
|
|
|
|
| |
| |
| MODEL_INFOS = {} |
| |
| |
| |
|
|
|
|
| def add_rank(df: pd.DataFrame) -> pd.DataFrame: |
| cols_to_rank = [ |
| col |
| for col in df.columns |
| if col |
| not in [ |
| "Model", |
| "Model Size (Million Parameters)", |
| "Memory Usage (GB, fp32)", |
| "Embedding Dimensions", |
| "Max Tokens", |
| ] |
| ] |
| if len(cols_to_rank) == 1: |
| df.sort_values(cols_to_rank[0], ascending=False, inplace=True) |
| else: |
| df.insert( |
| len(df.columns) - len(cols_to_rank), |
| "Average", |
| df[cols_to_rank].mean(axis=1, skipna=False), |
| ) |
| df.sort_values("Average", ascending=False, inplace=True) |
| df.insert(0, "Rank", list(range(1, len(df) + 1))) |
| df = df.round(2) |
| |
| df.fillna("", inplace=True) |
| return df |
|
|
|
|
| def make_clickable_model(model_name: str, link: None | str = None) -> str: |
| if link is None: |
| link = "https://huggingface.co/" + model_name |
| |
| return f'<a target="_blank" style="text-decoration: underline" href="{link}">{model_name.split("/")[-1]}</a>' |
|
|
|
|
| def add_lang(examples): |
| if not (examples["eval_language"]) or (examples["eval_language"] == "default"): |
| examples["mteb_dataset_name_with_lang"] = examples["mteb_dataset_name"] |
| else: |
| examples["mteb_dataset_name_with_lang"] = ( |
| examples["mteb_dataset_name"] + f' ({examples["eval_language"]})' |
| ) |
| return examples |
|
|
|
|
| def norm(names: list[str]) -> set[str]: |
| return set([name.split()[0] for name in names]) |
|
|
|
|
| def add_task(examples): |
| |
| task_name = examples["mteb_dataset_name"] |
| task_type = None |
| for task_category, task_list in TASK_TO_TASK_TYPE.items(): |
| if task_name in norm(task_list): |
| task_type = task_category |
| break |
| if task_type is not None: |
| examples["mteb_task"] = task_type |
| else: |
| print("WARNING: Task not found for dataset", examples["mteb_dataset_name"]) |
| examples["mteb_task"] = "Unknown" |
| return examples |
|
|
|
|
| def filter_metric_external(x, task, metrics) -> bool: |
| |
| if x["mteb_dataset_name"] in ["LEMBNeedleRetrieval", "LEMBPasskeyRetrieval"]: |
| return bool(x["mteb_task"] == task and x["metric"] == "ndcg_at_1") |
| else: |
| return bool(x["mteb_task"] == task and x["metric"] in metrics) |
|
|
|
|
| def filter_metric_fetched(name: str, metric: str, expected_metrics) -> bool: |
| |
| return bool( |
| metric == "ndcg_at_1" |
| if name in ["LEMBNeedleRetrieval", "LEMBPasskeyRetrieval"] |
| else metric in expected_metrics |
| ) |
|
|
|
|
| def get_dim_seq_size(model): |
| siblings = model.siblings or [] |
| filenames = [sib.rfilename for sib in siblings] |
| dim, seq = "", "" |
| for filename in filenames: |
| if re.match("\d+_Pooling/config.json", filename): |
| st_config_path = hf_hub_download(model.modelId, filename=filename) |
| dim = json.load(open(st_config_path)).get("word_embedding_dimension", "") |
| break |
| for filename in filenames: |
| if re.match("\d+_Dense/config.json", filename): |
| st_config_path = hf_hub_download(model.modelId, filename=filename) |
| dim = json.load(open(st_config_path)).get("out_features", dim) |
| if "config.json" in filenames: |
| config_path = hf_hub_download(model.modelId, filename="config.json") |
| config = json.load(open(config_path)) |
| if not dim: |
| dim = config.get( |
| "hidden_dim", config.get("hidden_size", config.get("d_model", "")) |
| ) |
| seq = config.get( |
| "n_positions", |
| config.get( |
| "max_position_embeddings", |
| config.get("n_ctx", config.get("seq_length", "")), |
| ), |
| ) |
|
|
| if dim == "" or seq == "": |
| raise Exception(f"Could not find dim or seq for model {model.modelId}") |
|
|
| |
| parameters, memory = get_model_parameters_memory(model) |
| return dim, seq, parameters, memory |
|
|
|
|
| def get_external_model_results(): |
| if os.path.exists("EXTERNAL_MODEL_RESULTS.json"): |
| with open("EXTERNAL_MODEL_RESULTS.json") as f: |
| EXTERNAL_MODEL_RESULTS = json.load(f) |
| |
| models_to_run = [] |
| for model in EXTERNAL_MODELS: |
| if model not in EXTERNAL_MODEL_RESULTS: |
| models_to_run.append(model) |
| EXTERNAL_MODEL_RESULTS[model] = { |
| k: {v[0]: []} for k, v in TASK_TO_METRIC.items() |
| } |
|
|
| |
| |
| else: |
| EXTERNAL_MODEL_RESULTS = { |
| model: {k: {v[0]: []} for k, v in TASK_TO_METRIC.items()} |
| for model in EXTERNAL_MODELS |
| } |
| models_to_run = EXTERNAL_MODELS |
|
|
| pbar = tqdm(models_to_run, desc="Fetching external model results") |
| for model in pbar: |
| pbar.set_description(f"Fetching external model results for {model!r}") |
| ds = load_dataset( |
| RESULTS_REPO, |
| model, |
| trust_remote_code=True, |
| download_mode="force_redownload", |
| verification_mode="no_checks", |
| ) |
| ds = ds.map(add_lang) |
| ds = ds.map(add_task) |
| base_dict = { |
| "Model": make_clickable_model( |
| model, |
| link=EXTERNAL_MODEL_TO_LINK.get( |
| model, f"https://huggingface.co/spaces/{REPO_ID}" |
| ), |
| ) |
| } |
|
|
| for task, metrics in TASK_TO_METRIC.items(): |
| ds_dict = ds.filter(lambda x: filter_metric_external(x, task, metrics))[ |
| "test" |
| ].to_dict() |
| ds_dict = { |
| k: round(v, 2) |
| for k, v in zip( |
| ds_dict["mteb_dataset_name_with_lang"], ds_dict["score"] |
| ) |
| } |
| |
| EXTERNAL_MODEL_RESULTS[model][task][metrics[0]].append( |
| {**base_dict, **ds_dict} |
| ) |
|
|
| |
| with open("EXTERNAL_MODEL_RESULTS.json", "w") as f: |
| json.dump(EXTERNAL_MODEL_RESULTS, f, indent=4) |
|
|
| return EXTERNAL_MODEL_RESULTS |
|
|
|
|
| def download_or_use_cache(modelId: str): |
| global MODEL_CACHE |
| if modelId in MODEL_CACHE: |
| return MODEL_CACHE[modelId] |
| try: |
| readme_path = hf_hub_download(modelId, filename="README.md", etag_timeout=30) |
| except Exception: |
| print(f"ERROR: Could not fetch metadata for {modelId}, trying again") |
| readme_path = hf_hub_download(modelId, filename="README.md", etag_timeout=30) |
| meta = metadata_load(readme_path) |
| MODEL_CACHE[modelId] = meta |
| return meta |
|
|
|
|
| def get_mteb_data( |
| tasks: list = ["Clustering"], |
| langs: list = [], |
| datasets: list = [], |
| fillna: bool = True, |
| add_emb_dim: bool = True, |
| task_to_metric: dict = TASK_TO_METRIC, |
| rank: bool = True, |
| ) -> pd.DataFrame: |
| global MODEL_INFOS |
|
|
| with open("EXTERNAL_MODEL_RESULTS.json", "r") as f: |
| external_model_results = json.load(f) |
|
|
| api = API |
| models = list(api.list_models(filter="mteb", full=True)) |
| |
| if "MLSUMClusteringP2P (fr)" in datasets: |
| datasets.append("MLSUMClusteringP2P") |
| if "MLSUMClusteringS2S (fr)" in datasets: |
| datasets.append("MLSUMClusteringS2S") |
| if "PawsXPairClassification (fr)" in datasets: |
| datasets.append("PawsX (fr)") |
| |
| df_list = [] |
| for model in external_model_results: |
| results_list = [] |
| for task in tasks: |
| |
| if task not in external_model_results[model]: |
| continue |
| results_list += external_model_results[model][task][task_to_metric[task][0]] |
|
|
| if len(datasets) > 0: |
| res = { |
| k: v |
| for d in results_list |
| for k, v in d.items() |
| if (k == "Model") or any([x in k for x in datasets]) |
| } |
| elif langs: |
| |
| langs_format = [f"({lang})" for lang in langs] |
| res = { |
| k: v |
| for d in results_list |
| for k, v in d.items() |
| if any([k.split(" ")[-1] in (k, x) for x in langs_format]) |
| } |
| else: |
| res = {k: v for d in results_list for k, v in d.items()} |
| |
| if len(res) > 1: |
| if add_emb_dim: |
| res["Model Size (Million Parameters)"] = EXTERNAL_MODEL_TO_SIZE.get( |
| model, "" |
| ) |
| res["Memory Usage (GB, fp32)"] = ( |
| round(res["Model Size (Million Parameters)"] * 1e6 * 4 / 1024**3, 2) |
| if res["Model Size (Million Parameters)"] != "" |
| else "" |
| ) |
| res["Embedding Dimensions"] = EXTERNAL_MODEL_TO_DIM.get(model, "") |
| res["Max Tokens"] = EXTERNAL_MODEL_TO_SEQLEN.get(model, "") |
| df_list.append(res) |
|
|
| pbar = tqdm(models, desc="Fetching model metadata") |
| for model in pbar: |
| if model.modelId in MODELS_TO_SKIP: |
| continue |
| pbar.set_description(f"Fetching {model.modelId!r} metadata") |
| meta = download_or_use_cache(model.modelId) |
| MODEL_INFOS[model.modelId] = {"metadata": meta} |
| if "model-index" not in meta: |
| continue |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if len(datasets) > 0: |
| task_results = [ |
| sub_res |
| for sub_res in meta["model-index"][0]["results"] |
| if (sub_res.get("task", {}).get("type", "") in tasks) |
| and any( |
| [x in sub_res.get("dataset", {}).get("name", "") for x in datasets] |
| ) |
| ] |
| elif langs: |
| task_results = [ |
| sub_res |
| for sub_res in meta["model-index"][0]["results"] |
| if (sub_res.get("task", {}).get("type", "") in tasks) |
| and ( |
| sub_res.get("dataset", {}).get("config", "default") |
| in ("default", *langs) |
| ) |
| ] |
| else: |
| task_results = [ |
| sub_res |
| for sub_res in meta["model-index"][0]["results"] |
| if (sub_res.get("task", {}).get("type", "") in tasks) |
| ] |
| try: |
| out = [ |
| { |
| res["dataset"]["name"].replace("MTEB ", ""): [ |
| round(score["value"], 2) |
| for score in res["metrics"] |
| if filter_metric_fetched( |
| res["dataset"]["name"].replace("MTEB ", ""), |
| score["type"], |
| task_to_metric.get(res["task"]["type"]), |
| ) |
| ][0] |
| } |
| for res in task_results |
| ] |
| except Exception as e: |
| print("ERROR", model.modelId, e) |
| continue |
| out = {k: v for d in out for k, v in d.items()} |
| out["Model"] = make_clickable_model(model.modelId) |
| |
| if len(out) > 1: |
| if add_emb_dim: |
| |
| try: |
| MODEL_INFOS[model.modelId]["dim_seq_size"] = list(get_dim_seq_size(model)) |
| except: |
| name_without_org = model.modelId.split("/")[-1] |
| |
| |
| |
| MODEL_INFOS[model.modelId]["dim_seq_size"] = ( |
| EXTERNAL_MODEL_TO_DIM.get(name_without_org, ""), |
| EXTERNAL_MODEL_TO_SEQLEN.get(name_without_org, ""), |
| EXTERNAL_MODEL_TO_SIZE.get(name_without_org, ""), |
| round( |
| EXTERNAL_MODEL_TO_SIZE[name_without_org] |
| * 1e6 |
| * 4 |
| / 1024**3, |
| 2, |
| ) |
| if name_without_org in EXTERNAL_MODEL_TO_SIZE |
| else "", |
| ) |
| ( |
| out["Embedding Dimensions"], |
| out["Max Tokens"], |
| out["Model Size (Million Parameters)"], |
| out["Memory Usage (GB, fp32)"], |
| ) = tuple(MODEL_INFOS[model.modelId]["dim_seq_size"]) |
| df_list.append(out) |
| model_siblings = model.siblings or [] |
| if ( |
| model.library_name == "sentence-transformers" |
| or "sentence-transformers" in model.tags |
| or "modules.json" in {file.rfilename for file in model_siblings} |
| ): |
| SENTENCE_TRANSFORMERS_COMPATIBLE_MODELS.add(out["Model"]) |
|
|
| |
| |
| |
|
|
| df = pd.DataFrame(df_list) |
| |
| |
| df = df.groupby("Model", as_index=False).first() |
| |
| cols = sorted(list(df.columns)) |
| base_columns = [ |
| "Model", |
| "Model Size (Million Parameters)", |
| "Memory Usage (GB, fp32)", |
| "Embedding Dimensions", |
| "Max Tokens", |
| ] |
| if len(datasets) > 0: |
| |
| |
| if ("MLSUMClusteringP2P (fr)" in datasets) and ("MLSUMClusteringP2P" in cols): |
| df["MLSUMClusteringP2P (fr)"] = df["MLSUMClusteringP2P (fr)"].fillna( |
| df["MLSUMClusteringP2P"] |
| ) |
| datasets.remove("MLSUMClusteringP2P") |
| if ("MLSUMClusteringS2S (fr)" in datasets) and ("MLSUMClusteringS2S" in cols): |
| df["MLSUMClusteringS2S (fr)"] = df["MLSUMClusteringS2S (fr)"].fillna( |
| df["MLSUMClusteringS2S"] |
| ) |
| datasets.remove("MLSUMClusteringS2S") |
| if ("PawsXPairClassification (fr)" in datasets) and ("PawsX (fr)" in cols): |
| |
| if "PawsXPairClassification (fr)" not in cols: |
| df["PawsXPairClassification (fr)"] = df["PawsX (fr)"] |
| else: |
| df["PawsXPairClassification (fr)"] = df[ |
| "PawsXPairClassification (fr)" |
| ].fillna(df["PawsX (fr)"]) |
| |
| datasets.remove("PawsX (fr)") |
| cols.remove("PawsX (fr)") |
| df.drop(columns=["PawsX (fr)"], inplace=True) |
|
|
| |
| cols = [col for col in cols if col in base_columns + datasets] |
| i = 0 |
| for column in base_columns: |
| if column in cols: |
| cols.insert(i, cols.pop(cols.index(column))) |
| i += 1 |
| df = df[cols] |
| if rank: |
| df = add_rank(df) |
| if fillna: |
| df.fillna("", inplace=True) |
| return df |
|
|
|
|
| |
| |
| def get_mteb_average(task_dict: dict) -> tuple[Any, dict]: |
| all_tasks = reduce(lambda x, y: x + y, task_dict.values()) |
| DATA_OVERALL = get_mteb_data( |
| tasks=list(task_dict.keys()), |
| datasets=all_tasks, |
| fillna=False, |
| add_emb_dim=True, |
| rank=False, |
| ) |
| |
| |
| DATA_OVERALL.insert( |
| 1, |
| f"Average ({len(all_tasks)} datasets)", |
| DATA_OVERALL[all_tasks].mean(axis=1, skipna=False), |
| ) |
|
|
| for i, (task_category, task_category_list) in enumerate(task_dict.items()): |
| DATA_OVERALL.insert( |
| i + 2, |
| f"{task_category} Average ({len(task_category_list)} datasets)", |
| DATA_OVERALL[task_category_list].mean(axis=1, skipna=False), |
| ) |
| DATA_OVERALL.sort_values( |
| f"Average ({len(all_tasks)} datasets)", ascending=False, inplace=True |
| ) |
| |
| DATA_OVERALL.insert(0, "Rank", list(range(1, len(DATA_OVERALL) + 1))) |
|
|
| DATA_OVERALL = DATA_OVERALL.round(2) |
|
|
| DATA_TASKS = {} |
| for task_category, task_category_list in task_dict.items(): |
| DATA_TASKS[task_category] = add_rank( |
| DATA_OVERALL[ |
| ["Model", "Model Size (Million Parameters)", "Memory Usage (GB, fp32)"] + task_category_list |
| ] |
| ) |
| DATA_TASKS[task_category] = DATA_TASKS[task_category][ |
| DATA_TASKS[task_category].iloc[:, 4:].ne("").any(axis=1) |
| ] |
|
|
| |
| DATA_OVERALL.fillna("", inplace=True) |
|
|
| data_overall_rows = [ |
| "Rank", |
| "Model", |
| "Model Size (Million Parameters)", |
| "Memory Usage (GB, fp32)", |
| "Embedding Dimensions", |
| "Max Tokens", |
| f"Average ({len(all_tasks)} datasets)", |
| ] |
| for task_category, task_category_list in task_dict.items(): |
| data_overall_rows.append( |
| f"{task_category} Average ({len(task_category_list)} datasets)" |
| ) |
|
|
| DATA_OVERALL = DATA_OVERALL[data_overall_rows] |
| DATA_OVERALL = DATA_OVERALL[DATA_OVERALL.iloc[:, 5:].ne("").any(axis=1)] |
|
|
| return DATA_OVERALL, DATA_TASKS |
|
|
|
|
| def refresh_leaderboard() -> tuple[list, dict]: |
| """ |
| The main code to refresh and calculate results for the leaderboard. It does this by fetching the results from the |
| external models and the models in the leaderboard, then calculating the average scores for each task category. |
| """ |
|
|
| |
| |
| get_external_model_results() |
|
|
| boards_data = {} |
| all_data_tasks = [] |
| pbar_tasks = tqdm( |
| BOARDS_CONFIG.items(), |
| desc="Fetching leaderboard results for ???", |
| total=len(BOARDS_CONFIG), |
| leave=True, |
| ) |
| for board, board_config in pbar_tasks: |
| boards_data[board] = {"data_overall": None, "data_tasks": {}} |
| pbar_tasks.set_description(f"Fetching leaderboard results for {board!r}") |
| pbar_tasks.refresh() |
| if board_config["has_overall"]: |
| data_overall, data_tasks = get_mteb_average(board_config["tasks"]) |
| boards_data[board]["data_overall"] = data_overall |
| boards_data[board]["data_tasks"] = data_tasks |
| all_data_tasks.extend(data_tasks.values()) |
| else: |
| for task_category, task_category_list in board_config["tasks"].items(): |
| data_task_category = get_mteb_data( |
| tasks=[task_category], datasets=task_category_list |
| ) |
| data_task_category.drop( |
| columns=["Embedding Dimensions", "Max Tokens"], inplace=True |
| ) |
| boards_data[board]["data_tasks"][task_category] = data_task_category |
| all_data_tasks.append(data_task_category) |
|
|
| return all_data_tasks, boards_data |
|
|
|
|
| def write_out_results(item: dict, item_name: str) -> None: |
| """ |
| Due to their complex structure, let's recursively create subfolders until we reach the end |
| of the item and then save the DFs as jsonl files |
| |
| Args: |
| item: The item to save |
| item_name: The name of the item |
| """ |
| main_folder = item_name |
|
|
| if isinstance(item, list): |
| for i, v in enumerate(item): |
| write_out_results(v, os.path.join(main_folder, str(i))) |
|
|
| elif isinstance(item, dict): |
| for key, value in item.items(): |
| if isinstance(value, dict): |
| write_out_results(value, os.path.join(main_folder, key)) |
| elif isinstance(value, list): |
| for i, v in enumerate(value): |
| write_out_results(v, os.path.join(main_folder, key + str(i))) |
| else: |
| write_out_results(value, os.path.join(main_folder, key)) |
|
|
| elif isinstance(item, pd.DataFrame): |
| print(f"Saving {main_folder} to {main_folder}/default.jsonl") |
| os.makedirs(main_folder, exist_ok=True) |
|
|
| if "index" not in item.columns: |
| item.reset_index(inplace=True) |
| item.to_json(f"{main_folder}/default.jsonl", orient="records", lines=True) |
|
|
| elif isinstance(item, str): |
| print(f"Saving {main_folder} to {main_folder}/default.txt") |
| os.makedirs(main_folder, exist_ok=True) |
| with open(f"{main_folder}/default.txt", "w") as f: |
| f.write(item) |
|
|
| elif item is None: |
| |
| print(f"Saving {main_folder} to {main_folder}/default.txt") |
| os.makedirs(main_folder, exist_ok=True) |
| with open(f"{main_folder}/default.txt", "w") as f: |
| f.write("") |
|
|
| else: |
| raise Exception(f"Unknown type {type(item)}") |
|
|
|
|
| def load_results(data_path: str) -> list | dict | pd.DataFrame | str | None: |
| """ |
| Do the reverse of `write_out_results` to reconstruct the item |
| |
| Args: |
| data_path: The path to the data to load |
| |
| Returns: |
| The loaded data |
| """ |
| if os.path.isdir(data_path): |
| |
| all_files_in_dir = list(os.listdir(data_path)) |
| if set(all_files_in_dir) == set([str(i) for i in range(len(all_files_in_dir))]): |
| |
| return [ |
| load_results(os.path.join(data_path, str(i))) |
| for i in range(len(os.listdir(data_path))) |
| ] |
| else: |
| if len(all_files_in_dir) == 1: |
| file_name = all_files_in_dir[0] |
| if file_name == "default.jsonl": |
| return load_results(os.path.join(data_path, file_name)) |
| else: |
| return {file_name: load_results(os.path.join(data_path, file_name))} |
| else: |
| return { |
| file_name: load_results(os.path.join(data_path, file_name)) |
| for file_name in all_files_in_dir |
| } |
|
|
| elif data_path.endswith(".jsonl"): |
| df = pd.read_json(data_path, orient="records", lines=True) |
| if "index" in df.columns: |
| df = df.set_index("index") |
| if "Memory Usage (GB, fp32)" in df.columns: |
| df["Memory Usage (GB, fp32)"] = df["Memory Usage (GB, fp32)"].map(lambda value: round(value, 2) if isinstance(value, float) else value) |
| return df |
|
|
| else: |
| with open(data_path, "r") as f: |
| data = f.read() |
| if data == "": |
| return None |
| else: |
| return data |
|
|
|
|
| if __name__ == "__main__": |
| print("Refreshing leaderboard statistics...") |
| all_data_tasks, boards_data = refresh_leaderboard() |
| print("Done calculating, saving...") |
| |
| |
| write_out_results(all_data_tasks, "all_data_tasks") |
| write_out_results(boards_data, "boards_data") |
|
|
| |
| |
| |
| print("Done saving results!") |
|
|