stt-gpu-service-v3 / diagnostic_test.py
Peter Michael Gits
REVERT: Switch back to 1B multilingual model for T4 GPU compatibility
5d40667
#!/usr/bin/env python3
"""
Comprehensive diagnostic test to pinpoint exactly where the STT pipeline is failing
"""
import asyncio
import websockets
import json
import ssl
import base64
import numpy as np
import time
class STTDiagnostic:
def __init__(self, uri="wss://pgits-stt-gpu-service-v3.hf.space/ws"):
self.uri = uri
self.ssl_context = ssl.create_default_context()
self.ssl_context.check_hostname = False
self.ssl_context.verify_mode = ssl.CERT_NONE
async def test_step_by_step(self):
"""Test each step of the STT pipeline systematically"""
print("πŸ” COMPREHENSIVE STT DIAGNOSTIC TEST")
print("=" * 50)
# STEP 0: Check server health first
print("\nπŸ₯ STEP 0: Checking server health...")
try:
import requests
health_response = requests.get("https://pgits-stt-gpu-service-v3.hf.space/health", timeout=5)
health_data = health_response.json()
print(f"πŸ“Š Server health: {health_data}")
if health_data.get("rust_server") != "ready":
print(f"⚠️ WARNING: Rust server status is '{health_data.get('rust_server')}', not 'ready'")
print("This explains why WebSocket connections might fail")
except Exception as e:
print(f"❌ Health check failed: {e}")
try:
# STEP 1: Test connection
print("\nπŸ“‘ STEP 1: Testing WebSocket connection...")
async with websockets.connect(self.uri, ssl=self.ssl_context) as ws:
print("βœ… Connection established successfully")
# STEP 2: Test start message
print("\nπŸš€ STEP 2: Testing start message...")
start_msg = {"type": "start", "config": {"enable_timestamps": True}}
await ws.send(json.dumps(start_msg))
print(f"πŸ“€ Sent: {start_msg}")
# Wait for start response
try:
response = await asyncio.wait_for(ws.recv(), timeout=5.0)
print(f"πŸ“₯ Response: {response}")
resp_data = json.loads(response)
if resp_data.get("type") == "status":
print("βœ… Start message acknowledged by server")
else:
print("⚠️ Unexpected start response format")
except asyncio.TimeoutError:
print("❌ No response to start message within 5 seconds")
return
# STEP 3: Test with minimal audio chunk
print("\n🎡 STEP 3: Testing minimal audio chunk...")
await self.test_minimal_audio(ws)
# STEP 4: Test with proper-sized audio chunk
print("\n🎡 STEP 4: Testing proper-sized audio chunk...")
await self.test_proper_audio(ws)
# STEP 5: Test with longer audio for processing
print("\n🎡 STEP 5: Testing longer audio (2 seconds)...")
await self.test_longer_audio(ws)
# STEP 6: Test stop message
print("\nπŸ›‘ STEP 6: Testing stop message...")
await self.test_stop_message(ws)
except Exception as e:
print(f"❌ Connection failed: {e}")
async def test_minimal_audio(self, ws):
"""Test with minimal valid audio chunk"""
# 80ms at 16kHz = 1280 samples (minimum for moshi)
samples = 1280
audio_data = self.generate_test_audio(samples, 16000, 440)
await self.send_and_wait_for_response(ws, audio_data, 16000, "minimal (80ms)")
async def test_proper_audio(self, ws):
"""Test with 1 second of audio"""
samples = 16000 # 1 second at 16kHz
audio_data = self.generate_test_audio(samples, 16000, 440)
await self.send_and_wait_for_response(ws, audio_data, 16000, "1-second")
async def test_longer_audio(self, ws):
"""Test with 2 seconds of audio"""
samples = 32000 # 2 seconds at 16kHz
audio_data = self.generate_test_audio(samples, 16000, 440)
await self.send_and_wait_for_response(ws, audio_data, 16000, "2-second")
def generate_test_audio(self, samples, sample_rate, frequency):
"""Generate test sine wave audio"""
duration = samples / sample_rate
t = np.linspace(0, duration, samples, False)
# Generate A note (440Hz) sine wave
audio = (np.sin(2 * np.pi * frequency * t) * 0.3).astype(np.float32)
print(f" πŸ“Š Generated {samples} samples at {sample_rate}Hz ({duration:.3f}s)")
return audio
async def send_and_wait_for_response(self, ws, audio_data, sample_rate, description):
"""Send audio and wait for response with detailed logging"""
print(f" πŸ“€ Sending {description} audio...")
# Convert to base64
audio_bytes = audio_data.tobytes()
audio_b64 = base64.b64encode(audio_bytes).decode('utf-8')
# Prepare message with timestamp (Unix timestamp in milliseconds)
timestamp_ms = int(time.time() * 1000)
audio_msg = {
"type": "audio",
"data": audio_b64,
"sample_rate": sample_rate,
"channels": 1,
"timestamp": timestamp_ms
}
print(f" πŸ“Š Audio data: {len(audio_data)} samples, {len(audio_b64)} chars base64, timestamp: {timestamp_ms}")
# Send with timestamp
send_time = time.time()
await ws.send(json.dumps(audio_msg))
print(f" βœ… Audio sent at {send_time:.3f}")
# Wait for responses with increasing timeouts
responses_received = 0
timeout_stages = [2, 5, 10, 20] # Progressive timeouts
for i, timeout in enumerate(timeout_stages):
try:
print(f" ⏳ Waiting for response (stage {i+1}, timeout {timeout}s)...")
response = await asyncio.wait_for(ws.recv(), timeout=timeout)
responses_received += 1
response_time = time.time()
processing_time = response_time - send_time
print(f" πŸ“₯ Response {responses_received}: {response}")
print(f" ⏱️ Processing time: {processing_time:.3f}s")
# Parse response
try:
resp_data = json.loads(response)
if resp_data.get("type") == "transcription":
print(" 🎯 SUCCESS: Got transcription response!")
print(f" πŸ“ Text: {resp_data.get('text', 'N/A')}")
return True
elif resp_data.get("type") == "status":
print(f" ℹ️ Status: {resp_data.get('message', 'N/A')}")
continue # Keep waiting for transcription
elif resp_data.get("type") == "error":
print(f" ❌ Error: {resp_data.get('message', 'N/A')}")
return False
else:
print(f" ⚠️ Unknown response type: {resp_data.get('type', 'N/A')}")
continue
except json.JSONDecodeError:
print(f" ⚠️ Non-JSON response: {response}")
continue
except asyncio.TimeoutError:
if i == len(timeout_stages) - 1:
print(f" ❌ No response after {timeout}s - this is where we're failing!")
print(f" πŸ” Diagnosis: Audio sent successfully but no transcription generated")
return False
else:
print(f" ⏳ No response within {timeout}s, trying longer timeout...")
continue
return False
async def test_stop_message(self, ws):
"""Test stop message with proper format"""
print(" πŸ“€ Testing stop message...")
# Try different stop message formats
stop_formats = [
{"type": "stop"},
{"type": "stop", "timestamp": time.time()},
{"type": "stop", "session_id": "test"}
]
for i, stop_msg in enumerate(stop_formats):
print(f" Trying format {i+1}: {stop_msg}")
try:
await ws.send(json.dumps(stop_msg))
response = await asyncio.wait_for(ws.recv(), timeout=3.0)
print(f" βœ… Response: {response}")
return True
except asyncio.TimeoutError:
print(f" ⏰ No response to format {i+1}")
continue
except Exception as e:
print(f" ❌ Error with format {i+1}: {e}")
continue
print(" ❌ All stop message formats failed")
return False
async def main():
diagnostic = STTDiagnostic()
await diagnostic.test_step_by_step()
print("\n" + "=" * 50)
print("🎯 DIAGNOSTIC SUMMARY")
print("If audio sends successfully but no transcription appears,")
print("the issue is likely in the Rust moshi processing pipeline.")
print("Check if the v1.2.1 breakthrough changes are working correctly.")
if __name__ == "__main__":
asyncio.run(main())