pgits's picture
v1.4.38: CRITICAL FIX - Moshi dual-stream configuration for STT
0a9e40c
#!/usr/bin/env python3
"""
HuggingFace Spaces compatibility layer for Kyutai STT Rust Server v0.3.3
Provides a simple web interface and proxies to the Rust WebSocket server
This is SPEECH-TO-TEXT (STT) not Text-to-Speech (TTS)
"""
print("🎀 STARTING KYUTAI STT (SPEECH-TO-TEXT) SERVER v1.4.38")
print("πŸ“ This is STT (Speech-to-Text), NOT TTS (Text-to-Speech)")
print("πŸ” DEBUG: app.py is executing - Python frontend starting...")
print("πŸ”΄ CRITICAL DEBUG: This message should appear in HF Spaces logs if app.py runs!")
import asyncio
import json
import subprocess
import sys
import time
import threading
import websockets
from pathlib import Path
import gradio as gr
import uvicorn
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
# Global variables
rust_process = None
rust_server_ready = False
# Module-level initialization for HuggingFace Spaces compatibility
import os
import threading
import signal
import sys
import socket
def check_port_in_use(port):
"""Check if a port is already in use"""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex(('localhost', port)) == 0
def start_rust_server():
"""Start the Rust server process with GPU/CPU fallback"""
global rust_process, rust_server_ready
# First, check if a Rust server is already running
if check_port_in_use(8080):
print("πŸ” Detected existing Rust server on port 8080")
try:
# Test if the existing server is healthy using curl subprocess
result = subprocess.run(
["curl", "-s", "http://127.0.0.1:8081/health"],
capture_output=True, text=True, timeout=3
)
if result.returncode == 0 and "healthy" in result.stdout:
print("βœ… Existing Rust server is healthy - reusing it")
rust_server_ready = True
return True
else:
print(f"⚠️ Health check failed: {result.stdout}")
except Exception as e:
print(f"⚠️ Existing server health check failed: {e}")
print("Starting Rust STT server...")
# Try GPU first, then CPU fallback
server_configs = [
{
"name": "GPU (CUDA)",
"args": [
"./kyutai-stt-server",
"--host", "127.0.0.1",
"--port", "8080",
"--config", "configs/config-stt-en_fr-hf.toml"
]
},
{
"name": "CPU (fallback)",
"args": [
"./kyutai-stt-server",
"--host", "127.0.0.1",
"--port", "8080",
"--config", "configs/config-stt-en_fr-hf.toml",
"--cpu"
]
}
]
for config in server_configs:
print(f"πŸ”§ Trying {config['name']} mode...")
print(f"πŸ”§ Command: {' '.join(config['args'])}")
try:
rust_process = subprocess.Popen(
config["args"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
universal_newlines=True
)
print(f"πŸ”§ Rust process started with PID: {rust_process.pid}")
# Monitor server startup for this attempt
def monitor_output():
global rust_server_ready
for line in iter(rust_process.stdout.readline, ''):
line = line.strip()
print(f"[Rust Server] {line}")
if "WebSocket server listening" in line:
rust_server_ready = True
print("βœ… Rust server is ready!")
elif "Model loaded successfully" in line:
print("🎯 Model loading completed")
elif "Loading model from:" in line:
print("πŸ“₯ Starting model download...")
elif "Downloading" in line and "from" in line:
print("⬇️ Downloading model file...")
elif "Error:" in line:
print(f"❌ Rust server error: {line}")
# Start monitoring in background thread
monitor_thread = threading.Thread(target=monitor_output, daemon=True)
monitor_thread.start()
# Wait for server to be ready
timeout = 300 # 5 minutes timeout for model download
start_time = time.time()
while not rust_server_ready and time.time() - start_time < timeout:
time.sleep(0.5)
if rust_process.poll() is not None:
return_code = rust_process.returncode
print(f"❌ DEBUG: Rust server process exited unexpectedly with code: {return_code}")
print(f"❌ DEBUG: This usually means model loading failed or GPU issues")
return False
if rust_server_ready:
print("πŸš€ Rust server started successfully!")
return True
else:
print("⏰ Timeout waiting for Rust server to start")
if rust_process.poll() is not None:
# Process exited, try to get the return code and output
return_code = rust_process.returncode
print(f"❌ Rust server process exited with code: {return_code}")
try:
remaining_output = rust_process.stdout.read()
if remaining_output:
print(f"Final output: {remaining_output}")
except:
pass
return False
except Exception as e:
print(f"❌ Error starting Rust server: {e}")
if rust_process and rust_process.poll() is not None:
print(f"Process exit code: {rust_process.returncode}")
return False
def init_rust_server_background():
"""Initialize Rust server in background thread for HF Spaces"""
print("πŸ”§ HF SPACES: Initializing Rust server in background...")
try:
if start_rust_server():
print("βœ… HF SPACES: Rust server ready for connections")
else:
print("⚠️ HF SPACES: Rust server failed to start - WebSocket will show unavailable")
except Exception as e:
print(f"❌ HF SPACES: Error in Rust server startup: {e}")
# Graceful shutdown handler
def signal_handler(signum, frame):
print(f"πŸ“΄ Received signal {signum}, shutting down gracefully...")
stop_rust_server()
sys.exit(0)
# Initialize for HuggingFace Spaces (module level execution)
print(f"πŸ” HF SPACES: Module loading, PID: {os.getpid()}")
# Register signal handlers
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Start Rust server at module level for HF Spaces (non-blocking daemon thread)
print("πŸ”§ MODULE LEVEL: Starting Rust server initialization for HF Spaces...")
print("πŸ”΄ CRITICAL: If you see this, module-level code IS running in HF Spaces")
rust_thread = threading.Thread(target=init_rust_server_background, daemon=True)
rust_thread.start()
print("πŸ”§ MODULE LEVEL: Rust server thread started, continuing app initialization...")
print("πŸ”΄ CRITICAL: Module-level initialization complete, thread started")
def stop_rust_server():
"""Stop the Rust server process"""
global rust_process
if rust_process:
print("Stopping Rust server...")
rust_process.terminate()
rust_process.wait()
rust_process = None
# Use FastAPI lifespan pattern for Rust server initialization (v1.4.19 validated approach)
# FastAPI app without lifespan (using PRE-MOUNT pattern instead)
app = FastAPI(title="Kyutai STT Server v3")
# Mount static files for the web interface
from pathlib import Path
static_dir = Path(__file__).parent / "static"
if static_dir.exists():
app.mount("/static", StaticFiles(directory=str(static_dir)), name="static")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""Proxy WebSocket connections to the Rust server"""
await websocket.accept()
if not rust_server_ready:
await websocket.send_text(json.dumps({
"type": "error",
"message": "Rust server not ready"
}))
return
try:
# Connect to Rust server
async with websockets.connect("ws://127.0.0.1:8080") as rust_ws:
# Proxy messages between client and Rust server
async def client_to_rust():
try:
while True:
data = await websocket.receive_text()
await rust_ws.send(data)
except WebSocketDisconnect:
pass
async def rust_to_client():
try:
async for message in rust_ws:
await websocket.send_text(message)
except websockets.exceptions.ConnectionClosed:
pass
# Run both directions concurrently
await asyncio.gather(
client_to_rust(),
rust_to_client(),
return_exceptions=True
)
except Exception as e:
print(f"WebSocket proxy error: {e}")
await websocket.send_text(json.dumps({
"type": "error",
"message": f"Proxy error: {str(e)}"
}))
@app.get("/health")
async def health_check():
"""Health check endpoint - always returns healthy for HF Spaces"""
import os
return {
"status": "healthy", # Always healthy so HF doesn't restart us
"app_ready": True,
"rust_server": "ready" if rust_server_ready else "loading",
"websocket_available": rust_server_ready,
"pid": os.getpid(),
"message": "App is ready, model loading in background" if not rust_server_ready else "Fully operational"
}
@app.get("/")
async def get_interface():
"""Serve a simple web interface"""
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>Kyutai STT Server v3</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; background: #f5f5f5; }
.container { max-width: 800px; margin: 0 auto; background: white; padding: 20px; border-radius: 10px; }
.status { padding: 10px; border-radius: 5px; margin: 10px 0; }
.ready { background: #d4edda; color: #155724; }
.not-ready { background: #f8d7da; color: #721c24; }
.controls { margin: 20px 0; }
button { padding: 10px 20px; margin: 5px; border: none; border-radius: 5px; cursor: pointer; }
.start { background: #007bff; color: white; }
.stop { background: #dc3545; color: white; }
#messages { border: 1px solid #ddd; padding: 10px; height: 200px; overflow-y: scroll; font-family: monospace; }
</style>
</head>
<body>
<div class="container">
<h1>🎀 Kyutai STT Server v3</h1>
<p>Cost-optimized Speech-to-Text WebSocket server with GPU acceleration.</p>
<div id="status" class="status not-ready">
Server Status: Checking...
</div>
<div class="controls">
<button id="connect" class="start" onclick="connectWebSocket()">Connect</button>
<button id="disconnect" class="stop" onclick="disconnectWebSocket()">Disconnect</button>
<button class="start" onclick="startStream()">Start Stream</button>
<button class="stop" onclick="stopStream()">Stop Stream</button>
</div>
<div>
<h3>Messages:</h3>
<div id="messages"></div>
</div>
<div style="margin-top: 20px;">
<h3>WebSocket Endpoint:</h3>
<code>wss://pgits-stt-gpu-service-v3.hf.space/ws</code> (external)<br>
<code>ws://localhost:7860/ws</code> (local development)
<h3>API Usage:</h3>
<pre>
// Start streaming
{"type": "start", "config": {"enable_timestamps": true}}
// Send audio (base64 encoded)
{"type": "audio", "data": "...", "sample_rate": 16000, "channels": 1}
// Stop streaming
{"type": "stop"}
</pre>
</div>
</div>
<script>
let ws = null;
function updateStatus() {
fetch('/health')
.then(r => r.json())
.then(data => {
const statusEl = document.getElementById('status');
if (data.rust_server === 'ready') {
statusEl.className = 'status ready';
statusEl.textContent = 'Server Status: Ready βœ…';
} else {
statusEl.className = 'status not-ready';
statusEl.textContent = 'Server Status: Starting... ⏳';
}
});
}
function connectWebSocket() {
// Use wss:// for HuggingFace Spaces, ws:// for local development
const wsUrl = window.location.protocol === 'https:'
? `wss://${window.location.host}/ws`
: `ws://${window.location.host}/ws`;
ws = new WebSocket(wsUrl);
ws.onopen = () => addMessage('Connected to STT server via ' + wsUrl);
ws.onmessage = (e) => addMessage('Received: ' + e.data);
ws.onerror = (e) => addMessage('WebSocket Error: ' + e);
ws.onclose = () => addMessage('Connection closed');
}
function disconnectWebSocket() {
if (ws) { ws.close(); ws = null; }
}
function startStream() {
if (ws) ws.send(JSON.stringify({"type": "start"}));
}
function stopStream() {
if (ws) ws.send(JSON.stringify({"type": "stop"}));
}
function addMessage(msg) {
const messages = document.getElementById('messages');
messages.innerHTML += new Date().toLocaleTimeString() + ': ' + msg + '\\n';
messages.scrollTop = messages.scrollHeight;
}
// Update status every 2 seconds
setInterval(updateStatus, 2000);
updateStatus();
</script>
</body>
</html>
"""
return HTMLResponse(content=html_content)
# Gradio interface for HuggingFace Spaces compatibility
def create_gradio_interface():
"""Create a Gradio interface for HuggingFace Spaces"""
def get_server_status():
return "🟒 Ready" if rust_server_ready else "🟑 Starting..."
def test_connection():
if rust_server_ready:
return "βœ… Server is ready for WebSocket connections"
else:
return "❌ Server is still starting up"
def test_websocket():
"""Test WebSocket connection to Rust server"""
import asyncio
try:
# Test connection to local Rust server
async def test():
import websockets
uri = "ws://127.0.0.1:8080"
try:
async with websockets.connect(uri) as websocket:
# Send test message
await websocket.send('{"type": "start"}')
# Wait for response
response = await asyncio.wait_for(websocket.recv(), timeout=5.0)
return f"βœ… WebSocket test successful!\nResponse: {response}"
except Exception as e:
return f"❌ WebSocket test failed: {str(e)}"
# Run async test
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
result = loop.run_until_complete(test())
loop.close()
return result
except Exception as e:
return f"❌ WebSocket test error: {str(e)}"
with gr.Blocks(title="Kyutai STT Server v3") as iface:
gr.Markdown("# 🎀 Kyutai STT Server v3")
gr.Markdown("Cost-optimized Speech-to-Text WebSocket server with GPU acceleration")
with gr.Row():
status_text = gr.Textbox(label="Server Status", value=get_server_status(), every=2)
test_btn = gr.Button("Test Connection")
ws_test_btn = gr.Button("Test WebSocket", variant="primary")
with gr.Row():
test_output = gr.Textbox(label="Connection Test")
ws_test_output = gr.Textbox(label="WebSocket Test")
test_btn.click(test_connection, outputs=test_output)
ws_test_btn.click(test_websocket, outputs=ws_test_output)
gr.Markdown("""
## WebSocket API
**External clients:** `wss://pgits-stt-gpu-service-v3.hf.space/ws`
**Local development:** `ws://localhost:7860/ws`
### Example Usage for External Clients:
```javascript
const ws = new WebSocket('wss://pgits-stt-gpu-service-v3.hf.space/ws');
// Start streaming
ws.send(JSON.stringify({
"type": "start",
"config": {"enable_timestamps": true}
}));
// Send audio data (base64 encoded)
ws.send(JSON.stringify({
"type": "audio",
"data": "base64_audio_data",
"sample_rate": 16000,
"channels": 1
}));
```
### Connection Status
- βœ… HuggingFace Spaces only exposes port 7860 externally
- βœ… WebSocket endpoint available at `/ws` path
- βœ… Proxies to internal Rust server on port 8080
## Cost Management
- **Auto-sleep**: Space sleeps after 30-60 minutes of inactivity
- **No charges during sleep**: GPU billing stops completely
- **Fast wake-up**: 30-90 seconds with preloaded model
""")
return iface
# Remove old websocket_proxy function - now handled via FastAPI /ws endpoint
if __name__ == "__main__":
import os
import signal
import sys
print(f"πŸš€ Starting Kyutai STT Server (PID: {os.getpid()})")
# Rust server initialization handled by FastAPI lifespan pattern - no conflicts
# Graceful shutdown handler
def signal_handler(signum, frame):
print(f"πŸ“΄ Received signal {signum}, shutting down gracefully...")
stop_rust_server()
sys.exit(0)
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Rust server initialization handled by FastAPI lifespan pattern - no conflicts
try:
# Rust server already started at module level for HF Spaces compatibility
# Give Rust server time to start before mounting Gradio
import time
time.sleep(3)
print("πŸ”§ MODULE LEVEL: Rust server should be ready, proceeding with Gradio mount")
# Create and launch Gradio interface
iface = create_gradio_interface()
# Mount FastAPI app with Gradio (this enables /ws WebSocket endpoint)
app = gr.mount_gradio_app(app, iface, path="/")
# Launch with uvicorn to support both Gradio and FastAPI WebSocket
import uvicorn
print("πŸš€ Launching server on port 7860 (WebSocket at /ws)")
print("🎯 HuggingFace Spaces should see this app as ready immediately")
# Check if port 7860 is already in use (handle multiple HF Spaces instances)
if check_port_in_use(7860):
print("⚠️ Port 7860 already in use by another process - exiting gracefully")
print("🎯 This is normal in HuggingFace Spaces multi-process environment")
sys.exit(0) # Exit gracefully, not an error
# Configure uvicorn to prevent multiple binding attempts
config = uvicorn.Config(
app,
host="0.0.0.0",
port=7860,
log_level="info",
access_log=False # Reduce log spam
)
server = uvicorn.Server(config)
server.run()
except Exception as e:
print(f"❌ Server startup error: {e}")
stop_rust_server()
sys.exit(1)
except KeyboardInterrupt:
print("πŸ“΄ Keyboard interrupt, shutting down...")
stop_rust_server()
sys.exit(0)
finally:
stop_rust_server()