speech2speech-interface / interface /server_realtime.py
marcosremar2's picture
Add real-time streaming avatar interface
14b6b3e
Raw
History Blame Contribute Delete
8.51 kB
"""
Interface Server - Streaming em Tempo Real
Porta: 8080
VERSAO FUNCIONANDO - 2024-12-27
Fluxo otimizado para baixa latência:
1. Frontend conecta via WebSocket
2. Server envia texto para Wav2Lip (action: speak)
3. Conforme frames chegam, envia direto para frontend (sem ffmpeg)
4. Frontend renderiza em Canvas + Web Audio API
Arquitetura:
[Frontend] <--WebSocket--> [Interface:8080] <--WebSocket--> [Wav2Lip:8082]
|
[Orpheus TTS:8000]
Latência típica: ~1-2s até primeiro frame
Formato: Frames JPEG (base64) + Audio WAV (base64)
"""
from aiohttp import web
import aiohttp
import asyncio
import json
import base64
import os
import time
# Configuracao dos servicos
WAV2LIP_WS = os.getenv("WAV2LIP_WS", "ws://localhost:8082/ws")
PORT = int(os.getenv("PORT", "8080"))
# Configuracao de video
VIDEO_FPS = 25
FRAME_INTERVAL_MS = 1000 / VIDEO_FPS # 40ms entre frames
routes = web.RouteTableDef()
class RealtimeSession:
"""Sessao de streaming em tempo real - envia frames diretamente."""
def __init__(self, client_ws):
self.client_ws = client_ws
self.is_running = False
self.start_time = None
self.total_frames = 0
async def send_to_client(self, msg_type: str, **kwargs):
try:
await self.client_ws.send_json({"type": msg_type, **kwargs})
except Exception as e:
print(f"Erro enviando para cliente: {e}")
async def run(self, text: str, voice: str):
"""Streaming em tempo real - envia frames conforme chegam."""
self.is_running = True
self.start_time = time.time()
await self.send_to_client("status", message="Conectando...")
try:
async with aiohttp.ClientSession() as session:
wav2lip_ws = await session.ws_connect(
WAV2LIP_WS,
timeout=aiohttp.ClientWSTimeout(ws_close=300)
)
await self.send_to_client("status", message="Gerando...")
# Enviar texto para Wav2Lip
await wav2lip_ws.send_json({
"action": "speak",
"text": text,
"voice": voice,
"parallel": True
})
print(f"[Wav2Lip] Texto: '{text[:50]}...'")
first_frame_time = None
audio_sent = False
# Receber e repassar frames em tempo real
async for msg in wav2lip_ws:
if not self.is_running:
break
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
msg_type = data.get("type", "")
if msg_type == "frame":
# Enviar frame DIRETAMENTE para o cliente
frame_b64 = data.get("frame", "")
self.total_frames += 1
await self.send_to_client(
"frame",
frame=frame_b64,
index=self.total_frames
)
if first_frame_time is None:
first_frame_time = time.time() - self.start_time
print(f"[Realtime] Primeiro frame: {first_frame_time*1000:.0f}ms")
await self.send_to_client(
"first_frame",
latency_ms=int(first_frame_time * 1000)
)
elif msg_type == "full_audio":
# Enviar áudio completo
audio_b64 = data.get("audio", "")
duration = data.get("duration_ms", 0)
await self.send_to_client(
"audio",
audio=audio_b64,
duration_ms=duration,
sample_rate=24000
)
audio_sent = True
print(f"[Realtime] Audio: {duration}ms")
elif msg_type == "done":
print(f"[Realtime] Concluído: {self.total_frames} frames")
break
elif msg_type == "first_chunk":
latency = data.get("latency_ms", 0)
print(f"[Wav2Lip] eSpeak pronto: {latency}ms")
elif msg_type == "status":
print(f"[Wav2Lip] {data.get('message')}")
elif msg_type == "error":
error_msg = data.get("message", "Erro")
print(f"[Wav2Lip] Erro: {error_msg}")
await self.send_to_client("error", message=error_msg)
break
elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
break
await wav2lip_ws.close()
except Exception as e:
print(f"Erro na sessao: {e}")
import traceback
traceback.print_exc()
await self.send_to_client("error", message=str(e))
return
# Finalizar
elapsed = time.time() - self.start_time
print(f"Sessao: {elapsed:.2f}s, {self.total_frames} frames")
await self.send_to_client(
"done",
total_frames=self.total_frames,
elapsed_ms=int(elapsed * 1000)
)
def stop(self):
self.is_running = False
@routes.get("/ws")
async def websocket_handler(request):
ws = web.WebSocketResponse(max_msg_size=50*1024*1024)
await ws.prepare(request)
print("Cliente conectado")
session = None
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
try:
data = json.loads(msg.data)
action = data.get("action", "")
if action == "generate":
text = data.get("text", "").strip()
voice = data.get("voice", "tara")
if not text:
await ws.send_json({"type": "error", "message": "Text required"})
continue
print(f"Gerando: '{text[:50]}...'")
if session:
session.stop()
session = RealtimeSession(ws)
await session.run(text, voice)
elif action == "stop":
if session:
session.stop()
await ws.send_json({"type": "stopped"})
elif action == "ping":
await ws.send_json({"type": "pong"})
except json.JSONDecodeError:
await ws.send_json({"type": "error", "message": "Invalid JSON"})
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"WebSocket error: {ws.exception()}")
break
except Exception as e:
print(f"Erro: {e}")
finally:
if session:
session.stop()
print("Cliente desconectado")
return ws
@routes.get("/")
async def index(request):
return web.FileResponse(os.path.join(os.path.dirname(__file__), "index_realtime.html"))
@routes.get("/health")
async def health(request):
status = {"server": True, "wav2lip": False}
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(WAV2LIP_WS, timeout=aiohttp.ClientWSTimeout(ws_close=2)) as ws:
status["wav2lip"] = True
except:
pass
return web.json_response(status)
app = web.Application()
app.add_routes(routes)
if __name__ == "__main__":
print("=" * 50)
print("Interface Server - REALTIME Streaming")
print("=" * 50)
print(f"Porta: {PORT}")
print(f"Wav2Lip: {WAV2LIP_WS}")
print(f"Mode: Canvas + Web Audio (baixa latencia)")
print("=" * 50)
web.run_app(app, host="0.0.0.0", port=PORT)