""" game/datastore.py — persistence to the public sightings dataset. Two responsibilities: * leaderboard read/write (Phase 1) * append_sighting() (Phase 2 — the upload flywheel) Both write to HomesteaderLabs/forager-sightings via huggingface_hub, authenticated by the HF_TOKEN Space secret. When no token is present (local dev, or the secret isn't set yet) everything falls back to in-memory so the game still runs — the leaderboard just won't persist across restarts until the secret is added. Concurrency note: the leaderboard is a read-modify-write on one jsonl file, so simultaneous posts can race (last write wins). Fine at demo scale; revisit with a queue or per-row append if it ever matters. """ import datetime import json import os DATASET_REPO = "HomesteaderLabs/forager-sightings" # public: in-domain finds REVIEW_REPO = "HomesteaderLabs/forager-sightings-review" # private: router-rejected quarantine LICENSE = "CC-BY-4.0" _TOKEN = os.environ.get("HF_TOKEN") # in-memory fallback: contributor -> aggregated score row _mem_leaderboard: dict[str, dict] = {} def persistence_enabled() -> bool: return bool(_TOKEN) def _now() -> str: return datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds") def load_leaderboard() -> list[dict]: """Return all aggregated score rows (from the dataset if a token is set, else memory).""" if _TOKEN: try: from huggingface_hub import hf_hub_download path = hf_hub_download(DATASET_REPO, "leaderboard.jsonl", repo_type="dataset", token=_TOKEN, force_download=True) with open(path) as f: return [json.loads(line) for line in f if line.strip()] except Exception: pass return list(_mem_leaderboard.values()) def post_score(contributor: str, you_correct: int, total: int, machine_correct: int) -> list[dict]: """Add this session's tally to the contributor's cumulative row; persist; return all rows.""" rows = {r["contributor"]: r for r in load_leaderboard()} r = rows.get(contributor, { "contributor": contributor, "skill_correct": 0, "skill_total": 0, "machine_correct": 0, "contributions": 0, }) r["skill_correct"] += int(you_correct) r["skill_total"] += int(total) r["machine_correct"] += int(machine_correct) r["updated"] = _now() rows[contributor] = r if _TOKEN: try: from huggingface_hub import HfApi body = "\n".join(json.dumps(x) for x in rows.values()) HfApi(token=_TOKEN).upload_file( path_or_fileobj=body.encode("utf-8"), path_in_repo="leaderboard.jsonl", repo_id=DATASET_REPO, repo_type="dataset", commit_message=f"score: {contributor}", ) except Exception: _mem_leaderboard.update(rows) else: _mem_leaderboard.update(rows) return list(rows.values()) def load_contributors() -> list[dict]: """Count accepted sightings per contributor from metadata.jsonl (the Contributor board).""" if not _TOKEN: return [] try: from collections import Counter from huggingface_hub import hf_hub_download path = hf_hub_download(DATASET_REPO, "metadata.jsonl", repo_type="dataset", token=_TOKEN, force_download=True) with open(path) as f: rows = [json.loads(line) for line in f if line.strip()] counts = Counter(r.get("contributor", "?") for r in rows) return [{"contributor": k, "count": v} for k, v in counts.most_common()] except Exception: return [] # Near-duplicate detection via perceptual hash (dHash). Catches the same photo # re-saved/resized/re-compressed, not just byte-identical files — which is the # realistic abuse/pollution case (re-uploading a popular web image, gaming the # contributor board, accidental double-submits). PIL + numpy only, no new dep. DUP_HAMMING_THRESHOLD = 5 # <=5 of 64 bits differ => treat as the same image def compute_phash(image) -> str: """64-bit dHash as a 16-char hex string (row->row horizontal gradient).""" import numpy as np from PIL import Image small = image.convert("L").resize((9, 8), Image.BILINEAR) a = np.asarray(small, dtype=np.int16) bits = (a[:, 1:] > a[:, :-1]).flatten() # 8x8 = 64 bits val = 0 for b in bits: val = (val << 1) | int(b) return f"{val:016x}" def _hamming(a: str, b: str) -> int: return bin(int(a, 16) ^ int(b, 16)).count("1") def _store(repo: str, image, base_row: dict, contributor: str, msg: str) -> str: """Dedup against `repo`'s metadata.jsonl, then commit image + metadata row. Returns "stored" | "duplicate" | "disabled". Used by both the public sightings write and the private review-queue write.""" if not _TOKEN: return "disabled" from io import BytesIO from huggingface_hub import CommitOperationAdd, HfApi, hf_hub_download existing, rows = "", [] try: with open(hf_hub_download(repo, "metadata.jsonl", repo_type="dataset", token=_TOKEN, force_download=True)) as f: txt = f.read() existing = txt.rstrip("\n") rows = [json.loads(line) for line in txt.splitlines() if line.strip()] except Exception: pass ph = compute_phash(image) for r in rows: h = r.get("phash") if h and _hamming(h, ph) <= DUP_HAMMING_THRESHOLD: return "duplicate" ts = _now() fname = f"images/{contributor}_{ts.replace(':', '').replace('-', '')}.jpg" buf = BytesIO() image.convert("RGB").save(buf, format="JPEG", quality=90) row = {"file_name": fname, **base_row, "contributor": contributor, "consent": True, "license": LICENSE, "timestamp": ts, "phash": ph} new_meta = (existing + "\n" if existing else "") + json.dumps(row) + "\n" try: HfApi(token=_TOKEN).create_commit( repo_id=repo, repo_type="dataset", commit_message=msg, operations=[ CommitOperationAdd(path_in_repo=fname, path_or_fileobj=buf.getvalue()), CommitOperationAdd(path_in_repo="metadata.jsonl", path_or_fileobj=new_meta.encode("utf-8")), ], ) except Exception: # e.g. the Space token lacks write scope on this repo — degrade gracefully # so the UI shows a friendly message instead of crashing the handler. return "disabled" return "stored" def append_sighting(image, user_label: str, machine: dict, contributor: str) -> str: """In-domain find -> public dataset. Returns stored/duplicate/disabled.""" return _store(DATASET_REPO, image, { "user_label": user_label, "machine_prediction": machine.get("species", "unknown"), "machine_confidence": round(float(machine.get("confidence", 0.0)), 4), "machine_abstained": bool(machine.get("abstained", True)), "machine_safety": machine.get("safety", "UNKNOWN"), "routed_domain": machine.get("domain", "unknown"), }, contributor, f"sighting: {machine.get('species', 'unknown')} by {contributor}") def append_unrouted(image, user_label: str, router: dict, contributor: str) -> str: """Router-rejected (out-of-domain) find -> PRIVATE review queue for later triage. Captures the model's blind spots (real forageables the router fumbles). Returns stored/duplicate/disabled.""" return _store(REVIEW_REPO, image, { "status": "unrouted", "user_label": user_label, "router_domain": router.get("domain", "unknown"), "router_confidence": round(float(router.get("domain_confidence", 0.0)), 4), "reason": router.get("reason", ""), }, contributor, f"review: {user_label} by {contributor}")