leaderboard-space / evaluate.py
kenantang's picture
add upload time instruction and update submission names
3d59145
import re
from datetime import datetime, timezone
from pathlib import Path
import gradio as gr
import pandas as pd
from huggingface_hub import CommitOperationAdd, HfApi
from config import (
ACTIVATED_COL,
CLINICAL_COLS,
DATASET_REPO_ID,
DEFAULT_LEADERBOARD_CONTEXT,
HF_TOKEN,
HORIZONS,
METRIC_BASE_COLS,
PRED_COLS,
SUBMISSION_COOLDOWN_SECONDS,
TECHNICAL_COLS,
get_leaderboard_entry_dir,
get_leaderboard_name,
)
from data import (
build_submission_metrics_row,
build_user_metadata_df,
get_ground_truth,
get_user_metadata_repo_path,
get_user_metrics_repo_path,
get_user_submission_history,
write_user_metrics_history_file,
)
from metrics import calculate_dts_error_grid, calculate_rmse, calculate_mae
_MAX_WEBSITE_LEN = 200
_MAX_NOTES_LEN = 500
_JOIN_KEYS = ["id", "source_file", "date"]
_EMPTY_TEXT_VALUE = "N/A"
_TIMESTAMP_FORMAT = "%Y-%m-%dT%H:%M:%SZ"
def validate_website(url: str) -> str | None:
"""Returns an error message, or None if the value is acceptable."""
url = (url or "").strip()
if not url or url == "N/A":
return None
if len(url) > _MAX_WEBSITE_LEN:
return f"⚠️ Website URL must be {_MAX_WEBSITE_LEN} characters or fewer."
if re.search(r"""[<>"']""", url):
return "⚠️ Website URL contains invalid characters (< > \" ')."
if not re.match(r'^https?://', url, re.IGNORECASE):
return "⚠️ Website URL must start with http:// or https://."
return None
def validate_notes(notes: str) -> str | None:
"""Returns an error message, or None if the value is acceptable."""
notes = (notes or "").strip()
if not notes or notes == "N/A":
return None
if len(notes) > _MAX_NOTES_LEN:
return f"⚠️ Notes must be {_MAX_NOTES_LEN} characters or fewer."
if re.search(r'[<>]', notes):
return "⚠️ Notes contain invalid characters (< or >)."
return None
def _normalize_optional_text(value: str | None) -> str:
return (value or "").strip() or _EMPTY_TEXT_VALUE
def _load_submission(file_path):
return pd.read_parquet(file_path)
def _validate_submission_metadata(website: str, notes: str) -> str | None:
err = validate_website(website)
if err:
return err
return validate_notes(notes)
def _validate_submission_columns(user_df: pd.DataFrame) -> str | None:
required_cols = _JOIN_KEYS + PRED_COLS
missing_cols = [c for c in required_cols if c not in user_df.columns]
if missing_cols:
return f"⚠️ Submission is missing columns: {', '.join(missing_cols)}."
return None
def _merge_submission_with_ground_truth(user_df: pd.DataFrame, ground_truth_df: pd.DataFrame):
n_truth = len(ground_truth_df)
n_submitted = len(user_df)
if n_submitted != n_truth:
return None, (
f"⚠️ Submission has {n_submitted:,} rows but ground truth has {n_truth:,} rows. "
"Ensure your file is derived from the predictions template without adding or removing rows."
)
merged = pd.merge(
ground_truth_df,
user_df[_JOIN_KEYS + PRED_COLS],
on=_JOIN_KEYS,
how="inner",
)
n_matched = len(merged)
if n_matched == 0:
return None, "⚠️ No matching rows found between your submission and the ground truth."
if n_matched < n_truth:
missing = n_truth - n_matched
return None, (
f"⚠️ {missing:,} ground truth rows could not be matched in your submission. "
"Check that the id, source_file, and date columns are unmodified from the template."
)
return merged, None
def _add_empty_horizon_scores(scores: dict[str, float], horizon: int) -> None:
for metric_name in METRIC_BASE_COLS:
scores[f"{metric_name}_{horizon}"] = float("nan")
def _compute_scores(merged_df: pd.DataFrame) -> dict[str, float]:
scores: dict[str, float] = {}
for horizon in HORIZONS:
true_vals = merged_df[f"target_{horizon}"].values
pred_vals = merged_df[f"pred_{horizon}"].values
mask = ~(pd.isna(true_vals) | pd.isna(pred_vals))
true_vals = true_vals[mask]
pred_vals = pred_vals[mask]
if len(true_vals) == 0:
_add_empty_horizon_scores(scores, horizon)
continue
dts_scores = calculate_dts_error_grid(pred_vals, true_vals)
for metric_name in CLINICAL_COLS:
scores[f"{metric_name}_{horizon}"] = dts_scores[metric_name]
scores[f"{TECHNICAL_COLS[0]}_{horizon}"] = calculate_rmse(pred_vals, true_vals)
scores[f"{TECHNICAL_COLS[1]}_{horizon}"] = calculate_mae(pred_vals, true_vals)
return scores
def _get_remaining_cooldown_seconds(metrics_history_df: pd.DataFrame) -> int | None:
if metrics_history_df.empty:
return None
last_ts = metrics_history_df.iloc[0]["Timestamp"]
if last_ts in (_EMPTY_TEXT_VALUE, "", "nan"):
return None
try:
last_dt = datetime.strptime(last_ts, _TIMESTAMP_FORMAT).replace(tzinfo=timezone.utc)
except ValueError:
return None
elapsed = (datetime.now(timezone.utc) - last_dt).total_seconds()
remaining = int(SUBMISSION_COOLDOWN_SECONDS - elapsed)
return remaining if remaining > 0 else None
def _build_submission_summary(username: str, action: str, scores: dict[str, float]) -> str:
summary_lines = [
f" {horizon}min → DTS-A: {scores[f'DTS_A_ZONE_PERCENT_{horizon}']:.1f}%,"
f" RMSE: {scores[f'RMSE_{horizon}']:.1f}"
for horizon in HORIZONS
]
return f"✅ {action} {username}'s scores:\n" + "\n".join(summary_lines)
def _append_submission_to_history(
metrics_history_df: pd.DataFrame,
scores: dict[str, float],
timestamp: str,
):
updated_history_df = metrics_history_df.copy()
if not updated_history_df.empty:
updated_history_df[ACTIVATED_COL] = False
new_row_df = build_submission_metrics_row(scores, timestamp, activated=True)
return pd.concat([new_row_df, updated_history_df], ignore_index=True)
def _build_user_metadata_output_path(username: str, context: str) -> Path:
metadata_filename = Path(get_user_metadata_repo_path(username, context)).name
return Path(get_leaderboard_entry_dir(context)) / metadata_filename
def _commit_user_metrics_history(
username: str,
metrics_history_df: pd.DataFrame,
commit_message: str,
context: str = DEFAULT_LEADERBOARD_CONTEXT,
):
metrics_path = write_user_metrics_history_file(
output_dir=Path(get_leaderboard_entry_dir(context)),
username=username,
metrics_history_df=metrics_history_df,
)
api = HfApi()
api.create_commit(
repo_id=DATASET_REPO_ID,
repo_type="dataset",
operations=[
CommitOperationAdd(
path_in_repo=get_user_metrics_repo_path(username, context),
path_or_fileobj=str(metrics_path),
),
],
commit_message=commit_message,
token=HF_TOKEN,
)
def set_active_submission(
profile: gr.OAuthProfile | None,
timestamp: str | None,
context: str = DEFAULT_LEADERBOARD_CONTEXT,
):
"""Marks one saved submission as visible on the leaderboard, or hides all."""
if profile is None:
return "⚠️ Please log in with Hugging Face to manage visible submissions."
username = profile.username
leaderboard_name = get_leaderboard_name(context)
try:
metrics_history_df = get_user_submission_history(username, context)
if metrics_history_df.empty:
return "⚠️ No saved submissions found."
metrics_history_df = metrics_history_df.copy()
metrics_history_df[ACTIVATED_COL] = False
if timestamp:
matching_rows = metrics_history_df["Timestamp"] == timestamp
if not matching_rows.any():
return "⚠️ The selected submission could not be found."
first_match_idx = metrics_history_df.index[matching_rows][0]
metrics_history_df.loc[first_match_idx, ACTIVATED_COL] = True
status_message = f"✅ Showing submission from {timestamp} on the {leaderboard_name} leaderboard."
commit_message = f"Activate {leaderboard_name} entry for {username}"
else:
status_message = f"✅ Your submissions are now hidden from the {leaderboard_name} leaderboard."
commit_message = f"Hide {leaderboard_name} entries for {username}"
_commit_user_metrics_history(username, metrics_history_df, commit_message, context)
return status_message
except Exception as e:
return f"❌ An error occurred: {str(e)}"
def evaluate_and_submit(
profile: gr.OAuthProfile | None,
file_path,
website: str = "N/A",
notes: str = "N/A",
context: str = DEFAULT_LEADERBOARD_CONTEXT,
):
"""Calculates per-horizon scores and securely pushes them to the database."""
if profile is None:
return "⚠️ Please log in with Hugging Face to submit your results."
username = profile.username
if not file_path:
return "⚠️ Please provide a predictions file."
website = _normalize_optional_text(website)
notes = _normalize_optional_text(notes)
metadata_error = _validate_submission_metadata(website, notes)
if metadata_error:
return metadata_error
try:
user_df = _load_submission(file_path)
column_error = _validate_submission_columns(user_df)
if column_error:
return column_error
ground_truth_df = get_ground_truth(context)
merged_df, merge_error = _merge_submission_with_ground_truth(user_df, ground_truth_df)
if merge_error:
return merge_error
scores = _compute_scores(merged_df)
metrics_history_df = get_user_submission_history(username, context)
remaining_cooldown = _get_remaining_cooldown_seconds(metrics_history_df)
if remaining_cooldown is not None:
return f"⚠️ Please wait {remaining_cooldown}s before submitting again."
action = "Updated" if not metrics_history_df.empty else "Added"
timestamp = datetime.now(timezone.utc).strftime(_TIMESTAMP_FORMAT)
metadata_path = _build_user_metadata_output_path(username, context)
metadata_path.parent.mkdir(parents=True, exist_ok=True)
build_user_metadata_df(website, notes).to_csv(metadata_path, index=False)
updated_metrics_history_df = _append_submission_to_history(metrics_history_df, scores, timestamp)
metrics_path = write_user_metrics_history_file(
output_dir=Path(get_leaderboard_entry_dir(context)),
username=username,
metrics_history_df=updated_metrics_history_df,
)
api = HfApi()
api.create_commit(
repo_id=DATASET_REPO_ID,
repo_type="dataset",
operations=[
CommitOperationAdd(
path_in_repo=get_user_metadata_repo_path(username, context),
path_or_fileobj=str(metadata_path),
),
CommitOperationAdd(
path_in_repo=get_user_metrics_repo_path(username, context),
path_or_fileobj=str(metrics_path),
),
],
commit_message=f"Update {get_leaderboard_name(context)} entry for {username}",
token=HF_TOKEN,
)
return _build_submission_summary(username, action, scores)
except Exception as e:
return f"❌ An error occurred: {str(e)}"