""" Performance tests for speech pathology diagnosis system. Tests latency requirements: - File batch: <200ms per file - Per-frame: <50ms - WebSocket roundtrip: <100ms """ import time import numpy as np import logging from pathlib import Path import asyncio from typing import Dict, List logger = logging.getLogger(__name__) def generate_test_audio(duration_seconds: float = 1.0, sample_rate: int = 16000) -> np.ndarray: """ Generate synthetic test audio. Args: duration_seconds: Duration in seconds sample_rate: Sample rate in Hz Returns: Audio array """ num_samples = int(duration_seconds * sample_rate) # Generate simple sine wave t = np.linspace(0, duration_seconds, num_samples) audio = 0.5 * np.sin(2 * np.pi * 440 * t) # 440 Hz tone return audio.astype(np.float32) def test_batch_latency(pipeline, num_files: int = 10) -> Dict[str, float]: """ Test batch file processing latency. Args: pipeline: InferencePipeline instance num_files: Number of test files to process Returns: Dictionary with latency statistics """ logger.info(f"Testing batch latency with {num_files} files...") latencies = [] for i in range(num_files): # Generate test audio audio = generate_test_audio(duration_seconds=1.0) # Save to temp file import tempfile import soundfile as sf with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f: temp_path = f.name sf.write(temp_path, audio, 16000) try: start_time = time.time() result = pipeline.predict_phone_level(temp_path, return_timestamps=True) latency_ms = (time.time() - start_time) * 1000 latencies.append(latency_ms) logger.info(f" File {i+1}: {latency_ms:.1f}ms ({result.num_frames} frames)") except Exception as e: logger.error(f" File {i+1} failed: {e}") finally: import os if os.path.exists(temp_path): os.remove(temp_path) if not latencies: return {"error": "No successful runs"} avg_latency = sum(latencies) / len(latencies) max_latency = max(latencies) min_latency = min(latencies) result = { "avg_latency_ms": avg_latency, "max_latency_ms": max_latency, "min_latency_ms": min_latency, "num_files": len(latencies), "target_ms": 200.0, "passed": avg_latency < 200.0 } logger.info(f"✅ Batch latency test: avg={avg_latency:.1f}ms, max={max_latency:.1f}ms, " f"target=200ms, passed={result['passed']}") return result def test_frame_latency(pipeline, num_frames: int = 100) -> Dict[str, float]: """ Test per-frame processing latency. Args: pipeline: InferencePipeline instance num_frames: Number of frames to test Returns: Dictionary with latency statistics """ logger.info(f"Testing frame latency with {num_frames} frames...") # Generate 1 second of audio (enough for one window) audio = generate_test_audio(duration_seconds=1.0) latencies = [] for i in range(num_frames): start_time = time.time() try: result = pipeline.predict_phone_level(audio, return_timestamps=False) latency_ms = (time.time() - start_time) * 1000 latencies.append(latency_ms) except Exception as e: logger.error(f" Frame {i+1} failed: {e}") if not latencies: return {"error": "No successful runs"} avg_latency = sum(latencies) / len(latencies) max_latency = max(latencies) min_latency = min(latencies) p95_latency = sorted(latencies)[int(len(latencies) * 0.95)] result = { "avg_latency_ms": avg_latency, "max_latency_ms": max_latency, "min_latency_ms": min_latency, "p95_latency_ms": p95_latency, "num_frames": len(latencies), "target_ms": 50.0, "passed": avg_latency < 50.0 } logger.info(f"✅ Frame latency test: avg={avg_latency:.1f}ms, p95={p95_latency:.1f}ms, " f"target=50ms, passed={result['passed']}") return result async def test_websocket_latency(websocket_url: str, num_chunks: int = 50) -> Dict[str, float]: """ Test WebSocket streaming latency. Args: websocket_url: WebSocket URL num_chunks: Number of chunks to send Returns: Dictionary with latency statistics """ try: import websockets logger.info(f"Testing WebSocket latency with {num_chunks} chunks...") latencies = [] async with websockets.connect(websocket_url) as websocket: # Generate test audio chunk (20ms @ 16kHz = 320 samples) chunk_samples = 320 audio_chunk = generate_test_audio(duration_seconds=0.02) chunk_bytes = (audio_chunk * 32768).astype(np.int16).tobytes() for i in range(num_chunks): start_time = time.time() # Send chunk await websocket.send(chunk_bytes) # Receive response response = await websocket.recv() latency_ms = (time.time() - start_time) * 1000 latencies.append(latency_ms) if i % 10 == 0: logger.info(f" Chunk {i+1}: {latency_ms:.1f}ms") if not latencies: return {"error": "No successful runs"} avg_latency = sum(latencies) / len(latencies) max_latency = max(latencies) p95_latency = sorted(latencies)[int(len(latencies) * 0.95)] result = { "avg_latency_ms": avg_latency, "max_latency_ms": max_latency, "p95_latency_ms": p95_latency, "num_chunks": len(latencies), "target_ms": 100.0, "passed": avg_latency < 100.0 } logger.info(f"✅ WebSocket latency test: avg={avg_latency:.1f}ms, p95={p95_latency:.1f}ms, " f"target=100ms, passed={result['passed']}") return result except ImportError: logger.warning("websockets library not available, skipping WebSocket test") return {"error": "websockets library not available"} except Exception as e: logger.error(f"WebSocket test failed: {e}") return {"error": str(e)} def test_concurrent_connections(pipeline, num_connections: int = 10) -> Dict[str, Any]: """ Test concurrent processing (simulated). Args: pipeline: InferencePipeline instance num_connections: Number of concurrent requests Returns: Dictionary with results """ logger.info(f"Testing {num_connections} concurrent connections...") import concurrent.futures def process_audio(i: int): try: audio = generate_test_audio(duration_seconds=0.5) start_time = time.time() result = pipeline.predict_phone_level(audio, return_timestamps=False) latency_ms = (time.time() - start_time) * 1000 return {"success": True, "latency_ms": latency_ms, "frames": result.num_frames} except Exception as e: return {"success": False, "error": str(e)} start_time = time.time() with concurrent.futures.ThreadPoolExecutor(max_workers=num_connections) as executor: futures = [executor.submit(process_audio, i) for i in range(num_connections)] results = [f.result() for f in concurrent.futures.as_completed(futures)] total_time = time.time() - start_time successful = sum(1 for r in results if r.get("success", False)) avg_latency = sum(r["latency_ms"] for r in results if r.get("success", False)) / successful if successful > 0 else 0.0 result = { "total_connections": num_connections, "successful": successful, "failed": num_connections - successful, "total_time_seconds": total_time, "avg_latency_ms": avg_latency, "throughput_per_second": successful / total_time if total_time > 0 else 0.0 } logger.info(f"✅ Concurrent test: {successful}/{num_connections} successful, " f"avg_latency={avg_latency:.1f}ms, throughput={result['throughput_per_second']:.1f}/s") return result def run_all_performance_tests(pipeline, websocket_url: Optional[str] = None) -> Dict[str, Any]: """ Run all performance tests. Args: pipeline: InferencePipeline instance websocket_url: Optional WebSocket URL for streaming tests Returns: Dictionary with all test results """ logger.info("=" * 60) logger.info("Running Performance Tests") logger.info("=" * 60) results = {} # Test 1: Batch latency logger.info("\n1. Batch File Latency Test") results["batch_latency"] = test_batch_latency(pipeline) # Test 2: Frame latency logger.info("\n2. Per-Frame Latency Test") results["frame_latency"] = test_frame_latency(pipeline) # Test 3: Concurrent connections logger.info("\n3. Concurrent Connections Test") results["concurrent"] = test_concurrent_connections(pipeline, num_connections=10) # Test 4: WebSocket latency (if URL provided) if websocket_url: logger.info("\n4. WebSocket Latency Test") results["websocket_latency"] = asyncio.run(test_websocket_latency(websocket_url)) # Summary logger.info("\n" + "=" * 60) logger.info("Performance Test Summary") logger.info("=" * 60) if "batch_latency" in results and results["batch_latency"].get("passed"): logger.info("✅ Batch latency: PASSED") else: logger.warning("❌ Batch latency: FAILED") if "frame_latency" in results and results["frame_latency"].get("passed"): logger.info("✅ Frame latency: PASSED") else: logger.warning("❌ Frame latency: FAILED") if "websocket_latency" in results and results["websocket_latency"].get("passed"): logger.info("✅ WebSocket latency: PASSED") elif "websocket_latency" in results: logger.warning("❌ WebSocket latency: FAILED") return results if __name__ == "__main__": # Example usage logging.basicConfig(level=logging.INFO) try: from inference.inference_pipeline import create_inference_pipeline pipeline = create_inference_pipeline() results = run_all_performance_tests(pipeline) print("\nTest Results:") import json print(json.dumps(results, indent=2)) except Exception as e: logger.error(f"Test failed: {e}", exc_info=True)