| from pathlib import Path |
|
|
| import pandas as pd |
| from huggingface_hub import HfApi, hf_hub_download |
| from huggingface_hub.utils import EntryNotFoundError |
|
|
| from config import ( |
| ACTIVATED_COL, |
| DATASET_REPO_ID, |
| DEFAULT_LEADERBOARD_CONTEXT, |
| HF_TOKEN, |
| HORIZONS, |
| LEADERBOARD_METADATA_SUFFIX, |
| LEADERBOARD_METRICS_SUFFIX, |
| MAX_SAVED_SUBMISSIONS, |
| METRIC_BASE_COLS, |
| get_ground_truth_file, |
| get_leaderboard_entry_dir, |
| ) |
|
|
| LEADERBOARD_METADATA_COLS = ["Website", "Notes"] |
| LEADERBOARD_METRIC_COLS = [f"{metric}_{horizon}" for horizon in HORIZONS for metric in METRIC_BASE_COLS] |
| LEADERBOARD_METRICS_WITH_HISTORY_COLS = [*LEADERBOARD_METRIC_COLS, "Timestamp", ACTIVATED_COL] |
| STRING_LEADERBOARD_COLS = ["Website", "Notes", "Timestamp"] |
| ALL_COLUMNS = ["User", "Website", *LEADERBOARD_METRIC_COLS, "Timestamp", "Notes"] |
| EMPTY_METADATA = {"Website": "N/A", "Notes": "N/A"} |
|
|
|
|
| def get_user_metadata_filename(username: str) -> str: |
| return f"{username}{LEADERBOARD_METADATA_SUFFIX}" |
|
|
|
|
| def get_user_metrics_filename(username: str) -> str: |
| return f"{username}{LEADERBOARD_METRICS_SUFFIX}" |
|
|
|
|
| def get_user_metadata_repo_path( |
| username: str, |
| context: str = DEFAULT_LEADERBOARD_CONTEXT, |
| ) -> str: |
| return f"{get_leaderboard_entry_dir(context)}/{get_user_metadata_filename(username)}" |
|
|
|
|
| def get_user_metrics_repo_path( |
| username: str, |
| context: str = DEFAULT_LEADERBOARD_CONTEXT, |
| ) -> str: |
| return f"{get_leaderboard_entry_dir(context)}/{get_user_metrics_filename(username)}" |
|
|
|
|
| def build_user_metadata_df(website: str, notes: str) -> pd.DataFrame: |
| return pd.DataFrame([{"Website": website, "Notes": notes}], columns=LEADERBOARD_METADATA_COLS) |
|
|
|
|
| def build_submission_metrics_row( |
| scores: dict[str, float], |
| timestamp: str, |
| activated: bool = False, |
| ) -> pd.DataFrame: |
| row = {col: scores.get(col, float("nan")) for col in LEADERBOARD_METRIC_COLS} |
| row["Timestamp"] = timestamp |
| row[ACTIVATED_COL] = bool(activated) |
| return pd.DataFrame([row], columns=LEADERBOARD_METRICS_WITH_HISTORY_COLS) |
|
|
|
|
| def _normalize_activated_col(df: pd.DataFrame) -> pd.DataFrame: |
| normalized_df = df.copy() |
| if ACTIVATED_COL not in normalized_df.columns: |
| normalized_df[ACTIVATED_COL] = False |
| if not normalized_df.empty: |
| normalized_df.loc[normalized_df.index[0], ACTIVATED_COL] = True |
|
|
| normalized_df[ACTIVATED_COL] = ( |
| normalized_df[ACTIVATED_COL] |
| .fillna(False) |
| .astype(str) |
| .str.strip() |
| .str.lower() |
| .isin(["true", "1", "yes"]) |
| ) |
| return normalized_df |
|
|
|
|
| def normalize_submission_history_df(df: pd.DataFrame) -> pd.DataFrame: |
| if df.empty: |
| return pd.DataFrame(columns=LEADERBOARD_METRICS_WITH_HISTORY_COLS) |
|
|
| normalized_df = df.copy() |
| for col in LEADERBOARD_METRIC_COLS: |
| if col not in normalized_df.columns: |
| normalized_df[col] = float("nan") |
| if "Timestamp" not in normalized_df.columns: |
| normalized_df["Timestamp"] = "N/A" |
|
|
| normalized_df = _normalize_activated_col(normalized_df) |
| normalized_df["Timestamp"] = normalized_df["Timestamp"].fillna("N/A").astype(str) |
| normalized_df = normalized_df[LEADERBOARD_METRICS_WITH_HISTORY_COLS] |
| return normalized_df.sort_values("Timestamp", ascending=False).reset_index(drop=True) |
|
|
|
|
| def cap_submission_history_df(df: pd.DataFrame) -> pd.DataFrame: |
| normalized_df = normalize_submission_history_df(df) |
| return normalized_df.head(MAX_SAVED_SUBMISSIONS).reset_index(drop=True) |
|
|
|
|
| def write_user_metrics_history_file( |
| output_dir, |
| username: str, |
| metrics_history_df: pd.DataFrame, |
| ): |
| output_path = Path(output_dir) |
| output_path.mkdir(parents=True, exist_ok=True) |
| metrics_path = output_path / get_user_metrics_filename(username) |
| normalized_history = cap_submission_history_df(metrics_history_df) |
| normalized_history.to_csv(metrics_path, index=False) |
| return metrics_path |
|
|
|
|
| def write_user_leaderboard_files( |
| output_dir, |
| username: str, |
| website: str, |
| notes: str, |
| scores: dict[str, float], |
| timestamp: str, |
| activated: bool = False, |
| ): |
| output_path = Path(output_dir) |
| output_path.mkdir(parents=True, exist_ok=True) |
|
|
| metadata_path = output_path / get_user_metadata_filename(username) |
| metrics_path = write_user_metrics_history_file( |
| output_dir=output_path, |
| username=username, |
| metrics_history_df=build_submission_metrics_row(scores, timestamp, activated=activated), |
| ) |
|
|
| build_user_metadata_df(website, notes).to_csv(metadata_path, index=False) |
| return metadata_path, metrics_path |
|
|
|
|
| def get_ground_truth(context: str = DEFAULT_LEADERBOARD_CONTEXT): |
| """Securely loads the hidden ground truth from the Dataset repo.""" |
| if not HF_TOKEN: |
| raise ValueError("System HF_TOKEN is missing. Please configure Space Secrets.") |
| ground_truth_file = get_ground_truth_file(context) |
| try: |
| file_path = hf_hub_download( |
| repo_id=DATASET_REPO_ID, |
| filename=ground_truth_file, |
| repo_type="dataset", |
| token=HF_TOKEN, |
| ) |
| return pd.read_parquet(file_path) |
| except Exception as e: |
| raise ValueError(f"Failed to load {ground_truth_file}: {str(e)}") |
|
|
|
|
| def _empty_leaderboard(): |
| return pd.DataFrame(columns=ALL_COLUMNS) |
|
|
|
|
| def _fill_missing_columns(df: pd.DataFrame) -> pd.DataFrame: |
| for col in STRING_LEADERBOARD_COLS: |
| if col not in df.columns: |
| df[col] = "N/A" |
| df[col] = df[col].fillna("N/A").astype(str).replace("nan", "N/A") |
|
|
| for col in LEADERBOARD_METRIC_COLS: |
| if col not in df.columns: |
| df[col] = float("nan") |
|
|
| return df |
|
|
|
|
| def _username_from_entry_filename(filename: str) -> str | None: |
| if filename.endswith(LEADERBOARD_METADATA_SUFFIX): |
| return filename[:-len(LEADERBOARD_METADATA_SUFFIX)] |
| if filename.endswith(LEADERBOARD_METRICS_SUFFIX): |
| return filename[:-len(LEADERBOARD_METRICS_SUFFIX)] |
| return None |
|
|
|
|
| def _get_repo_leaderboard_usernames( |
| context: str = DEFAULT_LEADERBOARD_CONTEXT, |
| ) -> list[str]: |
| api = HfApi() |
| repo_files = api.list_repo_files( |
| repo_id=DATASET_REPO_ID, |
| repo_type="dataset", |
| token=HF_TOKEN, |
| ) |
| prefix = f"{get_leaderboard_entry_dir(context)}/" |
| usernames = { |
| username |
| for repo_path in repo_files |
| if repo_path.startswith(prefix) |
| for username in [_username_from_entry_filename(Path(repo_path).name)] |
| if username |
| } |
| return sorted(usernames) |
|
|
|
|
| def _read_repo_csv(repo_path: str) -> pd.DataFrame: |
| file_path = hf_hub_download( |
| repo_id=DATASET_REPO_ID, |
| filename=repo_path, |
| repo_type="dataset", |
| token=HF_TOKEN, |
| force_download=True, |
| ) |
| return pd.read_csv(file_path) |
|
|
|
|
| def _read_repo_csv_or_empty(repo_path: str) -> pd.DataFrame: |
| try: |
| return _read_repo_csv(repo_path) |
| except EntryNotFoundError: |
| return pd.DataFrame() |
|
|
|
|
| def get_user_metadata( |
| username: str, |
| context: str = DEFAULT_LEADERBOARD_CONTEXT, |
| ) -> dict[str, str]: |
| if not HF_TOKEN: |
| return EMPTY_METADATA.copy() |
|
|
| metadata_df = _read_repo_csv_or_empty(get_user_metadata_repo_path(username, context)) |
| if metadata_df.empty: |
| return EMPTY_METADATA.copy() |
|
|
| metadata_row = metadata_df.iloc[0] |
| return { |
| "Website": metadata_row.get("Website", "N/A"), |
| "Notes": metadata_row.get("Notes", "N/A"), |
| } |
|
|
|
|
| def get_user_submission_history( |
| username: str, |
| context: str = DEFAULT_LEADERBOARD_CONTEXT, |
| ) -> pd.DataFrame: |
| if not HF_TOKEN: |
| return pd.DataFrame(columns=LEADERBOARD_METRICS_WITH_HISTORY_COLS) |
|
|
| metrics_df = _read_repo_csv_or_empty(get_user_metrics_repo_path(username, context)) |
| return normalize_submission_history_df(metrics_df) |
|
|
|
|
| def _get_active_submission_row(metrics_history_df: pd.DataFrame): |
| if metrics_history_df.empty: |
| return None |
| active_rows = metrics_history_df[metrics_history_df[ACTIVATED_COL]] |
| if active_rows.empty: |
| return None |
| return active_rows.iloc[0] |
|
|
|
|
| def _assemble_leaderboard_row( |
| username: str, |
| metadata: dict[str, str], |
| active_metrics_row, |
| ) -> dict: |
| row = { |
| "User": username, |
| "Website": metadata.get("Website", "N/A"), |
| "Notes": metadata.get("Notes", "N/A"), |
| "Timestamp": active_metrics_row.get("Timestamp", "N/A"), |
| } |
| for col in LEADERBOARD_METRIC_COLS: |
| row[col] = active_metrics_row.get(col, float("nan")) |
| return row |
|
|
|
|
| def get_leaderboard(context: str = DEFAULT_LEADERBOARD_CONTEXT): |
| """Fetches the latest leaderboard from the dataset repo (unsorted).""" |
| if not HF_TOKEN: |
| return _empty_leaderboard() |
|
|
| try: |
| usernames = _get_repo_leaderboard_usernames(context) |
| if not usernames: |
| return _empty_leaderboard() |
|
|
| rows = [] |
| for username in usernames: |
| metadata = get_user_metadata(username, context) |
| metrics_history_df = get_user_submission_history(username, context) |
| active_metrics_row = _get_active_submission_row(metrics_history_df) |
| if active_metrics_row is None: |
| continue |
| rows.append(_assemble_leaderboard_row(username, metadata, active_metrics_row)) |
|
|
| if not rows: |
| return _empty_leaderboard() |
|
|
| df = pd.DataFrame(rows, columns=ALL_COLUMNS) |
| return _fill_missing_columns(df) |
| except Exception as e: |
| print(f"Fetch error: {e}") |
| return _empty_leaderboard() |
|
|