sonic-caucus / storage.py
fdaudens's picture
Persist Sonic Caucus votes to private dataset
c7c2e28 verified
from __future__ import annotations
import csv
import json
import os
from datetime import datetime, timezone
from pathlib import Path
from uuid import uuid4
from huggingface_hub import HfApi, hf_hub_download
VOTE_FIELDS = [
"timestamp",
"vote_id",
"voter_id",
"sample_id",
"left_clip_id",
"right_clip_id",
"left_model_id",
"right_model_id",
"winner",
"winner_model_id",
"notes",
]
def new_vote_id(now: datetime | None = None) -> str:
now = now or datetime.now(timezone.utc)
return f"{now.strftime('%Y%m%dT%H%M%SZ')}_{uuid4().hex[:12]}"
def remote_vote_path(row: dict) -> str:
return f"votes/{row['sample_id']}/{row['vote_id']}.json"
def remote_config() -> tuple[str, str | None]:
repo_id = os.getenv("ARENA_VOTES_REPO", "fdaudens/sonic-caucus-votes").strip()
token = os.getenv("ARENA_HF_TOKEN") or os.getenv("HF_TOKEN")
return repo_id, token
def append_local_vote(votes_path: Path, row: dict) -> None:
exists = votes_path.exists()
with votes_path.open("a", newline="") as handle:
writer = csv.DictWriter(handle, fieldnames=VOTE_FIELDS)
if not exists:
writer.writeheader()
writer.writerow(row)
def append_remote_vote(row: dict) -> str | None:
repo_id, token = remote_config()
if not repo_id:
return "repo not configured"
if not token:
return "token not configured"
payload = json.dumps(row, indent=2, sort_keys=True).encode("utf-8")
try:
HfApi().upload_file(
path_or_fileobj=payload,
path_in_repo=remote_vote_path(row),
repo_id=repo_id,
repo_type="dataset",
token=token,
commit_message=f"Add Sonic Caucus vote {row['vote_id']}",
)
except Exception as exc:
return type(exc).__name__
return None
def append_vote(votes_path: Path, row: dict) -> str | None:
append_local_vote(votes_path, row)
return append_remote_vote(row)
def read_local_votes(votes_path: Path) -> list[dict]:
if not votes_path.exists():
return []
with votes_path.open() as handle:
return list(csv.DictReader(handle))
def read_remote_votes() -> tuple[list[dict], str | None]:
repo_id, token = remote_config()
if not repo_id:
return [], "repo not configured"
if not token:
return [], "token not configured"
api = HfApi()
try:
files = api.list_repo_files(repo_id=repo_id, repo_type="dataset", token=token)
vote_files = [name for name in files if name.startswith("votes/") and name.endswith(".json")]
rows = []
for filename in vote_files:
cached = hf_hub_download(repo_id=repo_id, filename=filename, repo_type="dataset", token=token)
rows.append(json.loads(Path(cached).read_text()))
except Exception as exc:
return [], type(exc).__name__
return rows, None
def load_vote_rows(votes_path: Path) -> tuple[list[dict], str]:
remote_rows, remote_error = read_remote_votes()
if remote_rows:
return remote_rows, "persistent Hugging Face dataset"
local_rows = read_local_votes(votes_path)
if local_rows:
if remote_error:
return local_rows, f"local CSV; remote unavailable ({remote_error})"
return local_rows, "local CSV"
if remote_error:
return [], f"no votes; remote unavailable ({remote_error})"
return [], "no votes"