#!/usr/bin/env python3 """ ============================================================= Sinhala TTS - Speaker Diarization Analysis (No Auth Required) ============================================================= Uses simple-diarizer (SpeechBrain ECAPA + Silero VAD). NO HuggingFace account, NO license acceptance, NO API keys. Models download automatically on first run (~100MB, one time). Requirements: pip install -U yt-dlp simple-diarizer librosa soundfile numpy scipy certifi Usage: python scripts/speaker_analysis.py ============================================================= """ import os import sys import ssl import json import numpy as np import warnings warnings.filterwarnings("ignore") # Fix macOS SSL try: import certifi os.environ['SSL_CERT_FILE'] = certifi.where() os.environ['REQUESTS_CA_BUNDLE'] = certifi.where() except ImportError: pass try: ssl._create_default_https_context = ssl._create_unverified_context except AttributeError: pass OUTPUT_DIR = "tts_channel_eval" # Sample videos — mix of edited and Neth FM SAMPLE_VIDEOS = [ {"id": "AJ0Ul2Wl4Pk", "title": "Arab History of Ceylon (edited)"}, {"id": "_QcE7a1j_o4", "title": "King Sirisangabo (edited)"}, {"id": "dIwl6akCrt8", "title": "Sura saha meraya (edited)"}, {"id": "4rC-uR0lpY8", "title": "Easter date (Neth FM)"}, {"id": "dFsb9KRCJHQ", "title": "Kadawuru Siritha (edited)"}, ] def download_videos(video_list, out_dir): """Download videos as 16kHz mono WAV.""" import yt_dlp os.makedirs(out_dir, exist_ok=True) print(f"\n{'='*60}") print(f"Step 1: Downloading {len(video_list)} sample videos") print(f"{'='*60}") downloaded = [] for i, v in enumerate(video_list): vid_id = v["id"] title = v["title"] wav_path = os.path.join(out_dir, f"{vid_id}.wav") if os.path.exists(wav_path): print(f" [{i+1}/{len(video_list)}] {title} -- cached") downloaded.append({"id": vid_id, "title": title, "path": wav_path}) continue url = f"https://www.youtube.com/watch?v={vid_id}" dl_opts = { 'format': 'bestaudio/best', 'outtmpl': os.path.join(out_dir, f"{vid_id}.%(ext)s"), 'postprocessors': [{ 'key': 'FFmpegExtractAudio', 'preferredcodec': 'wav', }], 'postprocessor_args': { 'ffmpeg': ['-ac', '1', '-ar', '16000'], }, 'quiet': True, 'no_warnings': True, 'nocheckcertificate': True, } print(f" [{i+1}/{len(video_list)}] {title}...") try: with yt_dlp.YoutubeDL(dl_opts) as ydl: ydl.download([url]) print(f" Done") downloaded.append({"id": vid_id, "title": title, "path": wav_path}) except Exception as e: print(f" Failed: {str(e)[:100]}") return downloaded def diarize_audio(wav_path, num_speakers=2): """Run speaker diarization using simple-diarizer (no auth needed).""" import torchaudio import soundfile as sf import torch # torchaudio 2.11+ forces torchcodec which is broken on Windows without shared FFmpeg. # We monkeypatch it to use soundfile directly. def _fixed_load(uri, frame_offset=0, num_frames=-1, normalize=True, channels_first=True, **kwargs): stop = None if num_frames == -1 else frame_offset + num_frames data, samplerate = sf.read(uri, start=frame_offset, stop=stop, dtype='float32') tensor = torch.from_numpy(data) if tensor.ndim == 1: tensor = tensor.unsqueeze(0) # (1, time) elif channels_first: tensor = tensor.T # (channels, time) else: # soundfile is already (time, channels) pass return tensor, samplerate torchaudio.load = _fixed_load from simple_diarizer.diarizer import Diarizer print(f" Running speaker diarization (this takes a few minutes)...") diar = Diarizer( embed_model='ecapa', # SpeechBrain ECAPA-TDNN (free, no auth) cluster_method='sc', # Spectral clustering ) segments = diar.diarize( wav_path, num_speakers=num_speakers, ) # Group by speaker speakers = {} for seg in segments: label = str(seg['label']) if label not in speakers: speakers[label] = [] speakers[label].append({ "start": round(seg['start'], 2), "end": round(seg['end'], 2), "duration": round(seg['end'] - seg['start'], 2), }) return speakers def analyze_speaker(wav_path, segments, sr=16000): """Analyze audio quality for one speaker's segments.""" import librosa y, _ = librosa.load(wav_path, sr=sr, mono=True) total_duration = len(y) / sr # Extract speaker audio spk_audio = [] for seg in segments: s = int(seg["start"] * sr) e = int(seg["end"] * sr) spk_audio.append(y[s:e]) if not spk_audio: return None spk_y = np.concatenate(spk_audio) spk_duration = len(spk_y) / sr # SNR rms = librosa.feature.rms(y=spk_y, frame_length=2048, hop_length=512)[0] rms_th = np.percentile(rms, 20) noise = rms[rms <= rms_th] speech = rms[rms > rms_th] snr = 20 * np.log10(np.mean(speech) / (np.mean(noise) + 1e-10)) if len(noise) > 0 and np.mean(noise) > 0 else 40.0 # Pitch (first 3 min) y_short = spk_y[:sr * 180] if len(spk_y) > sr * 180 else spk_y f0, _, _ = librosa.pyin(y_short, fmin=50, fmax=500, sr=sr) f0v = f0[~np.isnan(f0)] pitch_mean = float(np.mean(f0v)) if len(f0v) > 0 else 0 pitch_std = float(np.std(f0v)) if len(f0v) > 0 else 0 seg_durations = [s["duration"] for s in segments] gender = "female" if pitch_mean > 180 else "male" if pitch_mean > 0 else "unknown" return { "total_duration_min": round(spk_duration / 60, 1), "pct_of_total": round(spk_duration / total_duration * 100, 1), "num_segments": len(segments), "avg_segment_sec": round(np.mean(seg_durations), 1), "median_segment_sec": round(np.median(seg_durations), 1), "max_segment_sec": round(max(seg_durations), 1), "segments_over_3s": sum(1 for d in seg_durations if d >= 3.0), "segments_over_5s": sum(1 for d in seg_durations if d >= 5.0), "segments_over_10s": sum(1 for d in seg_durations if d >= 10.0), "snr_db": round(float(snr), 1), "pitch_mean_hz": round(pitch_mean, 1), "pitch_std_hz": round(pitch_std, 1), "gender": gender, } def extract_speaker_samples(wav_path, speakers, out_dir, vid_id, sr=16000): """Extract ~30s audio sample for each speaker.""" import librosa import soundfile as sf y, _ = librosa.load(wav_path, sr=sr, mono=True) samples_dir = os.path.join(out_dir, "speaker_samples") os.makedirs(samples_dir, exist_ok=True) sample_paths = {} for spk_id, segments in speakers.items(): sorted_segs = sorted(segments, key=lambda s: s["duration"], reverse=True) sample_audio = [] dur = 0 for seg in sorted_segs: if dur >= 30: break s = int(seg["start"] * sr) e = int(seg["end"] * sr) sample_audio.append(y[s:e]) sample_audio.append(np.zeros(int(0.3 * sr))) dur += seg["duration"] + 0.3 if sample_audio: out_path = os.path.join(samples_dir, f"{vid_id}_speaker{spk_id}.wav") sf.write(out_path, np.concatenate(sample_audio), sr) sample_paths[spk_id] = out_path return sample_paths def main(): # Check deps missing = [] for pkg_name, import_name in [ ('yt-dlp', 'yt_dlp'), ('simple-diarizer', 'simple_diarizer'), ('librosa', 'librosa'), ('soundfile', 'soundfile'), ]: try: __import__(import_name) except ImportError: missing.append(pkg_name) if missing: print(f"Missing packages: {', '.join(missing)}") print(f"Install: pip install -U {' '.join(missing)} certifi") sys.exit(1) print("Sinhala TTS - Speaker Diarization Analysis") print("(No accounts or API keys needed)") print("=" * 60) wav_dir = os.path.join(OUTPUT_DIR, "speaker_analysis") # Download downloaded = download_videos(SAMPLE_VIDEOS, wav_dir) if not downloaded: print("No videos downloaded!") sys.exit(1) # Process each video all_results = [] for vid_info in downloaded: vid_id = vid_info["id"] title = vid_info["title"] wav_path = vid_info["path"] print(f"\n{'='*60}") print(f"Step 2: Processing: {title}") print(f"{'='*60}") # Diarize speakers = diarize_audio(wav_path, num_speakers=2) print(f" Found {len(speakers)} speakers") # Analyze each speaker print(f" Analyzing per-speaker quality...") speaker_results = {} for spk_id, segments in speakers.items(): stats = analyze_speaker(wav_path, segments) if stats: speaker_results[spk_id] = stats # Extract samples print(f" Extracting audio samples...") sample_paths = extract_speaker_samples(wav_path, speakers, wav_dir, vid_id) # Print results for spk_id, stats in sorted(speaker_results.items(), key=lambda x: x[1]["total_duration_min"], reverse=True): sample = sample_paths.get(spk_id, "N/A") print(f"\n Speaker {spk_id}:") print(f" Duration: {stats['total_duration_min']} min ({stats['pct_of_total']}%)") print(f" Segments: {stats['num_segments']} total, {stats['segments_over_5s']} over 5s, {stats['segments_over_10s']} over 10s") print(f" Avg segment: {stats['avg_segment_sec']}s (median {stats['median_segment_sec']}s, max {stats['max_segment_sec']}s)") print(f" SNR: {stats['snr_db']} dB") print(f" Pitch: {stats['pitch_mean_hz']}Hz +/- {stats['pitch_std_hz']}Hz ({stats['gender']})") print(f" Sample: {sample}") all_results.append({ "video_id": vid_id, "title": title, "speakers": speaker_results, "samples": {k: str(v) for k, v in sample_paths.items()}, }) # ============================================================ # AGGREGATE # ============================================================ print(f"\n\n{'='*60}") print(f"AGGREGATE ANALYSIS ACROSS ALL VIDEOS") print(f"{'='*60}") flat = [] for r in all_results: for spk, stats in r["speakers"].items(): flat.append({"video": r["title"], "speaker": spk, **stats}) print(f"\n{'Video':<35} {'Spk':<6} {'Dur':>7} {'%':>6} {'SNR':>7} {'Pitch':>8} {'Sex':>6} {'>5s':>5} {'>10s':>5}") print(f"{'-'*35} {'-'*6} {'-'*7} {'-'*6} {'-'*7} {'-'*8} {'-'*6} {'-'*5} {'-'*5}") for s in sorted(flat, key=lambda x: (x["video"], -x["total_duration_min"])): print(f"{s['video'][:35]:<35} {s['speaker']:<6} {s['total_duration_min']:>5.1f}m {s['pct_of_total']:>5.1f}% {s['snr_db']:>5.1f}dB {s['pitch_mean_hz']:>6.1f}Hz {s['gender']:>6} {s['segments_over_5s']:>5} {s['segments_over_10s']:>5}") # Yield estimate print(f"\n{'='*60}") print(f"YIELD ESTIMATE (723 videos / 370 hours total)") print(f"{'='*60}") # Group speakers by pitch to identify the two recurring people low_pitch = [s for s in flat if s["pitch_mean_hz"] < 170] high_pitch = [s for s in flat if s["pitch_mean_hz"] >= 170] for label, group in [("Lower-pitched speaker", low_pitch), ("Higher-pitched speaker", high_pitch)]: if group: avg_pct = np.mean([s["pct_of_total"] for s in group]) avg_snr = np.mean([s["snr_db"] for s in group]) avg_pitch = np.mean([s["pitch_mean_hz"] for s in group]) avg_segs5 = np.mean([s["segments_over_5s"] for s in group]) avg_segs10 = np.mean([s["segments_over_10s"] for s in group]) est_hours = 370 * (avg_pct / 100) est_filtered = est_hours * 0.7 print(f"\n {label} (~{avg_pitch:.0f}Hz):") print(f" Avg share: {avg_pct:.1f}% of each video") print(f" Avg SNR: {avg_snr:.1f} dB") print(f" Avg segs >5s: {avg_segs5:.0f} per video") print(f" Avg segs>10s: {avg_segs10:.0f} per video") print(f" Est. total: {est_hours:.0f}h raw -> {est_filtered:.0f}h after filtering") # Save results_path = os.path.join(OUTPUT_DIR, "speaker_analysis_results.json") with open(results_path, "w") as f: json.dump(all_results, f, indent=2, ensure_ascii=False) print(f"\n{'='*60}") print(f"WHAT TO DO NEXT") print(f"{'='*60}") samples_dir = os.path.join(wav_dir, "speaker_samples") print(f"\n 1. Listen to the speaker samples:") print(f" open {samples_dir}") print(f" (each file is ~30s of one speaker's voice)") print(f"") print(f" 2. Pick which voice you want for TTS") print(f"") print(f" 3. Paste this output back to the assistant") print(f"\n Results: {results_path}") print(f"\nDone!") if __name__ == "__main__": main()