sinhala-tts / scripts /speaker_analysis.py
outlawmold's picture
Fix speaker analysis for Windows and add results
23f8808
#!/usr/bin/env python3
"""
=============================================================
Sinhala TTS - Speaker Diarization Analysis (No Auth Required)
=============================================================
Uses simple-diarizer (SpeechBrain ECAPA + Silero VAD).
NO HuggingFace account, NO license acceptance, NO API keys.
Models download automatically on first run (~100MB, one time).
Requirements:
pip install -U yt-dlp simple-diarizer librosa soundfile numpy scipy certifi
Usage:
python scripts/speaker_analysis.py
=============================================================
"""
import os
import sys
import ssl
import json
import numpy as np
import warnings
warnings.filterwarnings("ignore")
# Fix macOS SSL
try:
import certifi
os.environ['SSL_CERT_FILE'] = certifi.where()
os.environ['REQUESTS_CA_BUNDLE'] = certifi.where()
except ImportError:
pass
try:
ssl._create_default_https_context = ssl._create_unverified_context
except AttributeError:
pass
OUTPUT_DIR = "tts_channel_eval"
# Sample videos — mix of edited and Neth FM
SAMPLE_VIDEOS = [
{"id": "AJ0Ul2Wl4Pk", "title": "Arab History of Ceylon (edited)"},
{"id": "_QcE7a1j_o4", "title": "King Sirisangabo (edited)"},
{"id": "dIwl6akCrt8", "title": "Sura saha meraya (edited)"},
{"id": "4rC-uR0lpY8", "title": "Easter date (Neth FM)"},
{"id": "dFsb9KRCJHQ", "title": "Kadawuru Siritha (edited)"},
]
def download_videos(video_list, out_dir):
"""Download videos as 16kHz mono WAV."""
import yt_dlp
os.makedirs(out_dir, exist_ok=True)
print(f"\n{'='*60}")
print(f"Step 1: Downloading {len(video_list)} sample videos")
print(f"{'='*60}")
downloaded = []
for i, v in enumerate(video_list):
vid_id = v["id"]
title = v["title"]
wav_path = os.path.join(out_dir, f"{vid_id}.wav")
if os.path.exists(wav_path):
print(f" [{i+1}/{len(video_list)}] {title} -- cached")
downloaded.append({"id": vid_id, "title": title, "path": wav_path})
continue
url = f"https://www.youtube.com/watch?v={vid_id}"
dl_opts = {
'format': 'bestaudio/best',
'outtmpl': os.path.join(out_dir, f"{vid_id}.%(ext)s"),
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'wav',
}],
'postprocessor_args': {
'ffmpeg': ['-ac', '1', '-ar', '16000'],
},
'quiet': True,
'no_warnings': True,
'nocheckcertificate': True,
}
print(f" [{i+1}/{len(video_list)}] {title}...")
try:
with yt_dlp.YoutubeDL(dl_opts) as ydl:
ydl.download([url])
print(f" Done")
downloaded.append({"id": vid_id, "title": title, "path": wav_path})
except Exception as e:
print(f" Failed: {str(e)[:100]}")
return downloaded
def diarize_audio(wav_path, num_speakers=2):
"""Run speaker diarization using simple-diarizer (no auth needed)."""
import torchaudio
import soundfile as sf
import torch
# torchaudio 2.11+ forces torchcodec which is broken on Windows without shared FFmpeg.
# We monkeypatch it to use soundfile directly.
def _fixed_load(uri, frame_offset=0, num_frames=-1, normalize=True, channels_first=True, **kwargs):
stop = None if num_frames == -1 else frame_offset + num_frames
data, samplerate = sf.read(uri, start=frame_offset, stop=stop, dtype='float32')
tensor = torch.from_numpy(data)
if tensor.ndim == 1:
tensor = tensor.unsqueeze(0) # (1, time)
elif channels_first:
tensor = tensor.T # (channels, time)
else:
# soundfile is already (time, channels)
pass
return tensor, samplerate
torchaudio.load = _fixed_load
from simple_diarizer.diarizer import Diarizer
print(f" Running speaker diarization (this takes a few minutes)...")
diar = Diarizer(
embed_model='ecapa', # SpeechBrain ECAPA-TDNN (free, no auth)
cluster_method='sc', # Spectral clustering
)
segments = diar.diarize(
wav_path,
num_speakers=num_speakers,
)
# Group by speaker
speakers = {}
for seg in segments:
label = str(seg['label'])
if label not in speakers:
speakers[label] = []
speakers[label].append({
"start": round(seg['start'], 2),
"end": round(seg['end'], 2),
"duration": round(seg['end'] - seg['start'], 2),
})
return speakers
def analyze_speaker(wav_path, segments, sr=16000):
"""Analyze audio quality for one speaker's segments."""
import librosa
y, _ = librosa.load(wav_path, sr=sr, mono=True)
total_duration = len(y) / sr
# Extract speaker audio
spk_audio = []
for seg in segments:
s = int(seg["start"] * sr)
e = int(seg["end"] * sr)
spk_audio.append(y[s:e])
if not spk_audio:
return None
spk_y = np.concatenate(spk_audio)
spk_duration = len(spk_y) / sr
# SNR
rms = librosa.feature.rms(y=spk_y, frame_length=2048, hop_length=512)[0]
rms_th = np.percentile(rms, 20)
noise = rms[rms <= rms_th]
speech = rms[rms > rms_th]
snr = 20 * np.log10(np.mean(speech) / (np.mean(noise) + 1e-10)) if len(noise) > 0 and np.mean(noise) > 0 else 40.0
# Pitch (first 3 min)
y_short = spk_y[:sr * 180] if len(spk_y) > sr * 180 else spk_y
f0, _, _ = librosa.pyin(y_short, fmin=50, fmax=500, sr=sr)
f0v = f0[~np.isnan(f0)]
pitch_mean = float(np.mean(f0v)) if len(f0v) > 0 else 0
pitch_std = float(np.std(f0v)) if len(f0v) > 0 else 0
seg_durations = [s["duration"] for s in segments]
gender = "female" if pitch_mean > 180 else "male" if pitch_mean > 0 else "unknown"
return {
"total_duration_min": round(spk_duration / 60, 1),
"pct_of_total": round(spk_duration / total_duration * 100, 1),
"num_segments": len(segments),
"avg_segment_sec": round(np.mean(seg_durations), 1),
"median_segment_sec": round(np.median(seg_durations), 1),
"max_segment_sec": round(max(seg_durations), 1),
"segments_over_3s": sum(1 for d in seg_durations if d >= 3.0),
"segments_over_5s": sum(1 for d in seg_durations if d >= 5.0),
"segments_over_10s": sum(1 for d in seg_durations if d >= 10.0),
"snr_db": round(float(snr), 1),
"pitch_mean_hz": round(pitch_mean, 1),
"pitch_std_hz": round(pitch_std, 1),
"gender": gender,
}
def extract_speaker_samples(wav_path, speakers, out_dir, vid_id, sr=16000):
"""Extract ~30s audio sample for each speaker."""
import librosa
import soundfile as sf
y, _ = librosa.load(wav_path, sr=sr, mono=True)
samples_dir = os.path.join(out_dir, "speaker_samples")
os.makedirs(samples_dir, exist_ok=True)
sample_paths = {}
for spk_id, segments in speakers.items():
sorted_segs = sorted(segments, key=lambda s: s["duration"], reverse=True)
sample_audio = []
dur = 0
for seg in sorted_segs:
if dur >= 30:
break
s = int(seg["start"] * sr)
e = int(seg["end"] * sr)
sample_audio.append(y[s:e])
sample_audio.append(np.zeros(int(0.3 * sr)))
dur += seg["duration"] + 0.3
if sample_audio:
out_path = os.path.join(samples_dir, f"{vid_id}_speaker{spk_id}.wav")
sf.write(out_path, np.concatenate(sample_audio), sr)
sample_paths[spk_id] = out_path
return sample_paths
def main():
# Check deps
missing = []
for pkg_name, import_name in [
('yt-dlp', 'yt_dlp'),
('simple-diarizer', 'simple_diarizer'),
('librosa', 'librosa'),
('soundfile', 'soundfile'),
]:
try:
__import__(import_name)
except ImportError:
missing.append(pkg_name)
if missing:
print(f"Missing packages: {', '.join(missing)}")
print(f"Install: pip install -U {' '.join(missing)} certifi")
sys.exit(1)
print("Sinhala TTS - Speaker Diarization Analysis")
print("(No accounts or API keys needed)")
print("=" * 60)
wav_dir = os.path.join(OUTPUT_DIR, "speaker_analysis")
# Download
downloaded = download_videos(SAMPLE_VIDEOS, wav_dir)
if not downloaded:
print("No videos downloaded!")
sys.exit(1)
# Process each video
all_results = []
for vid_info in downloaded:
vid_id = vid_info["id"]
title = vid_info["title"]
wav_path = vid_info["path"]
print(f"\n{'='*60}")
print(f"Step 2: Processing: {title}")
print(f"{'='*60}")
# Diarize
speakers = diarize_audio(wav_path, num_speakers=2)
print(f" Found {len(speakers)} speakers")
# Analyze each speaker
print(f" Analyzing per-speaker quality...")
speaker_results = {}
for spk_id, segments in speakers.items():
stats = analyze_speaker(wav_path, segments)
if stats:
speaker_results[spk_id] = stats
# Extract samples
print(f" Extracting audio samples...")
sample_paths = extract_speaker_samples(wav_path, speakers, wav_dir, vid_id)
# Print results
for spk_id, stats in sorted(speaker_results.items(),
key=lambda x: x[1]["total_duration_min"],
reverse=True):
sample = sample_paths.get(spk_id, "N/A")
print(f"\n Speaker {spk_id}:")
print(f" Duration: {stats['total_duration_min']} min ({stats['pct_of_total']}%)")
print(f" Segments: {stats['num_segments']} total, {stats['segments_over_5s']} over 5s, {stats['segments_over_10s']} over 10s")
print(f" Avg segment: {stats['avg_segment_sec']}s (median {stats['median_segment_sec']}s, max {stats['max_segment_sec']}s)")
print(f" SNR: {stats['snr_db']} dB")
print(f" Pitch: {stats['pitch_mean_hz']}Hz +/- {stats['pitch_std_hz']}Hz ({stats['gender']})")
print(f" Sample: {sample}")
all_results.append({
"video_id": vid_id,
"title": title,
"speakers": speaker_results,
"samples": {k: str(v) for k, v in sample_paths.items()},
})
# ============================================================
# AGGREGATE
# ============================================================
print(f"\n\n{'='*60}")
print(f"AGGREGATE ANALYSIS ACROSS ALL VIDEOS")
print(f"{'='*60}")
flat = []
for r in all_results:
for spk, stats in r["speakers"].items():
flat.append({"video": r["title"], "speaker": spk, **stats})
print(f"\n{'Video':<35} {'Spk':<6} {'Dur':>7} {'%':>6} {'SNR':>7} {'Pitch':>8} {'Sex':>6} {'>5s':>5} {'>10s':>5}")
print(f"{'-'*35} {'-'*6} {'-'*7} {'-'*6} {'-'*7} {'-'*8} {'-'*6} {'-'*5} {'-'*5}")
for s in sorted(flat, key=lambda x: (x["video"], -x["total_duration_min"])):
print(f"{s['video'][:35]:<35} {s['speaker']:<6} {s['total_duration_min']:>5.1f}m {s['pct_of_total']:>5.1f}% {s['snr_db']:>5.1f}dB {s['pitch_mean_hz']:>6.1f}Hz {s['gender']:>6} {s['segments_over_5s']:>5} {s['segments_over_10s']:>5}")
# Yield estimate
print(f"\n{'='*60}")
print(f"YIELD ESTIMATE (723 videos / 370 hours total)")
print(f"{'='*60}")
# Group speakers by pitch to identify the two recurring people
low_pitch = [s for s in flat if s["pitch_mean_hz"] < 170]
high_pitch = [s for s in flat if s["pitch_mean_hz"] >= 170]
for label, group in [("Lower-pitched speaker", low_pitch), ("Higher-pitched speaker", high_pitch)]:
if group:
avg_pct = np.mean([s["pct_of_total"] for s in group])
avg_snr = np.mean([s["snr_db"] for s in group])
avg_pitch = np.mean([s["pitch_mean_hz"] for s in group])
avg_segs5 = np.mean([s["segments_over_5s"] for s in group])
avg_segs10 = np.mean([s["segments_over_10s"] for s in group])
est_hours = 370 * (avg_pct / 100)
est_filtered = est_hours * 0.7
print(f"\n {label} (~{avg_pitch:.0f}Hz):")
print(f" Avg share: {avg_pct:.1f}% of each video")
print(f" Avg SNR: {avg_snr:.1f} dB")
print(f" Avg segs >5s: {avg_segs5:.0f} per video")
print(f" Avg segs>10s: {avg_segs10:.0f} per video")
print(f" Est. total: {est_hours:.0f}h raw -> {est_filtered:.0f}h after filtering")
# Save
results_path = os.path.join(OUTPUT_DIR, "speaker_analysis_results.json")
with open(results_path, "w") as f:
json.dump(all_results, f, indent=2, ensure_ascii=False)
print(f"\n{'='*60}")
print(f"WHAT TO DO NEXT")
print(f"{'='*60}")
samples_dir = os.path.join(wav_dir, "speaker_samples")
print(f"\n 1. Listen to the speaker samples:")
print(f" open {samples_dir}")
print(f" (each file is ~30s of one speaker's voice)")
print(f"")
print(f" 2. Pick which voice you want for TTS")
print(f"")
print(f" 3. Paste this output back to the assistant")
print(f"\n Results: {results_path}")
print(f"\nDone!")
if __name__ == "__main__":
main()