| import gradio as gr |
| import os |
| import random |
| import pandas as pd |
| from datetime import datetime |
| import numpy as np |
| import uuid |
| import soundfile as sf |
| import librosa |
| import noisereduce as nr |
| import tempfile |
| import atexit |
| import shutil |
| import requests |
| from urllib.parse import quote |
| from dotenv import load_dotenv |
| from supabase import create_client, Client |
|
|
| |
| load_dotenv() |
|
|
| |
| SUPABASE_URL = os.getenv('SUPABASE_URL') |
| SUPABASE_KEY = os.getenv('SUPABASE_KEY') |
|
|
| if SUPABASE_URL and SUPABASE_KEY: |
| supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) |
| print(f"Supabase client initialized successfully") |
| else: |
| supabase = None |
| print("Warning: Supabase credentials not found. Results will not be saved to database.") |
|
|
| |
| |
| WORKSPACE_ROOT = os.path.dirname(os.path.abspath(__file__)) |
| ORIGINAL_DATA_DIR = os.path.join(WORKSPACE_ROOT, "torgo_original") |
| SYNTHETIC_DATA_DIR = os.path.join(WORKSPACE_ROOT, "torgo-synthetic") |
| RESULTS_FILE = os.path.join(WORKSPACE_ROOT, "experiment_Results.csv") |
| TEMP_DIR = os.path.join(tempfile.gettempdir(), "speech_evaluation") |
|
|
| |
| |
| GITHUB_USERNAME = "kesbeast23" |
| GITHUB_REPO = "dysathric-audio" |
| GITHUB_BRANCH = "main" |
| GITHUB_AUDIO_BASE_URL = f"https://raw.githubusercontent.com/{GITHUB_USERNAME}/{GITHUB_REPO}/{GITHUB_BRANCH}" |
|
|
| |
| os.makedirs(TEMP_DIR, exist_ok=True) |
| os.makedirs(os.path.join(ORIGINAL_DATA_DIR, "data"), exist_ok=True) |
| os.makedirs(os.path.join(SYNTHETIC_DATA_DIR, "data"), exist_ok=True) |
|
|
| |
| temp_files = [] |
|
|
| |
| DEMO_MODE = True |
|
|
| |
| def check_demo_mode(): |
| original_data_path = os.path.join(ORIGINAL_DATA_DIR, "data") |
| synthetic_data_path = os.path.join(SYNTHETIC_DATA_DIR, "data") |
| |
| |
| if (os.path.exists(original_data_path) and len(os.listdir(original_data_path)) > 0 and |
| os.path.exists(synthetic_data_path) and len(os.listdir(synthetic_data_path)) > 0): |
| return False |
| return True |
|
|
| |
| DEMO_MODE = check_demo_mode() |
| if DEMO_MODE: |
| print("Running in DEMO MODE - No audio files found") |
|
|
| |
| def cleanup_temp_files(): |
| """Remove temporary files and directory on exit""" |
| for temp_file in temp_files: |
| try: |
| if os.path.exists(temp_file): |
| os.remove(temp_file) |
| except Exception as e: |
| print(f"Error removing temp file {temp_file}: {e}") |
| |
| try: |
| if os.path.exists(TEMP_DIR): |
| shutil.rmtree(TEMP_DIR) |
| except Exception as e: |
| print(f"Error removing temp directory {TEMP_DIR}: {e}") |
|
|
| atexit.register(cleanup_temp_files) |
|
|
| |
| SAMPLE_TYPE_MAPPING = { |
| "Original": "Natural", |
| "Natural": "Original" |
| } |
|
|
| |
| COLUMNS = [ |
| 'timestamp', 'participant_id', 'sample_id', 'sample_type', |
| 'evaluation_type', 'naturalness_rating', 'intelligibility_rating', |
| 'comments', 'transcription', 'original_speaker', 'synthetic_speaker', |
| 'consent_given' |
| ] |
|
|
| |
| try: |
| results_df = pd.read_csv(RESULTS_FILE) |
| |
| if list(results_df.columns) != COLUMNS: |
| results_df = pd.DataFrame(columns=COLUMNS) |
| results_df.to_csv(RESULTS_FILE, index=False) |
| except (pd.errors.EmptyDataError, FileNotFoundError): |
| |
| results_df = pd.DataFrame(columns=COLUMNS) |
| results_df.to_csv(RESULTS_FILE, index=False) |
|
|
| |
| |
| SPEAKERS = { |
| 'F04': {'label': 'F04 (TORGO Dys F)', 'type': 'dysarthric', 'gender': 'F'}, |
| 'M02': {'label': 'M02 (TORGO Dys M)', 'type': 'dysarthric', 'gender': 'M'}, |
| 'FC02': {'label': 'FC02 (TORGO Hlth F)', 'type': 'healthy', 'gender': 'F'}, |
| 'MC01': {'label': 'MC01 (TORGO Hlth M)', 'type': 'healthy', 'gender': 'M'}, |
| 'F02': {'label': 'F02 (UA Dys F)', 'type': 'dysarthric', 'gender': 'F'}, |
| 'M04': {'label': 'M04 (UA Dys M)', 'type': 'dysarthric', 'gender': 'M'}, |
| '211': {'label': '211 (LibriSp healthy F)', 'type': 'healthy', 'gender': 'F'}, |
| '4014': {'label': '4014 (LibriSp healthy M)', 'type': 'healthy', 'gender': 'M'}, |
| } |
|
|
| |
| SELECTED_METHODS = ['Original', 'Sesame_TTS', 'Spark_KNN'] |
|
|
| |
| PARQUET_FILES = { |
| '211': '211_audio_samples.parquet', |
| '4014': '4014_audio_samples.parquet', |
| 'F02': 'F02_audio_samples.parquet', |
| 'F04': 'F04_audio_samples.parquet', |
| 'FC02': 'FC02_audio_samples.parquet', |
| 'M02': 'M02_audio_samples.parquet', |
| 'M04': 'M04_audio_samples.parquet', |
| 'MC01': 'MC01_audio_samples.parquet', |
| } |
|
|
| |
| def load_all_speaker_data(): |
| """Load all parquet files into a dict""" |
| all_data = {} |
| for speaker_id, filename in PARQUET_FILES.items(): |
| filepath = os.path.join(WORKSPACE_ROOT, filename) |
| if os.path.exists(filepath): |
| df = pd.read_parquet(filepath) |
| all_data[speaker_id] = df |
| print(f"Loaded {len(df)} samples for speaker {speaker_id}") |
| else: |
| print(f"Warning: Parquet file not found: {filepath}") |
| return all_data |
|
|
| |
| SPEAKER_DATA = load_all_speaker_data() |
|
|
| |
| RANDOM_SEED = 42 |
| random.seed(RANDOM_SEED) |
|
|
| def convert_display_type_to_storage(display_type): |
| """Convert display sample type to storage type""" |
| if display_type == "Natural": |
| return "Original" |
| return display_type |
|
|
| def convert_storage_type_to_display(storage_type): |
| """Convert storage sample type to display type""" |
| if storage_type == "Original": |
| return "Natural" |
| return storage_type |
|
|
| def get_audio_path(file_path, is_original=True): |
| """Convert metadata file path to actual audio file path""" |
| |
| file_path = file_path.replace('data/', '') |
| |
| |
| if is_original: |
| return os.path.join(ORIGINAL_DATA_DIR, "data", file_path) |
| else: |
| return os.path.join(SYNTHETIC_DATA_DIR, "data", file_path) |
|
|
| def get_github_audio_url(file_path, is_original=True): |
| """Get the GitHub URL for an audio file""" |
| |
| file_path = file_path.replace('data/', '') |
| |
| |
| if is_original: |
| return f"{GITHUB_AUDIO_BASE_URL}/torgo_original/data/{file_path}" |
| else: |
| return f"{GITHUB_AUDIO_BASE_URL}/torgo-synthetic/data/{file_path}" |
|
|
| def stream_audio_from_github(file_path, is_original=True): |
| """Stream audio file from GitHub directly without saving locally""" |
| |
| github_url = get_github_audio_url(file_path, is_original) |
| |
| try: |
| |
| encoded_url = quote(github_url, safe=':/') |
| |
| |
| response = requests.get(encoded_url, stream=True) |
| response.raise_for_status() |
| |
| |
| audio_data = response.content |
| |
| return audio_data |
| except Exception as e: |
| print(f"Error streaming audio from {github_url}: {e}") |
| return None |
|
|
| def verify_audio_file(file_path): |
| """Verify that audio file exists and is readable""" |
| if DEMO_MODE: |
| |
| try: |
| if "torgo_original" in file_path: |
| audio_data = stream_audio_from_github(os.path.basename(file_path), is_original=True) |
| else: |
| audio_data = stream_audio_from_github(os.path.basename(file_path), is_original=False) |
| |
| if audio_data: |
| return True |
| except Exception as e: |
| print(f"Error verifying streamed audio: {e}") |
| |
| |
| return True |
| |
| try: |
| if os.path.exists(file_path): |
| data, samplerate = sf.read(file_path) |
| return True |
| else: |
| |
| if "torgo_original" in file_path: |
| audio_data = stream_audio_from_github(os.path.basename(file_path), is_original=True) |
| else: |
| audio_data = stream_audio_from_github(os.path.basename(file_path), is_original=False) |
| |
| return audio_data is not None |
| except: |
| return False |
|
|
| def generate_participant_id(): |
| """Generate a unique participant ID for each session. |
| |
| Format: P{number}_{YYYYMMDD}_{HHMMSS} |
| Example: P001_20260209_193800 |
| |
| Combines sequential numbering with timestamp for guaranteed uniqueness. |
| """ |
| from datetime import datetime |
| |
| existing_ids = set() |
| |
| |
| if supabase: |
| try: |
| response = supabase.table('experiment_results').select('participant_id').execute() |
| if response.data: |
| existing_ids = set(row['participant_id'] for row in response.data if row.get('participant_id')) |
| except Exception as e: |
| print(f"Could not fetch existing IDs from Supabase: {e}") |
| |
| |
| max_num = 0 |
| for pid in existing_ids: |
| if pid and pid.startswith('P'): |
| try: |
| num_part = pid[1:4] |
| max_num = max(max_num, int(num_part)) |
| except (ValueError, IndexError): |
| pass |
| |
| |
| counter = max_num + 1 |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| new_id = f"P{counter:03d}_{timestamp}" |
| |
| print(f"Generated new participant ID: {new_id}") |
| |
| return new_id |
|
|
| def preprocess_audio(file_path): |
| """Remove background noise from the audio file and return a temporary file path""" |
| global temp_files |
| |
| if DEMO_MODE and (not os.path.exists(file_path) or "demo_" in file_path): |
| try: |
| |
| is_original = "torgo_original" in file_path |
| |
| |
| audio_data = stream_audio_from_github(os.path.basename(file_path), is_original) |
| |
| if audio_data: |
| |
| temp_in_path = os.path.join(TEMP_DIR, f"streamed_{uuid.uuid4()}.wav") |
| with open(temp_in_path, 'wb') as f: |
| f.write(audio_data) |
| |
| |
| audio, sr = librosa.load(temp_in_path, sr=None) |
| |
| |
| reduced_noise = nr.reduce_noise(y=audio, sr=sr) |
| |
| |
| temp_out_path = os.path.join(TEMP_DIR, f"processed_{uuid.uuid4()}.wav") |
| sf.write(temp_out_path, reduced_noise, sr) |
| |
| |
| temp_files.append(temp_in_path) |
| temp_files.append(temp_out_path) |
| |
| |
| try: |
| os.remove(temp_in_path) |
| except: |
| pass |
| |
| return temp_out_path |
| except Exception as e: |
| print(f"Error processing streamed audio: {e}") |
| |
| |
| temp_path = os.path.join(TEMP_DIR, f"demo_{uuid.uuid4()}.wav") |
| sr = 16000 |
| silent_audio = np.zeros(int(sr * 1.5)) |
| sf.write(temp_path, silent_audio, sr) |
| temp_files.append(temp_path) |
| return temp_path |
| |
| try: |
| |
| if os.path.exists(file_path): |
| |
| audio, sr = librosa.load(file_path, sr=None) |
| |
| |
| reduced_noise = nr.reduce_noise(y=audio, sr=sr) |
| |
| |
| temp_path = os.path.join(TEMP_DIR, f"processed_{os.path.basename(file_path)}") |
| sf.write(temp_path, reduced_noise, sr) |
| |
| |
| temp_files.append(temp_path) |
| |
| return temp_path |
| else: |
| |
| is_original = "torgo_original" in file_path |
| audio_data = stream_audio_from_github(os.path.basename(file_path), is_original) |
| |
| if audio_data: |
| |
| temp_in_path = os.path.join(TEMP_DIR, f"streamed_{uuid.uuid4()}.wav") |
| with open(temp_in_path, 'wb') as f: |
| f.write(audio_data) |
| |
| |
| audio, sr = librosa.load(temp_in_path, sr=None) |
| |
| |
| reduced_noise = nr.reduce_noise(y=audio, sr=sr) |
| |
| |
| temp_out_path = os.path.join(TEMP_DIR, f"processed_{uuid.uuid4()}.wav") |
| sf.write(temp_out_path, reduced_noise, sr) |
| |
| |
| temp_files.append(temp_in_path) |
| temp_files.append(temp_out_path) |
| |
| |
| try: |
| os.remove(temp_in_path) |
| except: |
| pass |
| |
| return temp_out_path |
| |
| |
| temp_path = os.path.join(TEMP_DIR, f"error_{uuid.uuid4()}.wav") |
| sr = 16000 |
| silent_audio = np.zeros(int(sr * 1.5)) |
| sf.write(temp_path, silent_audio, sr) |
| temp_files.append(temp_path) |
| return temp_path |
| except Exception as e: |
| print(f"Error preprocessing audio: {e}") |
| |
| temp_path = os.path.join(TEMP_DIR, f"error_{uuid.uuid4()}.wav") |
| sr = 16000 |
| silent_audio = np.zeros(int(sr * 1.5)) |
| sf.write(temp_path, silent_audio, sr) |
| temp_files.append(temp_path) |
| return temp_path |
|
|
| def categorize_by_duration(df): |
| """Categorize samples as short (1-3 words) or long (4+ words)""" |
| df = df.copy() |
| df['word_count'] = df['transcript'].apply(lambda x: len(str(x).split())) |
| df['duration_category'] = df['word_count'].apply(lambda x: 'short' if x <= 3 else 'long') |
| return df |
|
|
|
|
| def get_audio_from_parquet(speaker_id, method, sample_number): |
| """Get audio array and sample rate from parquet data""" |
| import json |
| |
| if speaker_id not in SPEAKER_DATA: |
| return None, None |
| |
| df = SPEAKER_DATA[speaker_id] |
| |
| sample_num_str = str(sample_number) |
| row = df[(df['method'] == method) & (df['sample_number'] == sample_num_str)] |
| |
| if len(row) == 0: |
| return None, None |
| |
| row = row.iloc[0] |
| |
| audio_str = row['audio_array'] |
| if isinstance(audio_str, str): |
| audio_list = json.loads(audio_str) |
| audio_array = np.array(audio_list) |
| else: |
| audio_array = np.array(audio_str) |
| |
| sample_rate = int(row['sample_rate']) |
| |
| return audio_array, sample_rate |
|
|
|
|
| def create_balanced_stimulus_set(): |
| """Create balanced 64-sample stimulus set. |
| |
| Design: |
| - 8 speakers (4 healthy, 4 dysarthric) |
| - Each speaker: 8 samples (4 short + 4 long) |
| - Per length category: Original, Original (repeat), Sesame_TTS, Spark_KNN |
| - Total: 8 speakers × 8 samples = 64 samples |
| |
| Sample Selection (REPRODUCIBLE): |
| - Short sample: sample_number='1' (single-word utterance) |
| - Long sample: sample_number='3' (multi-word sentence) |
| """ |
| |
| |
| |
| SAMPLE_SELECTION = { |
| 'short': '1', |
| 'long': '3', |
| } |
| |
| stimulus_set = [] |
| sample_counter = 0 |
| |
| print("\n=== Sample Selection (Reproducible) ===") |
| |
| for speaker_id, speaker_info in SPEAKERS.items(): |
| if speaker_id not in SPEAKER_DATA: |
| print(f"Warning: No data for speaker {speaker_id}") |
| continue |
| |
| df = SPEAKER_DATA[speaker_id] |
| |
| |
| original_samples = df[df['method'] == 'Original'].copy() |
| |
| |
| for duration_cat, sample_num in SAMPLE_SELECTION.items(): |
| |
| sample_row = original_samples[original_samples['sample_number'] == sample_num] |
| |
| if len(sample_row) == 0: |
| print(f"Warning: No sample_number={sample_num} for {speaker_id}") |
| continue |
| |
| sample_row = sample_row.iloc[0] |
| transcript = sample_row['transcript'] |
| |
| print(f" {speaker_id} {duration_cat}: sample_number={sample_num} -> '{transcript[:50]}{'...' if len(transcript) > 50 else ''}'") |
| |
| |
| methods_to_add = [ |
| ('Original', 'original_1'), |
| ('Original', 'original_2'), |
| ('Sesame_TTS', 'sesame_tts'), |
| ('Spark_KNN', 'spark_knn'), |
| ] |
| |
| for method, method_label in methods_to_add: |
| sample_counter += 1 |
| |
| |
| if method == 'Original': |
| sample_type = 'Original' |
| else: |
| sample_type = 'Synthetic' |
| |
| stimulus_set.append({ |
| 'sample_id': f"{speaker_id}_{duration_cat}_{method_label}", |
| 'speaker_id': speaker_id, |
| 'speaker_type': speaker_info['type'], |
| 'speaker_gender': speaker_info['gender'], |
| 'speaker_label': speaker_info['label'], |
| 'method': method, |
| 'sample_number': sample_num, |
| 'transcription': transcript, |
| 'duration_category': duration_cat, |
| 'sample_type': sample_type, |
| 'original_speaker': speaker_id, |
| 'synthetic_speaker': speaker_id if method != 'Original' else '', |
| }) |
| |
| |
| random.shuffle(stimulus_set) |
| |
| |
| print(f"\n=== Created Balanced Stimulus Set ===") |
| print(f"Total samples: {len(stimulus_set)}") |
| |
| |
| healthy_count = sum(1 for s in stimulus_set if s['speaker_type'] == 'healthy') |
| dysarthric_count = sum(1 for s in stimulus_set if s['speaker_type'] == 'dysarthric') |
| print(f"By speaker type: Healthy={healthy_count}, Dysarthric={dysarthric_count}") |
| |
| |
| original_count = sum(1 for s in stimulus_set if s['sample_type'] == 'Original') |
| synthetic_count = sum(1 for s in stimulus_set if s['sample_type'] == 'Synthetic') |
| print(f"By sample type: Original={original_count}, Synthetic={synthetic_count}") |
| |
| |
| method_counts = {} |
| for s in stimulus_set: |
| m = s['method'] |
| method_counts[m] = method_counts.get(m, 0) + 1 |
| print(f"By method: {method_counts}") |
| |
| |
| short_count = sum(1 for s in stimulus_set if s['duration_category'] == 'short') |
| long_count = sum(1 for s in stimulus_set if s['duration_category'] == 'long') |
| print(f"By duration: Short={short_count}, Long={long_count}") |
| |
| return stimulus_set |
|
|
| |
| _stimulus_set_template = create_balanced_stimulus_set() |
| print(f"Stimulus set template ready with {len(_stimulus_set_template)} samples") |
|
|
| def create_session_stimulus_set(): |
| """Create a fresh shuffled stimulus set for a new session.""" |
| import copy |
| session_set = copy.deepcopy(_stimulus_set_template) |
| random.shuffle(session_set) |
| return session_set |
|
|
| def log_session_event(participant_id, prolific_id, event_type, details=None): |
| """Log a session event to Supabase for tracking user sessions. |
| |
| Uses the existing experiment_results table with event_type as evaluation_type. |
| event_type: 'session_start', 'sample_completed', 'experiment_complete' |
| details: dict with extra info like progress, sample_id, etc. |
| """ |
| if not supabase: |
| return |
| |
| try: |
| event_row = { |
| 'timestamp': datetime.now().isoformat(), |
| 'participant_id': participant_id, |
| 'prolific_id': prolific_id or '', |
| 'sample_id': f'{participant_id}_{event_type}', |
| 'sample_type': 'Event', |
| 'evaluation_type': event_type, |
| 'naturalness_rating': None, |
| 'intelligibility_rating': None, |
| 'comments': str(details) if details else '', |
| 'transcription': None, |
| 'original_speaker': None, |
| 'synthetic_speaker': None, |
| 'consent_given': None |
| } |
| supabase.table('experiment_results').insert(event_row).execute() |
| print(f"Session event logged: {event_type} for {participant_id}") |
| except Exception as e: |
| print(f"Warning: Could not log session event: {e}") |
|
|
| def save_consent(participant_id, prolific_id): |
| """Save consent record to Supabase""" |
| global results_df |
| |
| timestamp = datetime.now().isoformat() |
| |
| |
| consent_row = { |
| 'timestamp': timestamp, |
| 'participant_id': participant_id, |
| 'prolific_id': prolific_id, |
| 'sample_id': f'{participant_id}', |
| 'sample_type': 'Consent', |
| 'evaluation_type': 'consent', |
| 'naturalness_rating': None, |
| 'intelligibility_rating': None, |
| 'comments': "Informed consent given", |
| 'transcription': None, |
| 'original_speaker': None, |
| 'synthetic_speaker': None, |
| 'consent_given': True |
| } |
| |
| |
| if supabase: |
| try: |
| supabase.table('experiment_results').insert(consent_row).execute() |
| |
| log_session_event(participant_id, prolific_id, 'session_start', { |
| 'total_samples': len(_stimulus_set_template) |
| }) |
| print(f"Consent saved to Supabase for participant {participant_id}") |
| except Exception as e: |
| print(f"Error saving consent to Supabase: {e}") |
| |
| results_df = pd.concat([results_df, pd.DataFrame([consent_row])], ignore_index=True) |
| results_df.to_csv(RESULTS_FILE, index=False) |
| else: |
| |
| results_df = pd.concat([results_df, pd.DataFrame([consent_row])], ignore_index=True) |
| results_df.to_csv(RESULTS_FILE, index=False) |
| |
| return f"Consent recorded for participant {participant_id}" |
|
|
| def save_rating(participant_id, prolific_id, sample_id, sample_type, |
| naturalness_rating, intelligibility_rating, |
| transcription, original_speaker, synthetic_speaker): |
| """Save both ratings to Supabase in a single row""" |
| global results_df |
| |
| timestamp = datetime.now().isoformat() |
| |
| |
| new_row = { |
| 'timestamp': timestamp, |
| 'participant_id': participant_id, |
| 'prolific_id': prolific_id, |
| 'sample_id': sample_id, |
| 'sample_type': sample_type, |
| 'evaluation_type': 'combined', |
| 'naturalness_rating': naturalness_rating, |
| 'intelligibility_rating': intelligibility_rating, |
| 'comments': "", |
| 'transcription': transcription, |
| 'original_speaker': original_speaker, |
| 'synthetic_speaker': synthetic_speaker, |
| 'consent_given': None |
| } |
| |
| |
| if supabase: |
| try: |
| supabase.table('experiment_results').insert(new_row).execute() |
| print(f"Rating saved to Supabase: {sample_id} (naturalness: {naturalness_rating}, intelligibility: {intelligibility_rating})") |
| |
| log_session_event(participant_id, prolific_id, 'sample_completed', { |
| 'sample_id': sample_id, |
| 'sample_type': sample_type |
| }) |
| except Exception as e: |
| print(f"Error saving rating to Supabase: {e}") |
| |
| results_df = pd.concat([results_df, pd.DataFrame([new_row])], ignore_index=True) |
| results_df.to_csv(RESULTS_FILE, index=False) |
| else: |
| |
| results_df = pd.concat([results_df, pd.DataFrame([new_row])], ignore_index=True) |
| results_df.to_csv(RESULTS_FILE, index=False) |
| |
| return f"Rating saved successfully!" |
|
|
| def create_experiment_interface(): |
| """Create the Gradio interface for MOS-style evaluation""" |
| |
| with gr.Blocks(title="Pathological Speech Evaluation", theme=gr.themes.Soft()) as demo: |
| |
| consent_given = gr.State(value=False) |
| |
| |
| with gr.Column(visible=True) as consent_screen: |
| gr.Markdown(""" |
| # Speech Evaluation Research Study |
| ## Participant Information Sheet |
| |
| ### What is this research about? |
| This research aims to investigate how well synthetic speech, generated using advanced AI techniques, replicates the characteristics of impaired speech. We evaluate these synthetic voices using human judgment. |
| |
| ### Why are you doing this research? |
| Developing robust Automatic Speech Recognition (ASR) systems for people with speech impairments like dysarthria is challenging due to limited and varied real-world data. This project explores methods for augmenting such datasets with high-quality synthetic speech. These synthetic samples could make speech technologies more inclusive and accurate, especially in clinical or assistive technology settings. |
| |
| ### Why have I been invited to take part? |
| You are a native English speaker who is over 18, and you have been invited to evaluate speech samples based on their naturalness and intelligibility. |
| |
| ### What will happen if I decide to take part? |
| The total time required is approximately **15 to 20 minutes**. You will listen to a series of short audio recordings of impaired speech (both real and AI-generated) and rate their naturalness and intelligibility. The recordings will be randomised and anonymised. |
| |
| Participation is completely voluntary, and you are free to withdraw at any time without explanation or penalty. |
| |
| ### How will the data be used? |
| Your responses will be stored anonymously and statistically analysed. All speech samples are pre-generated - you will not be recorded. This research is purely academic with no commercial purpose. |
| |
| ### How will your privacy be protected? |
| You will be assigned an anonymous ID. No personal information will be published or disclosed. All data will be securely stored per TU Dublin's data protection policies. |
| |
| ### Research Funding |
| This research is funded by Research Ireland under D-REAL (https://d-real.ie/) and ADAPT (https://www.adaptcentre.ie/). |
| |
| ### Benefits of participation |
| You will contribute to advancing inclusive AI technologies for speech therapy and assistive communication. |
| |
| ### Risks |
| There are minimal risks. Some speech samples may be hard to understand. You can withdraw immediately if you feel any discomfort. |
| |
| ### Contact |
| For questions or concerns: D23126641@mytudublin.ie |
| |
| ### Ethics Review |
| This project has been reviewed and approved by the Research Ethics Committee at TU Dublin. |
| |
| --- |
| |
| ## Consent Form |
| **Please confirm the following statements:** |
| """) |
| |
| |
| gr.Markdown("**Prolific ID (Optional):**") |
| prolific_id_input = gr.Textbox( |
| label="Prolific ID", |
| placeholder="Enter your Prolific ID here (if applicable)", |
| info="If you came from Prolific, please enter your ID to help us verify your participation. This is optional.", |
| interactive=True |
| ) |
| |
| gr.Markdown("---") |
| |
| |
| consent_age = gr.Checkbox(label="I am over 18 years old", value=False) |
| consent_native = gr.Checkbox(label="I am a native English speaker", value=False) |
| consent_impaired = gr.Checkbox(label="I understand I will listen to a variety of impaired speech samples", value=False) |
| consent_hearing = gr.Checkbox(label="I do not have any known hearing impairments", value=False) |
| consent_english = gr.Checkbox(label="I understand the experiment will be conducted in English", value=False) |
| consent_anonymous = gr.Checkbox(label="I understand that all data I submit will be anonymous", value=False) |
| consent_info = gr.Checkbox(label="I have read and understood the participant information sheet", value=False) |
| consent_analysis = gr.Checkbox(label="I consent to my responses being used for analysis", value=False) |
| consent_withdraw = gr.Checkbox(label="I understand that I can withdraw or request deletion of my data any time before project completion", value=False) |
| consent_participate = gr.Checkbox(label="I consent to take part in this research study", value=False) |
| consent_stop = gr.Checkbox(label="I understand that I can stop participating in this research at any time", value=False) |
| |
| |
| consent_status = gr.Markdown("", visible=False) |
| |
| |
| consent_button = gr.Button("I Consent to Participate", variant="primary", size="lg") |
| |
| |
| with gr.Column(visible=False) as experiment_screen: |
| gr.Markdown(""" |
| # Pathological Speech Evaluation Experiment |
| |
| Thank you for participating! You will now evaluate speech samples. |
| |
| ## Instructions: |
| For each audio sample: |
| |
| 1. **Listen** to the audio sample carefully |
| 2. **Select a naturalness rating** using the radio buttons (1=Bad, 2=Poor, 3=Fair, 4=Good, 5=Excellent) |
| 3. **The transcript will automatically appear** when you select your naturalness rating |
| 4. **Select an intelligibility rating** after seeing the transcript |
| 5. **Click "Submit Rating"** to save both ratings and move to the next sample |
| |
| ## Rating Scale: |
| - **1**: Bad |
| - **2**: Poor |
| - **3**: Fair |
| - **4**: Good |
| - **5**: Excellent |
| |
| """) |
| |
| |
| current_participant_id = gr.State(value=generate_participant_id) |
| current_prolific_id = gr.State(value="") |
| |
| session_stimulus_index = gr.State(value=0) |
| session_stimulus_set = gr.State(value=create_session_stimulus_set) |
| |
| with gr.Row(): |
| with gr.Column(): |
| participant_id_display = gr.Textbox( |
| label="Participant ID", |
| interactive=False |
| ) |
| |
| |
| progress_text = gr.Textbox( |
| label="Progress", |
| interactive=False, |
| value="Progress: 0/0 samples" |
| ) |
| |
| |
| evaluation_step_display = gr.Textbox( |
| label="Instructions", |
| interactive=False, |
| value="" |
| ) |
| |
| |
| sample_id = gr.Textbox(label="Sample ID", visible=False) |
| sample_type = gr.Textbox(label="Sample Type", visible=False) |
| evaluation_stage = gr.Textbox(label="Evaluation Stage", visible=False) |
| original_speaker = gr.Textbox(label="Original Speaker", visible=False) |
| synthetic_speaker = gr.Textbox(label="Synthetic Speaker", visible=False) |
| stored_transcription = gr.Textbox(label="Stored Transcription", visible=False) |
| |
| |
| audio_player = gr.Audio( |
| label="Speech Sample", |
| type="filepath", |
| format="wav", |
| autoplay=False |
| ) |
| |
| |
| naturalness_rating = gr.Radio( |
| choices=["1", "2", "3", "4", "5"], |
| label="Naturalness Rating", |
| info="How natural (pleasantly human-like) was the sound of this audio sample?", |
| value=None |
| ) |
| |
| |
| intelligibility_rating = gr.Radio( |
| choices=["1", "2", "3", "4", "5"], |
| label="Intelligibility Rating", |
| info="How easy was it to understand the speech?", |
| value=None, |
| visible=False |
| ) |
| |
| |
| transcription_display = gr.Textbox( |
| label="Transcription (What should be said)", |
| visible=False, |
| interactive=False |
| ) |
| |
| |
| status = gr.Textbox( |
| label="Status", |
| interactive=False, |
| lines=4 |
| ) |
| |
| submit_btn = gr.Button("Submit Rating", variant="primary") |
| |
| def load_sample(participant_id, stim_index, stim_set): |
| """Load current sample from parquet data (per-session state)""" |
| global temp_files |
| |
| |
| PROLIFIC_COMPLETION_URL = "https://app.prolific.com/submissions/complete?cc=CXOYRH0O" |
| |
| if stim_index >= len(stim_set): |
| |
| completion_instructions = f"""🎉 Experiment Complete! |
| |
| Click here to complete: {PROLIFIC_COMPLETION_URL} |
| |
| Completion Code: CXOYRH0O""" |
| |
| |
| log_session_event(participant_id, '', 'experiment_complete', { |
| 'total_samples': len(stim_set), |
| 'samples_completed': stim_index |
| }) |
| |
| return [ |
| None, |
| "Experiment Complete", |
| "Complete", |
| "", |
| "", |
| gr.update(visible=False), |
| None, |
| gr.update(visible=False), |
| None, |
| gr.update(visible=False), |
| f"Copy URL: {PROLIFIC_COMPLETION_URL}", |
| "", |
| "", |
| participant_id, |
| f"✅ Complete: {len(stim_set)}/{len(stim_set)} samples", |
| completion_instructions, |
| gr.update(interactive=False, value="Experiment Complete"), |
| stim_index, |
| stim_set |
| ] |
| |
| current_stimulus = stim_set[stim_index] |
| |
| |
| progress_info = f"Progress: {stim_index + 1}/{len(stim_set)} samples" |
| step_info = "Step 1: Listen to the audio and select a naturalness rating. Step 2: The transcript will appear - then rate intelligibility." |
| |
| |
| speaker_id = current_stimulus['speaker_id'] |
| method = current_stimulus['method'] |
| sample_number = current_stimulus['sample_number'] |
| |
| audio_array, sample_rate = get_audio_from_parquet(speaker_id, method, sample_number) |
| |
| if audio_array is not None: |
| |
| try: |
| reduced_audio = nr.reduce_noise(y=audio_array, sr=sample_rate) |
| except Exception as e: |
| print(f"Noise reduction failed: {e}") |
| reduced_audio = audio_array |
| |
| |
| temp_path = os.path.join(TEMP_DIR, f"sample_{current_stimulus['sample_id']}_{uuid.uuid4()}.wav") |
| sf.write(temp_path, reduced_audio, sample_rate) |
| temp_files.append(temp_path) |
| preprocessed_audio = temp_path |
| else: |
| |
| print(f"Warning: Could not load audio for {speaker_id}/{method}/{sample_number}") |
| temp_path = os.path.join(TEMP_DIR, f"silent_{uuid.uuid4()}.wav") |
| sr = 16000 |
| silent_audio = np.zeros(int(sr * 1.5)) |
| sf.write(temp_path, silent_audio, sr) |
| temp_files.append(temp_path) |
| preprocessed_audio = temp_path |
| |
| return [ |
| preprocessed_audio, |
| current_stimulus['sample_id'], |
| current_stimulus['sample_type'], |
| current_stimulus['transcription'], |
| "", |
| gr.update(visible=False), |
| None, |
| gr.update(visible=True), |
| None, |
| gr.update(visible=False), |
| "", |
| current_stimulus['original_speaker'], |
| current_stimulus['synthetic_speaker'], |
| participant_id, |
| progress_info, |
| step_info, |
| gr.update(interactive=True, value="Submit Rating"), |
| stim_index, |
| stim_set |
| ] |
| |
| def on_naturalness_selected(naturalness_rating, stored_transcription): |
| """Show transcript and intelligibility rating when naturalness is selected""" |
| if naturalness_rating is not None: |
| return [ |
| stored_transcription, |
| gr.update(visible=True), |
| gr.update(visible=True), |
| "Now rate the intelligibility after seeing the transcript." |
| ] |
| else: |
| return [ |
| "", |
| gr.update(visible=False), |
| gr.update(visible=False), |
| "" |
| ] |
| |
| def submit_rating(participant_id, prolific_id, sample_id, sample_type, stored_transcription, |
| naturalness_rating, intelligibility_rating, original_speaker, synthetic_speaker, |
| stim_index, stim_set): |
| """Handle rating submission and move to next sample (per-session state)""" |
| |
| |
| if naturalness_rating is None or intelligibility_rating is None: |
| return [ |
| gr.skip(), |
| gr.skip(), |
| gr.skip(), |
| gr.skip(), |
| gr.skip(), |
| gr.skip(), |
| gr.skip(), |
| gr.skip(), |
| gr.skip(), |
| gr.skip(), |
| "Please provide both naturalness and intelligibility ratings before submitting.", |
| gr.skip(), |
| gr.skip(), |
| gr.skip(), |
| gr.skip(), |
| gr.skip(), |
| gr.skip(), |
| stim_index, |
| stim_set |
| ] |
| |
| |
| save_rating( |
| participant_id, prolific_id, sample_id, sample_type, |
| int(naturalness_rating), int(intelligibility_rating), |
| stored_transcription, original_speaker, synthetic_speaker |
| ) |
| |
| |
| new_index = stim_index + 1 |
| next_outputs = load_sample(participant_id, new_index, stim_set) |
| next_outputs[10] = "Ratings saved successfully!" |
| |
| return next_outputs |
| |
| def handle_consent(c_age, c_native, c_impaired, c_hearing, c_english, |
| c_anonymous, c_info, c_analysis, c_withdraw, |
| c_participate, c_stop, participant_id, prolific_id): |
| """Handle consent form submission""" |
| |
| clean_prolific_id = prolific_id.strip() if prolific_id else "" |
| |
| |
| all_checked = all([c_age, c_native, c_impaired, c_hearing, c_english, |
| c_anonymous, c_info, c_analysis, c_withdraw, |
| c_participate, c_stop]) |
| |
| if all_checked: |
| |
| save_consent(participant_id, clean_prolific_id) |
| |
| |
| return [ |
| gr.update(visible=False), |
| gr.update(visible=True), |
| gr.update(visible=False), |
| True, |
| clean_prolific_id |
| ] |
| else: |
| |
| return [ |
| gr.update(visible=True), |
| gr.update(visible=False), |
| gr.update(value="⚠️ **Please check all boxes to confirm your consent before proceeding.**", |
| visible=True), |
| False, |
| prolific_id |
| ] |
| |
| |
| |
| consent_button.click( |
| handle_consent, |
| inputs=[consent_age, consent_native, consent_impaired, consent_hearing, |
| consent_english, consent_anonymous, consent_info, consent_analysis, |
| consent_withdraw, consent_participate, consent_stop, current_participant_id, prolific_id_input], |
| outputs=[consent_screen, experiment_screen, consent_status, consent_given, current_prolific_id] |
| ) |
| |
| |
| naturalness_rating.change( |
| on_naturalness_selected, |
| inputs=[naturalness_rating, stored_transcription], |
| outputs=[transcription_display, transcription_display, intelligibility_rating, status] |
| ) |
| |
| |
| submit_btn.click( |
| submit_rating, |
| inputs=[ |
| current_participant_id, current_prolific_id, sample_id, sample_type, stored_transcription, |
| naturalness_rating, intelligibility_rating, original_speaker, synthetic_speaker, |
| session_stimulus_index, session_stimulus_set |
| ], |
| outputs=[ |
| audio_player, sample_id, sample_type, stored_transcription, |
| transcription_display, transcription_display, naturalness_rating, |
| naturalness_rating, intelligibility_rating, intelligibility_rating, status, |
| original_speaker, synthetic_speaker, participant_id_display, |
| progress_text, evaluation_step_display, submit_btn, |
| session_stimulus_index, session_stimulus_set |
| ] |
| ) |
| |
| |
| def check_consent_and_load(consent_status, participant_id, stim_index, stim_set): |
| """Only load sample if consent has been given (per-session state)""" |
| if consent_status: |
| return load_sample(participant_id, stim_index, stim_set) |
| else: |
| |
| return [ |
| None, |
| "", |
| "", |
| "", |
| "", |
| gr.update(visible=False), |
| None, |
| gr.update(visible=True), |
| None, |
| gr.update(visible=False), |
| "", |
| "", |
| "", |
| "", |
| "", |
| "", |
| gr.update(interactive=True, value="Submit Rating"), |
| stim_index, |
| stim_set |
| ] |
| |
| |
| consent_given.change( |
| fn=check_consent_and_load, |
| inputs=[consent_given, current_participant_id, session_stimulus_index, session_stimulus_set], |
| outputs=[ |
| audio_player, sample_id, sample_type, stored_transcription, |
| transcription_display, transcription_display, naturalness_rating, |
| naturalness_rating, intelligibility_rating, intelligibility_rating, status, |
| original_speaker, synthetic_speaker, participant_id_display, |
| progress_text, evaluation_step_display, submit_btn, |
| session_stimulus_index, session_stimulus_set |
| ] |
| ) |
| |
| return demo |
|
|
| |
| if __name__ == "__main__": |
| demo = create_experiment_interface() |
| demo.launch() |