#!/usr/bin/env python3 """ Audio Dataset Explorer for TTS - Full Version (English) Explore audio datasets, analyze speakers, listen to samples """ # Monkey-patch gradio_client bug: 'const' in bool TypeError import gradio_client.utils as _gc_utils _orig_json_schema = _gc_utils._json_schema_to_python_type def _patched_json_schema(schema, defs=None): if not isinstance(schema, dict): return "Any" return _orig_json_schema(schema, defs) _gc_utils._json_schema_to_python_type = _patched_json_schema import gradio as gr from datasets import load_dataset, Audio from collections import defaultdict import pandas as pd import plotly.express as px import plotly.graph_objects as go from huggingface_hub import hf_hub_download import soundfile as sf import tempfile import os import numpy as np import io # Cache for datasets dataset_cache = {} def detect_split(dataset_name, config_name): """Auto-detect the best split to use""" from datasets import get_dataset_split_names try: splits = get_dataset_split_names(dataset_name, config_name) for preferred in ['train', 'asr_only', 'ast']: if preferred in splits: return preferred return splits[0] if splits else 'train' except Exception: return 'train' def load_dataset_stats(dataset_name, config_name, split_name=None, max_samples=5000): """Load dataset and compute speaker statistics""" if not split_name: split_name = detect_split(dataset_name, config_name) cache_key = f"{dataset_name}_{config_name}_{split_name}_{max_samples}" if cache_key in dataset_cache: return dataset_cache[cache_key] try: # Load dataset without audio (fast) ds = load_dataset( dataset_name, config_name, split=split_name, streaming=True ) audio_col = next((c for c in ('audio', 'flac', 'mp3') if c in ds.features), None) if audio_col: ds = ds.cast_column(audio_col, Audio(decode=False)) # Collect samples stats = defaultdict(lambda: { 'count': 0, 'total_duration': 0.0, 'total_words': 0, 'durations': [], 'texts': [] }) for i, sample in enumerate(ds): if max_samples and i >= max_samples: break # WebDataset format (e.g. sarulab-speech/mls_sidon): metadata in 'metadata.json', audio in 'flac' meta = sample.get('metadata.json') or {} audio_raw = sample.get('audio') or sample.get('flac') or sample.get('mp3') or {} speaker_id = str( meta.get('speaker_id') or sample.get('speaker_id') or sample.get('original_audio_id') or 'unknown' ) # Auto-detect duration field duration = ( meta.get('audio_duration') or meta.get('duration') or sample.get('duration') or sample.get('audio_duration') or ((sample.get('end_time', 0) - sample.get('begin_time', 0)) if 'end_time' in sample else 0) or 0.0 ) # Fallback: compute from audio bytes (for datasets without duration field) if not duration: audio_bytes = audio_raw.get('bytes') if isinstance(audio_raw, dict) else None if audio_bytes: try: with sf.SoundFile(io.BytesIO(audio_bytes)) as f: duration = len(f) / f.samplerate except Exception: pass # Auto-detect text field text = ( meta.get('transcript') or sample.get('text') or sample.get('transcript') or sample.get('sentence') or '' ) # Auto-detect word count num_words = sample.get('num_words') or len(text.split()) if text else 0 stats[speaker_id]['count'] += 1 stats[speaker_id]['total_duration'] += duration stats[speaker_id]['total_words'] += num_words stats[speaker_id]['durations'].append(duration) # Store sample texts + audio bytes (up to 5 per speaker) if len(stats[speaker_id]['texts']) < 5: stats[speaker_id]['texts'].append({ 'text': text[:150], 'duration': duration, 'audio_bytes': audio_raw.get('bytes') if isinstance(audio_raw, dict) else None, 'audio_path': audio_raw.get('path', '') if isinstance(audio_raw, dict) else '' }) # Create DataFrame rows = [] for speaker_id, data in stats.items(): rows.append({ 'Speaker ID': speaker_id, 'Samples': data['count'], 'Time (h)': round(data['total_duration'] / 3600, 2), 'Words': data['total_words'], 'Avg Duration (s)': round(data['total_duration'] / data['count'], 2) if data['count'] > 0 else 0, 'Avg Words': round(data['total_words'] / data['count'], 1) if data['count'] > 0 else 0, }) df = pd.DataFrame(rows).sort_values('Samples', ascending=False) result = { 'df': df, 'stats': dict(stats), 'dataset_name': dataset_name, 'config_name': config_name } dataset_cache[cache_key] = result return result except Exception as e: return {'df': pd.DataFrame(), 'stats': {}, 'error': str(e)} def create_overview(dataset_name, config_name, split_name, max_samples): """Create overview with statistics and charts""" result = load_dataset_stats(dataset_name, config_name, split_name or None, int(max_samples)) if 'error' in result: return None, None, None, f"❌ Error: {result['error']}", "", [] df = result['df'] stats = result['stats'] if df.empty: return None, None, None, "❌ No data loaded", "", [] # Chart 1: Sample distribution fig_samples = px.bar( df, x='Speaker ID', y='Samples', title=f'Sample Distribution by Speaker ({len(df)} speakers, {int(max_samples) or "all"} samples analyzed)', labels={'Samples': 'Number of Samples'} ) # Chart 2: Duration distribution fig_duration = px.bar( df, x='Speaker ID', y='Time (h)', title='Total Recording Time by Speaker', labels={'Time (h)': 'Time (hours)'} ) # Speaker list for dropdown speaker_list = ["Select a speaker..."] + df['Speaker ID'].tolist() total_h = df['Time (h)'].sum() total_samples = df['Samples'].sum() total_words = df['Words'].sum() summary = ( f"| Łącznie próbek | Łącznie czasu | Łącznie słów | Lektorów |\n" f"|---|---|---|---|\n" f"| **{total_samples:,}** | **{total_h:.1f} h** ({total_h*60:.0f} min) | **{total_words:,}** | **{len(df)}** |" ) status = f"✅ Loaded {int(max_samples) or 'all'} samples, found {len(df)} speakers" return df, fig_samples, fig_duration, status, summary, speaker_list def decode_audio_bytes(audio_bytes): """Decode raw audio bytes to a temp wav file path for gr.Audio""" if not audio_bytes: return None try: audio_array, sample_rate = sf.read(io.BytesIO(audio_bytes)) tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) sf.write(tmp.name, audio_array, sample_rate) return tmp.name except Exception: return None def analyze_speaker(dataset_name, config_name, split_name, max_samples, speaker_id): """Analyze selected speaker with details and audio samples""" empty_audios = [None] * 5 if not speaker_id or speaker_id == "Select a speaker...": return None, None, "Please select a speaker first", *empty_audios result = load_dataset_stats(dataset_name, config_name, split_name or None, int(max_samples)) if 'error' in result: return None, None, f"Error: {result['error']}", *empty_audios stats = result['stats'] speaker_data = stats.get(speaker_id) if not speaker_data: return None, None, f"Speaker {speaker_id} not found", *empty_audios # Statistics details = f""" ## Speaker {speaker_id} Statistics - **Total Samples**: {speaker_data['count']} - **Total Duration**: {speaker_data['total_duration']/3600:.2f} hours ({speaker_data['total_duration']/60:.1f} minutes) - **Total Words**: {speaker_data['total_words']} - **Average Sample Length**: {speaker_data['total_duration']/speaker_data['count']:.2f} seconds - **Average Words per Sample**: {speaker_data['total_words']/speaker_data['count']:.1f} """ # Duration histogram fig_hist = go.Figure() fig_hist.add_trace(go.Histogram( x=speaker_data['durations'], nbinsx=30, name='Duration' )) fig_hist.update_layout( title=f'Sample Duration Distribution - Speaker {speaker_id}', xaxis_title='Duration (seconds)', yaxis_title='Number of Samples' ) # Sample texts sample_texts_parts = [] audio_outputs = [] for i, sample in enumerate(speaker_data['texts'][:5], 1): sample_texts_parts.append(f"**Sample {i}** ({sample['duration']:.1f}s):\n{sample['text']}...") audio_outputs.append(decode_audio_bytes(sample.get('audio_bytes'))) # Pad to 5 outputs while len(audio_outputs) < 5: audio_outputs.append(None) return details, fig_hist, "\n\n".join(sample_texts_parts), *audio_outputs def generate_instructions(dataset_name, config_name, speaker_id): """Generate download instructions""" if not speaker_id or speaker_id == "Select a speaker...": return "Please select a speaker first" return f""" ## 📥 How to Download & Create Fork for Speaker {speaker_id} ### 1. Download Full Dataset ```bash hf download {dataset_name} --include '{config_name}/*' --local-dir ./data/cml-tts-full ``` ### 2. Filter to Selected Speaker (Python) ```python from datasets import load_dataset # Load full dataset dataset = load_dataset("{dataset_name}", "{config_name}", split="train") # Filter to selected speaker speaker_dataset = dataset.filter(lambda x: x['speaker_id'] == {speaker_id}) print(f"Filtered: {{len(speaker_dataset)}} samples") # Save locally speaker_dataset.save_to_disk("./speaker_{speaker_id}_dataset") # OR: Push to HuggingFace Hub as new dataset speaker_dataset.push_to_hub( "your-username/cml-tts-{config_name}-speaker-{speaker_id}", private=False # or True for private ) ``` ### 3. Add Custom Columns (Optional) ```python def add_custom_columns(example): example['emotion'] = 'neutral' # placeholder example['quality_score'] = 1.0 # placeholder example['use_for_training'] = True return example speaker_dataset = speaker_dataset.map(add_custom_columns) speaker_dataset.push_to_hub("your-username/cml-tts-{config_name}-speaker-{speaker_id}") ``` ### 4. Create Dataset Card Add README.md: ```markdown # CML-TTS {config_name.title()} - Speaker {speaker_id} Filtered subset of {dataset_name} containing only Speaker {speaker_id}. ## Usage ```python from datasets import load_dataset ds = load_dataset("your-username/cml-tts-{config_name}-speaker-{speaker_id}") ``` """ # === Gradio Interface === with gr.Blocks(title="Audio Dataset Explorer for TTS", theme=gr.themes.Soft()) as demo: gr.Markdown(""" # 🎙️ Audio Dataset Explorer for TTS Explore audio datasets, analyze speakers, and prepare training data for TTS models. """) # Configuration with gr.Row(): dataset_input = gr.Dropdown( label="Dataset Name", choices=["ylacombe/cml-tts", "facebook/multilingual_librispeech", "facebook/voxpopuli", "sarulab-speech/mls_sidon", "datadriven-company/WolneLektury-TTS-Polish", "espnet/yodas-granary"], value="ylacombe/cml-tts", allow_custom_value=True ) config_input = gr.Dropdown( label="Config Name", choices=["polish", "pl", "Polish", "default"], value="polish", allow_custom_value=True, info="Config/language subset. Type custom value if not listed." ) split_input = gr.Textbox( label="Split (auto if empty)", value="", placeholder="auto-detect", info="Dataset split to load. Leave empty to auto-detect (prefers 'train'). Common values: train, validation, test, asr_only." ) samples_slider = gr.Number( value=5000, minimum=0, label="Max Samples (less = faster, 0 = all)", precision=0, ) load_btn = gr.Button("🔄 Load Dataset", variant="primary", size="lg") gr.Markdown("⏱️ **Note**: First load takes ~30-60s. Subsequent loads are cached.") # Overview Tab with gr.Tab("📊 Overview - All Speakers"): gr.Markdown("### Statistics for all speakers in the dataset") status_text = gr.Textbox(label="Status", interactive=False) summary_text = gr.Markdown() overview_table = gr.Dataframe(label="Speaker Statistics") with gr.Row(): chart_samples = gr.Plot(label="Sample Distribution") chart_duration = gr.Plot(label="Duration Distribution") # Speaker Details Tab with gr.Tab("🎯 Speaker Details"): speaker_dropdown = gr.Dropdown( label="Select Speaker", choices=["Select a speaker..."], value="Select a speaker..." ) analyze_btn = gr.Button("🔍 Analyze Speaker", variant="secondary") speaker_details = gr.Markdown() speaker_hist = gr.Plot(label="Duration Distribution") gr.Markdown("### Audio Samples & Texts") sample_texts_display = gr.Markdown() audio_players = [] for i in range(5): audio_players.append(gr.Audio(label=f"Sample {i+1}", visible=True)) # Download Tab with gr.Tab("📥 Download & Fork"): gr.Markdown("### Instructions for creating your own dataset") download_instructions = gr.Markdown() generate_btn = gr.Button("📋 Generate Instructions", variant="secondary") # Callbacks def on_load(dataset_name, config_name, split_name, max_samples): df, fig1, fig2, status, summary, speakers = create_overview(dataset_name, config_name, split_name, max_samples) return ( status, summary or "", df if df is not None else gr.Dataframe(), fig1, fig2, gr.Dropdown(choices=speakers), ) def on_analyze(dataset_name, config_name, split_name, max_samples, speaker_id): results = analyze_speaker(dataset_name, config_name, split_name, max_samples, speaker_id) # results: details, fig_hist, texts, audio1..audio5 return results def on_generate(dataset_name, config_name, speaker_id): return generate_instructions(dataset_name, config_name, speaker_id) load_btn.click( fn=on_load, inputs=[dataset_input, config_input, split_input, samples_slider], outputs=[status_text, summary_text, overview_table, chart_samples, chart_duration, speaker_dropdown] ) analyze_btn.click( fn=on_analyze, inputs=[dataset_input, config_input, split_input, samples_slider, speaker_dropdown], outputs=[speaker_details, speaker_hist, sample_texts_display] + audio_players ) generate_btn.click( fn=on_generate, inputs=[dataset_input, config_input, speaker_dropdown], outputs=[download_instructions] ) gr.Markdown(""" --- ### 💡 Tips - First load takes ~30-60s (parsing metadata) - Subsequent loads are faster (cached) - Reduce "Max Samples" for faster overview - **🔊 Click "Listen" links** in Speaker Details to play audio samples ### 🎵 Audio Playback - Audio links open files directly from HuggingFace Hub - Works in all browsers - click to play in new tab - Up to 5 sample audio clips per speaker ### 🔧 Tested Datasets - `ylacombe/cml-tts` - configs: dutch, french, german, italian, polish, portuguese, spanish - `facebook/voxpopuli` - configs: pl, en, de, fr, es, ... - `facebook/multilingual_librispeech` - configs: polish, german, french, spanish, italian, portuguese, dutch (audiobooks) - `sarulab-speech/mls_sidon` - configs: polish, german, french, spanish, italian, portuguese, dutch (audiobooks, WebDataset format) - `datadriven-company/WolneLektury-TTS-Polish` - config: default (polskie audiobooki, 310GB, tylko streaming) ### 📚 Resources - [HuggingFace Datasets Docs](https://huggingface.co/docs/datasets) - [TTS Training Guide](https://huggingface.co/docs/transformers/tasks/text-to-speech) """) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860)