Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Audio Dataset Explorer for TTS - Full Version (English) | |
| Explore audio datasets, analyze speakers, listen to samples | |
| """ | |
| # Monkey-patch gradio_client bug: 'const' in bool TypeError | |
| import gradio_client.utils as _gc_utils | |
| _orig_json_schema = _gc_utils._json_schema_to_python_type | |
| def _patched_json_schema(schema, defs=None): | |
| if not isinstance(schema, dict): | |
| return "Any" | |
| return _orig_json_schema(schema, defs) | |
| _gc_utils._json_schema_to_python_type = _patched_json_schema | |
| import gradio as gr | |
| from datasets import load_dataset, Audio | |
| from collections import defaultdict | |
| import pandas as pd | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| from huggingface_hub import hf_hub_download | |
| import soundfile as sf | |
| import tempfile | |
| import os | |
| import numpy as np | |
| import io | |
| # Cache for datasets | |
| dataset_cache = {} | |
| def detect_split(dataset_name, config_name): | |
| """Auto-detect the best split to use""" | |
| from datasets import get_dataset_split_names | |
| try: | |
| splits = get_dataset_split_names(dataset_name, config_name) | |
| for preferred in ['train', 'asr_only', 'ast']: | |
| if preferred in splits: | |
| return preferred | |
| return splits[0] if splits else 'train' | |
| except Exception: | |
| return 'train' | |
| def load_dataset_stats(dataset_name, config_name, split_name=None, max_samples=5000): | |
| """Load dataset and compute speaker statistics""" | |
| if not split_name: | |
| split_name = detect_split(dataset_name, config_name) | |
| cache_key = f"{dataset_name}_{config_name}_{split_name}_{max_samples}" | |
| if cache_key in dataset_cache: | |
| return dataset_cache[cache_key] | |
| try: | |
| # Load dataset without audio (fast) | |
| ds = load_dataset( | |
| dataset_name, | |
| config_name, | |
| split=split_name, | |
| streaming=True | |
| ) | |
| audio_col = next((c for c in ('audio', 'flac', 'mp3') if c in ds.features), None) | |
| if audio_col: | |
| ds = ds.cast_column(audio_col, Audio(decode=False)) | |
| # Collect samples | |
| stats = defaultdict(lambda: { | |
| 'count': 0, | |
| 'total_duration': 0.0, | |
| 'total_words': 0, | |
| 'durations': [], | |
| 'texts': [] | |
| }) | |
| for i, sample in enumerate(ds): | |
| if max_samples and i >= max_samples: | |
| break | |
| # WebDataset format (e.g. sarulab-speech/mls_sidon): metadata in 'metadata.json', audio in 'flac' | |
| meta = sample.get('metadata.json') or {} | |
| audio_raw = sample.get('audio') or sample.get('flac') or sample.get('mp3') or {} | |
| speaker_id = str( | |
| meta.get('speaker_id') | |
| or sample.get('speaker_id') | |
| or sample.get('original_audio_id') | |
| or 'unknown' | |
| ) | |
| # Auto-detect duration field | |
| duration = ( | |
| meta.get('audio_duration') | |
| or meta.get('duration') | |
| or sample.get('duration') | |
| or sample.get('audio_duration') | |
| or ((sample.get('end_time', 0) - sample.get('begin_time', 0)) if 'end_time' in sample else 0) | |
| or 0.0 | |
| ) | |
| # Fallback: compute from audio bytes (for datasets without duration field) | |
| if not duration: | |
| audio_bytes = audio_raw.get('bytes') if isinstance(audio_raw, dict) else None | |
| if audio_bytes: | |
| try: | |
| with sf.SoundFile(io.BytesIO(audio_bytes)) as f: | |
| duration = len(f) / f.samplerate | |
| except Exception: | |
| pass | |
| # Auto-detect text field | |
| text = ( | |
| meta.get('transcript') | |
| or sample.get('text') | |
| or sample.get('transcript') | |
| or sample.get('sentence') | |
| or '' | |
| ) | |
| # Auto-detect word count | |
| num_words = sample.get('num_words') or len(text.split()) if text else 0 | |
| stats[speaker_id]['count'] += 1 | |
| stats[speaker_id]['total_duration'] += duration | |
| stats[speaker_id]['total_words'] += num_words | |
| stats[speaker_id]['durations'].append(duration) | |
| # Store sample texts + audio bytes (up to 5 per speaker) | |
| if len(stats[speaker_id]['texts']) < 5: | |
| stats[speaker_id]['texts'].append({ | |
| 'text': text[:150], | |
| 'duration': duration, | |
| 'audio_bytes': audio_raw.get('bytes') if isinstance(audio_raw, dict) else None, | |
| 'audio_path': audio_raw.get('path', '') if isinstance(audio_raw, dict) else '' | |
| }) | |
| # Create DataFrame | |
| rows = [] | |
| for speaker_id, data in stats.items(): | |
| rows.append({ | |
| 'Speaker ID': speaker_id, | |
| 'Samples': data['count'], | |
| 'Time (h)': round(data['total_duration'] / 3600, 2), | |
| 'Words': data['total_words'], | |
| 'Avg Duration (s)': round(data['total_duration'] / data['count'], 2) if data['count'] > 0 else 0, | |
| 'Avg Words': round(data['total_words'] / data['count'], 1) if data['count'] > 0 else 0, | |
| }) | |
| df = pd.DataFrame(rows).sort_values('Samples', ascending=False) | |
| result = { | |
| 'df': df, | |
| 'stats': dict(stats), | |
| 'dataset_name': dataset_name, | |
| 'config_name': config_name | |
| } | |
| dataset_cache[cache_key] = result | |
| return result | |
| except Exception as e: | |
| return {'df': pd.DataFrame(), 'stats': {}, 'error': str(e)} | |
| def create_overview(dataset_name, config_name, split_name, max_samples): | |
| """Create overview with statistics and charts""" | |
| result = load_dataset_stats(dataset_name, config_name, split_name or None, int(max_samples)) | |
| if 'error' in result: | |
| return None, None, None, f"β Error: {result['error']}", "", [] | |
| df = result['df'] | |
| stats = result['stats'] | |
| if df.empty: | |
| return None, None, None, "β No data loaded", "", [] | |
| # Chart 1: Sample distribution | |
| fig_samples = px.bar( | |
| df, | |
| x='Speaker ID', | |
| y='Samples', | |
| title=f'Sample Distribution by Speaker ({len(df)} speakers, {int(max_samples) or "all"} samples analyzed)', | |
| labels={'Samples': 'Number of Samples'} | |
| ) | |
| # Chart 2: Duration distribution | |
| fig_duration = px.bar( | |
| df, | |
| x='Speaker ID', | |
| y='Time (h)', | |
| title='Total Recording Time by Speaker', | |
| labels={'Time (h)': 'Time (hours)'} | |
| ) | |
| # Speaker list for dropdown | |
| speaker_list = ["Select a speaker..."] + df['Speaker ID'].tolist() | |
| total_h = df['Time (h)'].sum() | |
| total_samples = df['Samples'].sum() | |
| total_words = df['Words'].sum() | |
| summary = ( | |
| f"| ΕΔ cznie prΓ³bek | ΕΔ cznie czasu | ΕΔ cznie sΕΓ³w | LektorΓ³w |\n" | |
| f"|---|---|---|---|\n" | |
| f"| **{total_samples:,}** | **{total_h:.1f} h** ({total_h*60:.0f} min) | **{total_words:,}** | **{len(df)}** |" | |
| ) | |
| status = f"β Loaded {int(max_samples) or 'all'} samples, found {len(df)} speakers" | |
| return df, fig_samples, fig_duration, status, summary, speaker_list | |
| def decode_audio_bytes(audio_bytes): | |
| """Decode raw audio bytes to a temp wav file path for gr.Audio""" | |
| if not audio_bytes: | |
| return None | |
| try: | |
| audio_array, sample_rate = sf.read(io.BytesIO(audio_bytes)) | |
| tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False) | |
| sf.write(tmp.name, audio_array, sample_rate) | |
| return tmp.name | |
| except Exception: | |
| return None | |
| def analyze_speaker(dataset_name, config_name, split_name, max_samples, speaker_id): | |
| """Analyze selected speaker with details and audio samples""" | |
| empty_audios = [None] * 5 | |
| if not speaker_id or speaker_id == "Select a speaker...": | |
| return None, None, "Please select a speaker first", *empty_audios | |
| result = load_dataset_stats(dataset_name, config_name, split_name or None, int(max_samples)) | |
| if 'error' in result: | |
| return None, None, f"Error: {result['error']}", *empty_audios | |
| stats = result['stats'] | |
| speaker_data = stats.get(speaker_id) | |
| if not speaker_data: | |
| return None, None, f"Speaker {speaker_id} not found", *empty_audios | |
| # Statistics | |
| details = f""" | |
| ## Speaker {speaker_id} Statistics | |
| - **Total Samples**: {speaker_data['count']} | |
| - **Total Duration**: {speaker_data['total_duration']/3600:.2f} hours ({speaker_data['total_duration']/60:.1f} minutes) | |
| - **Total Words**: {speaker_data['total_words']} | |
| - **Average Sample Length**: {speaker_data['total_duration']/speaker_data['count']:.2f} seconds | |
| - **Average Words per Sample**: {speaker_data['total_words']/speaker_data['count']:.1f} | |
| """ | |
| # Duration histogram | |
| fig_hist = go.Figure() | |
| fig_hist.add_trace(go.Histogram( | |
| x=speaker_data['durations'], | |
| nbinsx=30, | |
| name='Duration' | |
| )) | |
| fig_hist.update_layout( | |
| title=f'Sample Duration Distribution - Speaker {speaker_id}', | |
| xaxis_title='Duration (seconds)', | |
| yaxis_title='Number of Samples' | |
| ) | |
| # Sample texts | |
| sample_texts_parts = [] | |
| audio_outputs = [] | |
| for i, sample in enumerate(speaker_data['texts'][:5], 1): | |
| sample_texts_parts.append(f"**Sample {i}** ({sample['duration']:.1f}s):\n{sample['text']}...") | |
| audio_outputs.append(decode_audio_bytes(sample.get('audio_bytes'))) | |
| # Pad to 5 outputs | |
| while len(audio_outputs) < 5: | |
| audio_outputs.append(None) | |
| return details, fig_hist, "\n\n".join(sample_texts_parts), *audio_outputs | |
| def generate_instructions(dataset_name, config_name, speaker_id): | |
| """Generate download instructions""" | |
| if not speaker_id or speaker_id == "Select a speaker...": | |
| return "Please select a speaker first" | |
| return f""" | |
| ## π₯ How to Download & Create Fork for Speaker {speaker_id} | |
| ### 1. Download Full Dataset | |
| ```bash | |
| hf download {dataset_name} --include '{config_name}/*' --local-dir ./data/cml-tts-full | |
| ``` | |
| ### 2. Filter to Selected Speaker (Python) | |
| ```python | |
| from datasets import load_dataset | |
| # Load full dataset | |
| dataset = load_dataset("{dataset_name}", "{config_name}", split="train") | |
| # Filter to selected speaker | |
| speaker_dataset = dataset.filter(lambda x: x['speaker_id'] == {speaker_id}) | |
| print(f"Filtered: {{len(speaker_dataset)}} samples") | |
| # Save locally | |
| speaker_dataset.save_to_disk("./speaker_{speaker_id}_dataset") | |
| # OR: Push to HuggingFace Hub as new dataset | |
| speaker_dataset.push_to_hub( | |
| "your-username/cml-tts-{config_name}-speaker-{speaker_id}", | |
| private=False # or True for private | |
| ) | |
| ``` | |
| ### 3. Add Custom Columns (Optional) | |
| ```python | |
| def add_custom_columns(example): | |
| example['emotion'] = 'neutral' # placeholder | |
| example['quality_score'] = 1.0 # placeholder | |
| example['use_for_training'] = True | |
| return example | |
| speaker_dataset = speaker_dataset.map(add_custom_columns) | |
| speaker_dataset.push_to_hub("your-username/cml-tts-{config_name}-speaker-{speaker_id}") | |
| ``` | |
| ### 4. Create Dataset Card | |
| Add README.md: | |
| ```markdown | |
| # CML-TTS {config_name.title()} - Speaker {speaker_id} | |
| Filtered subset of {dataset_name} containing only Speaker {speaker_id}. | |
| ## Usage | |
| ```python | |
| from datasets import load_dataset | |
| ds = load_dataset("your-username/cml-tts-{config_name}-speaker-{speaker_id}") | |
| ``` | |
| """ | |
| # === Gradio Interface === | |
| with gr.Blocks(title="Audio Dataset Explorer for TTS", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(""" | |
| # ποΈ Audio Dataset Explorer for TTS | |
| Explore audio datasets, analyze speakers, and prepare training data for TTS models. | |
| """) | |
| # Configuration | |
| with gr.Row(): | |
| dataset_input = gr.Dropdown( | |
| label="Dataset Name", | |
| choices=["ylacombe/cml-tts", "facebook/multilingual_librispeech", "facebook/voxpopuli", "sarulab-speech/mls_sidon", "datadriven-company/WolneLektury-TTS-Polish", "espnet/yodas-granary"], | |
| value="ylacombe/cml-tts", | |
| allow_custom_value=True | |
| ) | |
| config_input = gr.Dropdown( | |
| label="Config Name", | |
| choices=["polish", "pl", "Polish", "default"], | |
| value="polish", | |
| allow_custom_value=True, | |
| info="Config/language subset. Type custom value if not listed." | |
| ) | |
| split_input = gr.Textbox( | |
| label="Split (auto if empty)", | |
| value="", | |
| placeholder="auto-detect", | |
| info="Dataset split to load. Leave empty to auto-detect (prefers 'train'). Common values: train, validation, test, asr_only." | |
| ) | |
| samples_slider = gr.Number( | |
| value=5000, | |
| minimum=0, | |
| label="Max Samples (less = faster, 0 = all)", | |
| precision=0, | |
| ) | |
| load_btn = gr.Button("π Load Dataset", variant="primary", size="lg") | |
| gr.Markdown("β±οΈ **Note**: First load takes ~30-60s. Subsequent loads are cached.") | |
| # Overview Tab | |
| with gr.Tab("π Overview - All Speakers"): | |
| gr.Markdown("### Statistics for all speakers in the dataset") | |
| status_text = gr.Textbox(label="Status", interactive=False) | |
| summary_text = gr.Markdown() | |
| overview_table = gr.Dataframe(label="Speaker Statistics") | |
| with gr.Row(): | |
| chart_samples = gr.Plot(label="Sample Distribution") | |
| chart_duration = gr.Plot(label="Duration Distribution") | |
| # Speaker Details Tab | |
| with gr.Tab("π― Speaker Details"): | |
| speaker_dropdown = gr.Dropdown( | |
| label="Select Speaker", | |
| choices=["Select a speaker..."], | |
| value="Select a speaker..." | |
| ) | |
| analyze_btn = gr.Button("π Analyze Speaker", variant="secondary") | |
| speaker_details = gr.Markdown() | |
| speaker_hist = gr.Plot(label="Duration Distribution") | |
| gr.Markdown("### Audio Samples & Texts") | |
| sample_texts_display = gr.Markdown() | |
| audio_players = [] | |
| for i in range(5): | |
| audio_players.append(gr.Audio(label=f"Sample {i+1}", visible=True)) | |
| # Download Tab | |
| with gr.Tab("π₯ Download & Fork"): | |
| gr.Markdown("### Instructions for creating your own dataset") | |
| download_instructions = gr.Markdown() | |
| generate_btn = gr.Button("π Generate Instructions", variant="secondary") | |
| # Callbacks | |
| def on_load(dataset_name, config_name, split_name, max_samples): | |
| df, fig1, fig2, status, summary, speakers = create_overview(dataset_name, config_name, split_name, max_samples) | |
| return ( | |
| status, | |
| summary or "", | |
| df if df is not None else gr.Dataframe(), | |
| fig1, | |
| fig2, | |
| gr.Dropdown(choices=speakers), | |
| ) | |
| def on_analyze(dataset_name, config_name, split_name, max_samples, speaker_id): | |
| results = analyze_speaker(dataset_name, config_name, split_name, max_samples, speaker_id) | |
| # results: details, fig_hist, texts, audio1..audio5 | |
| return results | |
| def on_generate(dataset_name, config_name, speaker_id): | |
| return generate_instructions(dataset_name, config_name, speaker_id) | |
| load_btn.click( | |
| fn=on_load, | |
| inputs=[dataset_input, config_input, split_input, samples_slider], | |
| outputs=[status_text, summary_text, overview_table, chart_samples, chart_duration, speaker_dropdown] | |
| ) | |
| analyze_btn.click( | |
| fn=on_analyze, | |
| inputs=[dataset_input, config_input, split_input, samples_slider, speaker_dropdown], | |
| outputs=[speaker_details, speaker_hist, sample_texts_display] + audio_players | |
| ) | |
| generate_btn.click( | |
| fn=on_generate, | |
| inputs=[dataset_input, config_input, speaker_dropdown], | |
| outputs=[download_instructions] | |
| ) | |
| gr.Markdown(""" | |
| --- | |
| ### π‘ Tips | |
| - First load takes ~30-60s (parsing metadata) | |
| - Subsequent loads are faster (cached) | |
| - Reduce "Max Samples" for faster overview | |
| - **π Click "Listen" links** in Speaker Details to play audio samples | |
| ### π΅ Audio Playback | |
| - Audio links open files directly from HuggingFace Hub | |
| - Works in all browsers - click to play in new tab | |
| - Up to 5 sample audio clips per speaker | |
| ### π§ Tested Datasets | |
| - `ylacombe/cml-tts` - configs: dutch, french, german, italian, polish, portuguese, spanish | |
| - `facebook/voxpopuli` - configs: pl, en, de, fr, es, ... | |
| - `facebook/multilingual_librispeech` - configs: polish, german, french, spanish, italian, portuguese, dutch (audiobooks) | |
| - `sarulab-speech/mls_sidon` - configs: polish, german, french, spanish, italian, portuguese, dutch (audiobooks, WebDataset format) | |
| - `datadriven-company/WolneLektury-TTS-Polish` - config: default (polskie audiobooki, 310GB, tylko streaming) | |
| ### π Resources | |
| - [HuggingFace Datasets Docs](https://huggingface.co/docs/datasets) | |
| - [TTS Training Guide](https://huggingface.co/docs/transformers/tasks/text-to-speech) | |
| """) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=7860) | |