| from __future__ import annotations |
|
|
| import json |
| import math |
| import os |
| from typing import Iterable |
|
|
| import pandas as pd |
|
|
| from src.leaderboard.schema import SCHEMA |
|
|
|
|
| class ResultsValidationError(ValueError): |
| pass |
|
|
|
|
| def _is_number(value) -> bool: |
| if not isinstance(value, (int, float)) or isinstance(value, bool): |
| return False |
| return math.isfinite(float(value)) |
|
|
|
|
| def _is_missing(value) -> bool: |
| if value is None: |
| return True |
| if isinstance(value, float) and math.isnan(value): |
| return True |
| return False |
|
|
|
|
| def _load_json_records(path: str) -> list[dict]: |
| with open(path, "r") as fp: |
| data = json.load(fp) |
|
|
| if isinstance(data, list): |
| return data |
| if isinstance(data, dict) and "records" in data and isinstance(data["records"], list): |
| return data["records"] |
| raise ResultsValidationError( |
| "JSON must be a list of records or an object with a 'records' list." |
| ) |
|
|
|
|
| def _load_csv_records(path: str) -> list[dict]: |
| df = pd.read_csv(path) |
| return df.to_dict(orient="records") |
|
|
|
|
| def load_records(path: str) -> list[dict]: |
| if not os.path.exists(path): |
| raise ResultsValidationError(f"Results file not found: {path}") |
|
|
| _, ext = os.path.splitext(path) |
| ext = ext.lower() |
| if ext == ".json": |
| return _load_json_records(path) |
| if ext == ".csv": |
| return _load_csv_records(path) |
|
|
| raise ResultsValidationError("Unsupported file type. Use .json or .csv") |
|
|
|
|
| def validate_records(records: Iterable[dict]) -> None: |
| records = list(records) |
| if not records: |
| raise ResultsValidationError("Results file is empty.") |
|
|
| for idx, record in enumerate(records): |
| if not isinstance(record, dict): |
| raise ResultsValidationError(f"Record {idx} is not an object.") |
|
|
| missing = [f for f in SCHEMA.identity_fields if f not in record] |
| if missing: |
| raise ResultsValidationError(f"Record {idx} is missing fields: {missing}") |
|
|
| for field in SCHEMA.identity_fields: |
| if not isinstance(record[field], str): |
| raise ResultsValidationError( |
| f"Record {idx} field '{field}' must be a string." |
| ) |
|
|
| missing_metrics = [m for m in SCHEMA.required_metrics if m not in record] |
| if missing_metrics: |
| raise ResultsValidationError( |
| f"Record {idx} is missing required metrics: {missing_metrics}" |
| ) |
|
|
| for key, value in record.items(): |
| if key in SCHEMA.identity_fields: |
| continue |
| if _is_missing(value): |
| continue |
| if not _is_number(value): |
| raise ResultsValidationError( |
| f"Record {idx} metric '{key}' must be numeric." |
| ) |
|
|
|
|
| def infer_metric_columns(records: Iterable[dict]) -> list[str]: |
| records = list(records) |
| if not records: |
| return [] |
|
|
| all_keys = set() |
| for record in records: |
| all_keys.update(record.keys()) |
| metric_keys = [k for k in all_keys if k not in SCHEMA.identity_fields] |
|
|
| ordered = [] |
| for key in SCHEMA.required_metrics: |
| if key in metric_keys: |
| ordered.append(key) |
| for key in SCHEMA.optional_metrics: |
| if key in metric_keys: |
| ordered.append(key) |
|
|
| remaining = sorted([k for k in metric_keys if k not in ordered]) |
| ordered.extend(remaining) |
| return ordered |
|
|
|
|
| def build_dataframe(records: list[dict]) -> tuple[pd.DataFrame, list[str]]: |
| validate_records(records) |
| metric_cols = infer_metric_columns(records) |
| column_order = list(SCHEMA.identity_fields) + metric_cols |
| df = pd.DataFrame.from_records(records) |
| df = apply_overall_metrics(df) |
| |
| for col in df.columns: |
| if col in SCHEMA.identity_fields: |
| continue |
| if col not in column_order: |
| column_order.append(col) |
| df = df[column_order] |
| return df, column_order |
|
|
|
|
| MCQ_QUESTIONS = { |
| "MIMIC": {"T1": 188, "T2": 141, "T3": 239, "T4": 141}, |
| "PSML": {"T1": 200, "T2": 150, "T3": 250, "T4": 150}, |
| "CausalChambers": {"T1": 150, "T2": 150, "T3": 250, "T4": 150}, |
| "FreshRetailNet": {"T1": 176, "T2": 132, "T3": 176, "T4": 132}, |
| } |
|
|
| FORECAST_SERIES = { |
| "MIMIC": {"T2": 282, "T4": 282}, |
| "PSML": {"T2": 50, "T4": 50}, |
| "CausalChambers": {"T2": 50, "T4": 50}, |
| "FreshRetailNet": {"T2": 44, "T4": 44}, |
| } |
|
|
|
|
| def _weighted_avg(row: pd.Series, columns: list[str], weights: list[int]) -> float | None: |
| total = 0.0 |
| total_w = 0.0 |
| for col, w in zip(columns, weights): |
| val = row.get(col) |
| if _is_missing(val): |
| continue |
| total += float(val) * w |
| total_w += w |
| if total_w == 0: |
| return None |
| return round(total / total_w, 4) |
|
|
|
|
| def apply_overall_metrics(df: pd.DataFrame) -> pd.DataFrame: |
| df = df.copy() |
|
|
| for task in ["T1", "T2", "T3", "T4"]: |
| cols = [] |
| weights = [] |
| for dataset, task_weights in MCQ_QUESTIONS.items(): |
| col = f"{dataset}_{task}_acc" |
| if col in df.columns: |
| cols.append(col) |
| weights.append(task_weights[task]) |
| if cols: |
| df[f"{task}_acc"] = df.apply(lambda r: _weighted_avg(r, cols, weights), axis=1) |
|
|
| overall_cols = [] |
| overall_weights = [] |
| for dataset, task_weights in MCQ_QUESTIONS.items(): |
| for task, weight in task_weights.items(): |
| col = f"{dataset}_{task}_acc" |
| if col in df.columns: |
| overall_cols.append(col) |
| overall_weights.append(weight) |
| if overall_cols: |
| df["overall_mcq_acc"] = df.apply( |
| lambda r: _weighted_avg(r, overall_cols, overall_weights), axis=1 |
| ) |
|
|
| for task in ["T2", "T4"]: |
| |
| for metric in ["sMAPE", "MAE"]: |
| cols = [] |
| weights = [] |
| for dataset, task_weights in FORECAST_SERIES.items(): |
| if dataset == "MIMIC": |
| continue |
| col = f"{dataset}_{task}_{metric}" |
| if col in df.columns: |
| cols.append(col) |
| weights.append(task_weights[task]) |
| if cols: |
| df[f"{task}_{metric}"] = df.apply(lambda r: _weighted_avg(r, cols, weights), axis=1) |
|
|
| return df |
|
|