kesbeast23's picture
update sessiom
4a615fc unverified
import gradio as gr
import os
import random
import pandas as pd
from datetime import datetime
import numpy as np
import uuid
import soundfile as sf
import librosa
import noisereduce as nr
import tempfile
import atexit
import shutil
import requests
from urllib.parse import quote
from dotenv import load_dotenv
from supabase import create_client, Client
# Load environment variables
load_dotenv()
# Initialize Supabase client
SUPABASE_URL = os.getenv('SUPABASE_URL')
SUPABASE_KEY = os.getenv('SUPABASE_KEY')
if SUPABASE_URL and SUPABASE_KEY:
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
print(f"Supabase client initialized successfully")
else:
supabase = None
print("Warning: Supabase credentials not found. Results will not be saved to database.")
# Constants
# Get absolute paths
WORKSPACE_ROOT = os.path.dirname(os.path.abspath(__file__))
ORIGINAL_DATA_DIR = os.path.join(WORKSPACE_ROOT, "torgo_original")
SYNTHETIC_DATA_DIR = os.path.join(WORKSPACE_ROOT, "torgo-synthetic")
RESULTS_FILE = os.path.join(WORKSPACE_ROOT, "experiment_Results.csv")
TEMP_DIR = os.path.join(tempfile.gettempdir(), "speech_evaluation")
# GitHub repository for audio files
# Replace these with your GitHub username and repository name
GITHUB_USERNAME = "kesbeast23"
GITHUB_REPO = "dysathric-audio"
GITHUB_BRANCH = "main"
GITHUB_AUDIO_BASE_URL = f"https://raw.githubusercontent.com/{GITHUB_USERNAME}/{GITHUB_REPO}/{GITHUB_BRANCH}"
# Create directories if they don't exist
os.makedirs(TEMP_DIR, exist_ok=True)
os.makedirs(os.path.join(ORIGINAL_DATA_DIR, "data"), exist_ok=True)
os.makedirs(os.path.join(SYNTHETIC_DATA_DIR, "data"), exist_ok=True)
# Track generated temp files for cleanup
temp_files = []
# Flag to check if running in demo mode (no audio files)
DEMO_MODE = True
# Check if we're in demo mode (no audio files)
def check_demo_mode():
original_data_path = os.path.join(ORIGINAL_DATA_DIR, "data")
synthetic_data_path = os.path.join(SYNTHETIC_DATA_DIR, "data")
# Check if data directories exist and contain files
if (os.path.exists(original_data_path) and len(os.listdir(original_data_path)) > 0 and
os.path.exists(synthetic_data_path) and len(os.listdir(synthetic_data_path)) > 0):
return False
return True
# Set demo mode flag
DEMO_MODE = check_demo_mode()
if DEMO_MODE:
print("Running in DEMO MODE - No audio files found")
# Register cleanup function to run on exit
def cleanup_temp_files():
"""Remove temporary files and directory on exit"""
for temp_file in temp_files:
try:
if os.path.exists(temp_file):
os.remove(temp_file)
except Exception as e:
print(f"Error removing temp file {temp_file}: {e}")
try:
if os.path.exists(TEMP_DIR):
shutil.rmtree(TEMP_DIR)
except Exception as e:
print(f"Error removing temp directory {TEMP_DIR}: {e}")
atexit.register(cleanup_temp_files)
# Sample type mapping
SAMPLE_TYPE_MAPPING = {
"Original": "Natural", # For display purposes
"Natural": "Original" # For database storage
}
# Define columns for results DataFrame
COLUMNS = [
'timestamp', 'participant_id', 'sample_id', 'sample_type',
'evaluation_type', 'naturalness_rating', 'intelligibility_rating',
'comments', 'transcription', 'original_speaker', 'synthetic_speaker',
'consent_given'
]
# Initialize results DataFrame
try:
results_df = pd.read_csv(RESULTS_FILE)
# Verify columns match expected structure
if list(results_df.columns) != COLUMNS:
results_df = pd.DataFrame(columns=COLUMNS)
results_df.to_csv(RESULTS_FILE, index=False)
except (pd.errors.EmptyDataError, FileNotFoundError):
# Create new DataFrame if file is empty or doesn't exist
results_df = pd.DataFrame(columns=COLUMNS)
results_df.to_csv(RESULTS_FILE, index=False)
# Speaker configuration for the experiment
# 8 speakers: 4 healthy + 4 dysarthric
SPEAKERS = {
'F04': {'label': 'F04 (TORGO Dys F)', 'type': 'dysarthric', 'gender': 'F'},
'M02': {'label': 'M02 (TORGO Dys M)', 'type': 'dysarthric', 'gender': 'M'},
'FC02': {'label': 'FC02 (TORGO Hlth F)', 'type': 'healthy', 'gender': 'F'},
'MC01': {'label': 'MC01 (TORGO Hlth M)', 'type': 'healthy', 'gender': 'M'},
'F02': {'label': 'F02 (UA Dys F)', 'type': 'dysarthric', 'gender': 'F'},
'M04': {'label': 'M04 (UA Dys M)', 'type': 'dysarthric', 'gender': 'M'},
'211': {'label': '211 (LibriSp healthy F)', 'type': 'healthy', 'gender': 'F'},
'4014': {'label': '4014 (LibriSp healthy M)', 'type': 'healthy', 'gender': 'M'},
}
# Methods to use in the experiment: Original (repeated 2x), Sesame_TTS, Spark_KNN
SELECTED_METHODS = ['Original', 'Sesame_TTS', 'Spark_KNN']
# Parquet files for each speaker
PARQUET_FILES = {
'211': '211_audio_samples.parquet',
'4014': '4014_audio_samples.parquet',
'F02': 'F02_audio_samples.parquet',
'F04': 'F04_audio_samples.parquet',
'FC02': 'FC02_audio_samples.parquet',
'M02': 'M02_audio_samples.parquet',
'M04': 'M04_audio_samples.parquet',
'MC01': 'MC01_audio_samples.parquet',
}
# Load all parquet data
def load_all_speaker_data():
"""Load all parquet files into a dict"""
all_data = {}
for speaker_id, filename in PARQUET_FILES.items():
filepath = os.path.join(WORKSPACE_ROOT, filename)
if os.path.exists(filepath):
df = pd.read_parquet(filepath)
all_data[speaker_id] = df
print(f"Loaded {len(df)} samples for speaker {speaker_id}")
else:
print(f"Warning: Parquet file not found: {filepath}")
return all_data
# Load speaker data at startup
SPEAKER_DATA = load_all_speaker_data()
# Set a fixed random seed for reproducibility
RANDOM_SEED = 42
random.seed(RANDOM_SEED)
def convert_display_type_to_storage(display_type):
"""Convert display sample type to storage type"""
if display_type == "Natural":
return "Original"
return display_type
def convert_storage_type_to_display(storage_type):
"""Convert storage sample type to display type"""
if storage_type == "Original":
return "Natural"
return storage_type
def get_audio_path(file_path, is_original=True):
"""Convert metadata file path to actual audio file path"""
# Remove 'data/' prefix if present
file_path = file_path.replace('data/', '')
# Construct absolute path
if is_original:
return os.path.join(ORIGINAL_DATA_DIR, "data", file_path)
else:
return os.path.join(SYNTHETIC_DATA_DIR, "data", file_path)
def get_github_audio_url(file_path, is_original=True):
"""Get the GitHub URL for an audio file"""
# Remove 'data/' prefix if present
file_path = file_path.replace('data/', '')
# Determine the GitHub path
if is_original:
return f"{GITHUB_AUDIO_BASE_URL}/torgo_original/data/{file_path}"
else:
return f"{GITHUB_AUDIO_BASE_URL}/torgo-synthetic/data/{file_path}"
def stream_audio_from_github(file_path, is_original=True):
"""Stream audio file from GitHub directly without saving locally"""
# Get the GitHub URL
github_url = get_github_audio_url(file_path, is_original)
try:
# URL encode to handle spaces and special characters
encoded_url = quote(github_url, safe=':/')
# Stream the file directly from GitHub
response = requests.get(encoded_url, stream=True)
response.raise_for_status()
# Load audio data directly into memory using BytesIO
audio_data = response.content
return audio_data
except Exception as e:
print(f"Error streaming audio from {github_url}: {e}")
return None
def verify_audio_file(file_path):
"""Verify that audio file exists and is readable"""
if DEMO_MODE:
# Try to stream from GitHub first to verify existence
try:
if "torgo_original" in file_path:
audio_data = stream_audio_from_github(os.path.basename(file_path), is_original=True)
else:
audio_data = stream_audio_from_github(os.path.basename(file_path), is_original=False)
if audio_data:
return True
except Exception as e:
print(f"Error verifying streamed audio: {e}")
# If download failed, pretend the file exists for demo mode
return True
try:
if os.path.exists(file_path):
data, samplerate = sf.read(file_path)
return True
else:
# If not found locally, try to stream it
if "torgo_original" in file_path:
audio_data = stream_audio_from_github(os.path.basename(file_path), is_original=True)
else:
audio_data = stream_audio_from_github(os.path.basename(file_path), is_original=False)
return audio_data is not None
except:
return False
def generate_participant_id():
"""Generate a unique participant ID for each session.
Format: P{number}_{YYYYMMDD}_{HHMMSS}
Example: P001_20260209_193800
Combines sequential numbering with timestamp for guaranteed uniqueness.
"""
from datetime import datetime
existing_ids = set()
# Check Supabase for existing IDs
if supabase:
try:
response = supabase.table('experiment_results').select('participant_id').execute()
if response.data:
existing_ids = set(row['participant_id'] for row in response.data if row.get('participant_id'))
except Exception as e:
print(f"Could not fetch existing IDs from Supabase: {e}")
# Count existing participants (extract number from IDs like P001_... or P001)
max_num = 0
for pid in existing_ids:
if pid and pid.startswith('P'):
try:
num_part = pid[1:4] # Extract the 3-digit number after P
max_num = max(max_num, int(num_part))
except (ValueError, IndexError):
pass
# Generate new ID with sequential number and timestamp
counter = max_num + 1
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
new_id = f"P{counter:03d}_{timestamp}"
print(f"Generated new participant ID: {new_id}")
return new_id
def preprocess_audio(file_path):
"""Remove background noise from the audio file and return a temporary file path"""
global temp_files
if DEMO_MODE and (not os.path.exists(file_path) or "demo_" in file_path):
try:
# Determine if it's an original or synthetic file
is_original = "torgo_original" in file_path
# Stream the audio directly without saving locally
audio_data = stream_audio_from_github(os.path.basename(file_path), is_original)
if audio_data:
# Create a temporary file to process the streamed audio
temp_in_path = os.path.join(TEMP_DIR, f"streamed_{uuid.uuid4()}.wav")
with open(temp_in_path, 'wb') as f:
f.write(audio_data)
# Process the streamed audio
audio, sr = librosa.load(temp_in_path, sr=None)
# Apply noise reduction
reduced_noise = nr.reduce_noise(y=audio, sr=sr)
# Save the processed audio to a new temporary file
temp_out_path = os.path.join(TEMP_DIR, f"processed_{uuid.uuid4()}.wav")
sf.write(temp_out_path, reduced_noise, sr)
# Track both temp files for cleanup
temp_files.append(temp_in_path)
temp_files.append(temp_out_path)
# Remove the temporary input file right away
try:
os.remove(temp_in_path)
except:
pass
return temp_out_path
except Exception as e:
print(f"Error processing streamed audio: {e}")
# If streaming failed or we're in demo mode without files, create a silent audio
temp_path = os.path.join(TEMP_DIR, f"demo_{uuid.uuid4()}.wav")
sr = 16000
silent_audio = np.zeros(int(sr * 1.5)) # 1.5 seconds of silence
sf.write(temp_path, silent_audio, sr)
temp_files.append(temp_path)
return temp_path
try:
# For local files, use the original approach
if os.path.exists(file_path):
# Load audio file
audio, sr = librosa.load(file_path, sr=None)
# Apply noise reduction
reduced_noise = nr.reduce_noise(y=audio, sr=sr)
# Create a temporary file to store the noise-reduced audio
temp_path = os.path.join(TEMP_DIR, f"processed_{os.path.basename(file_path)}")
sf.write(temp_path, reduced_noise, sr)
# Track the temp file for later cleanup
temp_files.append(temp_path)
return temp_path
else:
# File doesn't exist locally, try streaming it
is_original = "torgo_original" in file_path
audio_data = stream_audio_from_github(os.path.basename(file_path), is_original)
if audio_data:
# Create a temporary file to process the streamed audio
temp_in_path = os.path.join(TEMP_DIR, f"streamed_{uuid.uuid4()}.wav")
with open(temp_in_path, 'wb') as f:
f.write(audio_data)
# Process the streamed audio
audio, sr = librosa.load(temp_in_path, sr=None)
# Apply noise reduction
reduced_noise = nr.reduce_noise(y=audio, sr=sr)
# Save the processed audio to a new temporary file
temp_out_path = os.path.join(TEMP_DIR, f"processed_{uuid.uuid4()}.wav")
sf.write(temp_out_path, reduced_noise, sr)
# Track both temp files for cleanup
temp_files.append(temp_in_path)
temp_files.append(temp_out_path)
# Remove the temporary input file right away
try:
os.remove(temp_in_path)
except:
pass
return temp_out_path
# Fallback to silent audio
temp_path = os.path.join(TEMP_DIR, f"error_{uuid.uuid4()}.wav")
sr = 16000
silent_audio = np.zeros(int(sr * 1.5)) # 1.5 seconds of silence
sf.write(temp_path, silent_audio, sr)
temp_files.append(temp_path)
return temp_path
except Exception as e:
print(f"Error preprocessing audio: {e}")
# Create a silent audio in case of error
temp_path = os.path.join(TEMP_DIR, f"error_{uuid.uuid4()}.wav")
sr = 16000
silent_audio = np.zeros(int(sr * 1.5)) # 1.5 seconds of silence
sf.write(temp_path, silent_audio, sr)
temp_files.append(temp_path)
return temp_path
def categorize_by_duration(df):
"""Categorize samples as short (1-3 words) or long (4+ words)"""
df = df.copy()
df['word_count'] = df['transcript'].apply(lambda x: len(str(x).split()))
df['duration_category'] = df['word_count'].apply(lambda x: 'short' if x <= 3 else 'long')
return df
def get_audio_from_parquet(speaker_id, method, sample_number):
"""Get audio array and sample rate from parquet data"""
import json
if speaker_id not in SPEAKER_DATA:
return None, None
df = SPEAKER_DATA[speaker_id]
# Convert sample_number to string for comparison (parquet stores as string)
sample_num_str = str(sample_number)
row = df[(df['method'] == method) & (df['sample_number'] == sample_num_str)]
if len(row) == 0:
return None, None
row = row.iloc[0]
# Audio is stored as JSON string in parquet - need to decode it
audio_str = row['audio_array']
if isinstance(audio_str, str):
audio_list = json.loads(audio_str)
audio_array = np.array(audio_list)
else:
audio_array = np.array(audio_str)
sample_rate = int(row['sample_rate'])
return audio_array, sample_rate
def create_balanced_stimulus_set():
"""Create balanced 64-sample stimulus set.
Design:
- 8 speakers (4 healthy, 4 dysarthric)
- Each speaker: 8 samples (4 short + 4 long)
- Per length category: Original, Original (repeat), Sesame_TTS, Spark_KNN
- Total: 8 speakers × 8 samples = 64 samples
Sample Selection (REPRODUCIBLE):
- Short sample: sample_number='1' (single-word utterance)
- Long sample: sample_number='3' (multi-word sentence)
"""
# Fixed sample selection for reproducibility
# sample_number='1' = short utterance, sample_number='3' = long utterance
SAMPLE_SELECTION = {
'short': '1', # First sample (short word)
'long': '3', # Third sample (sentence)
}
stimulus_set = []
sample_counter = 0
print("\n=== Sample Selection (Reproducible) ===")
for speaker_id, speaker_info in SPEAKERS.items():
if speaker_id not in SPEAKER_DATA:
print(f"Warning: No data for speaker {speaker_id}")
continue
df = SPEAKER_DATA[speaker_id]
# Get original samples for this speaker
original_samples = df[df['method'] == 'Original'].copy()
# Use explicit sample_number selection for reproducibility
for duration_cat, sample_num in SAMPLE_SELECTION.items():
# Get the specific sample by sample_number
sample_row = original_samples[original_samples['sample_number'] == sample_num]
if len(sample_row) == 0:
print(f"Warning: No sample_number={sample_num} for {speaker_id}")
continue
sample_row = sample_row.iloc[0]
transcript = sample_row['transcript']
print(f" {speaker_id} {duration_cat}: sample_number={sample_num} -> '{transcript[:50]}{'...' if len(transcript) > 50 else ''}'")
# Methods to add: Original (×2 for balance), Sesame_TTS, Spark_KNN
methods_to_add = [
('Original', 'original_1'),
('Original', 'original_2'), # Repeat for balance
('Sesame_TTS', 'sesame_tts'),
('Spark_KNN', 'spark_knn'),
]
for method, method_label in methods_to_add:
sample_counter += 1
# Determine sample type label
if method == 'Original':
sample_type = 'Original'
else:
sample_type = 'Synthetic'
stimulus_set.append({
'sample_id': f"{speaker_id}_{duration_cat}_{method_label}",
'speaker_id': speaker_id,
'speaker_type': speaker_info['type'],
'speaker_gender': speaker_info['gender'],
'speaker_label': speaker_info['label'],
'method': method,
'sample_number': sample_num,
'transcription': transcript,
'duration_category': duration_cat,
'sample_type': sample_type,
'original_speaker': speaker_id,
'synthetic_speaker': speaker_id if method != 'Original' else '',
})
# Randomize presentation order
random.shuffle(stimulus_set)
# Print statistics
print(f"\n=== Created Balanced Stimulus Set ===")
print(f"Total samples: {len(stimulus_set)}")
# Count by speaker type
healthy_count = sum(1 for s in stimulus_set if s['speaker_type'] == 'healthy')
dysarthric_count = sum(1 for s in stimulus_set if s['speaker_type'] == 'dysarthric')
print(f"By speaker type: Healthy={healthy_count}, Dysarthric={dysarthric_count}")
# Count by sample type
original_count = sum(1 for s in stimulus_set if s['sample_type'] == 'Original')
synthetic_count = sum(1 for s in stimulus_set if s['sample_type'] == 'Synthetic')
print(f"By sample type: Original={original_count}, Synthetic={synthetic_count}")
# Count by method
method_counts = {}
for s in stimulus_set:
m = s['method']
method_counts[m] = method_counts.get(m, 0) + 1
print(f"By method: {method_counts}")
# Count by duration
short_count = sum(1 for s in stimulus_set if s['duration_category'] == 'short')
long_count = sum(1 for s in stimulus_set if s['duration_category'] == 'long')
print(f"By duration: Short={short_count}, Long={long_count}")
return stimulus_set
# Create a TEMPLATE stimulus set (this is the shared base - each session gets a shuffled copy)
_stimulus_set_template = create_balanced_stimulus_set()
print(f"Stimulus set template ready with {len(_stimulus_set_template)} samples")
def create_session_stimulus_set():
"""Create a fresh shuffled stimulus set for a new session."""
import copy
session_set = copy.deepcopy(_stimulus_set_template)
random.shuffle(session_set)
return session_set
def log_session_event(participant_id, prolific_id, event_type, details=None):
"""Log a session event to Supabase for tracking user sessions.
Uses the existing experiment_results table with event_type as evaluation_type.
event_type: 'session_start', 'sample_completed', 'experiment_complete'
details: dict with extra info like progress, sample_id, etc.
"""
if not supabase:
return
try:
event_row = {
'timestamp': datetime.now().isoformat(),
'participant_id': participant_id,
'prolific_id': prolific_id or '',
'sample_id': f'{participant_id}_{event_type}',
'sample_type': 'Event',
'evaluation_type': event_type,
'naturalness_rating': None,
'intelligibility_rating': None,
'comments': str(details) if details else '',
'transcription': None,
'original_speaker': None,
'synthetic_speaker': None,
'consent_given': None
}
supabase.table('experiment_results').insert(event_row).execute()
print(f"Session event logged: {event_type} for {participant_id}")
except Exception as e:
print(f"Warning: Could not log session event: {e}")
def save_consent(participant_id, prolific_id):
"""Save consent record to Supabase"""
global results_df
timestamp = datetime.now().isoformat()
# Create consent record with clear identification
consent_row = {
'timestamp': timestamp,
'participant_id': participant_id,
'prolific_id': prolific_id,
'sample_id': f'{participant_id}',
'sample_type': 'Consent',
'evaluation_type': 'consent',
'naturalness_rating': None,
'intelligibility_rating': None,
'comments': "Informed consent given",
'transcription': None,
'original_speaker': None,
'synthetic_speaker': None,
'consent_given': True
}
# Save to Supabase
if supabase:
try:
supabase.table('experiment_results').insert(consent_row).execute()
# Log session start event
log_session_event(participant_id, prolific_id, 'session_start', {
'total_samples': len(_stimulus_set_template)
})
print(f"Consent saved to Supabase for participant {participant_id}")
except Exception as e:
print(f"Error saving consent to Supabase: {e}")
# Fallback to CSV
results_df = pd.concat([results_df, pd.DataFrame([consent_row])], ignore_index=True)
results_df.to_csv(RESULTS_FILE, index=False)
else:
# Fallback to CSV if Supabase not configured
results_df = pd.concat([results_df, pd.DataFrame([consent_row])], ignore_index=True)
results_df.to_csv(RESULTS_FILE, index=False)
return f"Consent recorded for participant {participant_id}"
def save_rating(participant_id, prolific_id, sample_id, sample_type,
naturalness_rating, intelligibility_rating,
transcription, original_speaker, synthetic_speaker):
"""Save both ratings to Supabase in a single row"""
global results_df
timestamp = datetime.now().isoformat()
# Create single row with both ratings
new_row = {
'timestamp': timestamp,
'participant_id': participant_id,
'prolific_id': prolific_id,
'sample_id': sample_id,
'sample_type': sample_type,
'evaluation_type': 'combined',
'naturalness_rating': naturalness_rating,
'intelligibility_rating': intelligibility_rating,
'comments': "",
'transcription': transcription,
'original_speaker': original_speaker,
'synthetic_speaker': synthetic_speaker,
'consent_given': None
}
# Save to Supabase
if supabase:
try:
supabase.table('experiment_results').insert(new_row).execute()
print(f"Rating saved to Supabase: {sample_id} (naturalness: {naturalness_rating}, intelligibility: {intelligibility_rating})")
# Log sample completion event
log_session_event(participant_id, prolific_id, 'sample_completed', {
'sample_id': sample_id,
'sample_type': sample_type
})
except Exception as e:
print(f"Error saving rating to Supabase: {e}")
# Fallback to CSV
results_df = pd.concat([results_df, pd.DataFrame([new_row])], ignore_index=True)
results_df.to_csv(RESULTS_FILE, index=False)
else:
# Fallback to CSV if Supabase not configured
results_df = pd.concat([results_df, pd.DataFrame([new_row])], ignore_index=True)
results_df.to_csv(RESULTS_FILE, index=False)
return f"Rating saved successfully!"
def create_experiment_interface():
"""Create the Gradio interface for MOS-style evaluation"""
with gr.Blocks(title="Pathological Speech Evaluation", theme=gr.themes.Soft()) as demo:
# State to track consent status
consent_given = gr.State(value=False)
# Consent Screen
with gr.Column(visible=True) as consent_screen:
gr.Markdown("""
# Speech Evaluation Research Study
## Participant Information Sheet
### What is this research about?
This research aims to investigate how well synthetic speech, generated using advanced AI techniques, replicates the characteristics of impaired speech. We evaluate these synthetic voices using human judgment.
### Why are you doing this research?
Developing robust Automatic Speech Recognition (ASR) systems for people with speech impairments like dysarthria is challenging due to limited and varied real-world data. This project explores methods for augmenting such datasets with high-quality synthetic speech. These synthetic samples could make speech technologies more inclusive and accurate, especially in clinical or assistive technology settings.
### Why have I been invited to take part?
You are a native English speaker who is over 18, and you have been invited to evaluate speech samples based on their naturalness and intelligibility.
### What will happen if I decide to take part?
The total time required is approximately **15 to 20 minutes**. You will listen to a series of short audio recordings of impaired speech (both real and AI-generated) and rate their naturalness and intelligibility. The recordings will be randomised and anonymised.
Participation is completely voluntary, and you are free to withdraw at any time without explanation or penalty.
### How will the data be used?
Your responses will be stored anonymously and statistically analysed. All speech samples are pre-generated - you will not be recorded. This research is purely academic with no commercial purpose.
### How will your privacy be protected?
You will be assigned an anonymous ID. No personal information will be published or disclosed. All data will be securely stored per TU Dublin's data protection policies.
### Research Funding
This research is funded by Research Ireland under D-REAL (https://d-real.ie/) and ADAPT (https://www.adaptcentre.ie/).
### Benefits of participation
You will contribute to advancing inclusive AI technologies for speech therapy and assistive communication.
### Risks
There are minimal risks. Some speech samples may be hard to understand. You can withdraw immediately if you feel any discomfort.
### Contact
For questions or concerns: D23126641@mytudublin.ie
### Ethics Review
This project has been reviewed and approved by the Research Ethics Committee at TU Dublin.
---
## Consent Form
**Please confirm the following statements:**
""")
# Prolific ID input (optional)
gr.Markdown("**Prolific ID (Optional):**")
prolific_id_input = gr.Textbox(
label="Prolific ID",
placeholder="Enter your Prolific ID here (if applicable)",
info="If you came from Prolific, please enter your ID to help us verify your participation. This is optional.",
interactive=True
)
gr.Markdown("---")
# Consent checkboxes
consent_age = gr.Checkbox(label="I am over 18 years old", value=False)
consent_native = gr.Checkbox(label="I am a native English speaker", value=False)
consent_impaired = gr.Checkbox(label="I understand I will listen to a variety of impaired speech samples", value=False)
consent_hearing = gr.Checkbox(label="I do not have any known hearing impairments", value=False)
consent_english = gr.Checkbox(label="I understand the experiment will be conducted in English", value=False)
consent_anonymous = gr.Checkbox(label="I understand that all data I submit will be anonymous", value=False)
consent_info = gr.Checkbox(label="I have read and understood the participant information sheet", value=False)
consent_analysis = gr.Checkbox(label="I consent to my responses being used for analysis", value=False)
consent_withdraw = gr.Checkbox(label="I understand that I can withdraw or request deletion of my data any time before project completion", value=False)
consent_participate = gr.Checkbox(label="I consent to take part in this research study", value=False)
consent_stop = gr.Checkbox(label="I understand that I can stop participating in this research at any time", value=False)
# Status message for consent
consent_status = gr.Markdown("", visible=False)
# Consent button
consent_button = gr.Button("I Consent to Participate", variant="primary", size="lg")
# Main Experiment Screen (initially hidden)
with gr.Column(visible=False) as experiment_screen:
gr.Markdown("""
# Pathological Speech Evaluation Experiment
Thank you for participating! You will now evaluate speech samples.
## Instructions:
For each audio sample:
1. **Listen** to the audio sample carefully
2. **Select a naturalness rating** using the radio buttons (1=Bad, 2=Poor, 3=Fair, 4=Good, 5=Excellent)
3. **The transcript will automatically appear** when you select your naturalness rating
4. **Select an intelligibility rating** after seeing the transcript
5. **Click "Submit Rating"** to save both ratings and move to the next sample
## Rating Scale:
- **1**: Bad
- **2**: Poor
- **3**: Fair
- **4**: Good
- **5**: Excellent
""")
# State variables - pass function REFERENCES (not calls) so each session gets fresh values
current_participant_id = gr.State(value=generate_participant_id)
current_prolific_id = gr.State(value="")
# Per-session state: each user gets their own index and shuffled stimulus set
session_stimulus_index = gr.State(value=0)
session_stimulus_set = gr.State(value=create_session_stimulus_set)
with gr.Row():
with gr.Column():
participant_id_display = gr.Textbox(
label="Participant ID",
interactive=False
)
# Progress indicator
progress_text = gr.Textbox(
label="Progress",
interactive=False,
value="Progress: 0/0 samples"
)
# Current evaluation step
evaluation_step_display = gr.Textbox(
label="Instructions",
interactive=False,
value=""
)
# Hidden fields for tracking
sample_id = gr.Textbox(label="Sample ID", visible=False)
sample_type = gr.Textbox(label="Sample Type", visible=False)
evaluation_stage = gr.Textbox(label="Evaluation Stage", visible=False)
original_speaker = gr.Textbox(label="Original Speaker", visible=False)
synthetic_speaker = gr.Textbox(label="Synthetic Speaker", visible=False)
stored_transcription = gr.Textbox(label="Stored Transcription", visible=False)
# Audio player
audio_player = gr.Audio(
label="Speech Sample",
type="filepath",
format="wav",
autoplay=False
)
# Naturalness rating (5-point scale)
naturalness_rating = gr.Radio(
choices=["1", "2", "3", "4", "5"],
label="Naturalness Rating",
info="How natural (pleasantly human-like) was the sound of this audio sample?",
value=None
)
# Intelligibility rating (5-point scale)
intelligibility_rating = gr.Radio(
choices=["1", "2", "3", "4", "5"],
label="Intelligibility Rating",
info="How easy was it to understand the speech?",
value=None,
visible=False
)
# Transcription (conditionally visible) - moved below ratings
transcription_display = gr.Textbox(
label="Transcription (What should be said)",
visible=False,
interactive=False
)
# Status feedback
status = gr.Textbox(
label="Status",
interactive=False,
lines=4
)
submit_btn = gr.Button("Submit Rating", variant="primary")
def load_sample(participant_id, stim_index, stim_set):
"""Load current sample from parquet data (per-session state)"""
global temp_files
# Prolific completion URL
PROLIFIC_COMPLETION_URL = "https://app.prolific.com/submissions/complete?cc=CXOYRH0O"
if stim_index >= len(stim_set):
# Experiment complete - put Prolific URL in Instructions field which is visible
completion_instructions = f"""🎉 Experiment Complete!
Click here to complete: {PROLIFIC_COMPLETION_URL}
Completion Code: CXOYRH0O"""
# Log experiment completion event
log_session_event(participant_id, '', 'experiment_complete', {
'total_samples': len(stim_set),
'samples_completed': stim_index
})
return [
None, # audio_player
"Experiment Complete", # sample_id
"Complete", # sample_type
"", # stored_transcription
"", # transcription_display
gr.update(visible=False), # transcription_display visibility
None, # naturalness_rating (reset)
gr.update(visible=False), # naturalness_rating visibility - HIDE
None, # intelligibility_rating (reset)
gr.update(visible=False), # intelligibility_rating visibility
f"Copy URL: {PROLIFIC_COMPLETION_URL}", # status
"", # original_speaker
"", # synthetic_speaker
participant_id, # participant_id_display
f"✅ Complete: {len(stim_set)}/{len(stim_set)} samples", # progress_text
completion_instructions, # evaluation_step_display - show Prolific URL here
gr.update(interactive=False, value="Experiment Complete"), # submit_btn - disable
stim_index, # session_stimulus_index (unchanged)
stim_set # session_stimulus_set (unchanged)
]
current_stimulus = stim_set[stim_index]
# Calculate progress
progress_info = f"Progress: {stim_index + 1}/{len(stim_set)} samples"
step_info = "Step 1: Listen to the audio and select a naturalness rating. Step 2: The transcript will appear - then rate intelligibility."
# Load audio from parquet data
speaker_id = current_stimulus['speaker_id']
method = current_stimulus['method']
sample_number = current_stimulus['sample_number']
audio_array, sample_rate = get_audio_from_parquet(speaker_id, method, sample_number)
if audio_array is not None:
# Apply noise reduction
try:
reduced_audio = nr.reduce_noise(y=audio_array, sr=sample_rate)
except Exception as e:
print(f"Noise reduction failed: {e}")
reduced_audio = audio_array
# Save to temp file for Gradio
temp_path = os.path.join(TEMP_DIR, f"sample_{current_stimulus['sample_id']}_{uuid.uuid4()}.wav")
sf.write(temp_path, reduced_audio, sample_rate)
temp_files.append(temp_path)
preprocessed_audio = temp_path
else:
# Fallback: create silent audio
print(f"Warning: Could not load audio for {speaker_id}/{method}/{sample_number}")
temp_path = os.path.join(TEMP_DIR, f"silent_{uuid.uuid4()}.wav")
sr = 16000
silent_audio = np.zeros(int(sr * 1.5))
sf.write(temp_path, silent_audio, sr)
temp_files.append(temp_path)
preprocessed_audio = temp_path
return [
preprocessed_audio, # audio_player
current_stimulus['sample_id'], # sample_id
current_stimulus['sample_type'], # sample_type
current_stimulus['transcription'], # stored_transcription
"", # transcription_display (hidden initially)
gr.update(visible=False), # transcription_display visibility
None, # naturalness_rating (reset)
gr.update(visible=True), # naturalness_rating visibility
None, # intelligibility_rating (reset)
gr.update(visible=False), # intelligibility_rating visibility
"", # status (clear)
current_stimulus['original_speaker'], # original_speaker
current_stimulus['synthetic_speaker'], # synthetic_speaker
participant_id, # participant_id_display
progress_info, # progress_text
step_info, # evaluation_step_display
gr.update(interactive=True, value="Submit Rating"), # submit_btn - enable
stim_index, # session_stimulus_index (unchanged)
stim_set # session_stimulus_set (unchanged)
]
def on_naturalness_selected(naturalness_rating, stored_transcription):
"""Show transcript and intelligibility rating when naturalness is selected"""
if naturalness_rating is not None:
return [
stored_transcription, # transcription_display
gr.update(visible=True), # transcription_display visibility
gr.update(visible=True), # intelligibility_rating visibility
"Now rate the intelligibility after seeing the transcript." # status
]
else:
return [
"", # transcription_display
gr.update(visible=False), # transcription_display visibility
gr.update(visible=False), # intelligibility_rating visibility
"" # status
]
def submit_rating(participant_id, prolific_id, sample_id, sample_type, stored_transcription,
naturalness_rating, intelligibility_rating, original_speaker, synthetic_speaker,
stim_index, stim_set):
"""Handle rating submission and move to next sample (per-session state)"""
# Check if both ratings are provided
if naturalness_rating is None or intelligibility_rating is None:
return [
gr.skip(), # audio_player
gr.skip(), # sample_id
gr.skip(), # sample_type
gr.skip(), # stored_transcription
gr.skip(), # transcription_display
gr.skip(), # transcription_display visibility
gr.skip(), # naturalness_rating
gr.skip(), # naturalness_rating visibility
gr.skip(), # intelligibility_rating
gr.skip(), # intelligibility_rating visibility
"Please provide both naturalness and intelligibility ratings before submitting.", # status
gr.skip(), # original_speaker
gr.skip(), # synthetic_speaker
gr.skip(), # participant_id_display
gr.skip(), # progress_text
gr.skip(), # evaluation_step_display
gr.skip(), # submit_btn
stim_index, # session_stimulus_index (unchanged)
stim_set # session_stimulus_set (unchanged)
]
# Save both ratings in a single row
save_rating(
participant_id, prolific_id, sample_id, sample_type,
int(naturalness_rating), int(intelligibility_rating),
stored_transcription, original_speaker, synthetic_speaker
)
# Move to next sample (per-session increment)
new_index = stim_index + 1
next_outputs = load_sample(participant_id, new_index, stim_set)
next_outputs[10] = "Ratings saved successfully!" # Update status with feedback
return next_outputs
def handle_consent(c_age, c_native, c_impaired, c_hearing, c_english,
c_anonymous, c_info, c_analysis, c_withdraw,
c_participate, c_stop, participant_id, prolific_id):
"""Handle consent form submission"""
# Prolific ID is optional, just clean it if provided
clean_prolific_id = prolific_id.strip() if prolific_id else ""
# Check if all checkboxes are checked
all_checked = all([c_age, c_native, c_impaired, c_hearing, c_english,
c_anonymous, c_info, c_analysis, c_withdraw,
c_participate, c_stop])
if all_checked:
# Save consent record with prolific_id (optional)
save_consent(participant_id, clean_prolific_id)
# Consent given - show experiment screen and hide consent screen
return [
gr.update(visible=False), # Hide consent screen
gr.update(visible=True), # Show experiment screen
gr.update(visible=False), # Hide consent status
True, # Update consent_given state
clean_prolific_id # Update prolific_id state
]
else:
# Show error message
return [
gr.update(visible=True), # Keep consent screen visible
gr.update(visible=False), # Keep experiment screen hidden
gr.update(value="⚠️ **Please check all boxes to confirm your consent before proceeding.**",
visible=True), # Show error message
False, # Keep consent_given as False
prolific_id # Keep current prolific_id
]
# Event handlers
# Handle consent button click
consent_button.click(
handle_consent,
inputs=[consent_age, consent_native, consent_impaired, consent_hearing,
consent_english, consent_anonymous, consent_info, consent_analysis,
consent_withdraw, consent_participate, consent_stop, current_participant_id, prolific_id_input],
outputs=[consent_screen, experiment_screen, consent_status, consent_given, current_prolific_id]
)
# Show transcript when naturalness rating is selected
naturalness_rating.change(
on_naturalness_selected,
inputs=[naturalness_rating, stored_transcription],
outputs=[transcription_display, transcription_display, intelligibility_rating, status]
)
# Submit both ratings
submit_btn.click(
submit_rating,
inputs=[
current_participant_id, current_prolific_id, sample_id, sample_type, stored_transcription,
naturalness_rating, intelligibility_rating, original_speaker, synthetic_speaker,
session_stimulus_index, session_stimulus_set
],
outputs=[
audio_player, sample_id, sample_type, stored_transcription,
transcription_display, transcription_display, naturalness_rating,
naturalness_rating, intelligibility_rating, intelligibility_rating, status,
original_speaker, synthetic_speaker, participant_id_display,
progress_text, evaluation_step_display, submit_btn,
session_stimulus_index, session_stimulus_set
]
)
# Load first sample only after consent is given
def check_consent_and_load(consent_status, participant_id, stim_index, stim_set):
"""Only load sample if consent has been given (per-session state)"""
if consent_status:
return load_sample(participant_id, stim_index, stim_set)
else:
# Return empty/default values if no consent
return [
None, # audio_player
"", # sample_id
"", # sample_type
"", # stored_transcription
"", # transcription_display
gr.update(visible=False), # transcription_display visibility
None, # naturalness_rating
gr.update(visible=True), # naturalness_rating visibility
None, # intelligibility_rating
gr.update(visible=False), # intelligibility_rating visibility
"", # status
"", # original_speaker
"", # synthetic_speaker
"", # participant_id_display
"", # progress_text
"", # evaluation_step_display
gr.update(interactive=True, value="Submit Rating"), # submit_btn
stim_index, # session_stimulus_index (unchanged)
stim_set # session_stimulus_set (unchanged)
]
# When consent is given, load the first sample
consent_given.change(
fn=check_consent_and_load,
inputs=[consent_given, current_participant_id, session_stimulus_index, session_stimulus_set],
outputs=[
audio_player, sample_id, sample_type, stored_transcription,
transcription_display, transcription_display, naturalness_rating,
naturalness_rating, intelligibility_rating, intelligibility_rating, status,
original_speaker, synthetic_speaker, participant_id_display,
progress_text, evaluation_step_display, submit_btn,
session_stimulus_index, session_stimulus_set
]
)
return demo
# Create the interface
if __name__ == "__main__":
demo = create_experiment_interface()
demo.launch()