Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import threading | |
| import time | |
| from datetime import datetime, timezone, timedelta | |
| import os | |
| import requests | |
| import pandas as pd | |
| from datasets import Dataset, get_dataset_config_names | |
| from datasets.exceptions import DatasetNotFoundError | |
| from pandas.api.types import is_integer_dtype | |
| import gradio as gr | |
| from src.datamodel.data import F1Data | |
| from src.display.formatting import styled_error, styled_message | |
| from src.display.utils import ModelType | |
| from src.envs import SUBMISSIONS_REPO, TOKEN | |
| from src.logger import get_logger | |
| from src.validation.validate import is_submission_file_valid, is_valid | |
| logger = get_logger(__name__) | |
| MIN_WAIT_TIME_PER_USER_HRS = 24 | |
| RATE_LIMIT_WINDOW_HRS = 24 | |
| MAX_SUBMISSIONS_PER_WINDOW = 10 | |
| submission_lock = threading.Lock() | |
| def add_new_solutions( | |
| lbdb: F1Data, | |
| username: str, | |
| user_id: str, | |
| system_name: str, | |
| org: str, | |
| submission_path: str, | |
| is_warmup_dataset: bool, | |
| ensure_all_present: bool = False, | |
| ): | |
| with submission_lock: | |
| try: | |
| submitted_ids = get_dataset_config_names(SUBMISSIONS_REPO, token=TOKEN) | |
| except (DatasetNotFoundError, FileNotFoundError): | |
| submitted_ids = [] | |
| if submitted_ids == ["default"]: | |
| # means empty dataset | |
| submitted_ids = [] | |
| logger.info(f"Found {len(submitted_ids)} submissions") | |
| # Rate limits: | |
| # 1. Users must wait MIN_WAIT_TIME_PER_USER_HRS hours between submissions. | |
| # 2. No more than MAX_SUBMISSIONS_PER_WINDOW submissions RATE_LIMIT_WINDOW_HRS hours overall. | |
| sub_df = pd.DataFrame.from_dict( | |
| { | |
| "submission_id": submitted_ids, | |
| "user_id": map(_submission_id_to_user_id, submitted_ids), | |
| "timestamp": map(_submission_id_to_timestamp, submitted_ids), | |
| } | |
| ) | |
| # Per user limit | |
| now = datetime.now(timezone.utc) | |
| cutoff_user = now - timedelta(hours=MIN_WAIT_TIME_PER_USER_HRS) | |
| user_last_submission_ts = sub_df[sub_df.user_id == user_id].timestamp.max() | |
| if pd.notna(user_last_submission_ts) and user_last_submission_ts > cutoff_user: | |
| remaining_hrs = (user_last_submission_ts - cutoff_user).total_seconds() / 3600 | |
| logger.info(f"{username} must wait {remaining_hrs:.2f} more hours.") | |
| return styled_error( | |
| f"You must wait {MIN_WAIT_TIME_PER_USER_HRS} hours between submissions. " | |
| f"Remaining wait time: {remaining_hrs:.2f} hours" | |
| ) | |
| # Overall limit | |
| cutoff_overall = now - timedelta(hours=RATE_LIMIT_WINDOW_HRS) | |
| if len(sub_df.timestamp > cutoff_overall) >= MAX_SUBMISSIONS_PER_WINDOW: | |
| logger.info( | |
| f"Too many submissions in the last {RATE_LIMIT_WINDOW_HRS} hours: {len(sub_df.timestamp > cutoff_overall)}." | |
| ) | |
| return styled_error("The leaderboard has reached its submission capacity for now. Please try again later.") | |
| logger.info( | |
| f"Adding new submission: {system_name=}, {org=}, and {submission_path=}", | |
| ) | |
| # Double-checking. | |
| for val in [system_name, org]: | |
| assert is_valid(val) | |
| assert is_submission_file_valid(submission_path, is_warmup_dataset=is_warmup_dataset) | |
| try: | |
| submission_df = pd.read_json(submission_path, lines=True) | |
| if ensure_all_present: | |
| _validate_all_submissions_present(lbdb=lbdb, pd_ds=submission_df) | |
| except Exception: | |
| logger.warning("Failed to parse submission DF!", exc_info=True) | |
| return styled_error( | |
| "An error occurred. Please try again later." | |
| ) # Use same message as external error. Avoid infoleak. | |
| submission_id = f"{datetime.now(timezone.utc).strftime('%Y%m%d_%H%M%S')}_{username}_{user_id}" | |
| # Seems good, creating the eval. | |
| logger.info(f"Adding new submission: {submission_id}") | |
| submission_ts = time.time_ns() | |
| def add_info(row): | |
| return { | |
| **row, | |
| "system_name": system_name, | |
| "organization": org, | |
| "submission_id": submission_id, | |
| "submission_ts": submission_ts, | |
| "evaluation_id": "", # This will be set later when the evaluation is launched in the backend | |
| "evaluation_start_ts": "", # This will be set when the evaluation starts | |
| } | |
| ds = Dataset.from_pandas(submission_df).map(add_info) | |
| with submission_lock: | |
| ds.push_to_hub( | |
| SUBMISSIONS_REPO, | |
| submission_id, | |
| private=True, | |
| ) | |
| return styled_message( | |
| "Your request has been submitted to the evaluation queue!\n" | |
| + "Results may take up to 24 hours to be processed and shown in the leaderboard." | |
| ) | |
| def fetch_sub_claim(oauth_token: gr.OAuthToken | None) -> dict | None: | |
| if oauth_token is None: | |
| return None | |
| provider = os.getenv("OPENID_PROVIDER_URL") | |
| if not provider: | |
| return None | |
| try: | |
| oidc_meta = requests.get(f"{provider}/.well-known/openid-configuration", timeout=5) | |
| oidc_meta = oidc_meta.json() | |
| userinfo_ep = oidc_meta["userinfo_endpoint"] | |
| claims = requests.get(userinfo_ep, headers={"Authorization": f"Bearer {oauth_token.token}"}, timeout=5) | |
| logger.info(f"userinfo_endpoint response: status={claims.status_code}\nheaders={dict(claims.headers)}") | |
| claims = claims.json() | |
| # Typical fields: sub (stable id), preferred_username, name, picture | |
| return { | |
| "sub": claims.get("sub"), | |
| "preferred_username": claims.get("preferred_username"), | |
| "name": claims.get("name"), | |
| } | |
| except Exception as e: | |
| logger.warning(f"Failed to fetch user claims: {e}") | |
| return None | |
| def fetch_user_info(oauth_token: gr.OAuthToken | None) -> dict | None: | |
| if oauth_token is None: | |
| return None | |
| try: | |
| headers = {"Authorization": f"Bearer {oauth_token.token}"} | |
| logger.info("HEADERS %s", headers) | |
| r = requests.get("https://huggingface.co/api/whoami-v2", headers=headers) | |
| logger.info("RESP CODE %s", r.status_code) | |
| logger.info("RESP content %s", r.text) | |
| if r.status_code != 200: | |
| return None | |
| return r.json() | |
| except: | |
| logger.exception("Cannot get user info") | |
| return None | |
| def _validate_all_submissions_present( | |
| lbdb: F1Data, | |
| pd_ds: pd.DataFrame, | |
| ): | |
| logger.info(f"Validating DS size {len(pd_ds)} columns {pd_ds.columns} set {set(pd_ds.columns)}") | |
| expected_cols = ["problem_id", "solution"] | |
| if set(pd_ds.columns) != set(expected_cols): | |
| return ValueError(f"Expected attributes: {expected_cols}, Got: {pd_ds.columns.tolist()}") | |
| if not is_integer_dtype(pd_ds["problem_id"]): | |
| return ValueError("problem_id must be str convertible to int") | |
| if any(type(v) is not str for v in pd_ds["solution"]): | |
| return ValueError("solution must be of type str") | |
| submitted_ids = set(pd_ds.problem_id.astype(str)) | |
| if submitted_ids != lbdb.code_problem_ids: | |
| missing = lbdb.code_problem_ids - submitted_ids | |
| unknown = submitted_ids - lbdb.code_problem_ids | |
| raise ValueError(f"Mismatched problem IDs: {len(missing)} missing, {len(unknown)} unknown") | |
| if len(pd_ds) > len(lbdb.code_problem_ids): | |
| return ValueError("Duplicate problem IDs exist in uploaded file") | |
| def _submission_id_to_user_id(submission_id: str) -> str: | |
| """ | |
| Extracts the user ID from the submission ID: "YYYYMMDD_HHMMSS_username_userid" | |
| """ | |
| return submission_id.rsplit("_", 1)[-1] | |
| def _submission_id_to_timestamp(submission_id: str) -> datetime: | |
| """ | |
| Extracts the timestamp from the submission ID: "YYYYMMDD_HHMMSS_username_userid" | |
| """ | |
| ts_str = "_".join(submission_id.split("_", 2)[:2]) | |
| return datetime.strptime(ts_str, "%Y%m%d_%H%M%S").replace(tzinfo=timezone.utc) | |