hidingsound / app.py
ymdou's picture
Update Tier 4 listening study
fdf769e verified
"""Hiding Sound β€” Tier 4 human-study Gradio app.
Deploy as a Hugging Face Space. On every fresh page-load the app draws a
new random subset of trials from `pool/manifest.json` (default 20 per
condition Γ— 5 conditions = 100 trials). Trials are presented one at a time
with audio playback and 4 multiple-choice options. On submit, the app
generates a JSON file that the participant can download AND, if the Space
has the `HF_TOKEN` + `HF_RESULTS_REPO` secrets set, pushes the JSON to a
private HF dataset for automated collection.
Knobs (env vars):
HF_TOKEN HF write-access token (set as a Space secret)
HF_RESULTS_REPO target dataset, e.g. "ymdou/hidingsound-tier4-results"
N_PER_CONDITION override per-condition sample size (default 20)
Run locally:
python app.py
"""
from __future__ import annotations
import json
import os
import random
import time
import uuid
from datetime import datetime
from pathlib import Path
import gradio as gr
# ── Workaround: gradio_client.utils._json_schema_to_python_type crashes on
# boolean JSON schemas (e.g. `additionalProperties: True`) in some
# combinations of gradio_client + pydantic + State payloads. We patch the
# offending helpers to fall back to a permissive type. Safe and idempotent
# across versions; if the bug is already fixed, the patch is a no-op.
try:
from gradio_client import utils as _gc_utils
if not getattr(_gc_utils, "_HS_BOOL_PATCHED", False):
_orig_get_type = _gc_utils.get_type
def _patched_get_type(schema):
if isinstance(schema, bool):
return "Any"
return _orig_get_type(schema)
_gc_utils.get_type = _patched_get_type
_orig_jstpt = _gc_utils._json_schema_to_python_type
def _patched_jstpt(schema, defs=None):
if isinstance(schema, bool):
return "Any"
return _orig_jstpt(schema, defs)
_gc_utils._json_schema_to_python_type = _patched_jstpt
_gc_utils._HS_BOOL_PATCHED = True
except Exception: # pragma: no cover β€” never block the app boot
pass
POOL_DIR = Path(__file__).parent / "pool"
MANIFEST_PATH = POOL_DIR / "manifest.json"
N_PER_CONDITION = int(os.environ.get("N_PER_CONDITION", "20"))
HF_TOKEN = os.environ.get("HF_TOKEN")
HF_RESULTS_REPO = os.environ.get("HF_RESULTS_REPO") # e.g. "user/dataset"
# When AUDIO_REPO is set, audio files are fetched from a HF Dataset on
# demand (cached locally) instead of being bundled in the Space repo.
AUDIO_REPO = os.environ.get("AUDIO_REPO") # e.g. "user/dataset"
# Gradio 5+ refuses to serve files unless their path is under cwd, /tmp,
# or an entry in `allowed_paths`. The HF Hub default cache lands under
# `~/.cache/huggingface/...` which isn't trusted, so we redirect every
# `hf_hub_download` from this app into a known-allowed dir below /tmp
# and pass that same dir to `launch(allowed_paths=[...])`.
AUDIO_CACHE_DIR = Path(os.environ.get("AUDIO_CACHE_DIR", "/tmp/hs_audio_cache"))
AUDIO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
def resolve_audio_path(rel_path: str) -> str:
"""Return a local file path for the audio asset.
Tries the bundled pool first (works for local dev or small Spaces with
audio in-repo). If absent and AUDIO_REPO is configured, falls back to
`hf_hub_download` from the dataset, caching under AUDIO_CACHE_DIR so
Gradio is willing to serve the file.
"""
local = POOL_DIR / rel_path
if local.exists():
return str(local)
if AUDIO_REPO:
from huggingface_hub import hf_hub_download
return hf_hub_download(
repo_id=AUDIO_REPO,
filename=rel_path,
repo_type="dataset",
token=HF_TOKEN, # only required for private datasets
cache_dir=str(AUDIO_CACHE_DIR),
)
raise FileNotFoundError(
f"audio asset not found locally and AUDIO_REPO is not set: {rel_path}"
)
# ──────────────────────────── pool loading ───────────────────────────────
with open(MANIFEST_PATH) as _f:
_MANIFEST = json.load(_f)
ALL_TRIALS = _MANIFEST["trials"]
META = _MANIFEST.get("_meta", {})
CONDITIONS = META.get("conditions", sorted({t["condition"] for t in ALL_TRIALS}))
N_OPTIONS = META.get("n_options", 4)
TOTAL_PER_SESSION = N_PER_CONDITION * len(CONDITIONS)
def sample_session_trials(seed: int | None = None) -> list[dict]:
"""Pick N_PER_CONDITION trials from each condition. Order shuffled."""
rng = random.Random(seed)
by_cond: dict[str, list[dict]] = {c: [] for c in CONDITIONS}
for t in ALL_TRIALS:
if t["condition"] in by_cond:
by_cond[t["condition"]].append(t)
chosen: list[dict] = []
for c in CONDITIONS:
pool = by_cond[c]
if not pool:
continue
n = min(N_PER_CONDITION, len(pool))
chosen.extend(rng.sample(pool, n))
rng.shuffle(chosen)
return chosen
# ──────────────────────────── submission storage ─────────────────────────
def _build_submission(trials: list[dict], answers: list[dict], pid: str) -> dict:
"""Assemble the submission dict with per-condition accuracy."""
from collections import defaultdict
now = datetime.now().isoformat()
sub = {
"participant_id": pid,
"submitted_at": now,
"n_options": N_OPTIONS,
"n_per_condition": N_PER_CONDITION,
"conditions": CONDITIONS,
"responses": [
{
"trial_id": i,
"stem": t["stem"],
"condition": t["condition"],
"gt_letter": t["gt_letter"],
"user_letter": a["letter"],
"correct": (a["letter"] is not None
and a["letter"] == t["gt_letter"]),
"options": t["options"],
}
for i, (t, a) in enumerate(zip(trials, answers))
],
}
per_cond = defaultdict(lambda: [0, 0])
for r in sub["responses"]:
if r["user_letter"] is None:
continue
per_cond[r["condition"]][1] += 1
per_cond[r["condition"]][0] += int(r["correct"])
sub["per_condition_accuracy"] = {
c: {"correct": v[0], "n": v[1],
"accuracy": (v[0] / v[1] if v[1] else 0.0)}
for c, v in per_cond.items()
}
return sub
def _save_and_upload(trials: list[dict], answers: list[dict], pid: str
) -> tuple[str, str | None, str | None]:
"""Build submission, save to /tmp, upload to HF.
Returns (local_path, uploaded_path_or_None, error_msg_or_None).
"""
sub = _build_submission(trials, answers, pid)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
local = Path("/tmp") / f"hidingsound_tier4_{pid}_{ts}.json"
local.write_text(json.dumps(sub, ensure_ascii=False, indent=2))
uploaded, upload_err = upload_to_hf(sub)
return str(local), uploaded, upload_err
def upload_to_hf(submission: dict) -> tuple[str | None, str | None]:
"""Push the submission JSON to a HF Dataset.
Returns (uploaded_path, error_msg). On success, error_msg is None.
On failure, uploaded_path is None and error_msg explains the issue.
Activates only when HF_TOKEN and HF_RESULTS_REPO are both set.
"""
if not HF_RESULTS_REPO:
return None, "HF_RESULTS_REPO is not set on this Space"
if not HF_TOKEN:
return None, "HF_TOKEN is not set on this Space"
try:
from huggingface_hub import HfApi, create_repo
api = HfApi(token=HF_TOKEN)
# Idempotent: create the dataset if it doesn't exist yet.
try:
create_repo(repo_id=HF_RESULTS_REPO, repo_type="dataset",
token=HF_TOKEN, private=True, exist_ok=True)
except Exception as e:
print(f"[upload_to_hf] create_repo: {e!r}", flush=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
pid = submission.get("participant_id", uuid.uuid4().hex[:8])
fname = f"submissions/{ts}_{pid}.json"
local = Path("/tmp") / f"sub_{ts}_{pid}.json"
local.write_text(json.dumps(submission, ensure_ascii=False, indent=2))
api.upload_file(
path_or_fileobj=str(local),
path_in_repo=fname,
repo_id=HF_RESULTS_REPO,
repo_type="dataset",
commit_message=f"submission {pid} @ {ts}",
)
return f"{HF_RESULTS_REPO}:{fname}", None
except Exception as e:
msg = f"{type(e).__name__}: {e}"
print(f"[upload_to_hf] failed: {msg}", flush=True)
return None, msg
# ──────────────────────────── UI building blocks ─────────────────────────
INTRO_MD = f"""
# Hiding Sound β€” Listening Study
Thanks for participating! You will hear **{TOTAL_PER_SESSION} short audio
clips**, about 10–15 minutes in total.
Headphones are **strongly recommended** for the best listening experience.
For each clip, click the option whose description best matches the
**most noticeable** sound you hear.
Pick the sound that is most prominent,
not what you think "should" be in the recording.
Clicking an option records your answer and advances to the next clip
automatically. You can use **β—€ Previous** to go back. When you finish
the last clip, you'll see a Submit button.
Click **Start** below when you're ready.
"""
def render_trial(idx: int, trial: dict) -> str:
head = (f"### Clip {idx+1} / {TOTAL_PER_SESSION}\n\n"
f"Listen to the audio, then click the option whose "
f"description best matches the **most noticeable** sound.")
return head
# ──────────────────────────── main app ───────────────────────────────────
CUSTOM_CSS = """
/* ── Container: never exceed the viewport, no centered max-width gutter.
Gradio's default centers Blocks at ~1280 px max with side padding,
which is what causes a cramped column on phones. */
.gradio-container,
gradio-app .gradio-container,
.app .gradio-container {
max-width: 100% !important;
width: 100% !important;
padding: 12px !important;
box-sizing: border-box !important;
}
/* All children obey their parent's width β€” no horizontal scroll. */
.gradio-container * { box-sizing: border-box; }
.gradio-container img,
.gradio-container audio,
.gradio-container video {
max-width: 100% !important;
height: auto;
}
/* Audio: stretch to row width on every viewport. */
.gradio-container audio,
.gradio-container [data-testid="audio"],
.gradio-container .audio_player,
.gradio-container .audio-player {
width: 100% !important;
min-width: 0 !important;
}
/* Option buttons: full-width, left-aligned text, wraps on small widths. */
.opt-btn,
.opt-btn > button,
.opt-btn button {
width: 100% !important;
text-align: left !important;
justify-content: flex-start !important;
white-space: normal !important;
word-break: break-word;
min-height: 56px;
padding: 14px 18px;
line-height: 1.45;
font-size: 1rem;
box-sizing: border-box !important;
}
.opt-btn span, .opt-btn div {
text-align: left !important;
justify-content: flex-start !important;
white-space: normal !important;
}
/* Mobile-specific tightening. */
@media (max-width: 720px) {
.gradio-container { padding: 8px !important; }
h1, h2, h3 { font-size: 1.15rem !important; }
.opt-btn,
.opt-btn > button,
.opt-btn button {
font-size: 0.95rem !important;
padding: 12px 14px !important;
min-height: 64px;
}
/* Stack any side-by-side rows vertically. */
.gradio-container .row,
.gradio-container .gr-row {
flex-direction: column !important;
}
}
/* iOS Safari: prevent zoom-on-tap from shrinking small input text. */
.gradio-container button,
.gradio-container input,
.gradio-container select,
.gradio-container textarea {
font-size: 16px;
}
"""
CUSTOM_HEAD = """
<meta name="viewport" content="width=device-width, initial-scale=1.0, viewport-fit=cover" />
<meta name="apple-mobile-web-app-capable" content="yes" />
"""
import inspect as _inspect
_BLOCKS_PARAMS = _inspect.signature(gr.Blocks).parameters
_LAUNCH_PARAMS = _inspect.signature(gr.Blocks.launch).parameters
def _filter_kwargs(params, kwargs):
"""Drop kwargs not accepted by `params` (handles Gradio 5↔6 splits)."""
return {k: v for k, v in kwargs.items() if k in params}
def make_app() -> gr.Blocks:
blocks_kwargs = _filter_kwargs(_BLOCKS_PARAMS, dict(
title="Hiding Sound Listening Study",
css=CUSTOM_CSS,
theme=gr.themes.Soft(),
# fill_width=True removes Gradio's centered max-width gutter so
# the layout uses the full screen on phones and laptops alike.
fill_width=True,
head=CUSTOM_HEAD,
))
with gr.Blocks(**blocks_kwargs) as demo:
gr.Markdown(INTRO_MD)
# session state
trials_state = gr.State(value=[])
idx_state = gr.State(value=0)
answers_state = gr.State(value=[]) # list of {"trial_id":int, "letter":str|None}
pid_state = gr.State(value="")
# intro page (no participant-id box; we auto-generate)
with gr.Group(visible=True) as intro_group:
start_btn = gr.Button("Start", variant="primary")
# trial page β€” 4 option buttons (label updated per-trial); clicking
# an option records the answer and auto-advances.
with gr.Group(visible=False) as trial_group:
progress_md = gr.Markdown("")
# autoplay=True: when the trial advances and a new audio path
# is pushed to this component, the browser auto-starts
# playback. The user can pause/seek with the standard audio
# controls. (Mobile browsers gate autoplay until the page
# has had a user interaction; clicking Start counts, so this
# works from trial 1 onward on iOS/Android.)
audio_player = gr.Audio(label="Audio", autoplay=True, type="filepath")
opt_btn_a = gr.Button("", variant="secondary", elem_classes="opt-btn")
opt_btn_b = gr.Button("", variant="secondary", elem_classes="opt-btn")
opt_btn_c = gr.Button("", variant="secondary", elem_classes="opt-btn")
opt_btn_d = gr.Button("", variant="secondary", elem_classes="opt-btn")
prev_btn = gr.Button("β—€ Previous")
# submit page β€” populated automatically after the last trial
with gr.Group(visible=False) as submit_group:
submit_status = gr.Markdown("")
download_file = gr.File(label="Download your responses (JSON)",
visible=False)
# ─── helpers ──────────────────────────────────────────────────
def _trial_ui_updates(i: int, trials: list[dict], answers: list[dict]):
"""Returns [progress_md, audio_update, btn_A_label, btn_B_label,
btn_C_label, btn_D_label] for the trial at index i.
We wrap the audio path in a `gr.update(...)` with
`autoplay=True` so the browser re-triggers playback every time
the value changes, even between trials within the same session.
(Just returning a path keeps the same element instance, which
on some browsers does not re-fire autoplay; the explicit
update forces a fresh load + play.)
"""
t = trials[i]
audio_path = resolve_audio_path(t["audio"])
audio_update = gr.update(value=audio_path, autoplay=True)
head = render_trial(i, t)
chosen = answers[i].get("letter")
btn_labels: list[str] = []
for opt in t["options"]:
tag = "βœ“ " if chosen == opt["letter"] else ""
btn_labels.append(f"{tag}({opt['letter']}) {opt['description']}")
# Pad to 4 in case n_options < 4 for some reason.
while len(btn_labels) < 4:
btn_labels.append("")
return [head, audio_update, *btn_labels[:4]]
# ─── handlers ─────────────────────────────────────────────────
AUTO_SUBMIT_EVERY = 10 # save + upload after every N answered trials
def on_start():
pid = uuid.uuid4().hex[:8]
trials = sample_session_trials(seed=int(time.time() * 1000) & 0xffff_ffff)
answers = [{"trial_id": i, "letter": None}
for i in range(len(trials))]
ui = _trial_ui_updates(0, trials, answers)
return (
gr.update(visible=False), # intro
gr.update(visible=True), # trial
gr.update(visible=False), # submit
trials, 0, answers, pid,
*ui, # progress, audio, btn A/B/C/D
)
def _advance(trials, i, answers, pid, letter):
"""Record answer, auto-submit every AUTO_SUBMIT_EVERY answers,
and auto-show the submit page (already populated) on the last trial.
"""
if 0 <= i < len(trials):
answers[i]["letter"] = letter
n_answered = sum(1 for a in answers if a["letter"] is not None)
# Auto-save + upload every AUTO_SUBMIT_EVERY answered trials.
if n_answered % AUTO_SUBMIT_EVERY == 0:
local, uploaded, err = _save_and_upload(trials, answers, pid)
print(f"[auto_submit] @{n_answered} answered β†’ "
f"local={local} uploaded={uploaded} err={err}", flush=True)
if i + 1 < len(trials):
i += 1
ui = _trial_ui_updates(i, trials, answers)
return (
gr.update(visible=True), # trial
gr.update(visible=False), # submit
trials, i, answers,
gr.update(), # submit_status β€” no change
gr.update(), # download_file β€” no change
*ui,
)
# Last trial answered β†’ final save/upload and flip to submit page.
local, uploaded, upload_err = _save_and_upload(trials, answers, pid)
sub = _build_submission(trials, answers, pid)
status_lines = ["**Thank you!** Your responses are recorded."]
if uploaded:
status_lines.append(f"Auto-uploaded to dataset: `{uploaded}` βœ“")
else:
status_lines.append(
"*(Auto-upload is not active for this Space β€” "
f"reason: `{upload_err}`. Please download the JSON below "
"and email it to the study organizers.)*"
)
status_lines.append("")
status_lines.append("Per-condition accuracy on this submission:")
for c, v in sub["per_condition_accuracy"].items():
status_lines.append(f"- **{c}**: {v['correct']}/{v['n']} = "
f"{100*v['accuracy']:.1f}%")
ui = _trial_ui_updates(i, trials, answers)
return (
gr.update(visible=False), # trial
gr.update(visible=True), # submit
trials, i, answers,
"\n".join(status_lines), # submit_status
gr.update(value=local, visible=True), # download_file
*ui,
)
def make_pick(letter):
def _pick(trials, i, answers, pid):
return _advance(trials, i, answers, pid, letter)
return _pick
def on_prev(trials, i, answers):
if i > 0:
i -= 1
ui = _trial_ui_updates(i, trials, answers)
return (
gr.update(visible=True), # trial visible
gr.update(visible=False), # submit hidden
trials, i, answers,
gr.update(), # submit_status β€” no change
gr.update(), # download_file β€” no change
*ui,
)
# ─── wiring ───────────────────────────────────────────────────
trial_outputs = [
trial_group, submit_group,
trials_state, idx_state, answers_state,
submit_status, download_file,
progress_md, audio_player,
opt_btn_a, opt_btn_b, opt_btn_c, opt_btn_d,
]
start_btn.click(
on_start,
inputs=[],
outputs=[
intro_group, trial_group, submit_group,
trials_state, idx_state, answers_state, pid_state,
progress_md, audio_player,
opt_btn_a, opt_btn_b, opt_btn_c, opt_btn_d,
],
)
for btn, letter in [(opt_btn_a, "A"), (opt_btn_b, "B"),
(opt_btn_c, "C"), (opt_btn_d, "D")]:
btn.click(
make_pick(letter),
inputs=[trials_state, idx_state, answers_state, pid_state],
outputs=trial_outputs,
)
prev_btn.click(
on_prev,
inputs=[trials_state, idx_state, answers_state],
outputs=trial_outputs,
)
return demo
if __name__ == "__main__":
# Build a launch() kwarg dict that works across Gradio 5 (Space) and
# Gradio 6 (local conda env), filtering out anything the running
# version doesn't accept.
launch_kwargs = {
# allowed_paths lets Gradio serve the on-demand audio downloaded
# into AUDIO_CACHE_DIR via `hf_hub_download`.
"allowed_paths": [str(AUDIO_CACHE_DIR), str(POOL_DIR)],
# show_api=False (Gradio 5.x) skips the /api/info endpoint that
# triggers gradio_client's JSON-schema parser. We also monkey-patch
# the parser at the top of this file as belt-and-braces, so newer
# Gradio versions that don't accept this kwarg are fine.
"show_api": False,
}
# Gradio 6 moved theme/css/head/fill_width from Blocks(...) to
# launch(...). When running on Gradio 6 locally, forward them here;
# on Gradio 5 they already attached to Blocks above.
if "css" in _LAUNCH_PARAMS and "css" not in _BLOCKS_PARAMS:
launch_kwargs["css"] = CUSTOM_CSS
if "theme" in _LAUNCH_PARAMS and "theme" not in _BLOCKS_PARAMS:
launch_kwargs["theme"] = gr.themes.Soft()
if "head" in _LAUNCH_PARAMS and "head" not in _BLOCKS_PARAMS:
launch_kwargs["head"] = CUSTOM_HEAD
if "fill_width" in _LAUNCH_PARAMS and "fill_width" not in _BLOCKS_PARAMS:
launch_kwargs["fill_width"] = True
launch_kwargs = _filter_kwargs(_LAUNCH_PARAMS, launch_kwargs)
make_app().launch(**launch_kwargs)