| import json |
| import os |
| import random |
| import threading |
| import time |
| import uuid |
| from pathlib import Path |
|
|
| import gradio as gr |
| from huggingface_hub import HfApi |
|
|
| from config import CLIP_KEYS, MODEL_NAMES, RATING_CATEGORIES, SAMPLES |
|
|
| RESULTS_DIR = Path("results") |
| RESULTS_DIR.mkdir(exist_ok=True) |
|
|
| HF_BUCKET = os.environ.get("HF_BUCKET", "Cactooz/listening-data") if os.environ.get("SPACE_ID") else None |
|
|
| NUM_SAMPLES = len(SAMPLES) |
| CAT_KEYS = list(RATING_CATEGORIES.keys()) |
| CAT_LABELS = list(RATING_CATEGORIES.values()) |
|
|
|
|
| def create_session(): |
| session_id = str(uuid.uuid4())[:8] |
| rng = random.Random(session_id) |
|
|
| sample_order = list(range(NUM_SAMPLES)) |
| rng.shuffle(sample_order) |
|
|
| clip_orders = {} |
| for sample in SAMPLES: |
| order = list(CLIP_KEYS) |
| rng.shuffle(order) |
| clip_orders[sample["id"]] = order |
|
|
| return { |
| "session_id": session_id, |
| "sample_order": sample_order, |
| "clip_orders": clip_orders, |
| "current_page": 0, |
| "ratings": {}, |
| } |
|
|
|
|
| def get_audio_path(sample, clip_key): |
| if clip_key == "target": |
| return sample["target_audio"] |
| return sample[f"{clip_key}_audio"] |
|
|
|
|
| def build_page(state): |
| page_idx = state["current_page"] |
| sample_idx = state["sample_order"][page_idx] |
| sample = SAMPLES[sample_idx] |
| clip_order = state["clip_orders"][sample["id"]] |
|
|
| clip_audios = [] |
| for key in clip_order: |
| path = get_audio_path(sample, key) |
| clip_audios.append(path if os.path.exists(path) else None) |
|
|
| existing = state["ratings"].get(sample["id"], {}) |
| saved_scores = [] |
| for key in clip_order: |
| for cat in CAT_KEYS: |
| val = existing.get(key, {}).get(cat) |
| saved_scores.append(val if val is not None else 0) |
|
|
| return ( |
| f"### Sample {page_idx + 1} of {NUM_SAMPLES}", |
| f"<h1 style='text-align: center; padding-block: 15px;'>Instruction: {sample['instruction']}</h1>", |
| sample["input_audio"] if os.path.exists(sample["input_audio"]) else None, |
| *clip_audios, |
| *saved_scores, |
| gr.update(visible=(page_idx < NUM_SAMPLES - 1)), |
| gr.update(visible=(page_idx == NUM_SAMPLES - 1)), |
| ) |
|
|
|
|
| def save_ratings_for_page(state, *radio_values): |
| page_idx = state["current_page"] |
| sample_idx = state["sample_order"][page_idx] |
| sample = SAMPLES[sample_idx] |
| clip_order = state["clip_orders"][sample["id"]] |
|
|
| page_ratings = {} |
| idx = 0 |
| for clip_key in clip_order: |
| clip_ratings = {} |
| for cat in CAT_KEYS: |
| val = radio_values[idx] |
| if val is not None and val > 0: |
| clip_ratings[cat] = int(val) |
| idx += 1 |
| page_ratings[clip_key] = clip_ratings |
|
|
| state["ratings"][sample["id"]] = page_ratings |
| return state |
|
|
|
|
| def check_page_complete(slider_values): |
| for val in slider_values: |
| if val is None or val < 1: |
| return False |
| return True |
|
|
|
|
| def no_change(state): |
| num_outputs = 3 + len(CLIP_KEYS) + len(CLIP_KEYS) * len(CAT_KEYS) + 2 |
| return (state, *([gr.update()] * num_outputs)) |
|
|
|
|
| def go_next(state, *radio_values): |
| if not check_page_complete(radio_values): |
| gr.Warning("Please rate all clips before continuing.") |
| return no_change(state) |
| state = save_ratings_for_page(state, *radio_values) |
| state["current_page"] = min(state["current_page"] + 1, NUM_SAMPLES - 1) |
| return (state, *build_page(state)) |
|
|
|
|
| def go_prev(state, *radio_values): |
| state = save_ratings_for_page(state, *radio_values) |
| state["current_page"] = max(state["current_page"] - 1, 0) |
| return (state, *build_page(state)) |
|
|
|
|
| def submit_results(state, *radio_values): |
| if not check_page_complete(radio_values): |
| gr.Warning("Please rate all clips on this page before submitting.") |
| return (state, gr.update(), gr.update()) |
|
|
| state = save_ratings_for_page(state, *radio_values) |
|
|
| result = { |
| "session_id": state["session_id"], |
| "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), |
| "profile": state.get("profile", {}), |
| "samples": {}, |
| } |
|
|
| for sample in SAMPLES: |
| sid = sample["id"] |
| sample_result = { |
| "instruction": sample["instruction"], |
| "ratings": {}, |
| } |
| ratings = state["ratings"].get(sid, {}) |
| for model_key in CLIP_KEYS: |
| sample_result["ratings"][model_key] = { |
| "model_name": MODEL_NAMES[model_key], |
| **ratings.get(model_key, {}), |
| } |
| result["samples"][sid] = sample_result |
|
|
| filename = f"{state['session_id']}_{int(time.time())}.json" |
| out_path = RESULTS_DIR / filename |
| with open(out_path, "w") as f: |
| json.dump(result, f, indent=2) |
|
|
| if HF_BUCKET: |
| def _upload(): |
| try: |
| HfApi().batch_bucket_files( |
| bucket_id=HF_BUCKET, |
| add=[(str(out_path), filename)], |
| ) |
| except Exception as e: |
| print(f"Failed to upload to bucket: {e}") |
| threading.Thread(target=_upload, daemon=True).start() |
|
|
| session_id = state["session_id"] |
| return ( |
| state, |
| gr.update(visible=False), |
| gr.update( |
| value=f"# Thank you for your contribution!\n\nYour responses have been saved successfully.\n\n<p style='color: gray; font-size: 0.8em;'>Session ID: {session_id} (Save this ID if you want your response removed)</p>", |
| visible=True, |
| ), |
| ) |
|
|
|
|
| with gr.Blocks(title="Music Editing Listening Test", theme=gr.themes.Default()) as demo: |
| gr.Markdown( |
| """ |
| # Music Editing Listening Test |
| |
| Welcome, and thank you for participating in this listening study conducted as part of a Master's thesis at KTH Royal Institute of Technology, in collaboration with Epidemic Sound. |
| |
| **Expected time needed**: 15-20 minutes |
| |
| ## What you will do |
| In each trial, you will hear a 20-second original audio clip, with an editing instruction to add or remove instruments displayed right below it. |
| Below these, you will hear 4 different randomly ordered edited audio clips that tries to follow that instruction. Ideally, an edit should only change what is requested in the instruction, while preserving everything else from the original audio clip. |
| |
| The editing instructions can only be for adding or removing instruments and may sometimes include a specific genre. These instructions will use various terms, such as: |
| - ADD: add, include, insert, plus, layer, etc. |
| - REMOVE: remove, delete, mute, minus, omit, etc. |
| |
| Important information: |
| - There are no right or wrong answers, trust your own perception. |
| - Please complete all 10 samples in one go. You cannot save progress or submit halfway through. |
| - Ensure you are satisfied with all your ratings before moving to the next sample. You cannot return to previous pages. |
| - Sometimes the music might take a little while to load for each page, please be patient. |
| - All responses are anonymous and used solely for this Master's thesis research. |
| """ |
| ) |
|
|
| with gr.Row(): |
| with gr.Column(scale=1): |
| gr.Image("audio-instruction-edits.png", show_label=False) |
| gr.Column(scale=1) |
| |
| gr.Markdown( |
| """ |
| ## Rating |
| Rate each edited clip on three criteria using a 1-5 scale: |
| | Score | Meaning | |
| |-------|---------| |
| | 5 | Excellent | |
| | 4 | Good | |
| | 3 | Fair | |
| | 2 | Poor | |
| | 1 | Bad | |
| |
| ### Audio Quality |
| Perceptual quality of the edited audio. |
| Ask yourself: Does the audio sound clean, or are there digital artifacts, robotic glitches, distortion, hiss, sudden dropouts, or abrupt cuts? |
| - A high score means it sounds clean, like a professional track. |
| - A low score means it has sounds corrupted, glitchy, noisy, or low-quality. |
| |
| ### Relevance |
| How well the edited audio matches the given instruction. |
| Ask yourself: Did the edit successfully perform the exact action described, regardless of what happened to the rest of the track? (e.g. If the instruction was "add saxophone" is there now a saxophone?) |
| - A high score means: |
| - For removal that only the requested instruments were completely removed. |
| - For addition that the requested instruments were successfully added, matching the mood, tempo, and rhythm of the original track. |
| - A low score means that the instruction was ignored entirely (nothing was changed) or the wrong action was taken (e.g. a guitar was removed instead of a piano, or a synth was added instead of drums). |
| |
| ### Faithfulness |
| How well unedited parts of the original audio are preserved. |
| Ask yourself: Aside from the requested change, does the rest of the music remain identical to the original? |
| - A high score means the background tracks, mixing, and rhythm are perfectly preserved. |
| - A low score means unrelated instruments were altered, the overall musical structure changed, or the track was completely remixed. |
| """ |
| ) |
|
|
| state = gr.State(create_session) |
|
|
| intro_group = gr.Group(visible=True) |
| with intro_group: |
| gr.Markdown("### Before we begin, tell us a bit about yourself") |
| expertise_input = gr.Radio( |
| choices=[ |
| "Professional", |
| "Musician", |
| "Audio/music researcher", |
| "Casual listener", |
| ], |
| label="Listening expertise", |
| ) |
| setup_input = gr.Radio( |
| choices=[ |
| "Studio monitors (speakers)", |
| "Over-ear headphones", |
| "In-ear headphones", |
| "Laptop/phone speakers", |
| ], |
| label="Listening setup", |
| ) |
| environment_input = gr.Radio( |
| choices=[ |
| "Quiet room", |
| "Moderate background noise", |
| "Noisy environment", |
| ], |
| label="Listening environment", |
| ) |
| start_btn = gr.Button("Start Listening Test", variant="primary") |
|
|
| test_group = gr.Group(visible=False) |
| with test_group: |
| progress_label = gr.Markdown("### Sample 1 of 10") |
|
|
| input_audio = gr.Audio(label="Original Audio", type="filepath", interactive=False) |
|
|
| instruction_label = gr.Markdown("<h1 style='text-align: center; padding-block: 15px;'>...</h1>") |
|
|
| clip_audios = [] |
| sliders = [] |
|
|
| with gr.Row(equal_height=False): |
| for i in range(len(CLIP_KEYS)): |
| with gr.Column(): |
| clip_audio = gr.Audio( |
| label=f"Edited Audio {i + 1}", |
| type="filepath", |
| interactive=False, |
| ) |
| clip_audios.append(clip_audio) |
| for cat_label in CAT_LABELS: |
| slider = gr.Slider( |
| minimum=0, |
| maximum=5, |
| step=1, |
| value=0, |
| label=cat_label, |
| info="1=Bad 2=Poor 3=Fair 4=Good 5=Excellent", |
| ) |
| sliders.append(slider) |
|
|
| with gr.Row(): |
| |
| next_btn = gr.Button("Next", variant="primary") |
| submit_btn = gr.Button("Submit", variant="primary", visible=False) |
|
|
| thanks_msg = gr.Markdown(visible=False) |
|
|
| def start_test(state, expertise, setup, environment): |
| if not expertise or not setup or not environment: |
| gr.Warning("Please answer all questions before starting.") |
| return (state, gr.update(), gr.update()) |
| state["profile"] = { |
| "expertise": expertise, |
| "setup": setup, |
| "environment": environment, |
| } |
| return ( |
| state, |
| gr.update(visible=False), |
| gr.update(visible=True), |
| ) |
|
|
| start_btn.click( |
| fn=start_test, |
| inputs=[state, expertise_input, setup_input, environment_input], |
| outputs=[state, intro_group, test_group], |
| ) |
|
|
| all_outputs = ( |
| [state, progress_label, instruction_label, input_audio] |
| + clip_audios |
| + sliders |
| + [next_btn, submit_btn] |
| ) |
| slider_inputs = sliders |
|
|
| next_btn.click( |
| fn=go_next, |
| inputs=[state] + slider_inputs, |
| outputs=all_outputs, |
| ) |
| |
| |
| |
| |
| |
|
|
| submit_btn.click( |
| fn=submit_results, |
| inputs=[state] + slider_inputs, |
| outputs=[state, test_group, thanks_msg], |
| ) |
|
|
| def init_page(s): |
| if callable(s): |
| s = create_session() |
| return (s, *build_page(s)) |
|
|
| demo.load( |
| fn=init_page, |
| inputs=[state], |
| outputs=all_outputs, |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|