"""Hiding Sound — Tier 4 human-study Gradio app. Deploy as a Hugging Face Space. On every fresh page-load the app draws a new random subset of trials from `pool/manifest.json` (default 20 per condition × 5 conditions = 100 trials). Trials are presented one at a time with audio playback and 4 multiple-choice options. On submit, the app generates a JSON file that the participant can download AND, if the Space has the `HF_TOKEN` + `HF_RESULTS_REPO` secrets set, pushes the JSON to a private HF dataset for automated collection. Knobs (env vars): HF_TOKEN HF write-access token (set as a Space secret) HF_RESULTS_REPO target dataset, e.g. "ymdou/hidingsound-tier4-results" N_PER_CONDITION override per-condition sample size (default 20) Run locally: python app.py """ from __future__ import annotations import json import os import random import time import uuid from datetime import datetime from pathlib import Path import gradio as gr # ── Workaround: gradio_client.utils._json_schema_to_python_type crashes on # boolean JSON schemas (e.g. `additionalProperties: True`) in some # combinations of gradio_client + pydantic + State payloads. We patch the # offending helpers to fall back to a permissive type. Safe and idempotent # across versions; if the bug is already fixed, the patch is a no-op. try: from gradio_client import utils as _gc_utils if not getattr(_gc_utils, "_HS_BOOL_PATCHED", False): _orig_get_type = _gc_utils.get_type def _patched_get_type(schema): if isinstance(schema, bool): return "Any" return _orig_get_type(schema) _gc_utils.get_type = _patched_get_type _orig_jstpt = _gc_utils._json_schema_to_python_type def _patched_jstpt(schema, defs=None): if isinstance(schema, bool): return "Any" return _orig_jstpt(schema, defs) _gc_utils._json_schema_to_python_type = _patched_jstpt _gc_utils._HS_BOOL_PATCHED = True except Exception: # pragma: no cover — never block the app boot pass POOL_DIR = Path(__file__).parent / "pool" MANIFEST_PATH = POOL_DIR / "manifest.json" N_PER_CONDITION = int(os.environ.get("N_PER_CONDITION", "20")) HF_TOKEN = os.environ.get("HF_TOKEN") HF_RESULTS_REPO = os.environ.get("HF_RESULTS_REPO") # e.g. "user/dataset" # When AUDIO_REPO is set, audio files are fetched from a HF Dataset on # demand (cached locally) instead of being bundled in the Space repo. AUDIO_REPO = os.environ.get("AUDIO_REPO") # e.g. "user/dataset" # Gradio 5+ refuses to serve files unless their path is under cwd, /tmp, # or an entry in `allowed_paths`. The HF Hub default cache lands under # `~/.cache/huggingface/...` which isn't trusted, so we redirect every # `hf_hub_download` from this app into a known-allowed dir below /tmp # and pass that same dir to `launch(allowed_paths=[...])`. AUDIO_CACHE_DIR = Path(os.environ.get("AUDIO_CACHE_DIR", "/tmp/hs_audio_cache")) AUDIO_CACHE_DIR.mkdir(parents=True, exist_ok=True) def resolve_audio_path(rel_path: str) -> str: """Return a local file path for the audio asset. Tries the bundled pool first (works for local dev or small Spaces with audio in-repo). If absent and AUDIO_REPO is configured, falls back to `hf_hub_download` from the dataset, caching under AUDIO_CACHE_DIR so Gradio is willing to serve the file. """ local = POOL_DIR / rel_path if local.exists(): return str(local) if AUDIO_REPO: from huggingface_hub import hf_hub_download return hf_hub_download( repo_id=AUDIO_REPO, filename=rel_path, repo_type="dataset", token=HF_TOKEN, # only required for private datasets cache_dir=str(AUDIO_CACHE_DIR), ) raise FileNotFoundError( f"audio asset not found locally and AUDIO_REPO is not set: {rel_path}" ) # ──────────────────────────── pool loading ─────────────────────────────── with open(MANIFEST_PATH) as _f: _MANIFEST = json.load(_f) ALL_TRIALS = _MANIFEST["trials"] META = _MANIFEST.get("_meta", {}) CONDITIONS = META.get("conditions", sorted({t["condition"] for t in ALL_TRIALS})) N_OPTIONS = META.get("n_options", 4) TOTAL_PER_SESSION = N_PER_CONDITION * len(CONDITIONS) def sample_session_trials(seed: int | None = None) -> list[dict]: """Pick N_PER_CONDITION trials from each condition. Order shuffled.""" rng = random.Random(seed) by_cond: dict[str, list[dict]] = {c: [] for c in CONDITIONS} for t in ALL_TRIALS: if t["condition"] in by_cond: by_cond[t["condition"]].append(t) chosen: list[dict] = [] for c in CONDITIONS: pool = by_cond[c] if not pool: continue n = min(N_PER_CONDITION, len(pool)) chosen.extend(rng.sample(pool, n)) rng.shuffle(chosen) return chosen # ──────────────────────────── submission storage ───────────────────────── def _build_submission(trials: list[dict], answers: list[dict], pid: str) -> dict: """Assemble the submission dict with per-condition accuracy.""" from collections import defaultdict now = datetime.now().isoformat() sub = { "participant_id": pid, "submitted_at": now, "n_options": N_OPTIONS, "n_per_condition": N_PER_CONDITION, "conditions": CONDITIONS, "responses": [ { "trial_id": i, "stem": t["stem"], "condition": t["condition"], "gt_letter": t["gt_letter"], "user_letter": a["letter"], "correct": (a["letter"] is not None and a["letter"] == t["gt_letter"]), "options": t["options"], } for i, (t, a) in enumerate(zip(trials, answers)) ], } per_cond = defaultdict(lambda: [0, 0]) for r in sub["responses"]: if r["user_letter"] is None: continue per_cond[r["condition"]][1] += 1 per_cond[r["condition"]][0] += int(r["correct"]) sub["per_condition_accuracy"] = { c: {"correct": v[0], "n": v[1], "accuracy": (v[0] / v[1] if v[1] else 0.0)} for c, v in per_cond.items() } return sub def _save_and_upload(trials: list[dict], answers: list[dict], pid: str ) -> tuple[str, str | None, str | None]: """Build submission, save to /tmp, upload to HF. Returns (local_path, uploaded_path_or_None, error_msg_or_None). """ sub = _build_submission(trials, answers, pid) ts = datetime.now().strftime("%Y%m%d_%H%M%S") local = Path("/tmp") / f"hidingsound_tier4_{pid}_{ts}.json" local.write_text(json.dumps(sub, ensure_ascii=False, indent=2)) uploaded, upload_err = upload_to_hf(sub) return str(local), uploaded, upload_err def upload_to_hf(submission: dict) -> tuple[str | None, str | None]: """Push the submission JSON to a HF Dataset. Returns (uploaded_path, error_msg). On success, error_msg is None. On failure, uploaded_path is None and error_msg explains the issue. Activates only when HF_TOKEN and HF_RESULTS_REPO are both set. """ if not HF_RESULTS_REPO: return None, "HF_RESULTS_REPO is not set on this Space" if not HF_TOKEN: return None, "HF_TOKEN is not set on this Space" try: from huggingface_hub import HfApi, create_repo api = HfApi(token=HF_TOKEN) # Idempotent: create the dataset if it doesn't exist yet. try: create_repo(repo_id=HF_RESULTS_REPO, repo_type="dataset", token=HF_TOKEN, private=True, exist_ok=True) except Exception as e: print(f"[upload_to_hf] create_repo: {e!r}", flush=True) ts = datetime.now().strftime("%Y%m%d_%H%M%S") pid = submission.get("participant_id", uuid.uuid4().hex[:8]) fname = f"submissions/{ts}_{pid}.json" local = Path("/tmp") / f"sub_{ts}_{pid}.json" local.write_text(json.dumps(submission, ensure_ascii=False, indent=2)) api.upload_file( path_or_fileobj=str(local), path_in_repo=fname, repo_id=HF_RESULTS_REPO, repo_type="dataset", commit_message=f"submission {pid} @ {ts}", ) return f"{HF_RESULTS_REPO}:{fname}", None except Exception as e: msg = f"{type(e).__name__}: {e}" print(f"[upload_to_hf] failed: {msg}", flush=True) return None, msg # ──────────────────────────── UI building blocks ───────────────────────── INTRO_MD = f""" # Hiding Sound — Listening Study Thanks for participating! You will hear **{TOTAL_PER_SESSION} short audio clips**, about 10–15 minutes in total. Headphones are **strongly recommended** for the best listening experience. For each clip, click the option whose description best matches the **most noticeable** sound you hear. Pick the sound that is most prominent, not what you think "should" be in the recording. Clicking an option records your answer and advances to the next clip automatically. You can use **◀ Previous** to go back. When you finish the last clip, you'll see a Submit button. Click **Start** below when you're ready. """ def render_trial(idx: int, trial: dict) -> str: head = (f"### Clip {idx+1} / {TOTAL_PER_SESSION}\n\n" f"Listen to the audio, then click the option whose " f"description best matches the **most noticeable** sound.") return head # ──────────────────────────── main app ─────────────────────────────────── CUSTOM_CSS = """ /* ── Container: never exceed the viewport, no centered max-width gutter. Gradio's default centers Blocks at ~1280 px max with side padding, which is what causes a cramped column on phones. */ .gradio-container, gradio-app .gradio-container, .app .gradio-container { max-width: 100% !important; width: 100% !important; padding: 12px !important; box-sizing: border-box !important; } /* All children obey their parent's width — no horizontal scroll. */ .gradio-container * { box-sizing: border-box; } .gradio-container img, .gradio-container audio, .gradio-container video { max-width: 100% !important; height: auto; } /* Audio: stretch to row width on every viewport. */ .gradio-container audio, .gradio-container [data-testid="audio"], .gradio-container .audio_player, .gradio-container .audio-player { width: 100% !important; min-width: 0 !important; } /* Option buttons: full-width, left-aligned text, wraps on small widths. */ .opt-btn, .opt-btn > button, .opt-btn button { width: 100% !important; text-align: left !important; justify-content: flex-start !important; white-space: normal !important; word-break: break-word; min-height: 56px; padding: 14px 18px; line-height: 1.45; font-size: 1rem; box-sizing: border-box !important; } .opt-btn span, .opt-btn div { text-align: left !important; justify-content: flex-start !important; white-space: normal !important; } /* Mobile-specific tightening. */ @media (max-width: 720px) { .gradio-container { padding: 8px !important; } h1, h2, h3 { font-size: 1.15rem !important; } .opt-btn, .opt-btn > button, .opt-btn button { font-size: 0.95rem !important; padding: 12px 14px !important; min-height: 64px; } /* Stack any side-by-side rows vertically. */ .gradio-container .row, .gradio-container .gr-row { flex-direction: column !important; } } /* iOS Safari: prevent zoom-on-tap from shrinking small input text. */ .gradio-container button, .gradio-container input, .gradio-container select, .gradio-container textarea { font-size: 16px; } """ CUSTOM_HEAD = """ """ import inspect as _inspect _BLOCKS_PARAMS = _inspect.signature(gr.Blocks).parameters _LAUNCH_PARAMS = _inspect.signature(gr.Blocks.launch).parameters def _filter_kwargs(params, kwargs): """Drop kwargs not accepted by `params` (handles Gradio 5↔6 splits).""" return {k: v for k, v in kwargs.items() if k in params} def make_app() -> gr.Blocks: blocks_kwargs = _filter_kwargs(_BLOCKS_PARAMS, dict( title="Hiding Sound Listening Study", css=CUSTOM_CSS, theme=gr.themes.Soft(), # fill_width=True removes Gradio's centered max-width gutter so # the layout uses the full screen on phones and laptops alike. fill_width=True, head=CUSTOM_HEAD, )) with gr.Blocks(**blocks_kwargs) as demo: gr.Markdown(INTRO_MD) # session state trials_state = gr.State(value=[]) idx_state = gr.State(value=0) answers_state = gr.State(value=[]) # list of {"trial_id":int, "letter":str|None} pid_state = gr.State(value="") # intro page (no participant-id box; we auto-generate) with gr.Group(visible=True) as intro_group: start_btn = gr.Button("Start", variant="primary") # trial page — 4 option buttons (label updated per-trial); clicking # an option records the answer and auto-advances. with gr.Group(visible=False) as trial_group: progress_md = gr.Markdown("") # autoplay=True: when the trial advances and a new audio path # is pushed to this component, the browser auto-starts # playback. The user can pause/seek with the standard audio # controls. (Mobile browsers gate autoplay until the page # has had a user interaction; clicking Start counts, so this # works from trial 1 onward on iOS/Android.) audio_player = gr.Audio(label="Audio", autoplay=True, type="filepath") opt_btn_a = gr.Button("", variant="secondary", elem_classes="opt-btn") opt_btn_b = gr.Button("", variant="secondary", elem_classes="opt-btn") opt_btn_c = gr.Button("", variant="secondary", elem_classes="opt-btn") opt_btn_d = gr.Button("", variant="secondary", elem_classes="opt-btn") prev_btn = gr.Button("◀ Previous") # submit page — populated automatically after the last trial with gr.Group(visible=False) as submit_group: submit_status = gr.Markdown("") download_file = gr.File(label="Download your responses (JSON)", visible=False) # ─── helpers ────────────────────────────────────────────────── def _trial_ui_updates(i: int, trials: list[dict], answers: list[dict]): """Returns [progress_md, audio_update, btn_A_label, btn_B_label, btn_C_label, btn_D_label] for the trial at index i. We wrap the audio path in a `gr.update(...)` with `autoplay=True` so the browser re-triggers playback every time the value changes, even between trials within the same session. (Just returning a path keeps the same element instance, which on some browsers does not re-fire autoplay; the explicit update forces a fresh load + play.) """ t = trials[i] audio_path = resolve_audio_path(t["audio"]) audio_update = gr.update(value=audio_path, autoplay=True) head = render_trial(i, t) chosen = answers[i].get("letter") btn_labels: list[str] = [] for opt in t["options"]: tag = "✓ " if chosen == opt["letter"] else "" btn_labels.append(f"{tag}({opt['letter']}) {opt['description']}") # Pad to 4 in case n_options < 4 for some reason. while len(btn_labels) < 4: btn_labels.append("") return [head, audio_update, *btn_labels[:4]] # ─── handlers ───────────────────────────────────────────────── AUTO_SUBMIT_EVERY = 10 # save + upload after every N answered trials def on_start(): pid = uuid.uuid4().hex[:8] trials = sample_session_trials(seed=int(time.time() * 1000) & 0xffff_ffff) answers = [{"trial_id": i, "letter": None} for i in range(len(trials))] ui = _trial_ui_updates(0, trials, answers) return ( gr.update(visible=False), # intro gr.update(visible=True), # trial gr.update(visible=False), # submit trials, 0, answers, pid, *ui, # progress, audio, btn A/B/C/D ) def _advance(trials, i, answers, pid, letter): """Record answer, auto-submit every AUTO_SUBMIT_EVERY answers, and auto-show the submit page (already populated) on the last trial. """ if 0 <= i < len(trials): answers[i]["letter"] = letter n_answered = sum(1 for a in answers if a["letter"] is not None) # Auto-save + upload every AUTO_SUBMIT_EVERY answered trials. if n_answered % AUTO_SUBMIT_EVERY == 0: local, uploaded, err = _save_and_upload(trials, answers, pid) print(f"[auto_submit] @{n_answered} answered → " f"local={local} uploaded={uploaded} err={err}", flush=True) if i + 1 < len(trials): i += 1 ui = _trial_ui_updates(i, trials, answers) return ( gr.update(visible=True), # trial gr.update(visible=False), # submit trials, i, answers, gr.update(), # submit_status — no change gr.update(), # download_file — no change *ui, ) # Last trial answered → final save/upload and flip to submit page. local, uploaded, upload_err = _save_and_upload(trials, answers, pid) sub = _build_submission(trials, answers, pid) status_lines = ["**Thank you!** Your responses are recorded."] if uploaded: status_lines.append(f"Auto-uploaded to dataset: `{uploaded}` ✓") else: status_lines.append( "*(Auto-upload is not active for this Space — " f"reason: `{upload_err}`. Please download the JSON below " "and email it to the study organizers.)*" ) status_lines.append("") status_lines.append("Per-condition accuracy on this submission:") for c, v in sub["per_condition_accuracy"].items(): status_lines.append(f"- **{c}**: {v['correct']}/{v['n']} = " f"{100*v['accuracy']:.1f}%") ui = _trial_ui_updates(i, trials, answers) return ( gr.update(visible=False), # trial gr.update(visible=True), # submit trials, i, answers, "\n".join(status_lines), # submit_status gr.update(value=local, visible=True), # download_file *ui, ) def make_pick(letter): def _pick(trials, i, answers, pid): return _advance(trials, i, answers, pid, letter) return _pick def on_prev(trials, i, answers): if i > 0: i -= 1 ui = _trial_ui_updates(i, trials, answers) return ( gr.update(visible=True), # trial visible gr.update(visible=False), # submit hidden trials, i, answers, gr.update(), # submit_status — no change gr.update(), # download_file — no change *ui, ) # ─── wiring ─────────────────────────────────────────────────── trial_outputs = [ trial_group, submit_group, trials_state, idx_state, answers_state, submit_status, download_file, progress_md, audio_player, opt_btn_a, opt_btn_b, opt_btn_c, opt_btn_d, ] start_btn.click( on_start, inputs=[], outputs=[ intro_group, trial_group, submit_group, trials_state, idx_state, answers_state, pid_state, progress_md, audio_player, opt_btn_a, opt_btn_b, opt_btn_c, opt_btn_d, ], ) for btn, letter in [(opt_btn_a, "A"), (opt_btn_b, "B"), (opt_btn_c, "C"), (opt_btn_d, "D")]: btn.click( make_pick(letter), inputs=[trials_state, idx_state, answers_state, pid_state], outputs=trial_outputs, ) prev_btn.click( on_prev, inputs=[trials_state, idx_state, answers_state], outputs=trial_outputs, ) return demo if __name__ == "__main__": # Build a launch() kwarg dict that works across Gradio 5 (Space) and # Gradio 6 (local conda env), filtering out anything the running # version doesn't accept. launch_kwargs = { # allowed_paths lets Gradio serve the on-demand audio downloaded # into AUDIO_CACHE_DIR via `hf_hub_download`. "allowed_paths": [str(AUDIO_CACHE_DIR), str(POOL_DIR)], # show_api=False (Gradio 5.x) skips the /api/info endpoint that # triggers gradio_client's JSON-schema parser. We also monkey-patch # the parser at the top of this file as belt-and-braces, so newer # Gradio versions that don't accept this kwarg are fine. "show_api": False, } # Gradio 6 moved theme/css/head/fill_width from Blocks(...) to # launch(...). When running on Gradio 6 locally, forward them here; # on Gradio 5 they already attached to Blocks above. if "css" in _LAUNCH_PARAMS and "css" not in _BLOCKS_PARAMS: launch_kwargs["css"] = CUSTOM_CSS if "theme" in _LAUNCH_PARAMS and "theme" not in _BLOCKS_PARAMS: launch_kwargs["theme"] = gr.themes.Soft() if "head" in _LAUNCH_PARAMS and "head" not in _BLOCKS_PARAMS: launch_kwargs["head"] = CUSTOM_HEAD if "fill_width" in _LAUNCH_PARAMS and "fill_width" not in _BLOCKS_PARAMS: launch_kwargs["fill_width"] = True launch_kwargs = _filter_kwargs(_LAUNCH_PARAMS, launch_kwargs) make_app().launch(**launch_kwargs)