Forager-Field-Notes / game /datastore.py
HomesteaderLabs's picture
Capture router-rejected uploads into private review queue (no data lost)
7483ac2 verified
"""
game/datastore.py — persistence to the public sightings dataset.
Two responsibilities:
* leaderboard read/write (Phase 1)
* append_sighting() (Phase 2 — the upload flywheel)
Both write to HomesteaderLabs/forager-sightings via huggingface_hub, authenticated
by the HF_TOKEN Space secret. When no token is present (local dev, or the secret
isn't set yet) everything falls back to in-memory so the game still runs — the
leaderboard just won't persist across restarts until the secret is added.
Concurrency note: the leaderboard is a read-modify-write on one jsonl file, so
simultaneous posts can race (last write wins). Fine at demo scale; revisit with a
queue or per-row append if it ever matters.
"""
import datetime
import json
import os
DATASET_REPO = "HomesteaderLabs/forager-sightings" # public: in-domain finds
REVIEW_REPO = "HomesteaderLabs/forager-sightings-review" # private: router-rejected quarantine
LICENSE = "CC-BY-4.0"
_TOKEN = os.environ.get("HF_TOKEN")
# in-memory fallback: contributor -> aggregated score row
_mem_leaderboard: dict[str, dict] = {}
def persistence_enabled() -> bool:
return bool(_TOKEN)
def _now() -> str:
return datetime.datetime.now(datetime.timezone.utc).isoformat(timespec="seconds")
def load_leaderboard() -> list[dict]:
"""Return all aggregated score rows (from the dataset if a token is set, else memory)."""
if _TOKEN:
try:
from huggingface_hub import hf_hub_download
path = hf_hub_download(DATASET_REPO, "leaderboard.jsonl", repo_type="dataset",
token=_TOKEN, force_download=True)
with open(path) as f:
return [json.loads(line) for line in f if line.strip()]
except Exception:
pass
return list(_mem_leaderboard.values())
def post_score(contributor: str, you_correct: int, total: int, machine_correct: int) -> list[dict]:
"""Add this session's tally to the contributor's cumulative row; persist; return all rows."""
rows = {r["contributor"]: r for r in load_leaderboard()}
r = rows.get(contributor, {
"contributor": contributor, "skill_correct": 0, "skill_total": 0,
"machine_correct": 0, "contributions": 0,
})
r["skill_correct"] += int(you_correct)
r["skill_total"] += int(total)
r["machine_correct"] += int(machine_correct)
r["updated"] = _now()
rows[contributor] = r
if _TOKEN:
try:
from huggingface_hub import HfApi
body = "\n".join(json.dumps(x) for x in rows.values())
HfApi(token=_TOKEN).upload_file(
path_or_fileobj=body.encode("utf-8"), path_in_repo="leaderboard.jsonl",
repo_id=DATASET_REPO, repo_type="dataset",
commit_message=f"score: {contributor}",
)
except Exception:
_mem_leaderboard.update(rows)
else:
_mem_leaderboard.update(rows)
return list(rows.values())
def load_contributors() -> list[dict]:
"""Count accepted sightings per contributor from metadata.jsonl (the Contributor board)."""
if not _TOKEN:
return []
try:
from collections import Counter
from huggingface_hub import hf_hub_download
path = hf_hub_download(DATASET_REPO, "metadata.jsonl", repo_type="dataset",
token=_TOKEN, force_download=True)
with open(path) as f:
rows = [json.loads(line) for line in f if line.strip()]
counts = Counter(r.get("contributor", "?") for r in rows)
return [{"contributor": k, "count": v} for k, v in counts.most_common()]
except Exception:
return []
# Near-duplicate detection via perceptual hash (dHash). Catches the same photo
# re-saved/resized/re-compressed, not just byte-identical files — which is the
# realistic abuse/pollution case (re-uploading a popular web image, gaming the
# contributor board, accidental double-submits). PIL + numpy only, no new dep.
DUP_HAMMING_THRESHOLD = 5 # <=5 of 64 bits differ => treat as the same image
def compute_phash(image) -> str:
"""64-bit dHash as a 16-char hex string (row->row horizontal gradient)."""
import numpy as np
from PIL import Image
small = image.convert("L").resize((9, 8), Image.BILINEAR)
a = np.asarray(small, dtype=np.int16)
bits = (a[:, 1:] > a[:, :-1]).flatten() # 8x8 = 64 bits
val = 0
for b in bits:
val = (val << 1) | int(b)
return f"{val:016x}"
def _hamming(a: str, b: str) -> int:
return bin(int(a, 16) ^ int(b, 16)).count("1")
def _store(repo: str, image, base_row: dict, contributor: str, msg: str) -> str:
"""Dedup against `repo`'s metadata.jsonl, then commit image + metadata row.
Returns "stored" | "duplicate" | "disabled". Used by both the public sightings
write and the private review-queue write."""
if not _TOKEN:
return "disabled"
from io import BytesIO
from huggingface_hub import CommitOperationAdd, HfApi, hf_hub_download
existing, rows = "", []
try:
with open(hf_hub_download(repo, "metadata.jsonl", repo_type="dataset",
token=_TOKEN, force_download=True)) as f:
txt = f.read()
existing = txt.rstrip("\n")
rows = [json.loads(line) for line in txt.splitlines() if line.strip()]
except Exception:
pass
ph = compute_phash(image)
for r in rows:
h = r.get("phash")
if h and _hamming(h, ph) <= DUP_HAMMING_THRESHOLD:
return "duplicate"
ts = _now()
fname = f"images/{contributor}_{ts.replace(':', '').replace('-', '')}.jpg"
buf = BytesIO()
image.convert("RGB").save(buf, format="JPEG", quality=90)
row = {"file_name": fname, **base_row, "contributor": contributor,
"consent": True, "license": LICENSE, "timestamp": ts, "phash": ph}
new_meta = (existing + "\n" if existing else "") + json.dumps(row) + "\n"
try:
HfApi(token=_TOKEN).create_commit(
repo_id=repo, repo_type="dataset", commit_message=msg,
operations=[
CommitOperationAdd(path_in_repo=fname, path_or_fileobj=buf.getvalue()),
CommitOperationAdd(path_in_repo="metadata.jsonl",
path_or_fileobj=new_meta.encode("utf-8")),
],
)
except Exception:
# e.g. the Space token lacks write scope on this repo — degrade gracefully
# so the UI shows a friendly message instead of crashing the handler.
return "disabled"
return "stored"
def append_sighting(image, user_label: str, machine: dict, contributor: str) -> str:
"""In-domain find -> public dataset. Returns stored/duplicate/disabled."""
return _store(DATASET_REPO, image, {
"user_label": user_label,
"machine_prediction": machine.get("species", "unknown"),
"machine_confidence": round(float(machine.get("confidence", 0.0)), 4),
"machine_abstained": bool(machine.get("abstained", True)),
"machine_safety": machine.get("safety", "UNKNOWN"),
"routed_domain": machine.get("domain", "unknown"),
}, contributor, f"sighting: {machine.get('species', 'unknown')} by {contributor}")
def append_unrouted(image, user_label: str, router: dict, contributor: str) -> str:
"""Router-rejected (out-of-domain) find -> PRIVATE review queue for later triage.
Captures the model's blind spots (real forageables the router fumbles). Returns
stored/duplicate/disabled."""
return _store(REVIEW_REPO, image, {
"status": "unrouted",
"user_label": user_label,
"router_domain": router.get("domain", "unknown"),
"router_confidence": round(float(router.get("domain_confidence", 0.0)), 4),
"reason": router.get("reason", ""),
}, contributor, f"review: {user_label} by {contributor}")