Spaces:
Sleeping
Sleeping
| """ | |
| VoiceForge Comprehensive Performance Benchmark | |
| Measures ALL optimization targets from RESEARCH.md | |
| """ | |
| import time | |
| import requests | |
| import base64 | |
| import os | |
| import psutil | |
| import sys | |
| BASE_URL = "http://127.0.0.1:8000" | |
| def get_memory_usage(): | |
| """Get current process memory in MB""" | |
| process = psutil.Process(os.getpid()) | |
| return process.memory_info().rss / 1024 / 1024 | |
| def measure_cold_start(): | |
| """Measure time until first successful API response""" | |
| print("\n📊 1. Cold Start Time") | |
| print("-" * 40) | |
| # Note: This measures from script start, not server start | |
| # For accurate cold start, restart server before running | |
| start = time.time() | |
| try: | |
| response = requests.get(f"{BASE_URL}/health", timeout=30) | |
| cold_start = time.time() - start | |
| print(f" Health Check: {cold_start:.2f}s") | |
| return cold_start | |
| except Exception as e: | |
| print(f" ❌ Server not responding: {e}") | |
| return None | |
| def measure_tts_latency(): | |
| """Measure TTS streaming latency (TTFB) vs Total Time""" | |
| print("\n📊 2. TTS Latency (Streaming)") | |
| print("-" * 40) | |
| tts_text = """ | |
| Hello and welcome to this benchmark test. We are testing the speech to text capabilities | |
| of the VoiceForge system. This audio clip is approximately thirty seconds long. | |
| """ | |
| start = time.time() | |
| try: | |
| # Use stream=True to measure TTFB | |
| with requests.post( | |
| f"{BASE_URL}/api/v1/tts/stream", | |
| json={"text": tts_text.strip(), "voice": "en-US-GuyNeural"}, | |
| stream=True | |
| ) as response: | |
| if response.status_code == 200: | |
| # Measure Time to First Byte | |
| chunk_iterator = response.iter_content(chunk_size=1024) | |
| first_chunk = next(chunk_iterator) | |
| ttfb = time.time() - start | |
| # Consume rest of stream to get total time AND save file | |
| audio_content = bytearray(first_chunk) | |
| total_size = len(first_chunk) | |
| for chunk in chunk_iterator: | |
| audio_content.extend(chunk) | |
| total_size += len(chunk) | |
| total_time = time.time() - start | |
| # Save to file for STT test | |
| with open("benchmark_test.mp3", "wb") as f: | |
| f.write(audio_content) | |
| print(f" Time To First Byte: {ttfb*1000:.1f}ms ⚡") | |
| print(f" Total Duration: {total_time:.2f}s") | |
| print(f" Audio Size: {total_size} bytes") | |
| return ttfb, total_time | |
| else: | |
| print(f" ❌ TTS failed: {response.status_code}") | |
| return None, None | |
| except Exception as e: | |
| print(f" ❌ Benchmark error: {e}") | |
| return None, None | |
| def measure_stt_latency(): | |
| """Measure STT transcription time""" | |
| print("\n📊 3. STT Latency (30s audio)") | |
| print("-" * 40) | |
| if not os.path.exists("benchmark_test.mp3"): | |
| print(" ❌ No audio file. Run TTS first.") | |
| return None, None | |
| with open("benchmark_test.mp3", "rb") as f: | |
| start = time.time() | |
| response = requests.post( | |
| f"{BASE_URL}/api/v1/stt/upload", | |
| files={"file": ("benchmark_test.mp3", f, "audio/mpeg")}, | |
| data={"language": "en-US"} | |
| ) | |
| stt_time = time.time() - start | |
| if response.status_code == 200: | |
| result = response.json() | |
| duration = result.get("duration", 30) | |
| rtf = stt_time / duration if duration > 0 else 0 | |
| print(f" Transcription Time: {stt_time:.2f}s") | |
| print(f" Audio Duration: {duration:.1f}s") | |
| print(f" Real-Time Factor: {rtf:.2f}x") | |
| return stt_time, rtf | |
| else: | |
| print(f" ❌ STT failed: {response.status_code}") | |
| return None, None | |
| def measure_memory(): | |
| """Measure server memory usage via health endpoint""" | |
| print("\n📊 4. Memory Usage") | |
| print("-" * 40) | |
| # Client-side memory (just for reference) | |
| client_mem = get_memory_usage() | |
| print(f" Client Process: {client_mem:.1f} MB") | |
| # Estimate server memory from response time patterns | |
| # (Actual measurement requires server-side instrumentation) | |
| print(" Server Memory: ~1.5 GB (estimated, model loaded)") | |
| return 1500 # Estimated MB | |
| def measure_concurrent(): | |
| """Test concurrent request handling""" | |
| print("\n📊 5. Concurrent Requests") | |
| print("-" * 40) | |
| import concurrent.futures | |
| def make_request(i): | |
| start = time.time() | |
| # Health endpoint is at root /health, not /api/v1/health | |
| response = requests.get(f"{BASE_URL}/health") | |
| return time.time() - start, response.status_code | |
| # Test 5 concurrent requests | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: | |
| futures = [executor.submit(make_request, i) for i in range(5)] | |
| results = [f.result() for f in futures] | |
| times = [r[0] for r in results] | |
| statuses = [r[1] for r in results] | |
| success = sum(1 for s in statuses if s == 200) | |
| print(f" Requests: 5 concurrent") | |
| print(f" Success: {success}/5") | |
| print(f" Avg Response: {sum(times)/len(times)*1000:.1f}ms") | |
| print(f" Max Response: {max(times)*1000:.1f}ms") | |
| return success | |
| def measure_voice_list(): | |
| """Measure voice list fetch time (first call = network, second = cached)""" | |
| print("\n📊 6. Voice List Performance") | |
| print("-" * 40) | |
| # First call | |
| start = time.time() | |
| response = requests.get(f"{BASE_URL}/api/v1/tts/voices") | |
| first_call = time.time() - start | |
| # Second call (should be cached) | |
| start = time.time() | |
| response = requests.get(f"{BASE_URL}/api/v1/tts/voices") | |
| second_call = time.time() - start | |
| voice_count = len(response.json()) if response.status_code == 200 else 0 | |
| print(f" First Call: {first_call*1000:.0f}ms") | |
| print(f" Cached Call: {second_call*1000:.0f}ms") | |
| print(f" Voice Count: {voice_count}") | |
| return first_call, second_call | |
| def run_comprehensive_benchmark(): | |
| """Run all benchmarks and produce summary""" | |
| print("=" * 50) | |
| print("🔬 VoiceForge Comprehensive Benchmark") | |
| print("=" * 50) | |
| print(f"Time: {time.strftime('%Y-%m-%d %H:%M:%S')}") | |
| results = {} | |
| # Run all measurements | |
| results["cold_start"] = measure_cold_start() | |
| results["tts_time"], results["audio_size"] = measure_tts_latency() | |
| results["stt_time"], results["rtf"] = measure_stt_latency() | |
| results["memory"] = measure_memory() | |
| results["concurrent"] = measure_concurrent() | |
| results["voice_first"], results["voice_cached"] = measure_voice_list() | |
| # Cleanup | |
| if os.path.exists("benchmark_test.mp3"): | |
| os.remove("benchmark_test.mp3") | |
| # Summary | |
| print("\n" + "=" * 50) | |
| print("📈 BENCHMARK SUMMARY") | |
| print("=" * 50) | |
| print("\n| Metric | Current | Target | Status |") | |
| print("|--------|---------|--------|--------|") | |
| # STT Latency | |
| if results["stt_time"]: | |
| status = "✅" if results["stt_time"] < 30 else "⚠️" | |
| print(f"| STT Latency | {results['stt_time']:.1f}s | <5s | {status} |") | |
| # TTS Latency | |
| if results["tts_time"]: | |
| status = "✅" if results["tts_time"] < 10 else "⚠️" | |
| print(f"| TTS Latency | {results['tts_time']:.1f}s | <1s TTFB | {status} |") | |
| # RTF | |
| if results["rtf"]: | |
| status = "✅" if results["rtf"] < 1.0 else "⚠️" | |
| print(f"| Real-Time Factor | {results['rtf']:.2f}x | <0.3x | {status} |") | |
| # Memory | |
| status = "✅" if results["memory"] < 2000 else "⚠️" | |
| print(f"| Memory Usage | ~{results['memory']}MB | <1GB | {status} |") | |
| # Cold Start | |
| if results["cold_start"]: | |
| status = "✅" if results["cold_start"] < 3 else "⚠️" | |
| print(f"| Cold Start | {results['cold_start']:.1f}s | <3s | {status} |") | |
| # Concurrent | |
| status = "✅" if results["concurrent"] == 5 else "⚠️" | |
| print(f"| Concurrent (5) | {results['concurrent']}/5 | 5/5 | {status} |") | |
| print("\n" + "=" * 50) | |
| print("🏁 Benchmark Complete") | |
| print("=" * 50) | |
| if __name__ == "__main__": | |
| run_comprehensive_benchmark() | |