| import gradio as gr |
| import os |
| import json |
| from datetime import datetime |
| import uuid |
| import re |
| from typing import List, Dict, Tuple, Optional |
|
|
| |
| |
| |
|
|
| |
| QUESTIONS_ROOT = os.path.join(os.path.dirname(__file__), "questions") |
| |
| NUM_QUESTIONS = 16 |
|
|
| |
| G_VALUE_PATTERN = re.compile(r"(?:^|[_-])g([0-9]+(?:\.[0-9]+)?)", re.IGNORECASE) |
| |
| G0_PATTERN = re.compile(r"(?:^|[_-])g0(?:\.0+)?(?:[_-]|$)", re.IGNORECASE) |
| |
| MIX_VALUE_PATTERN = re.compile(r"(?:^|[_-])mix([0-9]+(?:\.[0-9]+)?)", re.IGNORECASE) |
|
|
|
|
| |
| |
| |
|
|
| def discover_questions() -> List[Dict]: |
| """ |
| Scan QUESTIONS_ROOT/question1..questionN and build a question list. |
| |
| Rules: |
| - Noise reference: any .wav not starting with '1' or '2' (optional). |
| - Audio A: .wav starting with '1' |
| - Audio B: .wav starting with '2' |
| - Image (optional): first *.jpg/*.jpeg/*.png/*.gif in the folder |
| - Correctness heuristic: |
| * Prefer 'g' rule: if exactly one side has g==0, that side is WRONG. |
| * Fallback 'mix' rule when no 'g' param on either: if exactly one side has mix==1.0, that side is WRONG. |
| We store which side is WRONG in field 't099_is' for backward compatibility. |
| """ |
| questions = [] |
| print(f"[disc] Scanning: {QUESTIONS_ROOT}") |
|
|
| for i in range(1, NUM_QUESTIONS + 1): |
| qdir = os.path.join(QUESTIONS_ROOT, f"question{i}") |
| if not os.path.isdir(qdir): |
| print(f"[disc] Skip missing dir: {qdir}") |
| continue |
|
|
| |
| all_files = [f for f in os.listdir(qdir) if f.lower().endswith(".wav")] |
| noise_candidates = [f for f in all_files if not (f.startswith("1") or f.startswith("2"))] |
| one_candidates = sorted([f for f in all_files if f.startswith("1")]) |
| two_candidates = sorted([f for f in all_files if f.startswith("2")]) |
|
|
| image_candidates = [f for f in os.listdir(qdir) |
| if f.lower().endswith(('.jpg', '.jpeg', '.png', '.gif'))] |
|
|
| |
| noise_path = os.path.join(qdir, noise_candidates[0]) if noise_candidates else None |
| a_path = os.path.join(qdir, one_candidates[0]) if one_candidates else None |
| b_path = os.path.join(qdir, two_candidates[0]) if two_candidates else None |
| image_path = os.path.join(qdir, image_candidates[0]) if image_candidates else None |
|
|
| if not (a_path and b_path): |
| print(f"[disc] Missing A/B in {qdir}: A={a_path}, B={b_path}") |
| continue |
|
|
| |
| for p in [a_path, b_path, noise_path, image_path]: |
| if p and not os.path.exists(p): |
| print(f"[disc] File not found (non-fatal): {p}") |
|
|
| |
| fname_a = os.path.basename(a_path) |
| fname_b = os.path.basename(b_path) |
|
|
| a_has_g = bool(G_VALUE_PATTERN.search(fname_a)) |
| b_has_g = bool(G_VALUE_PATTERN.search(fname_b)) |
| a_is_g0 = bool(G0_PATTERN.search(fname_a)) |
| b_is_g0 = bool(G0_PATTERN.search(fname_b)) |
|
|
| a_is_mix1 = False |
| b_is_mix1 = False |
| if not (a_has_g or b_has_g): |
| ma = MIX_VALUE_PATTERN.search(fname_a) |
| mb = MIX_VALUE_PATTERN.search(fname_b) |
| try: |
| a_is_mix1 = (abs(float(ma.group(1)) - 1.0) < 1e-9) if ma else False |
| except Exception: |
| a_is_mix1 = False |
| try: |
| b_is_mix1 = (abs(float(mb.group(1)) - 1.0) < 1e-9) if mb else False |
| except Exception: |
| b_is_mix1 = False |
|
|
| wrong_label = None |
| if a_has_g or b_has_g: |
| if a_is_g0 and not b_is_g0: |
| wrong_label = "A" |
| elif b_is_g0 and not a_is_g0: |
| wrong_label = "B" |
| else: |
| if a_is_mix1 and not b_is_mix1: |
| wrong_label = "A" |
| elif b_is_mix1 and not a_is_mix1: |
| wrong_label = "B" |
|
|
| if wrong_label == "A": |
| correct_label = "B" |
| elif wrong_label == "B": |
| correct_label = "A" |
| else: |
| correct_label = None |
|
|
| questions.append({ |
| "id": f"question{i}", |
| "index": i, |
| "noise": noise_path, |
| "A": a_path, |
| "B": b_path, |
| "image": image_path, |
| "correct": correct_label, |
| |
| "t099_is": wrong_label, |
| }) |
|
|
| print(f"[disc] Found {len(questions)} valid questions.") |
| return questions |
|
|
|
|
| |
| |
| |
| def upload_to_results_dataset(local_path: str, dest_dir: str = "submissions") -> str: |
| """ |
| Upload a local file into a dedicated dataset repo. |
| Unlike committing to the Space repo, this does NOT trigger rebuilds/restarts. |
| |
| Requires Space secrets: |
| - HF_TOKEN: with write permissions |
| - RESULTS_REPO: dataset repo id (e.g., 'qiuyiding/sound-survey-results') |
| """ |
| from huggingface_hub import upload_file, create_repo |
|
|
| repo_id = os.environ.get("RESULTS_REPO", "qiuyiding/sound-survey-results") |
| hf_token = os.environ.get("HF_TOKEN") |
| if not hf_token: |
| raise RuntimeError("Missing HF_TOKEN. Set it in Settings → Repository secrets.") |
|
|
| if "/" not in repo_id: |
| raise ValueError(f"RESULTS_REPO looks invalid: {repo_id!r}. Expected 'owner/dataset-name'.") |
|
|
| create_repo(repo_id, repo_type="dataset", exist_ok=True, token=hf_token) |
|
|
| remote_path = f"{dest_dir}/{os.path.basename(local_path)}" |
| upload_file( |
| path_or_fileobj=local_path, |
| path_in_repo=remote_path, |
| repo_id=repo_id, |
| repo_type="dataset", |
| token=hf_token, |
| commit_message=f"Add survey result {os.path.basename(local_path)}", |
| ) |
| return f"{repo_id}:{remote_path}" |
|
|
| |
| |
| |
|
|
| def finish_and_export_json(questions: List[Dict], responses: List[Dict]) -> Tuple[str, Optional[str]]: |
| """ |
| Build a JSON payload, save to a local path (for front-end download), |
| then also upload it to the RESULTS dataset repo (so you can view results on the Hub). |
| Returns (summary_text, local_file_path_for_download). |
| """ |
| total = len(questions) |
| answered = len(responses) |
| num_correct = sum(1 for r in responses if r.get("is_correct") is True) |
| num_incorrect = sum(1 for r in responses if r.get("is_correct") is False) |
| num_undetermined = answered - num_correct - num_incorrect |
|
|
| payload = { |
| "meta": { |
| "timestamp": datetime.now().isoformat(timespec="seconds"), |
| "total_questions": total, |
| "answered": answered, |
| "correct": num_correct, |
| "wrong": num_incorrect, |
| "undetermined": num_undetermined, |
| }, |
| "results": sorted(responses, key=lambda x: x["index"]), |
| } |
|
|
| |
| out_name = f"survey_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uuid.uuid4().hex[:8]}.json" |
| save_attempts = [ |
| os.path.join("/tmp", out_name), |
| os.path.join(os.path.dirname(__file__), out_name), |
| os.path.join(os.getcwd(), out_name), |
| ] |
| local_path = None |
| for save_path in save_attempts: |
| try: |
| with open(save_path, "w", encoding="utf-8") as f: |
| json.dump(payload, f, ensure_ascii=False, indent=2) |
| local_path = save_path |
| print(f"[export] Saved JSON: {save_path}") |
| break |
| except Exception as e: |
| print(f"[export] Save failed at {save_path}: {e}") |
|
|
| if local_path is None: |
| |
| import tempfile |
| tf = tempfile.NamedTemporaryFile(delete=False, suffix=".json", mode="w", encoding="utf-8") |
| json.dump(payload, tf, ensure_ascii=False, indent=2) |
| tf.close() |
| local_path = tf.name |
| print(f"[export] Saved JSON to temp: {local_path}") |
|
|
| |
| hub_loc = None |
| hub_err = None |
| try: |
| hub_loc = upload_to_results_dataset(local_path, dest_dir="submissions") |
| print(f"[export] Uploaded to dataset: {hub_loc}") |
| except Exception as e: |
| hub_err = str(e) |
| print(f"[export] Upload to dataset failed: {hub_err}") |
|
|
| |
| lines = [ |
| f"Total: {total} questions, Answered: {answered}", |
| f"Correct: {num_correct}, Wrong: {num_incorrect}, Undetermined: {num_undetermined}", |
| f"Saved locally (for download): {local_path}", |
| (f"Uploaded to results dataset as: {hub_loc}" if hub_loc else f"Upload to results dataset failed: {hub_err or 'see Logs'}"), |
| "\nPer-question results:", |
| ] |
| for r in payload["results"]: |
| correctness = ( |
| "Correct" if r.get("is_correct") is True else |
| ("Wrong" if r.get("is_correct") is False else "Undetermined") |
| ) |
| lines.append( |
| f"- {r['question_id']}: Selected {r['choice']}, Result: {correctness} (wrong-side heuristic: {r.get('t099_is')})" |
| ) |
| return "\n".join(lines), local_path |
|
|
|
|
| |
| |
| |
|
|
| def create_survey_interface(): |
| questions = discover_questions() |
| n = len(questions) |
|
|
| with gr.Blocks(title="Sound Generation Survey") as demo: |
| |
| gr.Markdown( |
| f""" |
| # Sound Generation Survey |
| |
| Below are {n} pairs of audios processed with different noise reduction methods. |
| Please listen carefully and select **which audio sounds cleaner and contains less of the original noise**. |
| |
| It may take some time to load all the audios. |
| If any loading error occurs, please refresh the webpage and try again. |
| We truly appreciate your time and patience in participating in this study! |
| |
| --- |
| |
| ## Instructions |
| - Each question shows a **Noise Reference** (if available) and two anonymized audios: **Audio A** and **Audio B**. |
| - **Task:** Select which audio has **less** of the original noise. |
| - **Tip:** First play the Noise Reference to memorize noise characteristics, then compare A and B. |
| - If the two audios sound the same, please choose the one that sounds more pleasant and has less noise. |
| """ |
| ) |
|
|
| radios = [] |
|
|
| |
| for idx, q in enumerate(questions): |
| with gr.Accordion(label=f"Question {idx+1}: {q['id']}", open=True): |
| |
| with gr.Row(): |
| if q["noise"] and q["image"]: |
| with gr.Column(scale=1): |
| gr.Image( |
| value=q["image"], label="", |
| height=200, width=200, show_download_button=False |
| ) |
| with gr.Column(scale=2): |
| gr.Audio(value=q["noise"], label="Noise Reference", interactive=False) |
| elif q["noise"]: |
| gr.Audio(value=q["noise"], label="Noise Reference", interactive=False) |
| elif q["image"]: |
| gr.Image( |
| value=q["image"], label="", |
| height=200, width=200, show_download_button=False |
| ) |
| else: |
| gr.Markdown("*No noise reference or image available*") |
|
|
| |
| with gr.Row(): |
| with gr.Column(): |
| gr.Audio(value=q["A"], label="Audio A", interactive=False) |
| with gr.Column(): |
| gr.Audio(value=q["B"], label="Audio B", interactive=False) |
|
|
| |
| r = gr.Radio(["A", "B"], label="Select which audio has LESS noise", value=None) |
| radios.append(r) |
|
|
| |
| with gr.Row(): |
| submit_btn = gr.Button("Submit All", variant="primary") |
| reset_btn = gr.Button("Reset All") |
|
|
| summary = gr.Textbox(label="Results (summary)", interactive=False, lines=12) |
| download = gr.File(label="Download JSON", interactive=False) |
|
|
| |
| def submit_all(*choices): |
| try: |
| responses = [] |
| for i, q in enumerate(questions): |
| choice_label = choices[i] if i < len(choices) else None |
| if choice_label not in ("A", "B"): |
| continue |
|
|
| timestamp = datetime.now().isoformat(timespec="seconds") |
| is_wrong = q.get("t099_is") == choice_label if q.get("t099_is") else None |
|
|
| entry = { |
| "timestamp": timestamp, |
| "question_id": q["id"], |
| "index": q["index"], |
| "choice": choice_label, |
| "is_correct": None if is_wrong is None else (not is_wrong), |
| "correct_label": q.get("correct"), |
| "t099_is": q.get("t099_is"), |
| "noise": q["noise"], |
| "A": q["A"], |
| "B": q["B"], |
| "chosen_path": q.get(choice_label), |
| "chosen_has_g0_or_mix1": bool(q.get("t099_is") == choice_label), |
| } |
| responses.append(entry) |
|
|
| if not responses: |
| return "Please make at least one selection before submitting.", None |
|
|
| summary_text, path = finish_and_export_json(questions, responses) |
| return summary_text, path |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| return f"Error occurred: {str(e)}", None |
|
|
| |
| def reset_all(): |
| return [None] * len(radios) |
|
|
| submit_btn.click(fn=submit_all, inputs=radios, outputs=[summary, download]) |
| reset_btn.click(fn=reset_all, inputs=None, outputs=radios) |
|
|
| |
| demo.queue() |
| return demo |
|
|
|
|
| |
| demo = create_survey_interface() |
|
|
| |
| if __name__ == "__main__": |
| demo.launch() |
|
|