Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Utility functions for VAD + Diarization pipeline | |
| """ | |
| import numpy as np | |
| import torch | |
| from typing import List, Dict, Optional, Tuple | |
| from pathlib import Path | |
| import json | |
| def load_audio( | |
| path: str, | |
| sampling_rate: int = 16000, | |
| mono: bool = True | |
| ) -> Tuple[np.ndarray, int]: | |
| """ | |
| Load audio file with automatic format detection. | |
| Args: | |
| path: Path to audio file | |
| sampling_rate: Target sample rate | |
| mono: Convert to mono | |
| Returns: | |
| Tuple of (audio_data, sample_rate) | |
| """ | |
| try: | |
| import librosa | |
| audio, sr = librosa.load(path, sr=sampling_rate, mono=mono) | |
| return audio, sr | |
| except Exception as e: | |
| print(f"Error loading audio with librosa: {e}") | |
| # Fallback to soundfile | |
| try: | |
| import soundfile as sf | |
| audio, sr = sf.read(path) | |
| # Resample if needed | |
| if sr != sampling_rate: | |
| import librosa | |
| audio = librosa.resample(audio, orig_sr=sr, target_sr=sampling_rate) | |
| sr = sampling_rate | |
| # Convert to mono if needed | |
| if mono and len(audio.shape) > 1: | |
| audio = audio.mean(axis=1) | |
| return audio, sr | |
| except Exception as e: | |
| print(f"Error loading audio with soundfile: {e}") | |
| raise | |
| def save_audio( | |
| audio: np.ndarray, | |
| path: str, | |
| sampling_rate: int = 16000 | |
| ): | |
| """ | |
| Save audio to file. | |
| Args: | |
| audio: Audio data | |
| path: Output path | |
| sampling_rate: Sample rate | |
| """ | |
| import soundfile as sf | |
| sf.write(path, audio, sampling_rate) | |
| def merge_segments( | |
| segments: List[Dict], | |
| gap_threshold: float = 0.5 | |
| ) -> List[Dict]: | |
| """ | |
| Merge nearby segments from the same speaker. | |
| Args: | |
| segments: List of segments with 'start', 'end', 'speaker' | |
| gap_threshold: Maximum gap to merge (seconds) | |
| Returns: | |
| Merged segments | |
| """ | |
| if not segments: | |
| return [] | |
| # Sort by start time | |
| sorted_segments = sorted(segments, key=lambda x: x['start']) | |
| merged = [sorted_segments[0].copy()] | |
| for seg in sorted_segments[1:]: | |
| last = merged[-1] | |
| # Check if same speaker and close enough | |
| if (seg['speaker'] == last['speaker'] and | |
| seg['start'] - last['end'] <= gap_threshold): | |
| # Merge | |
| last['end'] = seg['end'] | |
| last['duration'] = last['end'] - last['start'] | |
| else: | |
| # Add new segment | |
| merged.append(seg.copy()) | |
| return merged | |
| def filter_short_segments( | |
| segments: List[Dict], | |
| min_duration: float = 0.5 | |
| ) -> List[Dict]: | |
| """ | |
| Filter out segments shorter than threshold. | |
| Args: | |
| segments: List of segments | |
| min_duration: Minimum duration (seconds) | |
| Returns: | |
| Filtered segments | |
| """ | |
| return [seg for seg in segments if seg['duration'] >= min_duration] | |
| def calculate_overlap( | |
| seg1: Dict, | |
| seg2: Dict | |
| ) -> float: | |
| """ | |
| Calculate overlap between two segments. | |
| Args: | |
| seg1: First segment with 'start' and 'end' | |
| seg2: Second segment with 'start' and 'end' | |
| Returns: | |
| Overlap duration in seconds | |
| """ | |
| start = max(seg1['start'], seg2['start']) | |
| end = min(seg1['end'], seg2['end']) | |
| return max(0, end - start) | |
| def segment_to_rttm( | |
| segments: List[Dict], | |
| file_id: str = "audio" | |
| ) -> str: | |
| """ | |
| Convert segments to RTTM format. | |
| Args: | |
| segments: List of segments | |
| file_id: File identifier | |
| Returns: | |
| RTTM formatted string | |
| """ | |
| lines = [] | |
| for seg in segments: | |
| # RTTM format: SPEAKER file 1 start duration <NA> <NA> speaker <NA> <NA> | |
| line = f"SPEAKER {file_id} 1 {seg['start']:.3f} {seg['duration']:.3f} <NA> <NA> {seg['speaker']} <NA> <NA>" | |
| lines.append(line) | |
| return "\n".join(lines) | |
| def rttm_to_segments(rttm_text: str) -> List[Dict]: | |
| """ | |
| Parse RTTM format to segments. | |
| Args: | |
| rttm_text: RTTM formatted text | |
| Returns: | |
| List of segments | |
| """ | |
| segments = [] | |
| for line in rttm_text.strip().split('\n'): | |
| if not line.strip(): | |
| continue | |
| parts = line.split() | |
| if parts[0] != 'SPEAKER': | |
| continue | |
| start = float(parts[3]) | |
| duration = float(parts[4]) | |
| speaker = parts[7] | |
| segments.append({ | |
| 'start': start, | |
| 'end': start + duration, | |
| 'duration': duration, | |
| 'speaker': speaker | |
| }) | |
| return segments | |
| def visualize_timeline( | |
| segments: List[Dict], | |
| duration: Optional[float] = None, | |
| width: int = 80 | |
| ) -> str: | |
| """ | |
| Create ASCII visualization of speaker timeline. | |
| Args: | |
| segments: List of segments | |
| duration: Total duration (auto-detect if None) | |
| width: Width of visualization | |
| Returns: | |
| ASCII timeline string | |
| """ | |
| if not segments: | |
| return "No segments to visualize" | |
| # Determine duration | |
| if duration is None: | |
| duration = max(seg['end'] for seg in segments) | |
| # Get unique speakers | |
| speakers = sorted(set(seg['speaker'] for seg in segments)) | |
| speaker_chars = {} | |
| chars = ['█', '▓', '▒', '░', '●', '○', '■', '□', '▪', '▫'] | |
| for i, speaker in enumerate(speakers): | |
| speaker_chars[speaker] = chars[i % len(chars)] | |
| # Create timeline | |
| lines = [] | |
| lines.append(f"\nTimeline (0.00s - {duration:.2f}s):") | |
| lines.append("─" * width) | |
| # Time markers | |
| time_line = "" | |
| for i in range(width): | |
| t = (i / width) * duration | |
| if i % 10 == 0: | |
| time_line += f"{t:.0f}s" | |
| time_line += " " * (10 - len(f"{t:.0f}s")) | |
| else: | |
| time_line += " " | |
| lines.append(time_line[:width]) | |
| # Speaker rows | |
| for speaker in speakers: | |
| row = [' '] * width | |
| for seg in segments: | |
| if seg['speaker'] == speaker: | |
| start_pos = int((seg['start'] / duration) * width) | |
| end_pos = int((seg['end'] / duration) * width) | |
| for i in range(start_pos, min(end_pos, width)): | |
| row[i] = speaker_chars[speaker] | |
| lines.append(f"{speaker}: {''.join(row)}") | |
| lines.append("─" * width) | |
| return "\n".join(lines) | |
| def export_results( | |
| result: Dict, | |
| output_dir: str, | |
| formats: List[str] = ['json', 'rttm', 'txt'] | |
| ): | |
| """ | |
| Export results in multiple formats. | |
| Args: | |
| result: Pipeline result | |
| output_dir: Output directory | |
| formats: List of formats to export | |
| """ | |
| output_path = Path(output_dir) | |
| output_path.mkdir(parents=True, exist_ok=True) | |
| base_name = Path(result['audio_path']).stem | |
| for fmt in formats: | |
| if fmt == 'json': | |
| # JSON format | |
| json_path = output_path / f"{base_name}.json" | |
| with open(json_path, 'w') as f: | |
| json.dump(result, f, indent=2) | |
| print(f"✓ Saved JSON: {json_path}") | |
| elif fmt == 'rttm': | |
| # RTTM format | |
| rttm_path = output_path / f"{base_name}.rttm" | |
| rttm_text = segment_to_rttm(result['speaker_segments'], base_name) | |
| with open(rttm_path, 'w') as f: | |
| f.write(rttm_text) | |
| print(f"✓ Saved RTTM: {rttm_path}") | |
| elif fmt == 'txt': | |
| # Text format | |
| txt_path = output_path / f"{base_name}.txt" | |
| lines = [] | |
| lines.append("="*60) | |
| lines.append("SPEAKER DIARIZATION RESULTS") | |
| lines.append("="*60) | |
| lines.append(f"\nFile: {result['audio_path']}") | |
| lines.append(f"Speakers: {result['metadata']['num_speakers']}") | |
| lines.append(f"Segments: {result['metadata']['num_segments']}") | |
| lines.append(f"\nTimeline:") | |
| lines.append("-"*60) | |
| for seg in result['speaker_segments']: | |
| lines.append(f"{seg['start']:7.2f}s - {seg['end']:7.2f}s: {seg['speaker']}") | |
| with open(txt_path, 'w') as f: | |
| f.write("\n".join(lines)) | |
| print(f"✓ Saved TXT: {txt_path}") | |
| def create_test_audio( | |
| output_path: str = "test_audio.wav", | |
| duration: float = 10.0, | |
| sampling_rate: int = 16000 | |
| ) -> str: | |
| """ | |
| Create synthetic test audio with speech-like patterns. | |
| Args: | |
| output_path: Output file path | |
| duration: Duration in seconds | |
| sampling_rate: Sample rate | |
| Returns: | |
| Path to created file | |
| """ | |
| import soundfile as sf | |
| # Generate audio | |
| t = np.linspace(0, duration, int(sampling_rate * duration)) | |
| # Create speech-like patterns with silence | |
| signal = np.zeros_like(t) | |
| # Calculate segment lengths | |
| seg1_len = min(int(sampling_rate*3), len(signal)) | |
| seg2_start = int(sampling_rate*4) | |
| seg2_end = min(int(sampling_rate*7), len(signal)) | |
| seg3_start = min(int(sampling_rate*8), len(signal)) | |
| # Speaker 1: 0-3s (or until end) | |
| if seg1_len > 0: | |
| signal[0:seg1_len] = 0.3 * np.sin(2 * np.pi * 440 * t[0:seg1_len]) | |
| # Silence: 3-4s | |
| # Speaker 2: 4-7s (or until end) | |
| if seg2_start < len(signal) and seg2_end > seg2_start: | |
| seg2_len = seg2_end - seg2_start | |
| signal[seg2_start:seg2_end] = 0.3 * np.sin(2 * np.pi * 880 * t[seg2_start:seg2_end]) | |
| # Silence: 7-8s | |
| # Speaker 1: 8-10s (or until end) | |
| if seg3_start < len(signal): | |
| signal[seg3_start:] = 0.3 * np.sin(2 * np.pi * 440 * t[seg3_start:]) | |
| # Add some noise | |
| signal += 0.01 * np.random.randn(len(signal)) | |
| # Save | |
| sf.write(output_path, signal, sampling_rate) | |
| return output_path | |
| if __name__ == "__main__": | |
| # Demo utilities | |
| print("Utility functions loaded") | |
| # Create test audio | |
| test_path = create_test_audio() | |
| print(f"✓ Created test audio: {test_path}") | |