Spaces:
Sleeping
Sleeping
| """Blind A/B judging Space for denoised images. | |
| Reads triplets from a private HF dataset and writes one JSON per judgment | |
| to a separate private results dataset. | |
| Required Space secret: HF_TOKEN (write access to RESULTS_REPO). | |
| """ | |
| from __future__ import annotations | |
| import base64 | |
| import csv | |
| import functools | |
| import hashlib | |
| import io | |
| import json | |
| import os | |
| import random | |
| import re | |
| import shutil | |
| import uuid | |
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| import gradio as gr | |
| from huggingface_hub import HfApi, list_repo_files, snapshot_download | |
| TRIPLETS_REPO = "Stemson-AI/denoise_judging" | |
| RESULTS_REPO = "Stemson-AI/denoise-judgments" | |
| # Methods compared as the blind A/B options. Files at | |
| # `judging_dataset/<tag>/<method>.png` (plus `raw.png` for context). | |
| METHODS = ["n2v", "digital_twin"] | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| if not HF_TOKEN: | |
| print("WARNING: HF_TOKEN not set; reads/writes to private repos will fail.") | |
| api = HfApi(token=HF_TOKEN) | |
| EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") | |
| def load_triplets() -> tuple[Path, list[dict]]: | |
| """Download the triplets dataset, then materialize as real files under the | |
| working directory so Gradio will serve them. The HF cache uses symlinks | |
| into a `blobs/` sibling, which Gradio's symlink-resolving allowlist check | |
| rejects even when the snapshot dir is whitelisted. | |
| We only fetch `lowdose_export/manifest.csv` and the crop128 PNGs (skipping | |
| the larger `full/` variants and the legacy `judging_dataset/` folder).""" | |
| snapshot = Path( | |
| snapshot_download( | |
| repo_id=TRIPLETS_REPO, | |
| repo_type="dataset", | |
| token=HF_TOKEN, | |
| allow_patterns=[ | |
| "lowdose_export/manifest.csv", | |
| "lowdose_export/*/crop128/*.png", | |
| ], | |
| ) | |
| ) | |
| local_root = Path(__file__).resolve().parent / "triplets_local" | |
| if local_root.exists(): | |
| shutil.rmtree(local_root) | |
| shutil.copytree(snapshot, local_root) | |
| base = local_root / "lowdose_export" | |
| rows: list[dict] = [] | |
| with open(base / "manifest.csv", newline="") as f: | |
| for r in csv.DictReader(f): | |
| tag = r["tag"] | |
| row = { | |
| "triplet_id": tag, | |
| "raw": f"lowdose_export/{tag}/crop128/raw.png", | |
| "manifest": r, | |
| } | |
| for m in METHODS: | |
| row[m] = f"lowdose_export/{tag}/crop128/{m}.png" | |
| if all((local_root / row[k]).exists() for k in ("raw", *METHODS)): | |
| rows.append(row) | |
| else: | |
| print(f"skipping {tag}: missing image files") | |
| return local_root, rows | |
| def email_slug(email: str) -> str: | |
| return re.sub(r"[^a-z0-9]+", "_", email.strip().lower()).strip("_") | |
| def already_judged(email: str) -> set[str]: | |
| """Return triplet_ids the user has already judged, by inspecting filenames.""" | |
| slug = email_slug(email) | |
| try: | |
| files = list_repo_files(RESULTS_REPO, repo_type="dataset", token=HF_TOKEN) | |
| except Exception as exc: | |
| print(f"list_repo_files failed: {exc!r}") | |
| return set() | |
| done: set[str] = set() | |
| prefix = f"judgments/{slug}__" | |
| for f in files: | |
| if not f.startswith(prefix) or not f.endswith(".json"): | |
| continue | |
| # judgments/<slug>__<triplet_id>__<ts>.json | |
| stem = f[len(prefix) : -len(".json")] | |
| parts = stem.split("__") | |
| if len(parts) >= 2: | |
| done.add("__".join(parts[:-1])) | |
| return done | |
| TRIPLETS_ROOT, TRIPLETS = load_triplets() | |
| TRIPLET_BY_ID = {r["triplet_id"]: r for r in TRIPLETS} | |
| print(f"loaded {len(TRIPLETS)} triplets from {TRIPLETS_ROOT}") | |
| # ---------- image rendering -------------------------------------------------- | |
| def _img_data_url(path: str) -> str: | |
| return "data:image/png;base64," + base64.b64encode(Path(path).read_bytes()).decode() | |
| def _zoom_html(label: str, path: str | None) -> str: | |
| if path is None: | |
| return ( | |
| f'<div class="zoom-wrap">' | |
| f'<div class="zoom-label">{label}</div>' | |
| f'<div class="zoom-frame"></div></div>' | |
| ) | |
| src = _img_data_url(path) | |
| return ( | |
| f'<div class="zoom-wrap">' | |
| f'<div class="zoom-label">{label}</div>' | |
| f'<div class="zoom-frame" data-zoomable>' | |
| f'<img src="{src}" alt="{label}" />' | |
| f'</div></div>' | |
| ) | |
| # ---------- session helpers --------------------------------------------------- | |
| def _empty_session() -> dict: | |
| return { | |
| "name": "", | |
| "email": "", | |
| "session_id": "", | |
| "queue": [], # list of triplet_ids remaining | |
| "idx": 0, # pointer into queue | |
| "left_method": "", # which method is shown on the left this turn | |
| "right_method": "", # which method is shown on the right this turn | |
| "n_done_now": 0, # judgments made this session | |
| "n_total": 0, # queue length at session start | |
| "n_already": 0, # triplet count user had already judged before login | |
| } | |
| def _paths_for_current(session: dict) -> tuple[str, str, str] | None: | |
| if session["idx"] >= len(session["queue"]): | |
| return None | |
| tid = session["queue"][session["idx"]] | |
| rec = TRIPLET_BY_ID[tid] | |
| raw = str(TRIPLETS_ROOT / rec["raw"]) | |
| left = str(TRIPLETS_ROOT / rec[session["left_method"]]) | |
| right = str(TRIPLETS_ROOT / rec[session["right_method"]]) | |
| return raw, left, right | |
| def _assign_sides(session: dict) -> None: | |
| methods = list(METHODS) | |
| random.shuffle(methods) | |
| session["left_method"], session["right_method"] = methods | |
| def _progress(session: dict) -> str: | |
| total = session["n_total"] | |
| done = session["n_done_now"] | |
| if total == 0: | |
| return "All triplets are already judged for this email — thank you!" | |
| return f"Triplet {min(done + 1, total)} / {total} this session • {session['n_already']} already done before" | |
| # ---------- handlers ---------------------------------------------------------- | |
| def start(name: str, email: str): | |
| name = (name or "").strip() | |
| email = (email or "").strip().lower() | |
| if not name: | |
| return ( | |
| gr.update(), # login_group | |
| gr.update(), # judging_group | |
| gr.update(value="Please enter your name.", visible=True), # error_md | |
| gr.update(), gr.update(), gr.update(), # raw, left, right | |
| gr.update(), # progress | |
| _empty_session(), | |
| gr.update(), gr.update(), # buttons A/B interactivity | |
| gr.update(), # done_md | |
| ) | |
| if not EMAIL_RE.match(email): | |
| return ( | |
| gr.update(), | |
| gr.update(), | |
| gr.update(value="Please enter a valid email.", visible=True), | |
| gr.update(), gr.update(), gr.update(), | |
| gr.update(), | |
| _empty_session(), | |
| gr.update(), gr.update(), | |
| gr.update(), | |
| ) | |
| done = already_judged(email) | |
| remaining = [r["triplet_id"] for r in TRIPLETS if r["triplet_id"] not in done] | |
| rng = random.Random(f"{email}|{uuid.uuid4()}") | |
| rng.shuffle(remaining) | |
| session = _empty_session() | |
| session.update( | |
| name=name, | |
| email=email, | |
| session_id=str(uuid.uuid4()), | |
| queue=remaining, | |
| idx=0, | |
| n_total=len(remaining), | |
| n_already=len(done), | |
| ) | |
| if not remaining: | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| gr.update(visible=False), | |
| gr.update(value=_zoom_html("Raw", None)), | |
| gr.update(value=_zoom_html("Option A", None)), | |
| gr.update(value=_zoom_html("Option B", None)), | |
| gr.update(value=_progress(session)), | |
| session, | |
| gr.update(interactive=False), gr.update(interactive=False), | |
| gr.update(value="Nothing left to judge for this email. Thank you!", visible=True), | |
| ) | |
| _assign_sides(session) | |
| raw, left, right = _paths_for_current(session) | |
| return ( | |
| gr.update(visible=False), | |
| gr.update(visible=True), | |
| gr.update(visible=False), | |
| gr.update(value=_zoom_html("Raw", raw)), | |
| gr.update(value=_zoom_html("Option A", left)), | |
| gr.update(value=_zoom_html("Option B", right)), | |
| gr.update(value=_progress(session)), | |
| session, | |
| gr.update(interactive=True), gr.update(interactive=True), | |
| gr.update(visible=False), | |
| ) | |
| def _file_sha256(path: str) -> str: | |
| """SHA-256 of an on-disk image. Cached because the same triplet's files | |
| get hashed once per judgment for the same boot of the Space.""" | |
| return hashlib.sha256(Path(path).read_bytes()).hexdigest() | |
| def _write_judgment(session: dict, chosen_side: str) -> None: | |
| tid = session["queue"][session["idx"]] | |
| rec = TRIPLET_BY_ID[tid] | |
| chosen_method = ( | |
| session["left_method"] if chosen_side == "A" else session["right_method"] | |
| ) | |
| ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") | |
| # SHA of the actual files shown — independently verifiable against the | |
| # triplets dataset. Pinned to the on-disk file for the method recorded | |
| # on each side, so any future renaming/regeneration is detectable. | |
| raw_sha = _file_sha256(str(TRIPLETS_ROOT / rec["raw"])) | |
| left_sha = _file_sha256(str(TRIPLETS_ROOT / rec[session["left_method"]])) | |
| right_sha = _file_sha256(str(TRIPLETS_ROOT / rec[session["right_method"]])) | |
| payload = { | |
| "ts": datetime.now(timezone.utc).isoformat(), | |
| "user_name": session["name"], | |
| "user_email": session["email"], | |
| "triplet_id": tid, | |
| "left_method": session["left_method"], | |
| "right_method": session["right_method"], | |
| "chosen_side": chosen_side, | |
| "chosen_method": chosen_method, | |
| "raw_sha256": raw_sha, | |
| "left_sha256": left_sha, | |
| "right_sha256": right_sha, | |
| "triplets_repo": TRIPLETS_REPO, | |
| "session_id": session["session_id"], | |
| } | |
| path = f"judgments/{email_slug(session['email'])}__{tid}__{ts}.json" | |
| api.upload_file( | |
| path_or_fileobj=io.BytesIO(json.dumps(payload, indent=2).encode()), | |
| path_in_repo=path, | |
| repo_id=RESULTS_REPO, | |
| repo_type="dataset", | |
| commit_message=f"judgment {tid} by {session['email']}", | |
| ) | |
| def choose(side: str, session: dict): | |
| if not session.get("queue") or session["idx"] >= len(session["queue"]): | |
| return ( | |
| gr.update(), gr.update(), gr.update(), | |
| gr.update(), | |
| session, | |
| gr.update(interactive=False), gr.update(interactive=False), | |
| gr.update(value="No more triplets.", visible=True), | |
| ) | |
| try: | |
| _write_judgment(session, side) | |
| except Exception as exc: | |
| return ( | |
| gr.update(), gr.update(), gr.update(), | |
| gr.update(value=_progress(session)), | |
| session, | |
| gr.update(interactive=True), gr.update(interactive=True), | |
| gr.update(value=f"Could not save judgment: {exc!r}", visible=True), | |
| ) | |
| session["idx"] += 1 | |
| session["n_done_now"] += 1 | |
| if session["idx"] >= len(session["queue"]): | |
| return ( | |
| gr.update(value=_zoom_html("Raw", None)), | |
| gr.update(value=_zoom_html("Option A", None)), | |
| gr.update(value=_zoom_html("Option B", None)), | |
| gr.update(value=f"All {session['n_total']} triplets done — thank you!"), | |
| session, | |
| gr.update(interactive=False), gr.update(interactive=False), | |
| gr.update( | |
| value=f"All done! You judged {session['n_done_now']} triplets this session.", | |
| visible=True, | |
| ), | |
| ) | |
| _assign_sides(session) | |
| raw, left, right = _paths_for_current(session) | |
| return ( | |
| gr.update(value=_zoom_html("Raw", raw)), | |
| gr.update(value=_zoom_html("Option A", left)), | |
| gr.update(value=_zoom_html("Option B", right)), | |
| gr.update(value=_progress(session)), | |
| session, | |
| gr.update(interactive=True), gr.update(interactive=True), | |
| gr.update(visible=False), | |
| ) | |
| # ---------- UI ---------------------------------------------------------------- | |
| # Custom HTML zoom viewer: Shift+wheel zoom toward cursor (up to 10x), | |
| # drag-pan when zoomed, double-click reset. Pan is clamped so the image | |
| # always covers the frame. We own the markup so we can rely on | |
| # transform-origin: 0 0 without fighting Gradio's image-component CSS. | |
| ZOOM_HEAD = """ | |
| <style> | |
| .zoom-wrap { display: flex; flex-direction: column; align-items: stretch; } | |
| .zoom-label { | |
| font-size: 0.92em; font-weight: 600; opacity: 0.8; | |
| margin: 4px 0 6px 4px; | |
| } | |
| .zoom-frame { | |
| width: 100%; max-width: 520px; | |
| aspect-ratio: 1; | |
| margin: 0 auto; | |
| overflow: hidden; | |
| position: relative; | |
| background: #111; | |
| border-radius: 6px; | |
| user-select: none; | |
| touch-action: none; | |
| } | |
| .zoom-frame img { | |
| width: 100%; height: 100%; | |
| display: block; | |
| transform-origin: 0 0; | |
| transition: transform 0.05s ease-out; | |
| pointer-events: none; | |
| image-rendering: pixelated; | |
| } | |
| </style> | |
| <script> | |
| (function () { | |
| const MAX_SCALE = 10; | |
| function bind(frame) { | |
| if (frame.__zoomBound) return; | |
| const img = frame.querySelector('img'); | |
| if (!img) return; | |
| frame.__zoomBound = true; | |
| let scale = 1, tx = 0, ty = 0; | |
| let dragging = false, lastX = 0, lastY = 0; | |
| function clamp() { | |
| const w = frame.clientWidth, h = frame.clientHeight; | |
| const minX = w - w * scale, minY = h - h * scale; | |
| tx = Math.min(0, Math.max(minX, tx)); | |
| ty = Math.min(0, Math.max(minY, ty)); | |
| } | |
| function apply() { | |
| clamp(); | |
| img.style.transform = `translate(${tx}px, ${ty}px) scale(${scale})`; | |
| } | |
| function reset() { scale = 1; tx = 0; ty = 0; img.style.transform = ''; } | |
| frame.addEventListener('wheel', (e) => { | |
| if (!e.shiftKey) return; | |
| e.preventDefault(); | |
| const factor = e.deltaY < 0 ? 1.15 : 1 / 1.15; | |
| const ns = Math.min(MAX_SCALE, Math.max(1, scale * factor)); | |
| if (ns === scale) return; | |
| const rect = frame.getBoundingClientRect(); | |
| const ox = e.clientX - rect.left; | |
| const oy = e.clientY - rect.top; | |
| tx = ox - (ox - tx) * (ns / scale); | |
| ty = oy - (oy - ty) * (ns / scale); | |
| scale = ns; | |
| if (scale <= 1.001) reset(); else apply(); | |
| }, { passive: false }); | |
| frame.addEventListener('pointerdown', (e) => { | |
| if (scale <= 1) return; | |
| dragging = true; | |
| lastX = e.clientX; lastY = e.clientY; | |
| try { frame.setPointerCapture(e.pointerId); } catch (_) {} | |
| frame.style.cursor = 'grabbing'; | |
| e.preventDefault(); | |
| }); | |
| frame.addEventListener('pointermove', (e) => { | |
| if (!dragging) return; | |
| tx += e.clientX - lastX; ty += e.clientY - lastY; | |
| lastX = e.clientX; lastY = e.clientY; | |
| apply(); | |
| }); | |
| function endDrag(e) { | |
| if (!dragging) return; | |
| dragging = false; | |
| try { frame.releasePointerCapture(e.pointerId); } catch (_) {} | |
| frame.style.cursor = ''; | |
| } | |
| frame.addEventListener('pointerup', endDrag); | |
| frame.addEventListener('pointercancel', endDrag); | |
| frame.addEventListener('dblclick', reset); | |
| } | |
| function scan() { | |
| document.querySelectorAll('.zoom-frame[data-zoomable]').forEach(bind); | |
| } | |
| new MutationObserver(scan).observe(document.body, { childList: true, subtree: true }); | |
| if (document.readyState === 'loading') { | |
| document.addEventListener('DOMContentLoaded', scan); | |
| } else { | |
| scan(); | |
| } | |
| })(); | |
| </script> | |
| """ | |
| with gr.Blocks(title="Denoising A/B Judging", theme=gr.themes.Soft(), head=ZOOM_HEAD) as demo: | |
| session_state = gr.State(_empty_session()) | |
| gr.Markdown("# Denoising A/B Judging") | |
| gr.Markdown( | |
| "On each screen you'll see a **raw** noisy image at the top and two " | |
| "denoised versions of it below, labelled **A** and **B**. The two " | |
| "versions come from different denoising approaches, presented in a " | |
| "random left/right order so the comparison stays blind. " | |
| "Hold **Shift** and scroll over an image to zoom (up to 10×), drag to pan, double-click to reset.\n\n" | |
| "**Which denoised image would you rather work with?**" | |
| ) | |
| with gr.Group(visible=True) as login_group: | |
| gr.Markdown("### Sign in to start") | |
| name_in = gr.Textbox(label="Name", placeholder="Your name") | |
| email_in = gr.Textbox(label="Email", placeholder="you@example.com") | |
| start_btn = gr.Button("Start judging", variant="primary") | |
| login_error = gr.Markdown(visible=False) | |
| with gr.Group(visible=False) as judging_group: | |
| progress_md = gr.Markdown("") | |
| with gr.Row(): | |
| raw_html = gr.HTML(_zoom_html("Raw", None)) | |
| with gr.Row(): | |
| with gr.Column(): | |
| left_html = gr.HTML(_zoom_html("Option A", None)) | |
| a_btn = gr.Button("A is better", variant="primary") | |
| with gr.Column(): | |
| right_html = gr.HTML(_zoom_html("Option B", None)) | |
| b_btn = gr.Button("B is better", variant="primary") | |
| done_md = gr.Markdown(visible=False) | |
| start_outputs = [ | |
| login_group, judging_group, login_error, | |
| raw_html, left_html, right_html, | |
| progress_md, session_state, | |
| a_btn, b_btn, done_md, | |
| ] | |
| start_btn.click(start, inputs=[name_in, email_in], outputs=start_outputs) | |
| choose_outputs = [ | |
| raw_html, left_html, right_html, | |
| progress_md, session_state, | |
| a_btn, b_btn, done_md, | |
| ] | |
| a_btn.click(lambda s: choose("A", s), inputs=[session_state], outputs=choose_outputs) | |
| b_btn.click(lambda s: choose("B", s), inputs=[session_state], outputs=choose_outputs) | |
| if __name__ == "__main__": | |
| demo.launch() | |