""" Interface Server - Streaming em Tempo Real Porta: 8080 VERSAO FUNCIONANDO - 2024-12-27 Fluxo otimizado para baixa latência: 1. Frontend conecta via WebSocket 2. Server envia texto para Wav2Lip (action: speak) 3. Conforme frames chegam, envia direto para frontend (sem ffmpeg) 4. Frontend renderiza em Canvas + Web Audio API Arquitetura: [Frontend] <--WebSocket--> [Interface:8080] <--WebSocket--> [Wav2Lip:8082] | [Orpheus TTS:8000] Latência típica: ~1-2s até primeiro frame Formato: Frames JPEG (base64) + Audio WAV (base64) """ from aiohttp import web import aiohttp import asyncio import json import base64 import os import time # Configuracao dos servicos WAV2LIP_WS = os.getenv("WAV2LIP_WS", "ws://localhost:8082/ws") PORT = int(os.getenv("PORT", "8080")) # Configuracao de video VIDEO_FPS = 25 FRAME_INTERVAL_MS = 1000 / VIDEO_FPS # 40ms entre frames routes = web.RouteTableDef() class RealtimeSession: """Sessao de streaming em tempo real - envia frames diretamente.""" def __init__(self, client_ws): self.client_ws = client_ws self.is_running = False self.start_time = None self.total_frames = 0 async def send_to_client(self, msg_type: str, **kwargs): try: await self.client_ws.send_json({"type": msg_type, **kwargs}) except Exception as e: print(f"Erro enviando para cliente: {e}") async def run(self, text: str, voice: str): """Streaming em tempo real - envia frames conforme chegam.""" self.is_running = True self.start_time = time.time() await self.send_to_client("status", message="Conectando...") try: async with aiohttp.ClientSession() as session: wav2lip_ws = await session.ws_connect( WAV2LIP_WS, timeout=aiohttp.ClientWSTimeout(ws_close=300) ) await self.send_to_client("status", message="Gerando...") # Enviar texto para Wav2Lip await wav2lip_ws.send_json({ "action": "speak", "text": text, "voice": voice, "parallel": True }) print(f"[Wav2Lip] Texto: '{text[:50]}...'") first_frame_time = None audio_sent = False # Receber e repassar frames em tempo real async for msg in wav2lip_ws: if not self.is_running: break if msg.type == aiohttp.WSMsgType.TEXT: data = json.loads(msg.data) msg_type = data.get("type", "") if msg_type == "frame": # Enviar frame DIRETAMENTE para o cliente frame_b64 = data.get("frame", "") self.total_frames += 1 await self.send_to_client( "frame", frame=frame_b64, index=self.total_frames ) if first_frame_time is None: first_frame_time = time.time() - self.start_time print(f"[Realtime] Primeiro frame: {first_frame_time*1000:.0f}ms") await self.send_to_client( "first_frame", latency_ms=int(first_frame_time * 1000) ) elif msg_type == "full_audio": # Enviar áudio completo audio_b64 = data.get("audio", "") duration = data.get("duration_ms", 0) await self.send_to_client( "audio", audio=audio_b64, duration_ms=duration, sample_rate=24000 ) audio_sent = True print(f"[Realtime] Audio: {duration}ms") elif msg_type == "done": print(f"[Realtime] Concluído: {self.total_frames} frames") break elif msg_type == "first_chunk": latency = data.get("latency_ms", 0) print(f"[Wav2Lip] eSpeak pronto: {latency}ms") elif msg_type == "status": print(f"[Wav2Lip] {data.get('message')}") elif msg_type == "error": error_msg = data.get("message", "Erro") print(f"[Wav2Lip] Erro: {error_msg}") await self.send_to_client("error", message=error_msg) break elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): break await wav2lip_ws.close() except Exception as e: print(f"Erro na sessao: {e}") import traceback traceback.print_exc() await self.send_to_client("error", message=str(e)) return # Finalizar elapsed = time.time() - self.start_time print(f"Sessao: {elapsed:.2f}s, {self.total_frames} frames") await self.send_to_client( "done", total_frames=self.total_frames, elapsed_ms=int(elapsed * 1000) ) def stop(self): self.is_running = False @routes.get("/ws") async def websocket_handler(request): ws = web.WebSocketResponse(max_msg_size=50*1024*1024) await ws.prepare(request) print("Cliente conectado") session = None try: async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: try: data = json.loads(msg.data) action = data.get("action", "") if action == "generate": text = data.get("text", "").strip() voice = data.get("voice", "tara") if not text: await ws.send_json({"type": "error", "message": "Text required"}) continue print(f"Gerando: '{text[:50]}...'") if session: session.stop() session = RealtimeSession(ws) await session.run(text, voice) elif action == "stop": if session: session.stop() await ws.send_json({"type": "stopped"}) elif action == "ping": await ws.send_json({"type": "pong"}) except json.JSONDecodeError: await ws.send_json({"type": "error", "message": "Invalid JSON"}) elif msg.type == aiohttp.WSMsgType.ERROR: print(f"WebSocket error: {ws.exception()}") break except Exception as e: print(f"Erro: {e}") finally: if session: session.stop() print("Cliente desconectado") return ws @routes.get("/") async def index(request): return web.FileResponse(os.path.join(os.path.dirname(__file__), "index_realtime.html")) @routes.get("/health") async def health(request): status = {"server": True, "wav2lip": False} try: async with aiohttp.ClientSession() as session: async with session.ws_connect(WAV2LIP_WS, timeout=aiohttp.ClientWSTimeout(ws_close=2)) as ws: status["wav2lip"] = True except: pass return web.json_response(status) app = web.Application() app.add_routes(routes) if __name__ == "__main__": print("=" * 50) print("Interface Server - REALTIME Streaming") print("=" * 50) print(f"Porta: {PORT}") print(f"Wav2Lip: {WAV2LIP_WS}") print(f"Mode: Canvas + Web Audio (baixa latencia)") print("=" * 50) web.run_app(app, host="0.0.0.0", port=PORT)