Spaces:
Runtime error
Runtime error
| import json | |
| import os | |
| from datetime import datetime, timezone | |
| from sklearn.metrics import f1_score | |
| import pandas as pd | |
| from pathlib import Path | |
| from typing import Union | |
| from huggingface_hub import hf_hub_download | |
| from src.display.formatting import styled_error, styled_message, styled_warning | |
| from src.envs import API, EVAL_REQUESTS_PATH, EVAL_RESULTS_PATH, TOKEN, QUEUE_REPO, RESULTS_REPO | |
| from src.submission.check_validity import ( | |
| already_submitted_models, | |
| ) | |
| REQUESTED_MODELS = None | |
| GROUND_TRUTH = None | |
| class GroundTruth: | |
| def __init__(self, csv_path: Union[str, Path]): | |
| self.csv_path = Path(csv_path) | |
| self._data = None | |
| self._load_ground_truth() | |
| def _load_ground_truth(self): | |
| """Load headerless CSV with image_name and class columns""" | |
| try: | |
| # Read CSV without header, specify column names and types | |
| self._data = pd.read_csv( | |
| self.csv_path, | |
| header=None, | |
| names=['image_id', 'true_label'], | |
| dtype={'image_id': str, 'true_label': int}, | |
| usecols=[0, 1] # Only read first two columns | |
| ) | |
| # Validate column count | |
| if self._data.shape[1] != 2: | |
| raise ValueError("CSV must contain exactly 2 columns: image_name and class") | |
| except FileNotFoundError: | |
| raise RuntimeError(f"Ground truth file not found at {self.csv_path}") | |
| except pd.errors.ParserError: | |
| raise RuntimeError("Invalid CSV file format") | |
| def get_ground_truth(self) -> pd.DataFrame: | |
| """Return the ground truth DataFrame""" | |
| if self._data is None: | |
| self._load_ground_truth() | |
| return self._data.copy() # Return copy to prevent modification | |
| def labels(self) -> dict: | |
| """Get dictionary of {image_id: true_label}""" | |
| return dict(zip(self._data['image_id'], self._data['true_label'])) | |
| def get_ground_truth(): | |
| global GROUND_TRUTH | |
| # Initialize ground truth | |
| csv_path = hf_hub_download(repo_id=QUEUE_REPO, filename='./test_hidden.csv', repo_type="dataset", local_dir="./", token=TOKEN) | |
| GROUND_TRUTH = GroundTruth(csv_path) | |
| os.remove(csv_path) | |
| def process_submission( | |
| student_id: str, | |
| model_name: str, | |
| csv_upload: str, | |
| ): | |
| """Save submission request json file""" | |
| global REQUESTED_MODELS | |
| if not REQUESTED_MODELS: | |
| REQUESTED_MODELS = already_submitted_models(EVAL_REQUESTS_PATH) | |
| global GROUND_TRUTH | |
| if not GROUND_TRUTH: | |
| get_ground_truth() | |
| submitted_time = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") | |
| if not student_id.strip(): | |
| return styled_error("Submission failed! Student ID cannot be empty!") | |
| if csv_upload is None: | |
| return styled_error("Submission failed! CSV file cannot be empty!") | |
| # Seems good, creating the eval | |
| print("Adding new eval request") | |
| eval_entry = { | |
| "student_id": student_id, | |
| "model_name": model_name, | |
| "submitted_time": submitted_time, | |
| } | |
| # Check for duplicate submission | |
| if f"{student_id}_{model_name}_{submitted_time}" in REQUESTED_MODELS: | |
| return styled_warning("This model has been already submitted.") | |
| print("Creating eval reqest file") | |
| OUT_DIR = f"{EVAL_REQUESTS_PATH}/{student_id}" | |
| os.makedirs(OUT_DIR, exist_ok=True) | |
| out_path = os.path.join(OUT_DIR, f"request_{student_id}_{model_name}_{submitted_time}.json") | |
| with open(out_path, "w") as f: | |
| f.write(json.dumps(eval_entry)) | |
| print("Uploading eval request file") | |
| API.upload_file( | |
| path_or_fileobj=out_path, | |
| path_in_repo=out_path.split("eval/")[1], | |
| repo_id=QUEUE_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Add {student_id}_{model_name} to eval queue at {submitted_time}", | |
| ) | |
| """Now compute the metrics""" | |
| # Read submitted predictions | |
| df = pd.read_csv(csv_upload.name) | |
| # Assign column names | |
| df.columns = ['image_id', 'prediction'] | |
| # Validate CSV format | |
| if df.shape[0] == GROUND_TRUTH.get_ground_truth().shape[0] + 1: | |
| return "❌ CSV must contain the same number of rows as the ground truth" | |
| print("Uploading the submitted CSV file") | |
| csv_path = os.path.join(OUT_DIR, f"submission_{student_id}_{model_name}_{submitted_time}.csv") | |
| API.upload_file( | |
| path_or_fileobj=csv_upload.name, | |
| path_in_repo=csv_path.split("eval/")[1], | |
| repo_id=QUEUE_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Add {student_id}_{model_name} submitted CSV at {submitted_time}", | |
| ) | |
| # Get ground truth labels | |
| true_labels = GROUND_TRUTH.get_ground_truth() | |
| # Merge predictions with ground truth | |
| merged = df.merge(true_labels, on='image_id') | |
| # Calculate metrics | |
| accuracy = (merged['prediction'] == merged['true_label']).mean() | |
| f1 = f1_score(merged['true_label'], merged['prediction'], average='weighted') | |
| # Save results (implement your storage logic) | |
| # save_results(student_id, model_name, accuracy, f1) | |
| print("Creating eval result file") | |
| OUT_DIR = f"{EVAL_RESULTS_PATH}/{student_id}" | |
| os.makedirs(OUT_DIR, exist_ok=True) | |
| out_path = os.path.join(OUT_DIR, f"result_{student_id}_{model_name}_{submitted_time}.json") | |
| result_entry = { | |
| "config":{ | |
| "student_id": student_id, | |
| "model_name": model_name | |
| }, | |
| "results":{ | |
| "classification": { | |
| "accuracy": accuracy, | |
| "f1": f1 | |
| } | |
| } | |
| } | |
| with open(out_path, "w") as f: | |
| f.write(json.dumps(result_entry)) | |
| print("Uploading eval result file") | |
| API.upload_file( | |
| path_or_fileobj=out_path, | |
| path_in_repo=out_path.split("eval/")[1], | |
| repo_id=RESULTS_REPO, | |
| repo_type="dataset", | |
| commit_message=f"Add {student_id}_{model_name} to eval queue at {submitted_time}", | |
| ) | |
| print(f"Submission successful! Accuracy: {accuracy:.2%}, F1: {f1:.2%}") | |
| return styled_message(f"Submission successful! Accuracy: {accuracy:.2%}, F1: {f1:.2%}") | |