siddsuresh97's picture
Handle empty HF datasets that fail during generation
9255a5f
from __future__ import annotations
import json
import os
import threading
from typing import Any, Literal
from datasets import Dataset, load_dataset
TeamName = Literal["blue", "red"]
DATA_DIR_ENV = "HACKATHON_DATA_DIR"
BLUE_PATH_ENV = "HACKATHON_BLUE_DATA_PATH"
RED_PATH_ENV = "HACKATHON_RED_DATA_PATH"
HF_DATASET_ENV = "HACKATHON_HF_DATASET"
HF_BLUE_DATASET_ENV = "HACKATHON_HF_BLUE_DATASET"
HF_RED_DATASET_ENV = "HACKATHON_HF_RED_DATASET"
HF_TOKEN_ENV = "HF_TOKEN_SUBMISSIONS"
HF_TOKEN_FALLBACK_ENV = "HF_TOKEN"
DEFAULT_DATA_DIR = os.environ.get(DATA_DIR_ENV, os.path.join(os.getcwd(), "hackathon-data"))
DEFAULT_BLUE_PATH = os.environ.get(BLUE_PATH_ENV, os.path.join(DEFAULT_DATA_DIR, "blue_submissions.json"))
DEFAULT_RED_PATH = os.environ.get(RED_PATH_ENV, os.path.join(DEFAULT_DATA_DIR, "red_submissions.json"))
_TEAM_PATHS = {
"blue": DEFAULT_BLUE_PATH,
"red": DEFAULT_RED_PATH,
}
_LOCK = threading.Lock()
def _resolve_hf_dataset(team: TeamName) -> str | None:
dataset_name = (
os.environ.get(HF_BLUE_DATASET_ENV, "").strip()
if team == "blue"
else os.environ.get(HF_RED_DATASET_ENV, "").strip()
)
if dataset_name:
return dataset_name
legacy_dataset = os.environ.get(HF_DATASET_ENV, "").strip()
if legacy_dataset:
raise ValueError(
"Set HACKATHON_HF_BLUE_DATASET and HACKATHON_HF_RED_DATASET for separate datasets. "
f"HACKATHON_HF_DATASET is no longer supported: {legacy_dataset}"
)
return None
def _require_hf_token() -> str:
token = os.environ.get(HF_TOKEN_ENV) or os.environ.get(HF_TOKEN_FALLBACK_ENV)
if not token:
raise ValueError(
"HF_TOKEN_SUBMISSIONS (or HF_TOKEN) is required to access the private submissions dataset."
)
return token
def _load_hf_submissions(dataset_name: str, token: str) -> list[dict[str, Any]]:
try:
dataset = load_dataset(dataset_name, split="train", token=token)
except Exception as exc:
message = str(exc).lower()
if any(code in message for code in ("401", "403", "permission", "unauthorized", "forbidden")):
raise ValueError(
f"HF_TOKEN_SUBMISSIONS does not have access to the private dataset: {dataset_name}"
) from exc
if any(
text in message
for text in (
"not found", "404", "doesn't exist", "no such dataset", "split",
"doesn't contain any data", "an error occurred while generating the dataset",
)
):
return []
raise
return dataset.to_list()
def _save_hf_submissions(dataset_name: str, token: str, submissions: list[dict]) -> None:
dataset = Dataset.from_list(submissions)
dataset.push_to_hub(dataset_name, token=token, private=True)
def _resolve_data_path(team: TeamName, data_path: str | None) -> str:
if data_path:
return data_path
if team not in _TEAM_PATHS:
raise ValueError(f"Unknown team: {team}")
return _TEAM_PATHS[team]
def load_submissions(team: TeamName, data_path: str | None = None) -> list[dict[str, Any]]:
dataset_name = _resolve_hf_dataset(team)
if dataset_name:
token = _require_hf_token()
return _load_hf_submissions(dataset_name, token)
resolved_path = _resolve_data_path(team, data_path)
if not os.path.exists(resolved_path):
return []
with _LOCK:
with open(resolved_path, "r") as f:
data = json.load(f)
if not isinstance(data, list):
raise ValueError(f"Expected a list in {resolved_path}.")
return data
def save_submissions(
team: TeamName,
submissions: list[dict[str, Any]],
data_path: str | None = None,
) -> None:
dataset_name = _resolve_hf_dataset(team)
if dataset_name:
token = _require_hf_token()
_save_hf_submissions(dataset_name, token, submissions)
return
resolved_path = _resolve_data_path(team, data_path)
data_dir = os.path.dirname(resolved_path)
if data_dir:
os.makedirs(data_dir, exist_ok=True)
tmp_path = f"{resolved_path}.tmp"
with _LOCK:
with open(tmp_path, "w") as f:
json.dump(submissions, f, indent=2)
os.replace(tmp_path, resolved_path)
def append_submission(
team: TeamName,
submission: dict[str, Any],
data_path: str | None = None,
) -> list[dict[str, Any]]:
dataset_name = _resolve_hf_dataset(team)
if dataset_name:
token = _require_hf_token()
with _LOCK:
submissions = _load_hf_submissions(dataset_name, token)
submissions.append(submission)
_save_hf_submissions(dataset_name, token, submissions)
return submissions
submissions = load_submissions(team, data_path)
submissions.append(submission)
save_submissions(team, submissions, data_path)
return submissions