| """ |
| game/datastore.py — persistence to the public sightings dataset. |
| |
| Two responsibilities: |
| * leaderboard read/write (Phase 1) |
| * append_sighting() (Phase 2 — the upload flywheel) |
| |
| Both write to HomesteaderLabs/forager-sightings via huggingface_hub, authenticated |
| by the HF_TOKEN Space secret. When no token is present (local dev, or the secret |
| isn't set yet) everything falls back to in-memory so the game still runs — the |
| leaderboard just won't persist across restarts until the secret is added. |
| |
| Concurrency note: the leaderboard is a read-modify-write on one jsonl file, so |
| simultaneous posts can race (last write wins). Fine at demo scale; revisit with a |
| queue or per-row append if it ever matters. |
| """ |
|
|
| import datetime |
| import json |
| import os |
|
|
| DATASET_REPO = "HomesteaderLabs/forager-sightings" |
| REVIEW_REPO = "HomesteaderLabs/forager-sightings-review" |
| LICENSE = "CC-BY-4.0" |
|
|
| _TOKEN = os.environ.get("HF_TOKEN") |
|
|
| |
| _mem_leaderboard: dict[str, dict] = {} |
|
|
|
|
| def persistence_enabled() -> bool: |
| return bool(_TOKEN) |
|
|
|
|
| def _now() -> str: |
| return datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds") |
|
|
|
|
| def load_leaderboard() -> list[dict]: |
| """Return all aggregated score rows (from the dataset if a token is set, else memory).""" |
| if _TOKEN: |
| try: |
| from huggingface_hub import hf_hub_download |
| path = hf_hub_download(DATASET_REPO, "leaderboard.jsonl", repo_type="dataset", |
| token=_TOKEN, force_download=True) |
| with open(path) as f: |
| return [json.loads(line) for line in f if line.strip()] |
| except Exception: |
| pass |
| return list(_mem_leaderboard.values()) |
|
|
|
|
| def post_score(contributor: str, you_correct: int, total: int, machine_correct: int) -> list[dict]: |
| """Add this session's tally to the contributor's cumulative row; persist; return all rows.""" |
| rows = {r["contributor"]: r for r in load_leaderboard()} |
| r = rows.get(contributor, { |
| "contributor": contributor, "skill_correct": 0, "skill_total": 0, |
| "machine_correct": 0, "contributions": 0, |
| }) |
| r["skill_correct"] += int(you_correct) |
| r["skill_total"] += int(total) |
| r["machine_correct"] += int(machine_correct) |
| r["updated"] = _now() |
| rows[contributor] = r |
|
|
| if _TOKEN: |
| try: |
| from huggingface_hub import HfApi |
| body = "\n".join(json.dumps(x) for x in rows.values()) |
| HfApi(token=_TOKEN).upload_file( |
| path_or_fileobj=body.encode("utf-8"), path_in_repo="leaderboard.jsonl", |
| repo_id=DATASET_REPO, repo_type="dataset", |
| commit_message=f"score: {contributor}", |
| ) |
| except Exception: |
| _mem_leaderboard.update(rows) |
| else: |
| _mem_leaderboard.update(rows) |
| return list(rows.values()) |
|
|
|
|
| def load_contributors() -> list[dict]: |
| """Count accepted sightings per contributor from metadata.jsonl (the Contributor board).""" |
| if not _TOKEN: |
| return [] |
| try: |
| from collections import Counter |
| from huggingface_hub import hf_hub_download |
| path = hf_hub_download(DATASET_REPO, "metadata.jsonl", repo_type="dataset", |
| token=_TOKEN, force_download=True) |
| with open(path) as f: |
| rows = [json.loads(line) for line in f if line.strip()] |
| counts = Counter(r.get("contributor", "?") for r in rows) |
| return [{"contributor": k, "count": v} for k, v in counts.most_common()] |
| except Exception: |
| return [] |
|
|
|
|
| |
| |
| |
| |
| DUP_HAMMING_THRESHOLD = 5 |
|
|
|
|
| def compute_phash(image) -> str: |
| """64-bit dHash as a 16-char hex string (row->row horizontal gradient).""" |
| import numpy as np |
| from PIL import Image |
| small = image.convert("L").resize((9, 8), Image.BILINEAR) |
| a = np.asarray(small, dtype=np.int16) |
| bits = (a[:, 1:] > a[:, :-1]).flatten() |
| val = 0 |
| for b in bits: |
| val = (val << 1) | int(b) |
| return f"{val:016x}" |
|
|
|
|
| def _hamming(a: str, b: str) -> int: |
| return bin(int(a, 16) ^ int(b, 16)).count("1") |
|
|
|
|
| def _store(repo: str, image, base_row: dict, contributor: str, msg: str) -> str: |
| """Dedup against `repo`'s metadata.jsonl, then commit image + metadata row. |
| Returns "stored" | "duplicate" | "disabled". Used by both the public sightings |
| write and the private review-queue write.""" |
| if not _TOKEN: |
| return "disabled" |
| from io import BytesIO |
| from huggingface_hub import CommitOperationAdd, HfApi, hf_hub_download |
|
|
| existing, rows = "", [] |
| try: |
| with open(hf_hub_download(repo, "metadata.jsonl", repo_type="dataset", |
| token=_TOKEN, force_download=True)) as f: |
| txt = f.read() |
| existing = txt.rstrip("\n") |
| rows = [json.loads(line) for line in txt.splitlines() if line.strip()] |
| except Exception: |
| pass |
|
|
| ph = compute_phash(image) |
| for r in rows: |
| h = r.get("phash") |
| if h and _hamming(h, ph) <= DUP_HAMMING_THRESHOLD: |
| return "duplicate" |
|
|
| ts = _now() |
| fname = f"images/{contributor}_{ts.replace(':', '').replace('-', '')}.jpg" |
| buf = BytesIO() |
| image.convert("RGB").save(buf, format="JPEG", quality=90) |
| row = {"file_name": fname, **base_row, "contributor": contributor, |
| "consent": True, "license": LICENSE, "timestamp": ts, "phash": ph} |
| new_meta = (existing + "\n" if existing else "") + json.dumps(row) + "\n" |
| try: |
| HfApi(token=_TOKEN).create_commit( |
| repo_id=repo, repo_type="dataset", commit_message=msg, |
| operations=[ |
| CommitOperationAdd(path_in_repo=fname, path_or_fileobj=buf.getvalue()), |
| CommitOperationAdd(path_in_repo="metadata.jsonl", |
| path_or_fileobj=new_meta.encode("utf-8")), |
| ], |
| ) |
| except Exception: |
| |
| |
| return "disabled" |
| return "stored" |
|
|
|
|
| def append_sighting(image, user_label: str, machine: dict, contributor: str) -> str: |
| """In-domain find -> public dataset. Returns stored/duplicate/disabled.""" |
| return _store(DATASET_REPO, image, { |
| "user_label": user_label, |
| "machine_prediction": machine.get("species", "unknown"), |
| "machine_confidence": round(float(machine.get("confidence", 0.0)), 4), |
| "machine_abstained": bool(machine.get("abstained", True)), |
| "machine_safety": machine.get("safety", "UNKNOWN"), |
| "routed_domain": machine.get("domain", "unknown"), |
| }, contributor, f"sighting: {machine.get('species', 'unknown')} by {contributor}") |
|
|
|
|
| def append_unrouted(image, user_label: str, router: dict, contributor: str) -> str: |
| """Router-rejected (out-of-domain) find -> PRIVATE review queue for later triage. |
| Captures the model's blind spots (real forageables the router fumbles). Returns |
| stored/duplicate/disabled.""" |
| return _store(REVIEW_REPO, image, { |
| "status": "unrouted", |
| "user_label": user_label, |
| "router_domain": router.get("domain", "unknown"), |
| "router_confidence": round(float(router.get("domain_confidence", 0.0)), 4), |
| "reason": router.get("reason", ""), |
| }, contributor, f"review: {user_label} by {contributor}") |
|
|