| |
| """ |
| Test script for WebSocket retry logic and GPU cold start timing calibration. |
| Simulates the VoiceCal retry mechanism to validate timing patterns. |
| """ |
|
|
| import asyncio |
| import websockets |
| import json |
| import base64 |
| import tempfile |
| import wave |
| import numpy as np |
| import time |
| import ssl |
| from datetime import datetime |
|
|
| async def test_retry_calibration(): |
| """Test the WebSocket retry logic with comprehensive timing calibration.""" |
| print(f"π― RETRY CALIBRATION TEST - {datetime.now().strftime('%H:%M:%S')}") |
| print("=" * 60) |
| |
| |
| sample_rate = 16000 |
| duration = 1.0 |
| samples = int(sample_rate * duration) |
| audio_data = np.zeros(samples, dtype=np.int16) |
| |
| with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as temp_file: |
| temp_path = temp_file.name |
| |
| with wave.open(temp_path, 'wb') as wav_file: |
| wav_file.setnchannels(1) |
| wav_file.setsampwidth(2) |
| wav_file.setframerate(sample_rate) |
| wav_file.writeframes(audio_data.tobytes()) |
| |
| |
| with open(temp_path, 'rb') as f: |
| audio_bytes = f.read() |
| audio_base64 = base64.b64encode(audio_bytes).decode('utf-8') |
| |
| print(f"π΅ Test audio: {len(audio_bytes)} bytes, base64: {len(audio_base64)} chars") |
| |
| |
| urls_to_test = [ |
| "wss://pgits-stt-gpu-service.hf.space/ws/stt", |
| "ws://localhost:7860/ws/stt" |
| ] |
| |
| for url in urls_to_test: |
| print(f"\nπ TESTING URL: {url}") |
| await test_websocket_with_comprehensive_retry(url, audio_base64) |
|
|
| async def test_websocket_with_comprehensive_retry(ws_url: str, audio_base64: str): |
| """Replicate the VoiceCal retry logic with timing calibration.""" |
| |
| |
| max_retries = 8 |
| base_delay = 3.0 |
| max_delay = 45.0 |
| backoff_multiplier = 1.4 |
| connection_timeout = 30.0 |
| |
| |
| cold_start_threshold = 30.0 |
| warm_service_threshold = 8.0 |
| gpu_loading_threshold = 60.0 |
| |
| start_time = time.time() |
| |
| for attempt in range(1, max_retries + 1): |
| attempt_start = time.time() |
| elapsed_total = attempt_start - start_time |
| |
| |
| if attempt > 1: |
| delay = min(base_delay * (backoff_multiplier ** (attempt - 2)), max_delay) |
| print(f"β° RETRY CALIBRATION: Waiting {delay:.1f}s before attempt {attempt}/{max_retries} (elapsed: {elapsed_total:.1f}s)") |
| await asyncio.sleep(delay) |
| |
| try: |
| print(f"π€ WebSocket STT: Attempt {attempt}/{max_retries} - Connecting to {ws_url}") |
| |
| |
| ssl_context = ssl.create_default_context() |
| ssl_context.check_hostname = False |
| ssl_context.verify_mode = ssl.CERT_NONE |
| |
| connect_kwargs = { |
| "ping_interval": None, |
| "ping_timeout": 20, |
| "close_timeout": 10, |
| "max_size": 10 * 1024 * 1024, |
| "compression": None |
| } |
| |
| if ws_url.startswith("wss://"): |
| connect_kwargs["ssl"] = ssl_context |
| |
| |
| connection_start = time.time() |
| async with asyncio.timeout(connection_timeout): |
| async with websockets.connect(ws_url, **connect_kwargs) as websocket: |
| connection_time = time.time() - connection_start |
| total_elapsed = time.time() - start_time |
| |
| |
| if connection_time < warm_service_threshold: |
| print(f"β‘ TIMING CALIBRATION: Fast connection ({connection_time:.1f}s) - Service was warm") |
| elif connection_time < cold_start_threshold: |
| print(f"π TIMING CALIBRATION: Normal connection ({connection_time:.1f}s) - Service warming up") |
| else: |
| print(f"βοΈ TIMING CALIBRATION: Slow connection ({connection_time:.1f}s) - Cold start detected") |
| |
| if total_elapsed > gpu_loading_threshold: |
| print(f"π TIMING CALIBRATION: Long total wait ({total_elapsed:.1f}s) - GPU loading issues") |
| |
| print(f"π RETRY SUCCESS: Connected on attempt {attempt}/{max_retries} after {total_elapsed:.1f}s total") |
| |
| |
| try: |
| confirm_response = await asyncio.wait_for(websocket.recv(), timeout=10.0) |
| confirm_result = json.loads(confirm_response) |
| print(f"β
Connection confirmed: {confirm_result.get('type', 'unknown')}") |
| |
| if confirm_result.get("type") == "stt_connection_confirmed": |
| |
| message = { |
| "type": "stt_audio_chunk", |
| "audio_data": audio_base64, |
| "language": "auto", |
| "model_size": "base", |
| "is_final": True |
| } |
| |
| await websocket.send(json.dumps(message)) |
| print(f"π€ Sent audio chunk") |
| |
| |
| response = await asyncio.wait_for(websocket.recv(), timeout=30.0) |
| result = json.loads(response) |
| |
| if result.get("type") == "stt_transcription": |
| print(f"π― TRANSCRIPTION SUCCESS: '{result.get('text', 'NO_TEXT')}'") |
| print(f"π Processing time: {result.get('processing_time', 0):.1f}s") |
| return |
| else: |
| print(f"β Unexpected response: {result.get('type', 'unknown')}") |
| |
| except asyncio.TimeoutError: |
| print(f"β° Connection confirmation timeout") |
| except json.JSONDecodeError as e: |
| print(f"π JSON decode error: {e}") |
| |
| return |
| |
| except websockets.exceptions.InvalidStatusCode as e: |
| attempt_time = time.time() - attempt_start |
| status_code = getattr(e, 'status_code', 'unknown') |
| print(f"π€ RETRY: HTTP {status_code} (attempt {attempt}/{max_retries}, {attempt_time:.1f}s)") |
| |
| |
| if status_code == 503: |
| print(f"π STATUS CALIBRATION: HTTP 503 - Service temporarily unavailable (cold starting)") |
| elif status_code == 403: |
| print(f"π« STATUS CALIBRATION: HTTP 403 - Service forbidden (WebSocket not available)") |
| elif status_code == 502: |
| print(f"β οΈ STATUS CALIBRATION: HTTP 502 - Bad gateway (service deployment issue)") |
| else: |
| print(f"β STATUS CALIBRATION: HTTP {status_code} - Unknown service state") |
| |
| except asyncio.TimeoutError: |
| attempt_time = time.time() - attempt_start |
| print(f"π€ RETRY: Timeout after {attempt_time:.1f}s (attempt {attempt}/{max_retries})") |
| print(f"β° TIMEOUT CALIBRATION: Service taking >{connection_timeout}s indicates severe cold start") |
| |
| except ConnectionRefusedError as e: |
| attempt_time = time.time() - attempt_start |
| print(f"π€ RETRY: Connection refused (attempt {attempt}/{max_retries}, {attempt_time:.1f}s)") |
| print(f"π« CONNECTION CALIBRATION: Immediate refusal indicates service not listening") |
| |
| except Exception as e: |
| attempt_time = time.time() - attempt_start |
| print(f"π€ RETRY: Error (attempt {attempt}/{max_retries}, {attempt_time:.1f}s): {e}") |
| print(f"π Exception type: {type(e).__name__}") |
| |
| |
| total_time = time.time() - start_time |
| print(f"π€ RETRY EXHAUSTED: Failed after {max_retries} attempts over {total_time:.1f}s") |
| |
| |
| if total_time > gpu_loading_threshold: |
| print(f"π CALIBRATION SUMMARY: Very slow ({total_time:.1f}s) - GPU loading or deployment issues") |
| elif total_time > cold_start_threshold: |
| print(f"βοΈ CALIBRATION SUMMARY: Slow ({total_time:.1f}s) - Cold start confirmed") |
| else: |
| print(f"β‘ CALIBRATION SUMMARY: Fast failure ({total_time:.1f}s) - Service configuration issue") |
|
|
| if __name__ == "__main__": |
| asyncio.run(test_retry_calibration()) |