#!/usr/bin/env python3 """ Utility functions for VAD + Diarization pipeline """ import numpy as np import torch from typing import List, Dict, Optional, Tuple from pathlib import Path import json def load_audio( path: str, sampling_rate: int = 16000, mono: bool = True ) -> Tuple[np.ndarray, int]: """ Load audio file with automatic format detection. Args: path: Path to audio file sampling_rate: Target sample rate mono: Convert to mono Returns: Tuple of (audio_data, sample_rate) """ try: import librosa audio, sr = librosa.load(path, sr=sampling_rate, mono=mono) return audio, sr except Exception as e: print(f"Error loading audio with librosa: {e}") # Fallback to soundfile try: import soundfile as sf audio, sr = sf.read(path) # Resample if needed if sr != sampling_rate: import librosa audio = librosa.resample(audio, orig_sr=sr, target_sr=sampling_rate) sr = sampling_rate # Convert to mono if needed if mono and len(audio.shape) > 1: audio = audio.mean(axis=1) return audio, sr except Exception as e: print(f"Error loading audio with soundfile: {e}") raise def save_audio( audio: np.ndarray, path: str, sampling_rate: int = 16000 ): """ Save audio to file. Args: audio: Audio data path: Output path sampling_rate: Sample rate """ import soundfile as sf sf.write(path, audio, sampling_rate) def merge_segments( segments: List[Dict], gap_threshold: float = 0.5 ) -> List[Dict]: """ Merge nearby segments from the same speaker. Args: segments: List of segments with 'start', 'end', 'speaker' gap_threshold: Maximum gap to merge (seconds) Returns: Merged segments """ if not segments: return [] # Sort by start time sorted_segments = sorted(segments, key=lambda x: x['start']) merged = [sorted_segments[0].copy()] for seg in sorted_segments[1:]: last = merged[-1] # Check if same speaker and close enough if (seg['speaker'] == last['speaker'] and seg['start'] - last['end'] <= gap_threshold): # Merge last['end'] = seg['end'] last['duration'] = last['end'] - last['start'] else: # Add new segment merged.append(seg.copy()) return merged def filter_short_segments( segments: List[Dict], min_duration: float = 0.5 ) -> List[Dict]: """ Filter out segments shorter than threshold. Args: segments: List of segments min_duration: Minimum duration (seconds) Returns: Filtered segments """ return [seg for seg in segments if seg['duration'] >= min_duration] def calculate_overlap( seg1: Dict, seg2: Dict ) -> float: """ Calculate overlap between two segments. Args: seg1: First segment with 'start' and 'end' seg2: Second segment with 'start' and 'end' Returns: Overlap duration in seconds """ start = max(seg1['start'], seg2['start']) end = min(seg1['end'], seg2['end']) return max(0, end - start) def segment_to_rttm( segments: List[Dict], file_id: str = "audio" ) -> str: """ Convert segments to RTTM format. Args: segments: List of segments file_id: File identifier Returns: RTTM formatted string """ lines = [] for seg in segments: # RTTM format: SPEAKER file 1 start duration speaker line = f"SPEAKER {file_id} 1 {seg['start']:.3f} {seg['duration']:.3f} {seg['speaker']} " lines.append(line) return "\n".join(lines) def rttm_to_segments(rttm_text: str) -> List[Dict]: """ Parse RTTM format to segments. Args: rttm_text: RTTM formatted text Returns: List of segments """ segments = [] for line in rttm_text.strip().split('\n'): if not line.strip(): continue parts = line.split() if parts[0] != 'SPEAKER': continue start = float(parts[3]) duration = float(parts[4]) speaker = parts[7] segments.append({ 'start': start, 'end': start + duration, 'duration': duration, 'speaker': speaker }) return segments def visualize_timeline( segments: List[Dict], duration: Optional[float] = None, width: int = 80 ) -> str: """ Create ASCII visualization of speaker timeline. Args: segments: List of segments duration: Total duration (auto-detect if None) width: Width of visualization Returns: ASCII timeline string """ if not segments: return "No segments to visualize" # Determine duration if duration is None: duration = max(seg['end'] for seg in segments) # Get unique speakers speakers = sorted(set(seg['speaker'] for seg in segments)) speaker_chars = {} chars = ['█', '▓', '▒', '░', '●', '○', '■', '□', '▪', '▫'] for i, speaker in enumerate(speakers): speaker_chars[speaker] = chars[i % len(chars)] # Create timeline lines = [] lines.append(f"\nTimeline (0.00s - {duration:.2f}s):") lines.append("─" * width) # Time markers time_line = "" for i in range(width): t = (i / width) * duration if i % 10 == 0: time_line += f"{t:.0f}s" time_line += " " * (10 - len(f"{t:.0f}s")) else: time_line += " " lines.append(time_line[:width]) # Speaker rows for speaker in speakers: row = [' '] * width for seg in segments: if seg['speaker'] == speaker: start_pos = int((seg['start'] / duration) * width) end_pos = int((seg['end'] / duration) * width) for i in range(start_pos, min(end_pos, width)): row[i] = speaker_chars[speaker] lines.append(f"{speaker}: {''.join(row)}") lines.append("─" * width) return "\n".join(lines) def export_results( result: Dict, output_dir: str, formats: List[str] = ['json', 'rttm', 'txt'] ): """ Export results in multiple formats. Args: result: Pipeline result output_dir: Output directory formats: List of formats to export """ output_path = Path(output_dir) output_path.mkdir(parents=True, exist_ok=True) base_name = Path(result['audio_path']).stem for fmt in formats: if fmt == 'json': # JSON format json_path = output_path / f"{base_name}.json" with open(json_path, 'w') as f: json.dump(result, f, indent=2) print(f"✓ Saved JSON: {json_path}") elif fmt == 'rttm': # RTTM format rttm_path = output_path / f"{base_name}.rttm" rttm_text = segment_to_rttm(result['speaker_segments'], base_name) with open(rttm_path, 'w') as f: f.write(rttm_text) print(f"✓ Saved RTTM: {rttm_path}") elif fmt == 'txt': # Text format txt_path = output_path / f"{base_name}.txt" lines = [] lines.append("="*60) lines.append("SPEAKER DIARIZATION RESULTS") lines.append("="*60) lines.append(f"\nFile: {result['audio_path']}") lines.append(f"Speakers: {result['metadata']['num_speakers']}") lines.append(f"Segments: {result['metadata']['num_segments']}") lines.append(f"\nTimeline:") lines.append("-"*60) for seg in result['speaker_segments']: lines.append(f"{seg['start']:7.2f}s - {seg['end']:7.2f}s: {seg['speaker']}") with open(txt_path, 'w') as f: f.write("\n".join(lines)) print(f"✓ Saved TXT: {txt_path}") def create_test_audio( output_path: str = "test_audio.wav", duration: float = 10.0, sampling_rate: int = 16000 ) -> str: """ Create synthetic test audio with speech-like patterns. Args: output_path: Output file path duration: Duration in seconds sampling_rate: Sample rate Returns: Path to created file """ import soundfile as sf # Generate audio t = np.linspace(0, duration, int(sampling_rate * duration)) # Create speech-like patterns with silence signal = np.zeros_like(t) # Calculate segment lengths seg1_len = min(int(sampling_rate*3), len(signal)) seg2_start = int(sampling_rate*4) seg2_end = min(int(sampling_rate*7), len(signal)) seg3_start = min(int(sampling_rate*8), len(signal)) # Speaker 1: 0-3s (or until end) if seg1_len > 0: signal[0:seg1_len] = 0.3 * np.sin(2 * np.pi * 440 * t[0:seg1_len]) # Silence: 3-4s # Speaker 2: 4-7s (or until end) if seg2_start < len(signal) and seg2_end > seg2_start: seg2_len = seg2_end - seg2_start signal[seg2_start:seg2_end] = 0.3 * np.sin(2 * np.pi * 880 * t[seg2_start:seg2_end]) # Silence: 7-8s # Speaker 1: 8-10s (or until end) if seg3_start < len(signal): signal[seg3_start:] = 0.3 * np.sin(2 * np.pi * 440 * t[seg3_start:]) # Add some noise signal += 0.01 * np.random.randn(len(signal)) # Save sf.write(output_path, signal, sampling_rate) return output_path if __name__ == "__main__": # Demo utilities print("Utility functions loaded") # Create test audio test_path = create_test_audio() print(f"✓ Created test audio: {test_path}")