TemryL's picture
update leaderboard
8d8ba34
import os
import glob
import json
import dateutil
import numpy as np
from dataclasses import dataclass
from src.display.formatting import make_clickable_model
from src.display.utils import AutoEvalColumn, ModelType, Tasks, Precision, WeightType, generate_column_name
from src.submission.check_validity import is_model_on_hub
@dataclass
class EvalResult:
"""Represents one full evaluation. Built from a combination of the result and request file for a given run.
"""
eval_name: str # org_model_precision_feature-set_nb-shots (uid)
full_model: str # org/model (path on hub)
org: str
model: str
revision: str # commit hash, "" if main
results: dict
raw_data: dict
nb_shots: int
feature_set: str
precision: Precision = Precision.Unknown
model_type: ModelType = ModelType.Unknown # Pretrained, fine tuned, ...
weight_type: WeightType = WeightType.Original # Original or Adapter
architecture: str = "Unknown"
license: str = "?"
likes: int = 0
num_params: int = 0
date: str = "" # submission date of request file
still_on_hub: bool = False
@classmethod
def init_from_json_file(self, json_filepath):
"""Inits the result from the specific model result file"""
with open(json_filepath) as fp:
data = json.load(fp)
# Get config
config = data.get("config")
full_model = config.get("model")
org = full_model.split("/")[0]
model = full_model.split("/")[1]
precision = Precision.from_str(config.get("precision"))
revision = config.get("revision", "")
nb_shots = config.get("nb_shots", None)
feature_set = config.get("feature_set", None)
model_type = ModelType.from_str(config.get("model_type", ""))
weight_type = WeightType[config.get("weight_type", "Original")]
license = config.get("license", "?")
likes = config.get("likes", 0)
num_params = config.get("params", 0)
date = config.get("submitted_time", "")
# Check if model is still on hub
still_on_hub, _, model_config = is_model_on_hub(
full_model, revision, trust_remote_code=True, test_tokenizer=False, token=os.environ.get("TOKEN")
)
architecture = "?"
if model_config is not None:
architectures = getattr(model_config, "architectures", None)
if architectures:
architecture = ";".join(architectures)
results = {}
for task in Tasks:
task = task.value
mean = data["results"].get(task.phenotype, {}).get("metrics", {}).get("_".join(["mean", task.metric]), None)
lower = data["results"].get(task.phenotype, {}).get("metrics", {}).get("_".join(["lower", task.metric]), None)
upper = data["results"].get(task.phenotype, {}).get("metrics", {}).get("_".join(["upper", task.metric]), None)
formated_score = f"{mean:.2f} ({lower:.2f}-{upper:.2f})" if mean is not None else None
results["_".join([task.phenotype, task.metric])] = formated_score
return self(
eval_name=f"{org}_{model}_{precision.value.name}_{feature_set}_{nb_shots}",
full_model=full_model,
org=full_model.split("/")[0],
model=full_model.split("/")[1],
results=results,
raw_data=data,
nb_shots=nb_shots,
feature_set=feature_set,
precision=precision,
revision=revision,
still_on_hub=still_on_hub,
architecture=architecture,
model_type=model_type,
weight_type=weight_type,
license=license,
likes=likes,
num_params=num_params,
date=date
)
def to_dict(self):
"""Converts the Eval Result to a dict compatible with our dataframe display"""
average_auroc = np.mean(np.array([d["metrics"]["mean_auroc"] for d in self.raw_data["results"].values() if "mean_auroc" in d["metrics"].keys()]))
average_auprc = np.mean(np.array([d["metrics"]["mean_auprc"] for d in self.raw_data["results"].values() if "mean_auprc" in d["metrics"].keys()]))
data_dict = {
"eval_name": self.eval_name, # not a column, just a save name,
AutoEvalColumn.feature_set.name: self.feature_set,
AutoEvalColumn.nb_shots.name: self.nb_shots,
AutoEvalColumn.precision.name: self.precision.value.name,
AutoEvalColumn.model_type.name: self.model_type.value.name,
AutoEvalColumn.model_type_symbol.name: self.model_type.value.symbol,
AutoEvalColumn.weight_type.name: self.weight_type.value.name,
AutoEvalColumn.architecture.name: self.architecture,
AutoEvalColumn.model.name: make_clickable_model(self.full_model),
AutoEvalColumn.revision.name: self.revision,
AutoEvalColumn.average_auroc.name: average_auroc,
AutoEvalColumn.average_auprc.name: average_auprc,
AutoEvalColumn.license.name: self.license,
AutoEvalColumn.likes.name: self.likes,
AutoEvalColumn.params.name: self.num_params,
AutoEvalColumn.still_on_hub.name: self.still_on_hub,
}
for task in Tasks:
data_dict[generate_column_name(task.value.phenotype, task.value.metric.upper())] = self.results["_".join([task.value.phenotype, task.value.metric])]
return data_dict
def get_request_file_for_model(requests_path, model_name, precision):
"""Selects the correct request file for a given model. Only keeps runs tagged as FINISHED"""
request_files = os.path.join(
requests_path,
f"{model_name}_eval_request_*.json",
)
request_files = glob.glob(request_files)
# Select correct request file (precision)
request_file = ""
request_files = sorted(request_files, reverse=True)
for tmp_request_file in request_files:
with open(tmp_request_file, "r") as f:
req_content = json.load(f)
if (
req_content["status"] in ["FINISHED"]
and req_content["precision"] == precision.split(".")[-1]
):
request_file = tmp_request_file
return request_file
def get_raw_eval_results(results_path: str) -> list[EvalResult]:
"""From the path of the results folder root, extract all needed info for results"""
model_result_filepaths = []
for root, _, files in os.walk(results_path):
# We should only have json files in model results
if len(files) == 0 or any([not f.endswith(".json") for f in files]):
continue
# Sort the files by date
try:
files.sort(key=lambda x: x.removesuffix(".json").removeprefix("results_")[:-7])
except dateutil.parser._parser.ParserError:
files = [files[-1]]
for file in files:
model_result_filepaths.append(os.path.join(root, file))
eval_results = {}
for model_result_filepath in model_result_filepaths:
# Creation of result
eval_result = EvalResult.init_from_json_file(model_result_filepath)
# Store results of same eval together
eval_name = eval_result.eval_name
if eval_name in eval_results.keys():
eval_results[eval_name].results.update({k: v for k, v in eval_result.results.items() if v is not None})
else:
eval_results[eval_name] = eval_result
results = []
for v in eval_results.values():
try:
v.to_dict() # we test if the dict version is complete
results.append(v)
except KeyError: # not all eval values present
continue
return results