Spaces:
Runtime error
Runtime error
| import os | |
| import glob | |
| import json | |
| import dateutil | |
| import numpy as np | |
| from dataclasses import dataclass | |
| from src.display.formatting import make_clickable_model | |
| from src.display.utils import AutoEvalColumn, ModelType, Tasks, Precision, WeightType, generate_column_name | |
| from src.submission.check_validity import is_model_on_hub | |
| class EvalResult: | |
| """Represents one full evaluation. Built from a combination of the result and request file for a given run. | |
| """ | |
| eval_name: str # org_model_precision_feature-set_nb-shots (uid) | |
| full_model: str # org/model (path on hub) | |
| org: str | |
| model: str | |
| revision: str # commit hash, "" if main | |
| results: dict | |
| raw_data: dict | |
| nb_shots: int | |
| feature_set: str | |
| precision: Precision = Precision.Unknown | |
| model_type: ModelType = ModelType.Unknown # Pretrained, fine tuned, ... | |
| weight_type: WeightType = WeightType.Original # Original or Adapter | |
| architecture: str = "Unknown" | |
| license: str = "?" | |
| likes: int = 0 | |
| num_params: int = 0 | |
| date: str = "" # submission date of request file | |
| still_on_hub: bool = False | |
| def init_from_json_file(self, json_filepath): | |
| """Inits the result from the specific model result file""" | |
| with open(json_filepath) as fp: | |
| data = json.load(fp) | |
| # Get config | |
| config = data.get("config") | |
| full_model = config.get("model") | |
| org = full_model.split("/")[0] | |
| model = full_model.split("/")[1] | |
| precision = Precision.from_str(config.get("precision")) | |
| revision = config.get("revision", "") | |
| nb_shots = config.get("nb_shots", None) | |
| feature_set = config.get("feature_set", None) | |
| model_type = ModelType.from_str(config.get("model_type", "")) | |
| weight_type = WeightType[config.get("weight_type", "Original")] | |
| license = config.get("license", "?") | |
| likes = config.get("likes", 0) | |
| num_params = config.get("params", 0) | |
| date = config.get("submitted_time", "") | |
| # Check if model is still on hub | |
| still_on_hub, _, model_config = is_model_on_hub( | |
| full_model, revision, trust_remote_code=True, test_tokenizer=False, token=os.environ.get("TOKEN") | |
| ) | |
| architecture = "?" | |
| if model_config is not None: | |
| architectures = getattr(model_config, "architectures", None) | |
| if architectures: | |
| architecture = ";".join(architectures) | |
| results = {} | |
| for task in Tasks: | |
| task = task.value | |
| mean = data["results"].get(task.phenotype, {}).get("metrics", {}).get("_".join(["mean", task.metric]), None) | |
| lower = data["results"].get(task.phenotype, {}).get("metrics", {}).get("_".join(["lower", task.metric]), None) | |
| upper = data["results"].get(task.phenotype, {}).get("metrics", {}).get("_".join(["upper", task.metric]), None) | |
| formated_score = f"{mean:.2f} ({lower:.2f}-{upper:.2f})" if mean is not None else None | |
| results["_".join([task.phenotype, task.metric])] = formated_score | |
| return self( | |
| eval_name=f"{org}_{model}_{precision.value.name}_{feature_set}_{nb_shots}", | |
| full_model=full_model, | |
| org=full_model.split("/")[0], | |
| model=full_model.split("/")[1], | |
| results=results, | |
| raw_data=data, | |
| nb_shots=nb_shots, | |
| feature_set=feature_set, | |
| precision=precision, | |
| revision=revision, | |
| still_on_hub=still_on_hub, | |
| architecture=architecture, | |
| model_type=model_type, | |
| weight_type=weight_type, | |
| license=license, | |
| likes=likes, | |
| num_params=num_params, | |
| date=date | |
| ) | |
| def to_dict(self): | |
| """Converts the Eval Result to a dict compatible with our dataframe display""" | |
| average_auroc = np.mean(np.array([d["metrics"]["mean_auroc"] for d in self.raw_data["results"].values() if "mean_auroc" in d["metrics"].keys()])) | |
| average_auprc = np.mean(np.array([d["metrics"]["mean_auprc"] for d in self.raw_data["results"].values() if "mean_auprc" in d["metrics"].keys()])) | |
| data_dict = { | |
| "eval_name": self.eval_name, # not a column, just a save name, | |
| AutoEvalColumn.feature_set.name: self.feature_set, | |
| AutoEvalColumn.nb_shots.name: self.nb_shots, | |
| AutoEvalColumn.precision.name: self.precision.value.name, | |
| AutoEvalColumn.model_type.name: self.model_type.value.name, | |
| AutoEvalColumn.model_type_symbol.name: self.model_type.value.symbol, | |
| AutoEvalColumn.weight_type.name: self.weight_type.value.name, | |
| AutoEvalColumn.architecture.name: self.architecture, | |
| AutoEvalColumn.model.name: make_clickable_model(self.full_model), | |
| AutoEvalColumn.revision.name: self.revision, | |
| AutoEvalColumn.average_auroc.name: average_auroc, | |
| AutoEvalColumn.average_auprc.name: average_auprc, | |
| AutoEvalColumn.license.name: self.license, | |
| AutoEvalColumn.likes.name: self.likes, | |
| AutoEvalColumn.params.name: self.num_params, | |
| AutoEvalColumn.still_on_hub.name: self.still_on_hub, | |
| } | |
| for task in Tasks: | |
| data_dict[generate_column_name(task.value.phenotype, task.value.metric.upper())] = self.results["_".join([task.value.phenotype, task.value.metric])] | |
| return data_dict | |
| def get_request_file_for_model(requests_path, model_name, precision): | |
| """Selects the correct request file for a given model. Only keeps runs tagged as FINISHED""" | |
| request_files = os.path.join( | |
| requests_path, | |
| f"{model_name}_eval_request_*.json", | |
| ) | |
| request_files = glob.glob(request_files) | |
| # Select correct request file (precision) | |
| request_file = "" | |
| request_files = sorted(request_files, reverse=True) | |
| for tmp_request_file in request_files: | |
| with open(tmp_request_file, "r") as f: | |
| req_content = json.load(f) | |
| if ( | |
| req_content["status"] in ["FINISHED"] | |
| and req_content["precision"] == precision.split(".")[-1] | |
| ): | |
| request_file = tmp_request_file | |
| return request_file | |
| def get_raw_eval_results(results_path: str) -> list[EvalResult]: | |
| """From the path of the results folder root, extract all needed info for results""" | |
| model_result_filepaths = [] | |
| for root, _, files in os.walk(results_path): | |
| # We should only have json files in model results | |
| if len(files) == 0 or any([not f.endswith(".json") for f in files]): | |
| continue | |
| # Sort the files by date | |
| try: | |
| files.sort(key=lambda x: x.removesuffix(".json").removeprefix("results_")[:-7]) | |
| except dateutil.parser._parser.ParserError: | |
| files = [files[-1]] | |
| for file in files: | |
| model_result_filepaths.append(os.path.join(root, file)) | |
| eval_results = {} | |
| for model_result_filepath in model_result_filepaths: | |
| # Creation of result | |
| eval_result = EvalResult.init_from_json_file(model_result_filepath) | |
| # Store results of same eval together | |
| eval_name = eval_result.eval_name | |
| if eval_name in eval_results.keys(): | |
| eval_results[eval_name].results.update({k: v for k, v in eval_result.results.items() if v is not None}) | |
| else: | |
| eval_results[eval_name] = eval_result | |
| results = [] | |
| for v in eval_results.values(): | |
| try: | |
| v.to_dict() # we test if the dict version is complete | |
| results.append(v) | |
| except KeyError: # not all eval values present | |
| continue | |
| return results | |