Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Comprehensive diagnostic test to pinpoint exactly where the STT pipeline is failing | |
| """ | |
| import asyncio | |
| import websockets | |
| import json | |
| import ssl | |
| import base64 | |
| import numpy as np | |
| import time | |
| class STTDiagnostic: | |
| def __init__(self, uri="wss://pgits-stt-gpu-service-v3.hf.space/ws"): | |
| self.uri = uri | |
| self.ssl_context = ssl.create_default_context() | |
| self.ssl_context.check_hostname = False | |
| self.ssl_context.verify_mode = ssl.CERT_NONE | |
| async def test_step_by_step(self): | |
| """Test each step of the STT pipeline systematically""" | |
| print("π COMPREHENSIVE STT DIAGNOSTIC TEST") | |
| print("=" * 50) | |
| # STEP 0: Check server health first | |
| print("\nπ₯ STEP 0: Checking server health...") | |
| try: | |
| import requests | |
| health_response = requests.get("https://pgits-stt-gpu-service-v3.hf.space/health", timeout=5) | |
| health_data = health_response.json() | |
| print(f"π Server health: {health_data}") | |
| if health_data.get("rust_server") != "ready": | |
| print(f"β οΈ WARNING: Rust server status is '{health_data.get('rust_server')}', not 'ready'") | |
| print("This explains why WebSocket connections might fail") | |
| except Exception as e: | |
| print(f"β Health check failed: {e}") | |
| try: | |
| # STEP 1: Test connection | |
| print("\nπ‘ STEP 1: Testing WebSocket connection...") | |
| async with websockets.connect(self.uri, ssl=self.ssl_context) as ws: | |
| print("β Connection established successfully") | |
| # STEP 2: Test start message | |
| print("\nπ STEP 2: Testing start message...") | |
| start_msg = {"type": "start", "config": {"enable_timestamps": True}} | |
| await ws.send(json.dumps(start_msg)) | |
| print(f"π€ Sent: {start_msg}") | |
| # Wait for start response | |
| try: | |
| response = await asyncio.wait_for(ws.recv(), timeout=5.0) | |
| print(f"π₯ Response: {response}") | |
| resp_data = json.loads(response) | |
| if resp_data.get("type") == "status": | |
| print("β Start message acknowledged by server") | |
| else: | |
| print("β οΈ Unexpected start response format") | |
| except asyncio.TimeoutError: | |
| print("β No response to start message within 5 seconds") | |
| return | |
| # STEP 3: Test with minimal audio chunk | |
| print("\nπ΅ STEP 3: Testing minimal audio chunk...") | |
| await self.test_minimal_audio(ws) | |
| # STEP 4: Test with proper-sized audio chunk | |
| print("\nπ΅ STEP 4: Testing proper-sized audio chunk...") | |
| await self.test_proper_audio(ws) | |
| # STEP 5: Test with longer audio for processing | |
| print("\nπ΅ STEP 5: Testing longer audio (2 seconds)...") | |
| await self.test_longer_audio(ws) | |
| # STEP 6: Test stop message | |
| print("\nπ STEP 6: Testing stop message...") | |
| await self.test_stop_message(ws) | |
| except Exception as e: | |
| print(f"β Connection failed: {e}") | |
| async def test_minimal_audio(self, ws): | |
| """Test with minimal valid audio chunk""" | |
| # 80ms at 16kHz = 1280 samples (minimum for moshi) | |
| samples = 1280 | |
| audio_data = self.generate_test_audio(samples, 16000, 440) | |
| await self.send_and_wait_for_response(ws, audio_data, 16000, "minimal (80ms)") | |
| async def test_proper_audio(self, ws): | |
| """Test with 1 second of audio""" | |
| samples = 16000 # 1 second at 16kHz | |
| audio_data = self.generate_test_audio(samples, 16000, 440) | |
| await self.send_and_wait_for_response(ws, audio_data, 16000, "1-second") | |
| async def test_longer_audio(self, ws): | |
| """Test with 2 seconds of audio""" | |
| samples = 32000 # 2 seconds at 16kHz | |
| audio_data = self.generate_test_audio(samples, 16000, 440) | |
| await self.send_and_wait_for_response(ws, audio_data, 16000, "2-second") | |
| def generate_test_audio(self, samples, sample_rate, frequency): | |
| """Generate test sine wave audio""" | |
| duration = samples / sample_rate | |
| t = np.linspace(0, duration, samples, False) | |
| # Generate A note (440Hz) sine wave | |
| audio = (np.sin(2 * np.pi * frequency * t) * 0.3).astype(np.float32) | |
| print(f" π Generated {samples} samples at {sample_rate}Hz ({duration:.3f}s)") | |
| return audio | |
| async def send_and_wait_for_response(self, ws, audio_data, sample_rate, description): | |
| """Send audio and wait for response with detailed logging""" | |
| print(f" π€ Sending {description} audio...") | |
| # Convert to base64 | |
| audio_bytes = audio_data.tobytes() | |
| audio_b64 = base64.b64encode(audio_bytes).decode('utf-8') | |
| # Prepare message with timestamp (Unix timestamp in milliseconds) | |
| timestamp_ms = int(time.time() * 1000) | |
| audio_msg = { | |
| "type": "audio", | |
| "data": audio_b64, | |
| "sample_rate": sample_rate, | |
| "channels": 1, | |
| "timestamp": timestamp_ms | |
| } | |
| print(f" π Audio data: {len(audio_data)} samples, {len(audio_b64)} chars base64, timestamp: {timestamp_ms}") | |
| # Send with timestamp | |
| send_time = time.time() | |
| await ws.send(json.dumps(audio_msg)) | |
| print(f" β Audio sent at {send_time:.3f}") | |
| # Wait for responses with increasing timeouts | |
| responses_received = 0 | |
| timeout_stages = [2, 5, 10, 20] # Progressive timeouts | |
| for i, timeout in enumerate(timeout_stages): | |
| try: | |
| print(f" β³ Waiting for response (stage {i+1}, timeout {timeout}s)...") | |
| response = await asyncio.wait_for(ws.recv(), timeout=timeout) | |
| responses_received += 1 | |
| response_time = time.time() | |
| processing_time = response_time - send_time | |
| print(f" π₯ Response {responses_received}: {response}") | |
| print(f" β±οΈ Processing time: {processing_time:.3f}s") | |
| # Parse response | |
| try: | |
| resp_data = json.loads(response) | |
| if resp_data.get("type") == "transcription": | |
| print(" π― SUCCESS: Got transcription response!") | |
| print(f" π Text: {resp_data.get('text', 'N/A')}") | |
| return True | |
| elif resp_data.get("type") == "status": | |
| print(f" βΉοΈ Status: {resp_data.get('message', 'N/A')}") | |
| continue # Keep waiting for transcription | |
| elif resp_data.get("type") == "error": | |
| print(f" β Error: {resp_data.get('message', 'N/A')}") | |
| return False | |
| else: | |
| print(f" β οΈ Unknown response type: {resp_data.get('type', 'N/A')}") | |
| continue | |
| except json.JSONDecodeError: | |
| print(f" β οΈ Non-JSON response: {response}") | |
| continue | |
| except asyncio.TimeoutError: | |
| if i == len(timeout_stages) - 1: | |
| print(f" β No response after {timeout}s - this is where we're failing!") | |
| print(f" π Diagnosis: Audio sent successfully but no transcription generated") | |
| return False | |
| else: | |
| print(f" β³ No response within {timeout}s, trying longer timeout...") | |
| continue | |
| return False | |
| async def test_stop_message(self, ws): | |
| """Test stop message with proper format""" | |
| print(" π€ Testing stop message...") | |
| # Try different stop message formats | |
| stop_formats = [ | |
| {"type": "stop"}, | |
| {"type": "stop", "timestamp": time.time()}, | |
| {"type": "stop", "session_id": "test"} | |
| ] | |
| for i, stop_msg in enumerate(stop_formats): | |
| print(f" Trying format {i+1}: {stop_msg}") | |
| try: | |
| await ws.send(json.dumps(stop_msg)) | |
| response = await asyncio.wait_for(ws.recv(), timeout=3.0) | |
| print(f" β Response: {response}") | |
| return True | |
| except asyncio.TimeoutError: | |
| print(f" β° No response to format {i+1}") | |
| continue | |
| except Exception as e: | |
| print(f" β Error with format {i+1}: {e}") | |
| continue | |
| print(" β All stop message formats failed") | |
| return False | |
| async def main(): | |
| diagnostic = STTDiagnostic() | |
| await diagnostic.test_step_by_step() | |
| print("\n" + "=" * 50) | |
| print("π― DIAGNOSTIC SUMMARY") | |
| print("If audio sends successfully but no transcription appears,") | |
| print("the issue is likely in the Rust moshi processing pipeline.") | |
| print("Check if the v1.2.1 breakthrough changes are working correctly.") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |