DrSyedFaizan's picture
clearer synthetic-sample placeholders
29601fc verified
Raw
History Blame Contribute Delete
30.2 kB
# -*- coding: utf-8 -*-
"""
DEFER-RL Radiologist Reader Study - Gradio app (deployable as a Hugging Face Space).
Design goal: judging one case needs ZERO scrolling-to-understand and ZERO guessing.
Every rating control sits directly under the image it refers to, every scale legend is
printed inline, every rating value has a hover tooltip, and each image has its own
display-only zoom / brightness / contrast strip.
Study adaptation
----------------
Each CASE shows, for one patient study:
* LEFT cell of every row = the REFERENCE imaging (always visible for comparison).
* RIGHT cell, ONE ROW PER ITEM = an anonymized deferral-system DECISION panel
(DEFER-RL plus baselines). Order is randomized per (annotator, case); names hidden.
* FINAL ROW = the reference-standard / ground-truth panel.
The reader rates each panel's decision; the backend DERIVES best/worst/rankings.
"""
import os, io, json, base64, random, hashlib, datetime, threading, pathlib
import numpy as np
from PIL import Image, ImageDraw, ImageFont
import gradio as gr
# ----------------------------------------------------------------------------- config
N_PER_PAGE_NOTE = "" # placeholder
SCHEMA_VERSION = "deferrl-reader-1"
DATA_DIR = pathlib.Path(os.environ.get("DATA_DIR", "data")) # case manifest + images
RESP_DIR = pathlib.Path(os.environ.get("RESP_DIR", "responses_local")) # local response store
DATASET_REPO = os.environ.get("DATASET_REPO", "").strip() # private HF dataset id, e.g. "org/deferrl-reader-responses"
HF_TOKEN = os.environ.get("HF_TOKEN", "").strip()
COMMIT_EVERY_MIN= float(os.environ.get("COMMIT_EVERY_MIN", "1")) # sync cadence to the dataset
MAX_ITEMS = int(os.environ.get("MAX_ITEMS", "5")) # max decision panels per case (build budget)
RESP_DIR.mkdir(parents=True, exist_ok=True)
DATA_DIR.mkdir(parents=True, exist_ok=True)
# Per-user credentials. On the Space, set the secret ANNOTATORS to a JSON object
# {"dr_smith": "their-password", ...}. The username they type is their annotator name
# and keys their own response file. Falls back to a demo login for local testing.
try:
ANNOTATORS = json.loads(os.environ.get("ANNOTATORS", "")) or {"demo": "demo"}
except Exception:
ANNOTATORS = {"demo": "demo"}
# Rating dimensions (anchored 1-5 Likert + one inline yes/no judgement).
LIKERT = [
("1", "Clearly inappropriate: a decision a careful radiologist would not make here"),
("2", "Probably inappropriate"),
("3", "Equivocal / borderline"),
("4", "Probably appropriate"),
("5", "Clearly appropriate: the decision a careful radiologist would endorse"),
]
LIKERT_ADEQ = [
("1", "Clearly inadequate: key evidence was ignored or never gathered"),
("2", "Probably inadequate"),
("3", "Equivocal"),
("4", "Probably adequate"),
("5", "Clearly adequate: the evidence gathered was sufficient and on point"),
]
LIKERT_SOUND = [
("1", "Clearly unsound: the committed reading is wrong on a clinically important point"),
("2", "Probably unsound"),
("3", "Equivocal"),
("4", "Probably sound"),
("5", "Clearly sound: the committed reading is clinically correct"),
]
YESNO = [("yes", "Yes"), ("no", "No")]
YPN = [("yes", "Yes"), ("partial", "Partial"), ("no", "No")]
# required dimensions per item; soundness only applies when the panel chose Trust
REQ_ITEM_DIMS_TRUST = ["appropriateness", "evidence", "soundness", "misleading"]
REQ_ITEM_DIMS_DEFER = ["appropriateness", "evidence", "misleading"]
REQ_CASE_DIMS = ["unsafe_to_autoread", "reference_adequate"]
# ----------------------------------------------------------------------------- synthetic sample data
def _font(sz):
try:
return ImageFont.truetype("DejaVuSans.ttf", sz)
except Exception:
return ImageFont.load_default()
def make_placeholder(text, seed, w=360, h=300):
"""A clearly-labelled grayscale CT/MRI-like placeholder so the Space runs and is
obviously recognizable as sample data before real cases are loaded."""
rng = np.random.default_rng(seed)
base = rng.normal(70, 16, (h, w)).clip(0, 255)
yy, xx = np.mgrid[0:h, 0:w]
cx, cy = int(rng.integers(w // 4, 3 * w // 4)), int(rng.integers(h // 3, 3 * h // 4))
r = int(rng.integers(26, 52))
blob = np.exp(-(((xx - cx) ** 2 + (yy - cy) ** 2) / (2 * r * r))) * rng.integers(80, 150)
img = (base + blob).clip(0, 255).astype("uint8")
im = Image.fromarray(img, "L").convert("RGB")
d = ImageDraw.Draw(im)
# lesion-style marker ring so the "image" reads as a scan
d.ellipse([cx - r, cy - r, cx + r, cy + r], outline=(255, 205, 70), width=3)
# high-contrast top label banner (multi-line)
lines = text.split("\n")
bh = 19 * len(lines) + 10
d.rectangle([0, 0, w, bh], fill=(18, 20, 32))
d.text((8, 6), text, fill=(255, 255, 255), font=_font(16))
# unmistakable footer so testers know this is not real data
d.rectangle([0, h - 22, w, h], fill=(60, 20, 20))
d.text((8, h - 20), "SYNTHETIC SAMPLE - replace with real data", fill=(255, 190, 190), font=_font(12))
d.rectangle([0, 0, w - 1, h - 1], outline=(150, 150, 150))
return im
def synth_cases(n=6, k=4):
"""Build a sample manifest + images. Real runs replace data/cases.json + data/images/."""
imgdir = DATA_DIR / "images"; imgdir.mkdir(parents=True, exist_ok=True)
systems = ["defer_rl", "atcxr", "conformal_l2d", "always_defer", "chow_rule"][:k]
cohorts = ["LIDC-IDRI chest CT", "NLST LongCT", "Duke Breast MRI"]
cases = []
for i in range(n):
cid = f"C{i+1:03d}"
coh = cohorts[i % len(cohorts)]
make_placeholder(f"{cid} REFERENCE\n{coh}", seed=i * 100 + 1).save(imgdir / f"{cid}_ref.png")
make_placeholder(f"{cid} REFERENCE STANDARD", seed=i * 100 + 2).save(imgdir / f"{cid}_gt.png")
items = []
for j, s in enumerate(systems):
action = "Defer" if (i + j) % 3 == 0 or s == "always_defer" else "Trust"
reading = "(routed to radiologist)" if action == "Defer" else \
("No suspicious finding. BI-RADS 1." if (i + j) % 2 else "Indeterminate nodule, recommend follow-up.")
fn = f"{cid}_{s}.png"
make_placeholder(f"{cid} panel\n{action}", seed=i * 100 + 10 + j).save(imgdir / fn)
trail = []
for t in range(3):
tfn = f"{cid}_{s}_t{t}.png"
make_placeholder(f"step {t+1}", seed=i * 1000 + j * 10 + t, w=120, h=100).save(imgdir / tfn)
trail.append(f"images/{tfn}")
items.append({"item_id": s, "action": action, "reading": reading,
"image": f"images/{fn}", "trail": trail})
cases.append({
"case_id": cid, "cohort": coh,
"reference_image": f"images/{cid}_ref.png",
"show_trail": (i % 2 == 0), # evidence-trail ablation condition
"ground_truth": {"image": f"images/{cid}_gt.png",
"text": f"Reference standard for {cid}: 8 mm spiculated nodule, right upper lobe; "
f"path-confirmed malignant." if i % 2 else
f"Reference standard for {cid}: no malignant finding; benign granuloma."},
"items": items,
})
(DATA_DIR / "cases.json").write_text(json.dumps(cases, indent=2), encoding="utf-8")
return cases
def load_cases():
f = DATA_DIR / "cases.json"
if f.exists():
return json.loads(f.read_text(encoding="utf-8"))
return synth_cases()
CASES = load_cases()
N_CASES = len(CASES)
CASE_BY_ID = {c["case_id"]: c for c in CASES}
import functools
DISPLAY_MAX = int(os.environ.get("DISPLAY_MAX", "800")) # cap longest display side; zoom still available
@functools.lru_cache(maxsize=2048)
def b64img(rel_path):
"""Downscaled PNG as base64 (cached, so a reference reused across rows is encoded once)."""
p = DATA_DIR / rel_path
try:
im = Image.open(p).convert("RGB")
except Exception:
im = make_placeholder("missing image", 0)
if max(im.size) > DISPLAY_MAX:
im.thumbnail((DISPLAY_MAX, DISPLAY_MAX))
buf = io.BytesIO(); im.save(buf, "PNG")
return base64.b64encode(buf.getvalue()).decode("ascii")
# ----------------------------------------------------------------------------- storage (robust schema)
# One JSONL line per (annotator, case_id, item_id, dimension) -> value. Atomic, self-describing,
# so any later UI/layout/wording change can never overwrite or invalidate prior annotations.
_LOCK = threading.Lock()
def _resp_path(annotator):
safe = "".join(ch for ch in annotator if ch.isalnum() or ch in "._-") or "anon"
return RESP_DIR / f"{safe}.jsonl"
def append_records(annotator, records):
with _LOCK:
with open(_resp_path(annotator), "a", encoding="utf-8") as fh:
for r in records:
fh.write(json.dumps(r, ensure_ascii=False) + "\n")
def read_records(annotator):
p = _resp_path(annotator)
if not p.exists():
return []
out = []
with open(p, encoding="utf-8") as fh:
for line in fh:
line = line.strip()
if line:
try: out.append(json.loads(line))
except Exception: pass
return out
def latest_values(annotator, case_id):
"""Collapse the append-only log to the most recent value per (item_id, dimension)."""
vals = {}
for r in read_records(annotator):
if r.get("case_id") != case_id:
continue
vals[(r.get("item_id"), r.get("dimension"))] = (r.get("value"), r.get("ts", ""))
out = {}
for (iid, dim), (val, _ts) in vals.items():
out.setdefault(iid, {})[dim] = val
return out
def case_complete(annotator, case, values=None):
v = values if values is not None else latest_values(annotator, case["case_id"])
cl = v.get("__case__", {})
if any(cl.get(d) in (None, "") for d in REQ_CASE_DIMS):
return False
for it in case["items"]:
req = REQ_ITEM_DIMS_TRUST if it["action"] == "Trust" else REQ_ITEM_DIMS_DEFER
got = v.get(it["item_id"], {})
if any(got.get(d) in (None, "") for d in req):
return False
return True
def progress_counts(annotator):
done = sum(1 for c in CASES if case_complete(annotator, c))
return done
def first_unfinished(annotator):
for i, c in enumerate(CASES):
if not case_complete(annotator, c):
return i
return 0 # all done -> show first
# Sync local responses to a private HF dataset (best-effort; app still runs locally without it).
SCHEDULER = None
if DATASET_REPO and HF_TOKEN:
try:
from huggingface_hub import CommitScheduler
SCHEDULER = CommitScheduler(
repo_id=DATASET_REPO, repo_type="dataset", folder_path=str(RESP_DIR),
path_in_repo="responses", every=COMMIT_EVERY_MIN, token=HF_TOKEN, private=True,
squash_history=False,
)
print(f"[storage] CommitScheduler -> {DATASET_REPO} every {COMMIT_EVERY_MIN} min")
except Exception as e:
print(f"[storage] CommitScheduler disabled ({e}); responses stay local under {RESP_DIR}")
else:
print(f"[storage] No DATASET_REPO/HF_TOKEN set; responses stay local under {RESP_DIR}")
# ----------------------------------------------------------------------------- presentation order (blinding)
def presented_items(annotator, case):
seed = int(hashlib.sha256(f"{annotator}|{case['case_id']}".encode()).hexdigest(), 16) % (2**32)
rng = random.Random(seed)
items = list(case["items"])
rng.shuffle(items)
return items # stable per (annotator, case) so resume/prefill map correctly
# ----------------------------------------------------------------------------- HTML rendering
def _esc(s): return (str(s).replace("&", "&amp;").replace("<", "&lt;").replace(">", "&gt;"))
def img_cell(uid, b64, caption, h=210):
return f"""
<div class="cell">
<div class="cap">{_esc(caption)}</div>
<div class="imgwrap" style="height:{h}px">
<img id="{uid}" class="dimg" src="data:image/png;base64,{b64}" draggable="false"/>
</div>
<div class="vctrl">
<button type="button" title="Zoom in" onclick="dzoom('{uid}',0.2)">+</button>
<button type="button" title="Zoom out" onclick="dzoom('{uid}',-0.2)">&minus;</button>
<button type="button" title="Reset view (display only)" onclick="dreset('{uid}')">&#8635;</button>
<span class="vlab" title="Brightness (display only)">&#9728;</span>
<input id="{uid}-b" type="range" min="40" max="200" value="100"
title="Brightness (display only)" oninput="dset('{uid}','b',this.value)"/>
<span class="vlab" title="Contrast (display only)">&#9680;</span>
<input id="{uid}-c" type="range" min="40" max="200" value="100"
title="Contrast (display only)" oninput="dset('{uid}','c',this.value)"/>
</div>
</div>"""
def trail_html(uid_base, trail):
if not trail:
return ""
thumbs = "".join(
f'<img class="trailthumb" src="data:image/png;base64,{b64img(t)}" title="Evidence step {i+1}"/>'
for i, t in enumerate(trail))
return f'<div class="trail"><span class="traillab">Evidence trail the system examined:</span>{thumbs}</div>'
def radio_group(item_id, dim, label, legend, options, prefill, qlevel="item"):
pre = (prefill or {}).get(item_id, {}).get(dim) if qlevel == "item" else (prefill or {}).get("__case__", {}).get(dim)
opts = ""
for val, tip in options:
chk = "checked" if str(pre) == str(val) else ""
opts += (f'<label class="ropt" title="{_esc(tip)}">'
f'<input type="radio" data-qlevel="{qlevel}" data-item="{_esc(item_id)}" '
f'data-dim="{_esc(dim)}" value="{_esc(val)}" {chk}/>{_esc(val)}</label>')
return (f'<div class="rgroup"><div class="rlabel">{label}'
f'<span class="legend">{legend}</span></div><div class="ropts">{opts}</div></div>')
LETTERS = "ABCDEFGH"
def render_case_html(annotator, idx, prefill=None, error=""):
case = CASES[idx]
pres = presented_items(annotator, case)
ref_b64 = b64img(case["reference_image"])
gt_b64 = b64img(case["ground_truth"]["image"])
done = progress_counts(annotator)
err = f'<div class="errbox">{_esc(error)}</div>' if error else ""
intro = f"""
<div class="intro">
<div class="provenance">
<b>What you are looking at.</b> The lettered panels below are decisions produced by several
automated systems on this same {_esc(case.get('cohort','imaging'))} study. Their order is shuffled and
their identities are hidden. At least one panel is a simple reference baseline (for example, a system
that always defers, or one that uses only a confidence threshold). Each panel states the system's
<b>action</b> &mdash; <b>Trust</b> (it commits an automated reading) or <b>Defer</b> (it routes the case
to a radiologist) &mdash; the evidence it examined, and, when it chose Trust, the reading it produced.
You are rating each panel's decision on its own; you are never asked to pick a best or worst panel.
</div>
<div class="resume">You can stop and resume at any time &mdash; your answers are saved when you press
<b>Save &amp; Next</b>, and you will return to the first case you have not finished.</div>
</div>"""
# Case-level judgements (answer your own read first, before scoring the panels)
caselevel = f"""
<div class="caselevel">
<div class="clhead">Your own read first (before scoring the panels)</div>
{radio_group("__case__","unsafe_to_autoread",
"Should this case <u>not</u> be auto-read? ",
"(Is this a study where an automated reading should not be trusted, and a radiologist must see it?)",
YPN, prefill, qlevel="case")}
{radio_group("__case__","reference_adequate",
"Is the reference imaging adequate to judge this case? ",
"(yes / partial / no)", YPN, prefill, qlevel="case")}
</div>"""
rows = ""
for p, it in enumerate(pres):
letter = LETTERS[p]
ref_uid = f"ref-{idx}-{p}"
it_uid = f"it-{idx}-{p}"
action = it["action"]
if action == "Trust":
actline = (f'<div class="action"><span class="trust">Action: TRUST</span> &mdash; committed reading: '
f'<i>{_esc(it["reading"])}</i></div>')
else:
actline = '<div class="action"><span class="defer">Action: DEFER</span> &mdash; routed to a radiologist</div>'
trail = trail_html(it_uid, it.get("trail", [])) if case.get("show_trail") else \
'<div class="trail muted">(evidence trail hidden for this case)</div>'
# soundness only when the panel committed a reading
if action == "Trust":
sound = radio_group(it["item_id"], "soundness",
"Was the committed reading clinically sound? ",
"(1 clearly unsound &middot; 3 equivocal &middot; 5 clearly sound)",
LIKERT_SOUND, prefill)
else:
sound = ('<div class="rgroup nabox"><div class="rlabel">Reading soundness</div>'
'<div class="na">Not applicable &mdash; this panel deferred and committed no reading.</div></div>')
mislead = (f'<div class="rgroup"><div class="rlabel">Was this decision '
f'<span class="def">misleading</span>?</div>'
f'<div class="defbox"><b>misleading = the decision looks confident and plausible but is '
f'clinically wrong.</b> Example: the system <b>Trusts</b> a &ldquo;no nodule&rdquo; reading on a '
f'study that in fact shows an 8&nbsp;mm spiculated nodule; or it <b>Defers</b> an obviously '
f'benign study as if it were dangerous.</div>'
+ radio_group(it["item_id"], "misleading", "", "", YESNO, prefill) + '</div>')
rows += f"""
<div class="row">
{img_cell(ref_uid, ref_b64, "REFERENCE imaging (compare against this)", 210)}
<div class="cell itemcell">
{img_cell(it_uid, b64img(it["image"]), f"Panel {letter}", 210)}
{trail}
{actline}
{radio_group(it["item_id"], "appropriateness",
"Was this decision appropriate? ",
"(1 clearly inappropriate &middot; 2 probably &middot; 3 equivocal &middot; 4 probably appropriate &middot; 5 clearly appropriate)",
LIKERT, prefill)}
{radio_group(it["item_id"], "evidence",
"Was the evidence gathering adequate/justified? ",
"(1 clearly inadequate &middot; 3 equivocal &middot; 5 clearly adequate)",
LIKERT_ADEQ, prefill)}
{sound}
{mislead}
</div>
</div>"""
gtrow = f"""
<div class="row gtrow">
<div class="cell muted"><div class="cap">&mdash;</div></div>
<div class="cell">
{img_cell(f"gt-{idx}", gt_b64, "Reference standard (revealed to help your soundness judgement)", 200)}
<div class="gttext">{_esc(case["ground_truth"]["text"])}</div>
</div>
</div>"""
header = (f'<div class="chead"><span>Case <b>{_esc(case["case_id"])}</b> '
f'&middot; {_esc(case.get("cohort",""))}</span>'
f'<span class="prog">Completed {done} / {N_CASES}</span></div>')
return (f'<div id="case_form" data-case-id="{_esc(case["case_id"])}">'
f'{header}{err}{intro}{caselevel}'
f'<div class="tablehdr"><div>Reference</div><div>System decision to rate</div></div>'
f'{rows}{gtrow}</div>')
# ----------------------------------------------------------------------------- save / validate
def build_records(annotator, case, collected):
ts = datetime.datetime.utcnow().isoformat() + "Z"
common = dict(schema_version=SCHEMA_VERSION, ts=ts, annotator=annotator,
case_id=case["case_id"], case_condition_show_trail=bool(case.get("show_trail")))
recs = []
# case-level
for d in REQ_CASE_DIMS:
recs.append({**common, "item_id": "__case__", "dimension": d,
"value": collected.get("case_level", {}).get(d)})
# per item; store the presented position + true action so blinding is recoverable
pres = presented_items(annotator, case)
pos = {it["item_id"]: i for i, it in enumerate(pres)}
for it in case["items"]:
iid = it["item_id"]
got = collected.get("items", {}).get(iid, {})
dims = REQ_ITEM_DIMS_TRUST if it["action"] == "Trust" else REQ_ITEM_DIMS_DEFER
for d in dims:
recs.append({**common, "item_id": iid, "dimension": d, "value": got.get(d),
"presented_pos": pos.get(iid), "item_action": it["action"]})
return recs
def validate(case, collected):
missing = []
cl = collected.get("case_level", {})
for d in REQ_CASE_DIMS:
if cl.get(d) in (None, ""):
missing.append(f"case-level: {d}")
pres_letter = {it["item_id"]: i for i, it in enumerate(case["items"])}
for it in case["items"]:
got = collected.get("items", {}).get(it["item_id"], {})
dims = REQ_ITEM_DIMS_TRUST if it["action"] == "Trust" else REQ_ITEM_DIMS_DEFER
for d in dims:
if got.get(d) in (None, ""):
missing.append(f"a panel: {d}")
return missing
def collected_to_prefill(collected):
pf = {"__case__": dict(collected.get("case_level", {}))}
for iid, dims in collected.get("items", {}).items():
pf[iid] = dict(dims)
return pf
def save_and_next(collected_json, idx, request: gr.Request):
annotator = request.username if request and request.username else "anon"
idx = int(idx)
case = CASES[idx]
try:
collected = json.loads(collected_json) if collected_json else {}
except Exception:
collected = {}
missing = validate(case, collected)
if missing:
uniq = []
for m in missing:
if m not in uniq:
uniq.append(m)
msg = "Please complete every rating before saving. Missing: " + "; ".join(uniq[:6])
if len(uniq) > 6:
msg += " ..."
html = render_case_html(annotator, idx, prefill=collected_to_prefill(collected), error=msg)
return html, f"Completed {progress_counts(annotator)} / {N_CASES}", idx
# persist (append-only, robust schema), then advance
append_records(annotator, build_records(annotator, case, collected))
nxt = idx
for j in list(range(idx + 1, N_CASES)) + list(range(0, idx + 1)):
if not case_complete(annotator, CASES[j]):
nxt = j; break
else:
nxt = min(idx + 1, N_CASES - 1)
pf = latest_values(annotator, CASES[nxt]["case_id"])
html = render_case_html(annotator, nxt, prefill=pf)
return html, f"Completed {progress_counts(annotator)} / {N_CASES}", nxt
def go_prev(idx, request: gr.Request):
annotator = request.username if request and request.username else "anon"
idx = max(0, int(idx) - 1)
pf = latest_values(annotator, CASES[idx]["case_id"])
return render_case_html(annotator, idx, prefill=pf), f"Completed {progress_counts(annotator)} / {N_CASES}", idx
def on_load(request: gr.Request):
annotator = request.username if request and request.username else "anon"
idx = first_unfinished(annotator)
pf = latest_values(annotator, CASES[idx]["case_id"])
who = f"Signed in as <b>{_esc(annotator)}</b>"
return (render_case_html(annotator, idx, prefill=pf),
f"Completed {progress_counts(annotator)} / {N_CASES}", idx, who)
# ----------------------------------------------------------------------------- front-end JS / CSS
HEAD_JS = """
<script>
// ---- per-image display controls (display only; never touch stored data) ----
window.dxf = function(im){var z=im.dataset.zoom||1,b=im.dataset.b||100,c=im.dataset.c||100;
im.style.transform='scale('+z+')'; im.style.filter='brightness('+b+'%) contrast('+c+'%)';};
window.dzoom = function(id,d){var im=document.getElementById(id); if(!im)return;
var z=parseFloat(im.dataset.zoom||1)+d; z=Math.max(0.3,Math.min(6,z)); im.dataset.zoom=z; dxf(im);};
window.dset = function(id,k,v){var im=document.getElementById(id); if(!im)return; im.dataset[k]=v; dxf(im);};
window.dreset = function(id){var im=document.getElementById(id); if(!im)return;
im.dataset.zoom=1; im.dataset.b=100; im.dataset.c=100; dxf(im);
var b=document.getElementById(id+'-b'); if(b)b.value=100;
var c=document.getElementById(id+'-c'); if(c)c.value=100;};
// ---- harvest all answers from the case form into one JSON string ----
window.collectAnswers = function(){
var form=document.getElementById('case_form');
if(!form) return JSON.stringify({});
var ans={case_id:form.getAttribute('data-case-id'), case_level:{}, items:{}};
form.querySelectorAll('input[type=radio]:checked').forEach(function(inp){
var dim=inp.getAttribute('data-dim'), iid=inp.getAttribute('data-item'),
lvl=inp.getAttribute('data-qlevel');
if(lvl==='case'){ ans.case_level[dim]=inp.value; }
else { if(!ans.items[iid]) ans.items[iid]={}; ans.items[iid][dim]=inp.value; }
});
return JSON.stringify(ans);
};
</script>
"""
CSS = """
:root{ --gap:10px; }
#case_form{ max-width:1180px; margin:0 auto; }
.chead{ display:flex; justify-content:space-between; align-items:center; font-size:15px;
padding:6px 4px; border-bottom:2px solid #444; margin-bottom:6px;}
.chead .prog{ font-weight:700; }
.errbox{ background:#fde8e8; border:1px solid #e02424; color:#9b1c1c; padding:8px 10px;
border-radius:6px; margin:8px 0; font-weight:600;}
.intro{ font-size:12.5px; line-height:1.45; margin:6px 0 10px; }
.provenance{ background:#f3f6ff; border:1px solid #c9d6ff; padding:8px 10px; border-radius:6px;}
.resume{ color:#444; margin-top:5px; }
.caselevel{ background:#fffaf0; border:1px solid #f0d9a8; border-radius:6px; padding:8px 10px; margin-bottom:10px;}
.clhead{ font-weight:700; margin-bottom:4px; }
.tablehdr{ display:grid; grid-template-columns:1fr 1.3fr; gap:var(--gap); font-weight:700;
border-bottom:1px solid #999; padding:3px 2px; position:sticky; top:0; background:var(--body-background-fill,#fff); z-index:5;}
.row{ display:grid; grid-template-columns:1fr 1.3fr; gap:var(--gap); padding:8px 0;
border-bottom:1px solid #ddd; align-items:start; }
.gtrow{ background:#f7f7f7; }
.cell{ min-width:0; }
.itemcell{ }
.cap{ font-size:11.5px; font-weight:700; color:#333; margin-bottom:3px; }
.imgwrap{ overflow:hidden; border:1px solid #bbb; border-radius:4px; background:#000;
display:flex; align-items:center; justify-content:center; }
.dimg{ width:100%; height:100%; object-fit:contain; transform-origin:center center;
transition:transform .05s linear; user-select:none; }
.vctrl{ display:flex; align-items:center; gap:4px; margin:4px 0 2px; flex-wrap:wrap; font-size:12px;}
.vctrl button{ width:24px; height:22px; cursor:pointer; border:1px solid #999; border-radius:4px; background:#eee;}
.vctrl input[type=range]{ width:78px; }
.vlab{ font-size:13px; }
.trail{ font-size:11px; margin:4px 0; display:flex; align-items:center; gap:4px; flex-wrap:wrap;}
.traillab{ color:#555; }
.trailthumb{ height:42px; border:1px solid #aaa; border-radius:3px; }
.trail.muted{ color:#999; font-style:italic; }
.action{ font-size:12.5px; margin:5px 0; }
.trust{ color:#0b6b2e; font-weight:700; }
.defer{ color:#9a4d00; font-weight:700; }
.rgroup{ margin:5px 0; }
.rlabel{ font-size:12.5px; font-weight:600; }
.legend{ font-weight:400; color:#555; font-size:11px; margin-left:4px; }
.ropts{ display:flex; gap:6px; flex-wrap:wrap; margin-top:2px; }
.ropt{ display:inline-flex; align-items:center; gap:2px; font-size:12.5px; cursor:pointer;
border:1px solid #ccc; border-radius:4px; padding:1px 6px; }
.ropt:hover{ background:#eef3ff; }
.def{ color:#c00; font-weight:800; }
.defbox{ color:#c00; font-size:11px; background:#fff5f5; border:1px solid #f3b4b4;
border-radius:5px; padding:4px 6px; margin:3px 0; line-height:1.35;}
.nabox .na{ font-size:11.5px; color:#777; font-style:italic; }
.gttext{ font-size:12px; margin-top:4px; }
.muted{ color:#999; }
"""
# ----------------------------------------------------------------------------- auth
def auth_fn(username, password):
return username in ANNOTATORS and ANNOTATORS[username] == password
# ----------------------------------------------------------------------------- UI
with gr.Blocks(title="DEFER-RL Reader Study") as demo:
gr.Markdown("## DEFER-RL Radiologist Reader Study")
with gr.Row():
who_md = gr.Markdown("")
prog_md = gr.Markdown("")
logout = gr.Button("Log out", link="/logout", size="sm") # robust across Gradio 4.x-6.x
case_html = gr.HTML()
collected_box = gr.Textbox(visible=False) # filled by collectAnswers() before save
idx_state = gr.State(0)
with gr.Row():
prev_btn = gr.Button("◀ Previous", size="sm")
save_btn = gr.Button("Save & Next ▶", variant="primary")
# Save & Next: JS harvests the form into collected_box first, then Python validates+saves+advances.
save_btn.click(
fn=save_and_next,
inputs=[collected_box, idx_state],
outputs=[case_html, prog_md, idx_state],
js="(j, idx) => [window.collectAnswers(), idx]",
)
prev_btn.click(fn=go_prev, inputs=[idx_state], outputs=[case_html, prog_md, idx_state])
demo.load(fn=on_load, inputs=None, outputs=[case_html, prog_md, idx_state, who_md])
if __name__ == "__main__":
# Gradio 6: theme / css / head are passed to launch(), not the Blocks constructor.
demo.queue().launch(
auth=auth_fn,
auth_message="DEFER-RL reader study. Enter your assigned annotator name and password.",
css=CSS, head=HEAD_JS, theme=gr.themes.Soft(),
)