Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Benchmark script for VAD + Speaker Diarization | |
| Tests performance on various audio conditions | |
| """ | |
| import sys | |
| from pathlib import Path | |
| sys.path.insert(0, str(Path(__file__).parent.parent)) | |
| import time | |
| import json | |
| import numpy as np | |
| from typing import Dict, List | |
| import argparse | |
| from src.vad import SileroVAD | |
| from src.pipeline import VADDiarizationPipeline | |
| from src.utils import create_test_audio | |
| class Benchmark: | |
| """Benchmark suite for VAD + Diarization.""" | |
| def __init__(self, use_auth_token: str = None): | |
| """Initialize benchmark.""" | |
| self.use_auth_token = use_auth_token | |
| self.results = {} | |
| def benchmark_vad_latency(self, durations: List[float] = [1, 5, 10, 30, 60]): | |
| """Benchmark VAD latency across different audio durations.""" | |
| print("\n" + "="*60) | |
| print("VAD LATENCY BENCHMARK") | |
| print("="*60) | |
| vad = SileroVAD(threshold=0.5) | |
| results = [] | |
| for duration in durations: | |
| print(f"\nTesting {duration}s audio...") | |
| metrics = vad.benchmark_latency(duration_seconds=duration) | |
| result = { | |
| 'duration_s': duration, | |
| 'processing_time_ms': metrics['total_processing_time_ms'], | |
| 'latency_per_second_ms': metrics['latency_per_second_ms'], | |
| 'real_time_factor': metrics['real_time_factor'] | |
| } | |
| results.append(result) | |
| print(f" Processing time: {result['processing_time_ms']:.2f}ms") | |
| print(f" Latency/second: {result['latency_per_second_ms']:.2f}ms") | |
| print(f" Real-time factor: {result['real_time_factor']:.4f}x") | |
| # Check target | |
| if result['latency_per_second_ms'] < 100: | |
| print(" ✅ Target achieved (<100ms)") | |
| else: | |
| print(" ⚠️ Above target (>100ms)") | |
| self.results['vad_latency'] = results | |
| # Summary | |
| avg_latency = np.mean([r['latency_per_second_ms'] for r in results]) | |
| print(f"\n📊 Average latency: {avg_latency:.2f}ms per second") | |
| return results | |
| def benchmark_vad_thresholds(self, thresholds: List[float] = [0.3, 0.5, 0.7]): | |
| """Benchmark VAD with different sensitivity thresholds.""" | |
| print("\n" + "="*60) | |
| print("VAD THRESHOLD BENCHMARK") | |
| print("="*60) | |
| # Create test audio | |
| test_audio = create_test_audio("test_threshold.wav", duration=10.0) | |
| results = [] | |
| for threshold in thresholds: | |
| print(f"\nTesting threshold {threshold}...") | |
| vad = SileroVAD(threshold=threshold) | |
| timestamps, processing_time = vad.process_file(test_audio) | |
| result = { | |
| 'threshold': threshold, | |
| 'num_segments': len(timestamps), | |
| 'processing_time_ms': processing_time, | |
| 'total_speech_time_s': sum(ts['end'] - ts['start'] for ts in timestamps) | |
| } | |
| results.append(result) | |
| print(f" Segments detected: {result['num_segments']}") | |
| print(f" Total speech time: {result['total_speech_time_s']:.2f}s") | |
| print(f" Processing time: {result['processing_time_ms']:.2f}ms") | |
| self.results['vad_thresholds'] = results | |
| # Cleanup | |
| Path(test_audio).unlink(missing_ok=True) | |
| return results | |
| def benchmark_full_pipeline(self): | |
| """Benchmark full VAD + Diarization pipeline.""" | |
| print("\n" + "="*60) | |
| print("FULL PIPELINE BENCHMARK") | |
| print("="*60) | |
| if not self.use_auth_token: | |
| print("⚠️ No HF_TOKEN provided, skipping full pipeline benchmark") | |
| return None | |
| try: | |
| # Initialize pipeline | |
| print("\nInitializing pipeline...") | |
| pipeline = VADDiarizationPipeline( | |
| use_auth_token=self.use_auth_token, | |
| vad_threshold=0.5 | |
| ) | |
| # Create test audio | |
| test_audio = create_test_audio("test_pipeline.wav", duration=30.0) | |
| # Process | |
| print(f"\nProcessing {test_audio}...") | |
| result = pipeline.process_file(test_audio) | |
| benchmark_result = { | |
| 'audio_duration_s': 30.0, | |
| 'vad_time_ms': result['processing_time']['vad_ms'], | |
| 'diarization_time_ms': result['processing_time']['diarization_ms'], | |
| 'total_time_ms': result['processing_time']['total_ms'], | |
| 'num_speakers': result['metadata']['num_speakers'], | |
| 'num_segments': result['metadata']['num_segments'] | |
| } | |
| print(f"\n📊 Results:") | |
| print(f" VAD time: {benchmark_result['vad_time_ms']:.2f}ms") | |
| print(f" Diarization time: {benchmark_result['diarization_time_ms']:.2f}ms") | |
| print(f" Total time: {benchmark_result['total_time_ms']:.2f}ms") | |
| print(f" Speakers: {benchmark_result['num_speakers']}") | |
| print(f" Segments: {benchmark_result['num_segments']}") | |
| self.results['full_pipeline'] = benchmark_result | |
| # Cleanup | |
| Path(test_audio).unlink(missing_ok=True) | |
| return benchmark_result | |
| except Exception as e: | |
| print(f"❌ Error: {e}") | |
| return None | |
| def benchmark_memory_usage(self): | |
| """Benchmark memory usage.""" | |
| print("\n" + "="*60) | |
| print("MEMORY USAGE BENCHMARK") | |
| print("="*60) | |
| import psutil | |
| import torch | |
| process = psutil.Process() | |
| # Initial memory | |
| initial_mem = process.memory_info().rss / 1024 / 1024 # MB | |
| print(f"\nInitial memory: {initial_mem:.2f} MB") | |
| # Load VAD | |
| print("\nLoading VAD...") | |
| vad = SileroVAD() | |
| vad_mem = process.memory_info().rss / 1024 / 1024 | |
| print(f"After VAD: {vad_mem:.2f} MB (+{vad_mem - initial_mem:.2f} MB)") | |
| # GPU memory (if available) | |
| if torch.cuda.is_available(): | |
| gpu_mem = torch.cuda.memory_allocated() / 1024 / 1024 | |
| print(f"GPU memory: {gpu_mem:.2f} MB") | |
| result = { | |
| 'initial_memory_mb': initial_mem, | |
| 'vad_memory_mb': vad_mem, | |
| 'vad_increase_mb': vad_mem - initial_mem | |
| } | |
| if torch.cuda.is_available(): | |
| result['gpu_memory_mb'] = gpu_mem | |
| self.results['memory_usage'] = result | |
| return result | |
| def save_results(self, output_path: str = "benchmark_results.json"): | |
| """Save benchmark results to file.""" | |
| output_file = Path(__file__).parent / output_path | |
| with open(output_file, 'w') as f: | |
| json.dump(self.results, f, indent=2) | |
| print(f"\n✓ Results saved to: {output_file}") | |
| def run_all(self): | |
| """Run all benchmarks.""" | |
| print("\n" + "="*60) | |
| print("RUNNING ALL BENCHMARKS") | |
| print("="*60) | |
| # VAD latency | |
| self.benchmark_vad_latency() | |
| # VAD thresholds | |
| self.benchmark_vad_thresholds() | |
| # Memory usage | |
| self.benchmark_memory_usage() | |
| # Full pipeline (if token available) | |
| if self.use_auth_token: | |
| self.benchmark_full_pipeline() | |
| # Save results | |
| self.save_results() | |
| print("\n" + "="*60) | |
| print("✅ ALL BENCHMARKS COMPLETE") | |
| print("="*60) | |
| def main(): | |
| """Main benchmark runner.""" | |
| parser = argparse.ArgumentParser(description="Run VAD + Diarization benchmarks") | |
| parser.add_argument( | |
| '--token', | |
| type=str, | |
| default=None, | |
| help='Hugging Face token for full pipeline benchmark' | |
| ) | |
| parser.add_argument( | |
| '--output', | |
| type=str, | |
| default='benchmark_results.json', | |
| help='Output file for results' | |
| ) | |
| parser.add_argument( | |
| '--quick', | |
| action='store_true', | |
| help='Run quick benchmark (VAD only)' | |
| ) | |
| args = parser.parse_args() | |
| # Get token from args or environment | |
| token = args.token or os.environ.get('HF_TOKEN') | |
| # Initialize benchmark | |
| benchmark = Benchmark(use_auth_token=token) | |
| if args.quick: | |
| # Quick benchmark (VAD only) | |
| benchmark.benchmark_vad_latency(durations=[1, 5, 10]) | |
| benchmark.save_results(args.output) | |
| else: | |
| # Full benchmark suite | |
| benchmark.run_all() | |
| if __name__ == "__main__": | |
| import os | |
| main() | |