Spaces:
Runtime error
Runtime error
| import glob | |
| import json | |
| import math | |
| import os | |
| from dataclasses import dataclass | |
| import dateutil | |
| import numpy as np | |
| from src.display.formatting import make_hyperlink | |
| from src.display.utils import AutoEvalColumn, ModelType, Tasks, Precision | |
| from src.about import Model_Backbone, Training_Dataset, Testing_Type | |
| class EvalResult: | |
| """ | |
| Represents one full evaluation. Built from a combination of the result and request file for a given run. | |
| """ | |
| eval_name: str # model_training_testing_precision (identifier for evaluations) | |
| model_name: str | |
| training_dataset_type: Training_Dataset | |
| training_dataset: str | |
| testing_type: Testing_Type | |
| results: dict | |
| paper_name: str = "" | |
| model_link: str = "" | |
| paper_link: str = "" | |
| model_backbone_type: Model_Backbone = Model_Backbone.Other | |
| model_backbone: str = "" | |
| precision: Precision = Precision.Other | |
| model_parameters: float = 0 | |
| model_type: ModelType = ModelType.Other # Pretrained, fine tuned, ... | |
| date: str = "" # submission date of request file | |
| def init_from_json_file(self, json_filepath): | |
| """Inits the result from the specific model result file""" | |
| with open(json_filepath) as fp: | |
| data = json.load(fp) | |
| config = data.get("config") | |
| # Extract evaluation config | |
| model_name = config["model_name"] | |
| training_dataset_type = Training_Dataset.from_str(config["training_dataset"]) | |
| if training_dataset_type.name != Training_Dataset.Other.name: | |
| training_dataset = training_dataset_type.value | |
| else: | |
| training_dataset = config["training_dataset"] | |
| testing_type = Testing_Type(config["testing_type"]) | |
| precision = Precision.from_str(config.get("model_dtype")) | |
| eval_name = model_name + precision.value + training_dataset + testing_type.value | |
| # Extract results | |
| results = {} | |
| for task in Tasks: | |
| task = task.value | |
| results[task.metric] = data["results"].get(task.metric, -1) | |
| return self( | |
| eval_name=eval_name, | |
| model_name=model_name, | |
| training_dataset_type=training_dataset_type, | |
| training_dataset=training_dataset, | |
| testing_type=testing_type, | |
| precision=precision, | |
| results=results, | |
| ) | |
| def update_with_request_file(self, requests_path): | |
| """Finds the relevant request file for the current model and updates info with it""" | |
| if self.training_dataset_type.name != Training_Dataset.Other.name: | |
| training_dataset_request = self.training_dataset_type.name | |
| else: | |
| training_dataset_request = self.training_dataset | |
| training_dataset_request = "_".join(training_dataset_request.split()) | |
| request_file = get_request_file_for_model(requests_path, self.model_name, self.precision.value, training_dataset_request, self.testing_type.value) | |
| try: | |
| with open(request_file, "r") as f: | |
| request = json.load(f) | |
| self.model_parameters = request.get("model_parameters", 0) | |
| self.model_link = request.get("model_link", "None") | |
| self.model_backbone = request.get("model_backbone", "Unknown") | |
| self.model_backbone_type = Model_Backbone.from_str(self.model_backbone) | |
| self.paper_name = request.get("paper_name", "None") | |
| self.paper_link = request.get("paper_link", "None") | |
| self.model_type = ModelType.from_str(request.get("model_type", "")) | |
| self.date = request.get("submitted_time", "") | |
| except Exception: | |
| print(f"Could not find request file for {self.model_name} with precision {self.precision.value}, training dataset {self.training_dataset} and testing type {self.testing_type.value}") | |
| def to_dict(self): | |
| """Converts the Eval Result to a dict compatible with our dataframe display""" | |
| data_dict = { | |
| "eval_name": self.eval_name, # not a column, just a save name, | |
| AutoEvalColumn.precision.name: self.precision.value, | |
| AutoEvalColumn.model_parameters.name: self.model_parameters, | |
| AutoEvalColumn.model_name.name: self.model_name, | |
| AutoEvalColumn.paper.name: make_hyperlink(self.paper_link, self.paper_name) if self.paper_link.startswith("http") else self.paper_name, | |
| AutoEvalColumn.model_backbone_type.name: self.model_backbone_type.value, | |
| AutoEvalColumn.model_backbone.name: self.model_backbone, | |
| AutoEvalColumn.training_dataset_type.name: self.training_dataset_type.value, | |
| AutoEvalColumn.training_dataset.name: self.training_dataset, | |
| AutoEvalColumn.testing_type.name: self.testing_type.name, | |
| AutoEvalColumn.model_link.name: self.model_link | |
| } | |
| for task in Tasks: | |
| data_dict[task.value.col_name] = self.results[task.value.metric] | |
| return data_dict | |
| def get_request_file_for_model(requests_path, model_name, precision, training_dataset, testing_type): | |
| """Selects the correct request file for a given model if it's marked as FINISHED""" | |
| request_filename = os.path.join( | |
| requests_path, | |
| model_name, | |
| f"{model_name}_eval_request_{precision}_{training_dataset}_{testing_type}.json", | |
| ) | |
| # check for request file | |
| try: | |
| with open(request_filename, "r") as file: | |
| req_content = json.load(file) | |
| if req_content["status"] not in ["FINISHED"]: | |
| return None | |
| except OSError: | |
| return None | |
| return request_filename | |
| def get_raw_eval_results(results_path: str, requests_path: str) -> list[EvalResult]: | |
| """From the path of the results folder root, extract all needed info for results""" | |
| model_result_filepaths = [] | |
| for root, _, files in os.walk(results_path): | |
| # We should only have json files in model results | |
| if len(files) == 0 or any([not f.endswith(".json") for f in files]): | |
| continue | |
| for file in files: | |
| model_result_filepaths.append(os.path.join(root, file)) | |
| eval_results = {} | |
| for model_result_filepath in model_result_filepaths: | |
| # Creation of result | |
| eval_result = EvalResult.init_from_json_file(model_result_filepath) | |
| eval_result.update_with_request_file(requests_path) | |
| eval_name = eval_result.eval_name | |
| eval_results[eval_name] = eval_result | |
| results = [] | |
| for v in eval_results.values(): | |
| try: | |
| v.to_dict() # we test if the dict version is complete | |
| results.append(v) | |
| except KeyError: # not all eval values present | |
| continue | |
| return results | |