| |
| """ |
| ============================================================= |
| Sinhala TTS - Speaker Diarization Analysis (No Auth Required) |
| ============================================================= |
| Uses simple-diarizer (SpeechBrain ECAPA + Silero VAD). |
| NO HuggingFace account, NO license acceptance, NO API keys. |
| Models download automatically on first run (~100MB, one time). |
| |
| Requirements: |
| pip install -U yt-dlp simple-diarizer librosa soundfile numpy scipy certifi |
| |
| Usage: |
| python scripts/speaker_analysis.py |
| ============================================================= |
| """ |
|
|
| import os |
| import sys |
| import ssl |
| import json |
| import numpy as np |
| import warnings |
| warnings.filterwarnings("ignore") |
|
|
| |
| try: |
| import certifi |
| os.environ['SSL_CERT_FILE'] = certifi.where() |
| os.environ['REQUESTS_CA_BUNDLE'] = certifi.where() |
| except ImportError: |
| pass |
| try: |
| ssl._create_default_https_context = ssl._create_unverified_context |
| except AttributeError: |
| pass |
|
|
| OUTPUT_DIR = "tts_channel_eval" |
|
|
| |
| SAMPLE_VIDEOS = [ |
| {"id": "AJ0Ul2Wl4Pk", "title": "Arab History of Ceylon (edited)"}, |
| {"id": "_QcE7a1j_o4", "title": "King Sirisangabo (edited)"}, |
| {"id": "dIwl6akCrt8", "title": "Sura saha meraya (edited)"}, |
| {"id": "4rC-uR0lpY8", "title": "Easter date (Neth FM)"}, |
| {"id": "dFsb9KRCJHQ", "title": "Kadawuru Siritha (edited)"}, |
| ] |
|
|
|
|
| def download_videos(video_list, out_dir): |
| """Download videos as 16kHz mono WAV.""" |
| import yt_dlp |
| os.makedirs(out_dir, exist_ok=True) |
|
|
| print(f"\n{'='*60}") |
| print(f"Step 1: Downloading {len(video_list)} sample videos") |
| print(f"{'='*60}") |
|
|
| downloaded = [] |
| for i, v in enumerate(video_list): |
| vid_id = v["id"] |
| title = v["title"] |
| wav_path = os.path.join(out_dir, f"{vid_id}.wav") |
|
|
| if os.path.exists(wav_path): |
| print(f" [{i+1}/{len(video_list)}] {title} -- cached") |
| downloaded.append({"id": vid_id, "title": title, "path": wav_path}) |
| continue |
|
|
| url = f"https://www.youtube.com/watch?v={vid_id}" |
| dl_opts = { |
| 'format': 'bestaudio/best', |
| 'outtmpl': os.path.join(out_dir, f"{vid_id}.%(ext)s"), |
| 'postprocessors': [{ |
| 'key': 'FFmpegExtractAudio', |
| 'preferredcodec': 'wav', |
| }], |
| 'postprocessor_args': { |
| 'ffmpeg': ['-ac', '1', '-ar', '16000'], |
| }, |
| 'quiet': True, |
| 'no_warnings': True, |
| 'nocheckcertificate': True, |
| } |
|
|
| print(f" [{i+1}/{len(video_list)}] {title}...") |
| try: |
| with yt_dlp.YoutubeDL(dl_opts) as ydl: |
| ydl.download([url]) |
| print(f" Done") |
| downloaded.append({"id": vid_id, "title": title, "path": wav_path}) |
| except Exception as e: |
| print(f" Failed: {str(e)[:100]}") |
|
|
| return downloaded |
|
|
|
|
| def diarize_audio(wav_path, num_speakers=2): |
| """Run speaker diarization using simple-diarizer (no auth needed).""" |
| import torchaudio |
| import soundfile as sf |
| import torch |
| |
| |
| |
| def _fixed_load(uri, frame_offset=0, num_frames=-1, normalize=True, channels_first=True, **kwargs): |
| stop = None if num_frames == -1 else frame_offset + num_frames |
| data, samplerate = sf.read(uri, start=frame_offset, stop=stop, dtype='float32') |
| tensor = torch.from_numpy(data) |
| if tensor.ndim == 1: |
| tensor = tensor.unsqueeze(0) |
| elif channels_first: |
| tensor = tensor.T |
| else: |
| |
| pass |
| return tensor, samplerate |
| |
| torchaudio.load = _fixed_load |
|
|
| from simple_diarizer.diarizer import Diarizer |
|
|
| print(f" Running speaker diarization (this takes a few minutes)...") |
| diar = Diarizer( |
| embed_model='ecapa', |
| cluster_method='sc', |
| ) |
|
|
| segments = diar.diarize( |
| wav_path, |
| num_speakers=num_speakers, |
| ) |
|
|
| |
| speakers = {} |
| for seg in segments: |
| label = str(seg['label']) |
| if label not in speakers: |
| speakers[label] = [] |
| speakers[label].append({ |
| "start": round(seg['start'], 2), |
| "end": round(seg['end'], 2), |
| "duration": round(seg['end'] - seg['start'], 2), |
| }) |
|
|
| return speakers |
|
|
|
|
| def analyze_speaker(wav_path, segments, sr=16000): |
| """Analyze audio quality for one speaker's segments.""" |
| import librosa |
|
|
| y, _ = librosa.load(wav_path, sr=sr, mono=True) |
| total_duration = len(y) / sr |
|
|
| |
| spk_audio = [] |
| for seg in segments: |
| s = int(seg["start"] * sr) |
| e = int(seg["end"] * sr) |
| spk_audio.append(y[s:e]) |
|
|
| if not spk_audio: |
| return None |
|
|
| spk_y = np.concatenate(spk_audio) |
| spk_duration = len(spk_y) / sr |
|
|
| |
| rms = librosa.feature.rms(y=spk_y, frame_length=2048, hop_length=512)[0] |
| rms_th = np.percentile(rms, 20) |
| noise = rms[rms <= rms_th] |
| speech = rms[rms > rms_th] |
| snr = 20 * np.log10(np.mean(speech) / (np.mean(noise) + 1e-10)) if len(noise) > 0 and np.mean(noise) > 0 else 40.0 |
|
|
| |
| y_short = spk_y[:sr * 180] if len(spk_y) > sr * 180 else spk_y |
| f0, _, _ = librosa.pyin(y_short, fmin=50, fmax=500, sr=sr) |
| f0v = f0[~np.isnan(f0)] |
| pitch_mean = float(np.mean(f0v)) if len(f0v) > 0 else 0 |
| pitch_std = float(np.std(f0v)) if len(f0v) > 0 else 0 |
|
|
| seg_durations = [s["duration"] for s in segments] |
| gender = "female" if pitch_mean > 180 else "male" if pitch_mean > 0 else "unknown" |
|
|
| return { |
| "total_duration_min": round(spk_duration / 60, 1), |
| "pct_of_total": round(spk_duration / total_duration * 100, 1), |
| "num_segments": len(segments), |
| "avg_segment_sec": round(np.mean(seg_durations), 1), |
| "median_segment_sec": round(np.median(seg_durations), 1), |
| "max_segment_sec": round(max(seg_durations), 1), |
| "segments_over_3s": sum(1 for d in seg_durations if d >= 3.0), |
| "segments_over_5s": sum(1 for d in seg_durations if d >= 5.0), |
| "segments_over_10s": sum(1 for d in seg_durations if d >= 10.0), |
| "snr_db": round(float(snr), 1), |
| "pitch_mean_hz": round(pitch_mean, 1), |
| "pitch_std_hz": round(pitch_std, 1), |
| "gender": gender, |
| } |
|
|
|
|
| def extract_speaker_samples(wav_path, speakers, out_dir, vid_id, sr=16000): |
| """Extract ~30s audio sample for each speaker.""" |
| import librosa |
| import soundfile as sf |
|
|
| y, _ = librosa.load(wav_path, sr=sr, mono=True) |
| samples_dir = os.path.join(out_dir, "speaker_samples") |
| os.makedirs(samples_dir, exist_ok=True) |
|
|
| sample_paths = {} |
| for spk_id, segments in speakers.items(): |
| sorted_segs = sorted(segments, key=lambda s: s["duration"], reverse=True) |
|
|
| sample_audio = [] |
| dur = 0 |
| for seg in sorted_segs: |
| if dur >= 30: |
| break |
| s = int(seg["start"] * sr) |
| e = int(seg["end"] * sr) |
| sample_audio.append(y[s:e]) |
| sample_audio.append(np.zeros(int(0.3 * sr))) |
| dur += seg["duration"] + 0.3 |
|
|
| if sample_audio: |
| out_path = os.path.join(samples_dir, f"{vid_id}_speaker{spk_id}.wav") |
| sf.write(out_path, np.concatenate(sample_audio), sr) |
| sample_paths[spk_id] = out_path |
|
|
| return sample_paths |
|
|
|
|
| def main(): |
| |
| missing = [] |
| for pkg_name, import_name in [ |
| ('yt-dlp', 'yt_dlp'), |
| ('simple-diarizer', 'simple_diarizer'), |
| ('librosa', 'librosa'), |
| ('soundfile', 'soundfile'), |
| ]: |
| try: |
| __import__(import_name) |
| except ImportError: |
| missing.append(pkg_name) |
| if missing: |
| print(f"Missing packages: {', '.join(missing)}") |
| print(f"Install: pip install -U {' '.join(missing)} certifi") |
| sys.exit(1) |
|
|
| print("Sinhala TTS - Speaker Diarization Analysis") |
| print("(No accounts or API keys needed)") |
| print("=" * 60) |
|
|
| wav_dir = os.path.join(OUTPUT_DIR, "speaker_analysis") |
|
|
| |
| downloaded = download_videos(SAMPLE_VIDEOS, wav_dir) |
| if not downloaded: |
| print("No videos downloaded!") |
| sys.exit(1) |
|
|
| |
| all_results = [] |
|
|
| for vid_info in downloaded: |
| vid_id = vid_info["id"] |
| title = vid_info["title"] |
| wav_path = vid_info["path"] |
|
|
| print(f"\n{'='*60}") |
| print(f"Step 2: Processing: {title}") |
| print(f"{'='*60}") |
|
|
| |
| speakers = diarize_audio(wav_path, num_speakers=2) |
| print(f" Found {len(speakers)} speakers") |
|
|
| |
| print(f" Analyzing per-speaker quality...") |
| speaker_results = {} |
| for spk_id, segments in speakers.items(): |
| stats = analyze_speaker(wav_path, segments) |
| if stats: |
| speaker_results[spk_id] = stats |
|
|
| |
| print(f" Extracting audio samples...") |
| sample_paths = extract_speaker_samples(wav_path, speakers, wav_dir, vid_id) |
|
|
| |
| for spk_id, stats in sorted(speaker_results.items(), |
| key=lambda x: x[1]["total_duration_min"], |
| reverse=True): |
| sample = sample_paths.get(spk_id, "N/A") |
| print(f"\n Speaker {spk_id}:") |
| print(f" Duration: {stats['total_duration_min']} min ({stats['pct_of_total']}%)") |
| print(f" Segments: {stats['num_segments']} total, {stats['segments_over_5s']} over 5s, {stats['segments_over_10s']} over 10s") |
| print(f" Avg segment: {stats['avg_segment_sec']}s (median {stats['median_segment_sec']}s, max {stats['max_segment_sec']}s)") |
| print(f" SNR: {stats['snr_db']} dB") |
| print(f" Pitch: {stats['pitch_mean_hz']}Hz +/- {stats['pitch_std_hz']}Hz ({stats['gender']})") |
| print(f" Sample: {sample}") |
|
|
| all_results.append({ |
| "video_id": vid_id, |
| "title": title, |
| "speakers": speaker_results, |
| "samples": {k: str(v) for k, v in sample_paths.items()}, |
| }) |
|
|
| |
| |
| |
| print(f"\n\n{'='*60}") |
| print(f"AGGREGATE ANALYSIS ACROSS ALL VIDEOS") |
| print(f"{'='*60}") |
|
|
| flat = [] |
| for r in all_results: |
| for spk, stats in r["speakers"].items(): |
| flat.append({"video": r["title"], "speaker": spk, **stats}) |
|
|
| print(f"\n{'Video':<35} {'Spk':<6} {'Dur':>7} {'%':>6} {'SNR':>7} {'Pitch':>8} {'Sex':>6} {'>5s':>5} {'>10s':>5}") |
| print(f"{'-'*35} {'-'*6} {'-'*7} {'-'*6} {'-'*7} {'-'*8} {'-'*6} {'-'*5} {'-'*5}") |
|
|
| for s in sorted(flat, key=lambda x: (x["video"], -x["total_duration_min"])): |
| print(f"{s['video'][:35]:<35} {s['speaker']:<6} {s['total_duration_min']:>5.1f}m {s['pct_of_total']:>5.1f}% {s['snr_db']:>5.1f}dB {s['pitch_mean_hz']:>6.1f}Hz {s['gender']:>6} {s['segments_over_5s']:>5} {s['segments_over_10s']:>5}") |
|
|
| |
| print(f"\n{'='*60}") |
| print(f"YIELD ESTIMATE (723 videos / 370 hours total)") |
| print(f"{'='*60}") |
|
|
| |
| low_pitch = [s for s in flat if s["pitch_mean_hz"] < 170] |
| high_pitch = [s for s in flat if s["pitch_mean_hz"] >= 170] |
|
|
| for label, group in [("Lower-pitched speaker", low_pitch), ("Higher-pitched speaker", high_pitch)]: |
| if group: |
| avg_pct = np.mean([s["pct_of_total"] for s in group]) |
| avg_snr = np.mean([s["snr_db"] for s in group]) |
| avg_pitch = np.mean([s["pitch_mean_hz"] for s in group]) |
| avg_segs5 = np.mean([s["segments_over_5s"] for s in group]) |
| avg_segs10 = np.mean([s["segments_over_10s"] for s in group]) |
|
|
| est_hours = 370 * (avg_pct / 100) |
| est_filtered = est_hours * 0.7 |
|
|
| print(f"\n {label} (~{avg_pitch:.0f}Hz):") |
| print(f" Avg share: {avg_pct:.1f}% of each video") |
| print(f" Avg SNR: {avg_snr:.1f} dB") |
| print(f" Avg segs >5s: {avg_segs5:.0f} per video") |
| print(f" Avg segs>10s: {avg_segs10:.0f} per video") |
| print(f" Est. total: {est_hours:.0f}h raw -> {est_filtered:.0f}h after filtering") |
|
|
| |
| results_path = os.path.join(OUTPUT_DIR, "speaker_analysis_results.json") |
| with open(results_path, "w") as f: |
| json.dump(all_results, f, indent=2, ensure_ascii=False) |
|
|
| print(f"\n{'='*60}") |
| print(f"WHAT TO DO NEXT") |
| print(f"{'='*60}") |
| samples_dir = os.path.join(wav_dir, "speaker_samples") |
| print(f"\n 1. Listen to the speaker samples:") |
| print(f" open {samples_dir}") |
| print(f" (each file is ~30s of one speaker's voice)") |
| print(f"") |
| print(f" 2. Pick which voice you want for TTS") |
| print(f"") |
| print(f" 3. Paste this output back to the assistant") |
| print(f"\n Results: {results_path}") |
| print(f"\nDone!") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|