zlaqa-version-c-ai-enginee / tests /performance_tests.py
anfastech's picture
New: implemented many, many changes. 10% Phone-level detection: WORKING
278e294
"""
Performance tests for speech pathology diagnosis system.
Tests latency requirements:
- File batch: <200ms per file
- Per-frame: <50ms
- WebSocket roundtrip: <100ms
"""
import time
import numpy as np
import logging
from pathlib import Path
import asyncio
from typing import Dict, List
logger = logging.getLogger(__name__)
def generate_test_audio(duration_seconds: float = 1.0, sample_rate: int = 16000) -> np.ndarray:
"""
Generate synthetic test audio.
Args:
duration_seconds: Duration in seconds
sample_rate: Sample rate in Hz
Returns:
Audio array
"""
num_samples = int(duration_seconds * sample_rate)
# Generate simple sine wave
t = np.linspace(0, duration_seconds, num_samples)
audio = 0.5 * np.sin(2 * np.pi * 440 * t) # 440 Hz tone
return audio.astype(np.float32)
def test_batch_latency(pipeline, num_files: int = 10) -> Dict[str, float]:
"""
Test batch file processing latency.
Args:
pipeline: InferencePipeline instance
num_files: Number of test files to process
Returns:
Dictionary with latency statistics
"""
logger.info(f"Testing batch latency with {num_files} files...")
latencies = []
for i in range(num_files):
# Generate test audio
audio = generate_test_audio(duration_seconds=1.0)
# Save to temp file
import tempfile
import soundfile as sf
with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f:
temp_path = f.name
sf.write(temp_path, audio, 16000)
try:
start_time = time.time()
result = pipeline.predict_phone_level(temp_path, return_timestamps=True)
latency_ms = (time.time() - start_time) * 1000
latencies.append(latency_ms)
logger.info(f" File {i+1}: {latency_ms:.1f}ms ({result.num_frames} frames)")
except Exception as e:
logger.error(f" File {i+1} failed: {e}")
finally:
import os
if os.path.exists(temp_path):
os.remove(temp_path)
if not latencies:
return {"error": "No successful runs"}
avg_latency = sum(latencies) / len(latencies)
max_latency = max(latencies)
min_latency = min(latencies)
result = {
"avg_latency_ms": avg_latency,
"max_latency_ms": max_latency,
"min_latency_ms": min_latency,
"num_files": len(latencies),
"target_ms": 200.0,
"passed": avg_latency < 200.0
}
logger.info(f"βœ… Batch latency test: avg={avg_latency:.1f}ms, max={max_latency:.1f}ms, "
f"target=200ms, passed={result['passed']}")
return result
def test_frame_latency(pipeline, num_frames: int = 100) -> Dict[str, float]:
"""
Test per-frame processing latency.
Args:
pipeline: InferencePipeline instance
num_frames: Number of frames to test
Returns:
Dictionary with latency statistics
"""
logger.info(f"Testing frame latency with {num_frames} frames...")
# Generate 1 second of audio (enough for one window)
audio = generate_test_audio(duration_seconds=1.0)
latencies = []
for i in range(num_frames):
start_time = time.time()
try:
result = pipeline.predict_phone_level(audio, return_timestamps=False)
latency_ms = (time.time() - start_time) * 1000
latencies.append(latency_ms)
except Exception as e:
logger.error(f" Frame {i+1} failed: {e}")
if not latencies:
return {"error": "No successful runs"}
avg_latency = sum(latencies) / len(latencies)
max_latency = max(latencies)
min_latency = min(latencies)
p95_latency = sorted(latencies)[int(len(latencies) * 0.95)]
result = {
"avg_latency_ms": avg_latency,
"max_latency_ms": max_latency,
"min_latency_ms": min_latency,
"p95_latency_ms": p95_latency,
"num_frames": len(latencies),
"target_ms": 50.0,
"passed": avg_latency < 50.0
}
logger.info(f"βœ… Frame latency test: avg={avg_latency:.1f}ms, p95={p95_latency:.1f}ms, "
f"target=50ms, passed={result['passed']}")
return result
async def test_websocket_latency(websocket_url: str, num_chunks: int = 50) -> Dict[str, float]:
"""
Test WebSocket streaming latency.
Args:
websocket_url: WebSocket URL
num_chunks: Number of chunks to send
Returns:
Dictionary with latency statistics
"""
try:
import websockets
logger.info(f"Testing WebSocket latency with {num_chunks} chunks...")
latencies = []
async with websockets.connect(websocket_url) as websocket:
# Generate test audio chunk (20ms @ 16kHz = 320 samples)
chunk_samples = 320
audio_chunk = generate_test_audio(duration_seconds=0.02)
chunk_bytes = (audio_chunk * 32768).astype(np.int16).tobytes()
for i in range(num_chunks):
start_time = time.time()
# Send chunk
await websocket.send(chunk_bytes)
# Receive response
response = await websocket.recv()
latency_ms = (time.time() - start_time) * 1000
latencies.append(latency_ms)
if i % 10 == 0:
logger.info(f" Chunk {i+1}: {latency_ms:.1f}ms")
if not latencies:
return {"error": "No successful runs"}
avg_latency = sum(latencies) / len(latencies)
max_latency = max(latencies)
p95_latency = sorted(latencies)[int(len(latencies) * 0.95)]
result = {
"avg_latency_ms": avg_latency,
"max_latency_ms": max_latency,
"p95_latency_ms": p95_latency,
"num_chunks": len(latencies),
"target_ms": 100.0,
"passed": avg_latency < 100.0
}
logger.info(f"βœ… WebSocket latency test: avg={avg_latency:.1f}ms, p95={p95_latency:.1f}ms, "
f"target=100ms, passed={result['passed']}")
return result
except ImportError:
logger.warning("websockets library not available, skipping WebSocket test")
return {"error": "websockets library not available"}
except Exception as e:
logger.error(f"WebSocket test failed: {e}")
return {"error": str(e)}
def test_concurrent_connections(pipeline, num_connections: int = 10) -> Dict[str, Any]:
"""
Test concurrent processing (simulated).
Args:
pipeline: InferencePipeline instance
num_connections: Number of concurrent requests
Returns:
Dictionary with results
"""
logger.info(f"Testing {num_connections} concurrent connections...")
import concurrent.futures
def process_audio(i: int):
try:
audio = generate_test_audio(duration_seconds=0.5)
start_time = time.time()
result = pipeline.predict_phone_level(audio, return_timestamps=False)
latency_ms = (time.time() - start_time) * 1000
return {"success": True, "latency_ms": latency_ms, "frames": result.num_frames}
except Exception as e:
return {"success": False, "error": str(e)}
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=num_connections) as executor:
futures = [executor.submit(process_audio, i) for i in range(num_connections)]
results = [f.result() for f in concurrent.futures.as_completed(futures)]
total_time = time.time() - start_time
successful = sum(1 for r in results if r.get("success", False))
avg_latency = sum(r["latency_ms"] for r in results if r.get("success", False)) / successful if successful > 0 else 0.0
result = {
"total_connections": num_connections,
"successful": successful,
"failed": num_connections - successful,
"total_time_seconds": total_time,
"avg_latency_ms": avg_latency,
"throughput_per_second": successful / total_time if total_time > 0 else 0.0
}
logger.info(f"βœ… Concurrent test: {successful}/{num_connections} successful, "
f"avg_latency={avg_latency:.1f}ms, throughput={result['throughput_per_second']:.1f}/s")
return result
def run_all_performance_tests(pipeline, websocket_url: Optional[str] = None) -> Dict[str, Any]:
"""
Run all performance tests.
Args:
pipeline: InferencePipeline instance
websocket_url: Optional WebSocket URL for streaming tests
Returns:
Dictionary with all test results
"""
logger.info("=" * 60)
logger.info("Running Performance Tests")
logger.info("=" * 60)
results = {}
# Test 1: Batch latency
logger.info("\n1. Batch File Latency Test")
results["batch_latency"] = test_batch_latency(pipeline)
# Test 2: Frame latency
logger.info("\n2. Per-Frame Latency Test")
results["frame_latency"] = test_frame_latency(pipeline)
# Test 3: Concurrent connections
logger.info("\n3. Concurrent Connections Test")
results["concurrent"] = test_concurrent_connections(pipeline, num_connections=10)
# Test 4: WebSocket latency (if URL provided)
if websocket_url:
logger.info("\n4. WebSocket Latency Test")
results["websocket_latency"] = asyncio.run(test_websocket_latency(websocket_url))
# Summary
logger.info("\n" + "=" * 60)
logger.info("Performance Test Summary")
logger.info("=" * 60)
if "batch_latency" in results and results["batch_latency"].get("passed"):
logger.info("βœ… Batch latency: PASSED")
else:
logger.warning("❌ Batch latency: FAILED")
if "frame_latency" in results and results["frame_latency"].get("passed"):
logger.info("βœ… Frame latency: PASSED")
else:
logger.warning("❌ Frame latency: FAILED")
if "websocket_latency" in results and results["websocket_latency"].get("passed"):
logger.info("βœ… WebSocket latency: PASSED")
elif "websocket_latency" in results:
logger.warning("❌ WebSocket latency: FAILED")
return results
if __name__ == "__main__":
# Example usage
logging.basicConfig(level=logging.INFO)
try:
from inference.inference_pipeline import create_inference_pipeline
pipeline = create_inference_pipeline()
results = run_all_performance_tests(pipeline)
print("\nTest Results:")
import json
print(json.dumps(results, indent=2))
except Exception as e:
logger.error(f"Test failed: {e}", exc_info=True)