kruzer
Add facebook/voxpopuli to dataset dropdown
77309b7
#!/usr/bin/env python3
"""
Audio Dataset Explorer for TTS - Full Version (English)
Explore audio datasets, analyze speakers, listen to samples
"""
# Monkey-patch gradio_client bug: 'const' in bool TypeError
import gradio_client.utils as _gc_utils
_orig_json_schema = _gc_utils._json_schema_to_python_type
def _patched_json_schema(schema, defs=None):
if not isinstance(schema, dict):
return "Any"
return _orig_json_schema(schema, defs)
_gc_utils._json_schema_to_python_type = _patched_json_schema
import gradio as gr
from datasets import load_dataset, Audio
from collections import defaultdict
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from huggingface_hub import hf_hub_download
import soundfile as sf
import tempfile
import os
import numpy as np
import io
# Cache for datasets
dataset_cache = {}
def detect_split(dataset_name, config_name):
"""Auto-detect the best split to use"""
from datasets import get_dataset_split_names
try:
splits = get_dataset_split_names(dataset_name, config_name)
for preferred in ['train', 'asr_only', 'ast']:
if preferred in splits:
return preferred
return splits[0] if splits else 'train'
except Exception:
return 'train'
def load_dataset_stats(dataset_name, config_name, split_name=None, max_samples=5000):
"""Load dataset and compute speaker statistics"""
if not split_name:
split_name = detect_split(dataset_name, config_name)
cache_key = f"{dataset_name}_{config_name}_{split_name}_{max_samples}"
if cache_key in dataset_cache:
return dataset_cache[cache_key]
try:
# Load dataset without audio (fast)
ds = load_dataset(
dataset_name,
config_name,
split=split_name,
streaming=True
)
audio_col = next((c for c in ('audio', 'flac', 'mp3') if c in ds.features), None)
if audio_col:
ds = ds.cast_column(audio_col, Audio(decode=False))
# Collect samples
stats = defaultdict(lambda: {
'count': 0,
'total_duration': 0.0,
'total_words': 0,
'durations': [],
'texts': []
})
for i, sample in enumerate(ds):
if max_samples and i >= max_samples:
break
# WebDataset format (e.g. sarulab-speech/mls_sidon): metadata in 'metadata.json', audio in 'flac'
meta = sample.get('metadata.json') or {}
audio_raw = sample.get('audio') or sample.get('flac') or sample.get('mp3') or {}
speaker_id = str(
meta.get('speaker_id')
or sample.get('speaker_id')
or sample.get('original_audio_id')
or 'unknown'
)
# Auto-detect duration field
duration = (
meta.get('audio_duration')
or meta.get('duration')
or sample.get('duration')
or sample.get('audio_duration')
or ((sample.get('end_time', 0) - sample.get('begin_time', 0)) if 'end_time' in sample else 0)
or 0.0
)
# Fallback: compute from audio bytes (for datasets without duration field)
if not duration:
audio_bytes = audio_raw.get('bytes') if isinstance(audio_raw, dict) else None
if audio_bytes:
try:
with sf.SoundFile(io.BytesIO(audio_bytes)) as f:
duration = len(f) / f.samplerate
except Exception:
pass
# Auto-detect text field
text = (
meta.get('transcript')
or sample.get('text')
or sample.get('transcript')
or sample.get('sentence')
or ''
)
# Auto-detect word count
num_words = sample.get('num_words') or len(text.split()) if text else 0
stats[speaker_id]['count'] += 1
stats[speaker_id]['total_duration'] += duration
stats[speaker_id]['total_words'] += num_words
stats[speaker_id]['durations'].append(duration)
# Store sample texts + audio bytes (up to 5 per speaker)
if len(stats[speaker_id]['texts']) < 5:
stats[speaker_id]['texts'].append({
'text': text[:150],
'duration': duration,
'audio_bytes': audio_raw.get('bytes') if isinstance(audio_raw, dict) else None,
'audio_path': audio_raw.get('path', '') if isinstance(audio_raw, dict) else ''
})
# Create DataFrame
rows = []
for speaker_id, data in stats.items():
rows.append({
'Speaker ID': speaker_id,
'Samples': data['count'],
'Time (h)': round(data['total_duration'] / 3600, 2),
'Words': data['total_words'],
'Avg Duration (s)': round(data['total_duration'] / data['count'], 2) if data['count'] > 0 else 0,
'Avg Words': round(data['total_words'] / data['count'], 1) if data['count'] > 0 else 0,
})
df = pd.DataFrame(rows).sort_values('Samples', ascending=False)
result = {
'df': df,
'stats': dict(stats),
'dataset_name': dataset_name,
'config_name': config_name
}
dataset_cache[cache_key] = result
return result
except Exception as e:
return {'df': pd.DataFrame(), 'stats': {}, 'error': str(e)}
def create_overview(dataset_name, config_name, split_name, max_samples):
"""Create overview with statistics and charts"""
result = load_dataset_stats(dataset_name, config_name, split_name or None, int(max_samples))
if 'error' in result:
return None, None, None, f"❌ Error: {result['error']}", "", []
df = result['df']
stats = result['stats']
if df.empty:
return None, None, None, "❌ No data loaded", "", []
# Chart 1: Sample distribution
fig_samples = px.bar(
df,
x='Speaker ID',
y='Samples',
title=f'Sample Distribution by Speaker ({len(df)} speakers, {int(max_samples) or "all"} samples analyzed)',
labels={'Samples': 'Number of Samples'}
)
# Chart 2: Duration distribution
fig_duration = px.bar(
df,
x='Speaker ID',
y='Time (h)',
title='Total Recording Time by Speaker',
labels={'Time (h)': 'Time (hours)'}
)
# Speaker list for dropdown
speaker_list = ["Select a speaker..."] + df['Speaker ID'].tolist()
total_h = df['Time (h)'].sum()
total_samples = df['Samples'].sum()
total_words = df['Words'].sum()
summary = (
f"| Łącznie prΓ³bek | Łącznie czasu | Łącznie sΕ‚Γ³w | LektorΓ³w |\n"
f"|---|---|---|---|\n"
f"| **{total_samples:,}** | **{total_h:.1f} h** ({total_h*60:.0f} min) | **{total_words:,}** | **{len(df)}** |"
)
status = f"βœ… Loaded {int(max_samples) or 'all'} samples, found {len(df)} speakers"
return df, fig_samples, fig_duration, status, summary, speaker_list
def decode_audio_bytes(audio_bytes):
"""Decode raw audio bytes to a temp wav file path for gr.Audio"""
if not audio_bytes:
return None
try:
audio_array, sample_rate = sf.read(io.BytesIO(audio_bytes))
tmp = tempfile.NamedTemporaryFile(suffix=".wav", delete=False)
sf.write(tmp.name, audio_array, sample_rate)
return tmp.name
except Exception:
return None
def analyze_speaker(dataset_name, config_name, split_name, max_samples, speaker_id):
"""Analyze selected speaker with details and audio samples"""
empty_audios = [None] * 5
if not speaker_id or speaker_id == "Select a speaker...":
return None, None, "Please select a speaker first", *empty_audios
result = load_dataset_stats(dataset_name, config_name, split_name or None, int(max_samples))
if 'error' in result:
return None, None, f"Error: {result['error']}", *empty_audios
stats = result['stats']
speaker_data = stats.get(speaker_id)
if not speaker_data:
return None, None, f"Speaker {speaker_id} not found", *empty_audios
# Statistics
details = f"""
## Speaker {speaker_id} Statistics
- **Total Samples**: {speaker_data['count']}
- **Total Duration**: {speaker_data['total_duration']/3600:.2f} hours ({speaker_data['total_duration']/60:.1f} minutes)
- **Total Words**: {speaker_data['total_words']}
- **Average Sample Length**: {speaker_data['total_duration']/speaker_data['count']:.2f} seconds
- **Average Words per Sample**: {speaker_data['total_words']/speaker_data['count']:.1f}
"""
# Duration histogram
fig_hist = go.Figure()
fig_hist.add_trace(go.Histogram(
x=speaker_data['durations'],
nbinsx=30,
name='Duration'
))
fig_hist.update_layout(
title=f'Sample Duration Distribution - Speaker {speaker_id}',
xaxis_title='Duration (seconds)',
yaxis_title='Number of Samples'
)
# Sample texts
sample_texts_parts = []
audio_outputs = []
for i, sample in enumerate(speaker_data['texts'][:5], 1):
sample_texts_parts.append(f"**Sample {i}** ({sample['duration']:.1f}s):\n{sample['text']}...")
audio_outputs.append(decode_audio_bytes(sample.get('audio_bytes')))
# Pad to 5 outputs
while len(audio_outputs) < 5:
audio_outputs.append(None)
return details, fig_hist, "\n\n".join(sample_texts_parts), *audio_outputs
def generate_instructions(dataset_name, config_name, speaker_id):
"""Generate download instructions"""
if not speaker_id or speaker_id == "Select a speaker...":
return "Please select a speaker first"
return f"""
## πŸ“₯ How to Download & Create Fork for Speaker {speaker_id}
### 1. Download Full Dataset
```bash
hf download {dataset_name} --include '{config_name}/*' --local-dir ./data/cml-tts-full
```
### 2. Filter to Selected Speaker (Python)
```python
from datasets import load_dataset
# Load full dataset
dataset = load_dataset("{dataset_name}", "{config_name}", split="train")
# Filter to selected speaker
speaker_dataset = dataset.filter(lambda x: x['speaker_id'] == {speaker_id})
print(f"Filtered: {{len(speaker_dataset)}} samples")
# Save locally
speaker_dataset.save_to_disk("./speaker_{speaker_id}_dataset")
# OR: Push to HuggingFace Hub as new dataset
speaker_dataset.push_to_hub(
"your-username/cml-tts-{config_name}-speaker-{speaker_id}",
private=False # or True for private
)
```
### 3. Add Custom Columns (Optional)
```python
def add_custom_columns(example):
example['emotion'] = 'neutral' # placeholder
example['quality_score'] = 1.0 # placeholder
example['use_for_training'] = True
return example
speaker_dataset = speaker_dataset.map(add_custom_columns)
speaker_dataset.push_to_hub("your-username/cml-tts-{config_name}-speaker-{speaker_id}")
```
### 4. Create Dataset Card
Add README.md:
```markdown
# CML-TTS {config_name.title()} - Speaker {speaker_id}
Filtered subset of {dataset_name} containing only Speaker {speaker_id}.
## Usage
```python
from datasets import load_dataset
ds = load_dataset("your-username/cml-tts-{config_name}-speaker-{speaker_id}")
```
"""
# === Gradio Interface ===
with gr.Blocks(title="Audio Dataset Explorer for TTS", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# πŸŽ™οΈ Audio Dataset Explorer for TTS
Explore audio datasets, analyze speakers, and prepare training data for TTS models.
""")
# Configuration
with gr.Row():
dataset_input = gr.Dropdown(
label="Dataset Name",
choices=["ylacombe/cml-tts", "facebook/multilingual_librispeech", "facebook/voxpopuli", "sarulab-speech/mls_sidon", "datadriven-company/WolneLektury-TTS-Polish", "espnet/yodas-granary"],
value="ylacombe/cml-tts",
allow_custom_value=True
)
config_input = gr.Dropdown(
label="Config Name",
choices=["polish", "pl", "Polish", "default"],
value="polish",
allow_custom_value=True,
info="Config/language subset. Type custom value if not listed."
)
split_input = gr.Textbox(
label="Split (auto if empty)",
value="",
placeholder="auto-detect",
info="Dataset split to load. Leave empty to auto-detect (prefers 'train'). Common values: train, validation, test, asr_only."
)
samples_slider = gr.Number(
value=5000,
minimum=0,
label="Max Samples (less = faster, 0 = all)",
precision=0,
)
load_btn = gr.Button("πŸ”„ Load Dataset", variant="primary", size="lg")
gr.Markdown("⏱️ **Note**: First load takes ~30-60s. Subsequent loads are cached.")
# Overview Tab
with gr.Tab("πŸ“Š Overview - All Speakers"):
gr.Markdown("### Statistics for all speakers in the dataset")
status_text = gr.Textbox(label="Status", interactive=False)
summary_text = gr.Markdown()
overview_table = gr.Dataframe(label="Speaker Statistics")
with gr.Row():
chart_samples = gr.Plot(label="Sample Distribution")
chart_duration = gr.Plot(label="Duration Distribution")
# Speaker Details Tab
with gr.Tab("🎯 Speaker Details"):
speaker_dropdown = gr.Dropdown(
label="Select Speaker",
choices=["Select a speaker..."],
value="Select a speaker..."
)
analyze_btn = gr.Button("πŸ” Analyze Speaker", variant="secondary")
speaker_details = gr.Markdown()
speaker_hist = gr.Plot(label="Duration Distribution")
gr.Markdown("### Audio Samples & Texts")
sample_texts_display = gr.Markdown()
audio_players = []
for i in range(5):
audio_players.append(gr.Audio(label=f"Sample {i+1}", visible=True))
# Download Tab
with gr.Tab("πŸ“₯ Download & Fork"):
gr.Markdown("### Instructions for creating your own dataset")
download_instructions = gr.Markdown()
generate_btn = gr.Button("πŸ“‹ Generate Instructions", variant="secondary")
# Callbacks
def on_load(dataset_name, config_name, split_name, max_samples):
df, fig1, fig2, status, summary, speakers = create_overview(dataset_name, config_name, split_name, max_samples)
return (
status,
summary or "",
df if df is not None else gr.Dataframe(),
fig1,
fig2,
gr.Dropdown(choices=speakers),
)
def on_analyze(dataset_name, config_name, split_name, max_samples, speaker_id):
results = analyze_speaker(dataset_name, config_name, split_name, max_samples, speaker_id)
# results: details, fig_hist, texts, audio1..audio5
return results
def on_generate(dataset_name, config_name, speaker_id):
return generate_instructions(dataset_name, config_name, speaker_id)
load_btn.click(
fn=on_load,
inputs=[dataset_input, config_input, split_input, samples_slider],
outputs=[status_text, summary_text, overview_table, chart_samples, chart_duration, speaker_dropdown]
)
analyze_btn.click(
fn=on_analyze,
inputs=[dataset_input, config_input, split_input, samples_slider, speaker_dropdown],
outputs=[speaker_details, speaker_hist, sample_texts_display] + audio_players
)
generate_btn.click(
fn=on_generate,
inputs=[dataset_input, config_input, speaker_dropdown],
outputs=[download_instructions]
)
gr.Markdown("""
---
### πŸ’‘ Tips
- First load takes ~30-60s (parsing metadata)
- Subsequent loads are faster (cached)
- Reduce "Max Samples" for faster overview
- **πŸ”Š Click "Listen" links** in Speaker Details to play audio samples
### 🎡 Audio Playback
- Audio links open files directly from HuggingFace Hub
- Works in all browsers - click to play in new tab
- Up to 5 sample audio clips per speaker
### πŸ”§ Tested Datasets
- `ylacombe/cml-tts` - configs: dutch, french, german, italian, polish, portuguese, spanish
- `facebook/voxpopuli` - configs: pl, en, de, fr, es, ...
- `facebook/multilingual_librispeech` - configs: polish, german, french, spanish, italian, portuguese, dutch (audiobooks)
- `sarulab-speech/mls_sidon` - configs: polish, german, french, spanish, italian, portuguese, dutch (audiobooks, WebDataset format)
- `datadriven-company/WolneLektury-TTS-Polish` - config: default (polskie audiobooki, 310GB, tylko streaming)
### πŸ“š Resources
- [HuggingFace Datasets Docs](https://huggingface.co/docs/datasets)
- [TTS Training Guide](https://huggingface.co/docs/transformers/tasks/text-to-speech)
""")
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=7860)