from __future__ import annotations import json import os import threading from typing import Any, Literal from datasets import Dataset, load_dataset TeamName = Literal["blue", "red"] DATA_DIR_ENV = "HACKATHON_DATA_DIR" BLUE_PATH_ENV = "HACKATHON_BLUE_DATA_PATH" RED_PATH_ENV = "HACKATHON_RED_DATA_PATH" HF_DATASET_ENV = "HACKATHON_HF_DATASET" HF_BLUE_DATASET_ENV = "HACKATHON_HF_BLUE_DATASET" HF_RED_DATASET_ENV = "HACKATHON_HF_RED_DATASET" HF_TOKEN_ENV = "HF_TOKEN_SUBMISSIONS" HF_TOKEN_FALLBACK_ENV = "HF_TOKEN" DEFAULT_DATA_DIR = os.environ.get(DATA_DIR_ENV, os.path.join(os.getcwd(), "hackathon-data")) DEFAULT_BLUE_PATH = os.environ.get(BLUE_PATH_ENV, os.path.join(DEFAULT_DATA_DIR, "blue_submissions.json")) DEFAULT_RED_PATH = os.environ.get(RED_PATH_ENV, os.path.join(DEFAULT_DATA_DIR, "red_submissions.json")) _TEAM_PATHS = { "blue": DEFAULT_BLUE_PATH, "red": DEFAULT_RED_PATH, } _LOCK = threading.Lock() def _resolve_hf_dataset(team: TeamName) -> str | None: dataset_name = ( os.environ.get(HF_BLUE_DATASET_ENV, "").strip() if team == "blue" else os.environ.get(HF_RED_DATASET_ENV, "").strip() ) if dataset_name: return dataset_name legacy_dataset = os.environ.get(HF_DATASET_ENV, "").strip() if legacy_dataset: raise ValueError( "Set HACKATHON_HF_BLUE_DATASET and HACKATHON_HF_RED_DATASET for separate datasets. " f"HACKATHON_HF_DATASET is no longer supported: {legacy_dataset}" ) return None def _require_hf_token() -> str: token = os.environ.get(HF_TOKEN_ENV) or os.environ.get(HF_TOKEN_FALLBACK_ENV) if not token: raise ValueError( "HF_TOKEN_SUBMISSIONS (or HF_TOKEN) is required to access the private submissions dataset." ) return token def _load_hf_submissions(dataset_name: str, token: str) -> list[dict[str, Any]]: try: dataset = load_dataset(dataset_name, split="train", token=token) except Exception as exc: message = str(exc).lower() if any(code in message for code in ("401", "403", "permission", "unauthorized", "forbidden")): raise ValueError( f"HF_TOKEN_SUBMISSIONS does not have access to the private dataset: {dataset_name}" ) from exc if any( text in message for text in ( "not found", "404", "doesn't exist", "no such dataset", "split", "doesn't contain any data", "an error occurred while generating the dataset", ) ): return [] raise return dataset.to_list() def _save_hf_submissions(dataset_name: str, token: str, submissions: list[dict]) -> None: dataset = Dataset.from_list(submissions) dataset.push_to_hub(dataset_name, token=token, private=True) def _resolve_data_path(team: TeamName, data_path: str | None) -> str: if data_path: return data_path if team not in _TEAM_PATHS: raise ValueError(f"Unknown team: {team}") return _TEAM_PATHS[team] def load_submissions(team: TeamName, data_path: str | None = None) -> list[dict[str, Any]]: dataset_name = _resolve_hf_dataset(team) if dataset_name: token = _require_hf_token() return _load_hf_submissions(dataset_name, token) resolved_path = _resolve_data_path(team, data_path) if not os.path.exists(resolved_path): return [] with _LOCK: with open(resolved_path, "r") as f: data = json.load(f) if not isinstance(data, list): raise ValueError(f"Expected a list in {resolved_path}.") return data def save_submissions( team: TeamName, submissions: list[dict[str, Any]], data_path: str | None = None, ) -> None: dataset_name = _resolve_hf_dataset(team) if dataset_name: token = _require_hf_token() _save_hf_submissions(dataset_name, token, submissions) return resolved_path = _resolve_data_path(team, data_path) data_dir = os.path.dirname(resolved_path) if data_dir: os.makedirs(data_dir, exist_ok=True) tmp_path = f"{resolved_path}.tmp" with _LOCK: with open(tmp_path, "w") as f: json.dump(submissions, f, indent=2) os.replace(tmp_path, resolved_path) def append_submission( team: TeamName, submission: dict[str, Any], data_path: str | None = None, ) -> list[dict[str, Any]]: dataset_name = _resolve_hf_dataset(team) if dataset_name: token = _require_hf_token() with _LOCK: submissions = _load_hf_submissions(dataset_name, token) submissions.append(submission) _save_hf_submissions(dataset_name, token, submissions) return submissions submissions = load_submissions(team, data_path) submissions.append(submission) save_submissions(team, submissions, data_path) return submissions