Spaces:
Sleeping
Sleeping
| """Hiding Sound β Tier 4 human-study Gradio app. | |
| Deploy as a Hugging Face Space. On every fresh page-load the app draws a | |
| new random subset of trials from `pool/manifest.json` (default 20 per | |
| condition Γ 5 conditions = 100 trials). Trials are presented one at a time | |
| with audio playback and 4 multiple-choice options. On submit, the app | |
| generates a JSON file that the participant can download AND, if the Space | |
| has the `HF_TOKEN` + `HF_RESULTS_REPO` secrets set, pushes the JSON to a | |
| private HF dataset for automated collection. | |
| Knobs (env vars): | |
| HF_TOKEN HF write-access token (set as a Space secret) | |
| HF_RESULTS_REPO target dataset, e.g. "ymdou/hidingsound-tier4-results" | |
| N_PER_CONDITION override per-condition sample size (default 20) | |
| Run locally: | |
| python app.py | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import random | |
| import time | |
| import uuid | |
| from datetime import datetime | |
| from pathlib import Path | |
| import gradio as gr | |
| # ββ Workaround: gradio_client.utils._json_schema_to_python_type crashes on | |
| # boolean JSON schemas (e.g. `additionalProperties: True`) in some | |
| # combinations of gradio_client + pydantic + State payloads. We patch the | |
| # offending helpers to fall back to a permissive type. Safe and idempotent | |
| # across versions; if the bug is already fixed, the patch is a no-op. | |
| try: | |
| from gradio_client import utils as _gc_utils | |
| if not getattr(_gc_utils, "_HS_BOOL_PATCHED", False): | |
| _orig_get_type = _gc_utils.get_type | |
| def _patched_get_type(schema): | |
| if isinstance(schema, bool): | |
| return "Any" | |
| return _orig_get_type(schema) | |
| _gc_utils.get_type = _patched_get_type | |
| _orig_jstpt = _gc_utils._json_schema_to_python_type | |
| def _patched_jstpt(schema, defs=None): | |
| if isinstance(schema, bool): | |
| return "Any" | |
| return _orig_jstpt(schema, defs) | |
| _gc_utils._json_schema_to_python_type = _patched_jstpt | |
| _gc_utils._HS_BOOL_PATCHED = True | |
| except Exception: # pragma: no cover β never block the app boot | |
| pass | |
| POOL_DIR = Path(__file__).parent / "pool" | |
| MANIFEST_PATH = POOL_DIR / "manifest.json" | |
| N_PER_CONDITION = int(os.environ.get("N_PER_CONDITION", "20")) | |
| HF_TOKEN = os.environ.get("HF_TOKEN") | |
| HF_RESULTS_REPO = os.environ.get("HF_RESULTS_REPO") # e.g. "user/dataset" | |
| # When AUDIO_REPO is set, audio files are fetched from a HF Dataset on | |
| # demand (cached locally) instead of being bundled in the Space repo. | |
| AUDIO_REPO = os.environ.get("AUDIO_REPO") # e.g. "user/dataset" | |
| # Gradio 5+ refuses to serve files unless their path is under cwd, /tmp, | |
| # or an entry in `allowed_paths`. The HF Hub default cache lands under | |
| # `~/.cache/huggingface/...` which isn't trusted, so we redirect every | |
| # `hf_hub_download` from this app into a known-allowed dir below /tmp | |
| # and pass that same dir to `launch(allowed_paths=[...])`. | |
| AUDIO_CACHE_DIR = Path(os.environ.get("AUDIO_CACHE_DIR", "/tmp/hs_audio_cache")) | |
| AUDIO_CACHE_DIR.mkdir(parents=True, exist_ok=True) | |
| def resolve_audio_path(rel_path: str) -> str: | |
| """Return a local file path for the audio asset. | |
| Tries the bundled pool first (works for local dev or small Spaces with | |
| audio in-repo). If absent and AUDIO_REPO is configured, falls back to | |
| `hf_hub_download` from the dataset, caching under AUDIO_CACHE_DIR so | |
| Gradio is willing to serve the file. | |
| """ | |
| local = POOL_DIR / rel_path | |
| if local.exists(): | |
| return str(local) | |
| if AUDIO_REPO: | |
| from huggingface_hub import hf_hub_download | |
| return hf_hub_download( | |
| repo_id=AUDIO_REPO, | |
| filename=rel_path, | |
| repo_type="dataset", | |
| token=HF_TOKEN, # only required for private datasets | |
| cache_dir=str(AUDIO_CACHE_DIR), | |
| ) | |
| raise FileNotFoundError( | |
| f"audio asset not found locally and AUDIO_REPO is not set: {rel_path}" | |
| ) | |
| # ββββββββββββββββββββββββββββ pool loading βββββββββββββββββββββββββββββββ | |
| with open(MANIFEST_PATH) as _f: | |
| _MANIFEST = json.load(_f) | |
| ALL_TRIALS = _MANIFEST["trials"] | |
| META = _MANIFEST.get("_meta", {}) | |
| CONDITIONS = META.get("conditions", sorted({t["condition"] for t in ALL_TRIALS})) | |
| N_OPTIONS = META.get("n_options", 4) | |
| TOTAL_PER_SESSION = N_PER_CONDITION * len(CONDITIONS) | |
| def sample_session_trials(seed: int | None = None) -> list[dict]: | |
| """Pick N_PER_CONDITION trials from each condition. Order shuffled.""" | |
| rng = random.Random(seed) | |
| by_cond: dict[str, list[dict]] = {c: [] for c in CONDITIONS} | |
| for t in ALL_TRIALS: | |
| if t["condition"] in by_cond: | |
| by_cond[t["condition"]].append(t) | |
| chosen: list[dict] = [] | |
| for c in CONDITIONS: | |
| pool = by_cond[c] | |
| if not pool: | |
| continue | |
| n = min(N_PER_CONDITION, len(pool)) | |
| chosen.extend(rng.sample(pool, n)) | |
| rng.shuffle(chosen) | |
| return chosen | |
| # ββββββββββββββββββββββββββββ submission storage βββββββββββββββββββββββββ | |
| def _build_submission(trials: list[dict], answers: list[dict], pid: str) -> dict: | |
| """Assemble the submission dict with per-condition accuracy.""" | |
| from collections import defaultdict | |
| now = datetime.now().isoformat() | |
| sub = { | |
| "participant_id": pid, | |
| "submitted_at": now, | |
| "n_options": N_OPTIONS, | |
| "n_per_condition": N_PER_CONDITION, | |
| "conditions": CONDITIONS, | |
| "responses": [ | |
| { | |
| "trial_id": i, | |
| "stem": t["stem"], | |
| "condition": t["condition"], | |
| "gt_letter": t["gt_letter"], | |
| "user_letter": a["letter"], | |
| "correct": (a["letter"] is not None | |
| and a["letter"] == t["gt_letter"]), | |
| "options": t["options"], | |
| } | |
| for i, (t, a) in enumerate(zip(trials, answers)) | |
| ], | |
| } | |
| per_cond = defaultdict(lambda: [0, 0]) | |
| for r in sub["responses"]: | |
| if r["user_letter"] is None: | |
| continue | |
| per_cond[r["condition"]][1] += 1 | |
| per_cond[r["condition"]][0] += int(r["correct"]) | |
| sub["per_condition_accuracy"] = { | |
| c: {"correct": v[0], "n": v[1], | |
| "accuracy": (v[0] / v[1] if v[1] else 0.0)} | |
| for c, v in per_cond.items() | |
| } | |
| return sub | |
| def _save_and_upload(trials: list[dict], answers: list[dict], pid: str | |
| ) -> tuple[str, str | None, str | None]: | |
| """Build submission, save to /tmp, upload to HF. | |
| Returns (local_path, uploaded_path_or_None, error_msg_or_None). | |
| """ | |
| sub = _build_submission(trials, answers, pid) | |
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| local = Path("/tmp") / f"hidingsound_tier4_{pid}_{ts}.json" | |
| local.write_text(json.dumps(sub, ensure_ascii=False, indent=2)) | |
| uploaded, upload_err = upload_to_hf(sub) | |
| return str(local), uploaded, upload_err | |
| def upload_to_hf(submission: dict) -> tuple[str | None, str | None]: | |
| """Push the submission JSON to a HF Dataset. | |
| Returns (uploaded_path, error_msg). On success, error_msg is None. | |
| On failure, uploaded_path is None and error_msg explains the issue. | |
| Activates only when HF_TOKEN and HF_RESULTS_REPO are both set. | |
| """ | |
| if not HF_RESULTS_REPO: | |
| return None, "HF_RESULTS_REPO is not set on this Space" | |
| if not HF_TOKEN: | |
| return None, "HF_TOKEN is not set on this Space" | |
| try: | |
| from huggingface_hub import HfApi, create_repo | |
| api = HfApi(token=HF_TOKEN) | |
| # Idempotent: create the dataset if it doesn't exist yet. | |
| try: | |
| create_repo(repo_id=HF_RESULTS_REPO, repo_type="dataset", | |
| token=HF_TOKEN, private=True, exist_ok=True) | |
| except Exception as e: | |
| print(f"[upload_to_hf] create_repo: {e!r}", flush=True) | |
| ts = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| pid = submission.get("participant_id", uuid.uuid4().hex[:8]) | |
| fname = f"submissions/{ts}_{pid}.json" | |
| local = Path("/tmp") / f"sub_{ts}_{pid}.json" | |
| local.write_text(json.dumps(submission, ensure_ascii=False, indent=2)) | |
| api.upload_file( | |
| path_or_fileobj=str(local), | |
| path_in_repo=fname, | |
| repo_id=HF_RESULTS_REPO, | |
| repo_type="dataset", | |
| commit_message=f"submission {pid} @ {ts}", | |
| ) | |
| return f"{HF_RESULTS_REPO}:{fname}", None | |
| except Exception as e: | |
| msg = f"{type(e).__name__}: {e}" | |
| print(f"[upload_to_hf] failed: {msg}", flush=True) | |
| return None, msg | |
| # ββββββββββββββββββββββββββββ UI building blocks βββββββββββββββββββββββββ | |
| INTRO_MD = f""" | |
| # Hiding Sound β Listening Study | |
| Thanks for participating! You will hear **{TOTAL_PER_SESSION} short audio | |
| clips**, about 10β15 minutes in total. | |
| Headphones are **strongly recommended** for the best listening experience. | |
| For each clip, click the option whose description best matches the | |
| **most noticeable** sound you hear. | |
| Pick the sound that is most prominent, | |
| not what you think "should" be in the recording. | |
| Clicking an option records your answer and advances to the next clip | |
| automatically. You can use **β Previous** to go back. When you finish | |
| the last clip, you'll see a Submit button. | |
| Click **Start** below when you're ready. | |
| """ | |
| def render_trial(idx: int, trial: dict) -> str: | |
| head = (f"### Clip {idx+1} / {TOTAL_PER_SESSION}\n\n" | |
| f"Listen to the audio, then click the option whose " | |
| f"description best matches the **most noticeable** sound.") | |
| return head | |
| # ββββββββββββββββββββββββββββ main app βββββββββββββββββββββββββββββββββββ | |
| CUSTOM_CSS = """ | |
| /* ββ Container: never exceed the viewport, no centered max-width gutter. | |
| Gradio's default centers Blocks at ~1280 px max with side padding, | |
| which is what causes a cramped column on phones. */ | |
| .gradio-container, | |
| gradio-app .gradio-container, | |
| .app .gradio-container { | |
| max-width: 100% !important; | |
| width: 100% !important; | |
| padding: 12px !important; | |
| box-sizing: border-box !important; | |
| } | |
| /* All children obey their parent's width β no horizontal scroll. */ | |
| .gradio-container * { box-sizing: border-box; } | |
| .gradio-container img, | |
| .gradio-container audio, | |
| .gradio-container video { | |
| max-width: 100% !important; | |
| height: auto; | |
| } | |
| /* Audio: stretch to row width on every viewport. */ | |
| .gradio-container audio, | |
| .gradio-container [data-testid="audio"], | |
| .gradio-container .audio_player, | |
| .gradio-container .audio-player { | |
| width: 100% !important; | |
| min-width: 0 !important; | |
| } | |
| /* Option buttons: full-width, left-aligned text, wraps on small widths. */ | |
| .opt-btn, | |
| .opt-btn > button, | |
| .opt-btn button { | |
| width: 100% !important; | |
| text-align: left !important; | |
| justify-content: flex-start !important; | |
| white-space: normal !important; | |
| word-break: break-word; | |
| min-height: 56px; | |
| padding: 14px 18px; | |
| line-height: 1.45; | |
| font-size: 1rem; | |
| box-sizing: border-box !important; | |
| } | |
| .opt-btn span, .opt-btn div { | |
| text-align: left !important; | |
| justify-content: flex-start !important; | |
| white-space: normal !important; | |
| } | |
| /* Mobile-specific tightening. */ | |
| @media (max-width: 720px) { | |
| .gradio-container { padding: 8px !important; } | |
| h1, h2, h3 { font-size: 1.15rem !important; } | |
| .opt-btn, | |
| .opt-btn > button, | |
| .opt-btn button { | |
| font-size: 0.95rem !important; | |
| padding: 12px 14px !important; | |
| min-height: 64px; | |
| } | |
| /* Stack any side-by-side rows vertically. */ | |
| .gradio-container .row, | |
| .gradio-container .gr-row { | |
| flex-direction: column !important; | |
| } | |
| } | |
| /* iOS Safari: prevent zoom-on-tap from shrinking small input text. */ | |
| .gradio-container button, | |
| .gradio-container input, | |
| .gradio-container select, | |
| .gradio-container textarea { | |
| font-size: 16px; | |
| } | |
| """ | |
| CUSTOM_HEAD = """ | |
| <meta name="viewport" content="width=device-width, initial-scale=1.0, viewport-fit=cover" /> | |
| <meta name="apple-mobile-web-app-capable" content="yes" /> | |
| """ | |
| import inspect as _inspect | |
| _BLOCKS_PARAMS = _inspect.signature(gr.Blocks).parameters | |
| _LAUNCH_PARAMS = _inspect.signature(gr.Blocks.launch).parameters | |
| def _filter_kwargs(params, kwargs): | |
| """Drop kwargs not accepted by `params` (handles Gradio 5β6 splits).""" | |
| return {k: v for k, v in kwargs.items() if k in params} | |
| def make_app() -> gr.Blocks: | |
| blocks_kwargs = _filter_kwargs(_BLOCKS_PARAMS, dict( | |
| title="Hiding Sound Listening Study", | |
| css=CUSTOM_CSS, | |
| theme=gr.themes.Soft(), | |
| # fill_width=True removes Gradio's centered max-width gutter so | |
| # the layout uses the full screen on phones and laptops alike. | |
| fill_width=True, | |
| head=CUSTOM_HEAD, | |
| )) | |
| with gr.Blocks(**blocks_kwargs) as demo: | |
| gr.Markdown(INTRO_MD) | |
| # session state | |
| trials_state = gr.State(value=[]) | |
| idx_state = gr.State(value=0) | |
| answers_state = gr.State(value=[]) # list of {"trial_id":int, "letter":str|None} | |
| pid_state = gr.State(value="") | |
| # intro page (no participant-id box; we auto-generate) | |
| with gr.Group(visible=True) as intro_group: | |
| start_btn = gr.Button("Start", variant="primary") | |
| # trial page β 4 option buttons (label updated per-trial); clicking | |
| # an option records the answer and auto-advances. | |
| with gr.Group(visible=False) as trial_group: | |
| progress_md = gr.Markdown("") | |
| # autoplay=True: when the trial advances and a new audio path | |
| # is pushed to this component, the browser auto-starts | |
| # playback. The user can pause/seek with the standard audio | |
| # controls. (Mobile browsers gate autoplay until the page | |
| # has had a user interaction; clicking Start counts, so this | |
| # works from trial 1 onward on iOS/Android.) | |
| audio_player = gr.Audio(label="Audio", autoplay=True, type="filepath") | |
| opt_btn_a = gr.Button("", variant="secondary", elem_classes="opt-btn") | |
| opt_btn_b = gr.Button("", variant="secondary", elem_classes="opt-btn") | |
| opt_btn_c = gr.Button("", variant="secondary", elem_classes="opt-btn") | |
| opt_btn_d = gr.Button("", variant="secondary", elem_classes="opt-btn") | |
| prev_btn = gr.Button("β Previous") | |
| # submit page β populated automatically after the last trial | |
| with gr.Group(visible=False) as submit_group: | |
| submit_status = gr.Markdown("") | |
| download_file = gr.File(label="Download your responses (JSON)", | |
| visible=False) | |
| # βββ helpers ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _trial_ui_updates(i: int, trials: list[dict], answers: list[dict]): | |
| """Returns [progress_md, audio_update, btn_A_label, btn_B_label, | |
| btn_C_label, btn_D_label] for the trial at index i. | |
| We wrap the audio path in a `gr.update(...)` with | |
| `autoplay=True` so the browser re-triggers playback every time | |
| the value changes, even between trials within the same session. | |
| (Just returning a path keeps the same element instance, which | |
| on some browsers does not re-fire autoplay; the explicit | |
| update forces a fresh load + play.) | |
| """ | |
| t = trials[i] | |
| audio_path = resolve_audio_path(t["audio"]) | |
| audio_update = gr.update(value=audio_path, autoplay=True) | |
| head = render_trial(i, t) | |
| chosen = answers[i].get("letter") | |
| btn_labels: list[str] = [] | |
| for opt in t["options"]: | |
| tag = "β " if chosen == opt["letter"] else "" | |
| btn_labels.append(f"{tag}({opt['letter']}) {opt['description']}") | |
| # Pad to 4 in case n_options < 4 for some reason. | |
| while len(btn_labels) < 4: | |
| btn_labels.append("") | |
| return [head, audio_update, *btn_labels[:4]] | |
| # βββ handlers βββββββββββββββββββββββββββββββββββββββββββββββββ | |
| AUTO_SUBMIT_EVERY = 10 # save + upload after every N answered trials | |
| def on_start(): | |
| pid = uuid.uuid4().hex[:8] | |
| trials = sample_session_trials(seed=int(time.time() * 1000) & 0xffff_ffff) | |
| answers = [{"trial_id": i, "letter": None} | |
| for i in range(len(trials))] | |
| ui = _trial_ui_updates(0, trials, answers) | |
| return ( | |
| gr.update(visible=False), # intro | |
| gr.update(visible=True), # trial | |
| gr.update(visible=False), # submit | |
| trials, 0, answers, pid, | |
| *ui, # progress, audio, btn A/B/C/D | |
| ) | |
| def _advance(trials, i, answers, pid, letter): | |
| """Record answer, auto-submit every AUTO_SUBMIT_EVERY answers, | |
| and auto-show the submit page (already populated) on the last trial. | |
| """ | |
| if 0 <= i < len(trials): | |
| answers[i]["letter"] = letter | |
| n_answered = sum(1 for a in answers if a["letter"] is not None) | |
| # Auto-save + upload every AUTO_SUBMIT_EVERY answered trials. | |
| if n_answered % AUTO_SUBMIT_EVERY == 0: | |
| local, uploaded, err = _save_and_upload(trials, answers, pid) | |
| print(f"[auto_submit] @{n_answered} answered β " | |
| f"local={local} uploaded={uploaded} err={err}", flush=True) | |
| if i + 1 < len(trials): | |
| i += 1 | |
| ui = _trial_ui_updates(i, trials, answers) | |
| return ( | |
| gr.update(visible=True), # trial | |
| gr.update(visible=False), # submit | |
| trials, i, answers, | |
| gr.update(), # submit_status β no change | |
| gr.update(), # download_file β no change | |
| *ui, | |
| ) | |
| # Last trial answered β final save/upload and flip to submit page. | |
| local, uploaded, upload_err = _save_and_upload(trials, answers, pid) | |
| sub = _build_submission(trials, answers, pid) | |
| status_lines = ["**Thank you!** Your responses are recorded."] | |
| if uploaded: | |
| status_lines.append(f"Auto-uploaded to dataset: `{uploaded}` β") | |
| else: | |
| status_lines.append( | |
| "*(Auto-upload is not active for this Space β " | |
| f"reason: `{upload_err}`. Please download the JSON below " | |
| "and email it to the study organizers.)*" | |
| ) | |
| status_lines.append("") | |
| status_lines.append("Per-condition accuracy on this submission:") | |
| for c, v in sub["per_condition_accuracy"].items(): | |
| status_lines.append(f"- **{c}**: {v['correct']}/{v['n']} = " | |
| f"{100*v['accuracy']:.1f}%") | |
| ui = _trial_ui_updates(i, trials, answers) | |
| return ( | |
| gr.update(visible=False), # trial | |
| gr.update(visible=True), # submit | |
| trials, i, answers, | |
| "\n".join(status_lines), # submit_status | |
| gr.update(value=local, visible=True), # download_file | |
| *ui, | |
| ) | |
| def make_pick(letter): | |
| def _pick(trials, i, answers, pid): | |
| return _advance(trials, i, answers, pid, letter) | |
| return _pick | |
| def on_prev(trials, i, answers): | |
| if i > 0: | |
| i -= 1 | |
| ui = _trial_ui_updates(i, trials, answers) | |
| return ( | |
| gr.update(visible=True), # trial visible | |
| gr.update(visible=False), # submit hidden | |
| trials, i, answers, | |
| gr.update(), # submit_status β no change | |
| gr.update(), # download_file β no change | |
| *ui, | |
| ) | |
| # βββ wiring βββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| trial_outputs = [ | |
| trial_group, submit_group, | |
| trials_state, idx_state, answers_state, | |
| submit_status, download_file, | |
| progress_md, audio_player, | |
| opt_btn_a, opt_btn_b, opt_btn_c, opt_btn_d, | |
| ] | |
| start_btn.click( | |
| on_start, | |
| inputs=[], | |
| outputs=[ | |
| intro_group, trial_group, submit_group, | |
| trials_state, idx_state, answers_state, pid_state, | |
| progress_md, audio_player, | |
| opt_btn_a, opt_btn_b, opt_btn_c, opt_btn_d, | |
| ], | |
| ) | |
| for btn, letter in [(opt_btn_a, "A"), (opt_btn_b, "B"), | |
| (opt_btn_c, "C"), (opt_btn_d, "D")]: | |
| btn.click( | |
| make_pick(letter), | |
| inputs=[trials_state, idx_state, answers_state, pid_state], | |
| outputs=trial_outputs, | |
| ) | |
| prev_btn.click( | |
| on_prev, | |
| inputs=[trials_state, idx_state, answers_state], | |
| outputs=trial_outputs, | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| # Build a launch() kwarg dict that works across Gradio 5 (Space) and | |
| # Gradio 6 (local conda env), filtering out anything the running | |
| # version doesn't accept. | |
| launch_kwargs = { | |
| # allowed_paths lets Gradio serve the on-demand audio downloaded | |
| # into AUDIO_CACHE_DIR via `hf_hub_download`. | |
| "allowed_paths": [str(AUDIO_CACHE_DIR), str(POOL_DIR)], | |
| # show_api=False (Gradio 5.x) skips the /api/info endpoint that | |
| # triggers gradio_client's JSON-schema parser. We also monkey-patch | |
| # the parser at the top of this file as belt-and-braces, so newer | |
| # Gradio versions that don't accept this kwarg are fine. | |
| "show_api": False, | |
| } | |
| # Gradio 6 moved theme/css/head/fill_width from Blocks(...) to | |
| # launch(...). When running on Gradio 6 locally, forward them here; | |
| # on Gradio 5 they already attached to Blocks above. | |
| if "css" in _LAUNCH_PARAMS and "css" not in _BLOCKS_PARAMS: | |
| launch_kwargs["css"] = CUSTOM_CSS | |
| if "theme" in _LAUNCH_PARAMS and "theme" not in _BLOCKS_PARAMS: | |
| launch_kwargs["theme"] = gr.themes.Soft() | |
| if "head" in _LAUNCH_PARAMS and "head" not in _BLOCKS_PARAMS: | |
| launch_kwargs["head"] = CUSTOM_HEAD | |
| if "fill_width" in _LAUNCH_PARAMS and "fill_width" not in _BLOCKS_PARAMS: | |
| launch_kwargs["fill_width"] = True | |
| launch_kwargs = _filter_kwargs(_LAUNCH_PARAMS, launch_kwargs) | |
| make_app().launch(**launch_kwargs) | |