""" Interface Server - Streaming Otimizado Porta: 8080 VERSAO OTIMIZADA - 2024-12-27 Otimizações implementadas: 1. WebSocket binário (sem base64) - reduz 33% do tráfego 2. Compressão de frames mais eficiente 3. Passthrough direto de bytes Arquitetura: [Frontend] <--WebSocket Binary--> [Interface:8080] <--WebSocket--> [Wav2Lip:8082] | [Orpheus TTS:8000] """ from aiohttp import web import aiohttp import asyncio import json import base64 import os import time import struct # Configuracao WAV2LIP_WS = os.getenv("WAV2LIP_WS", "ws://localhost:8082/ws") ORPHEUS_WS = os.getenv("ORPHEUS_WS", "ws://localhost:8081/ws") PORT = int(os.getenv("PORT", "8080")) VIDEO_FPS = 25 AUDIO_SAMPLE_RATE = 24000 routes = web.RouteTableDef() class OptimizedSession: """Sessao otimizada - usa WebSocket binário.""" # Tipos de mensagem binária MSG_FRAME = 0x01 MSG_AUDIO = 0x02 MSG_AUDIO_CHUNK = 0x03 def __init__(self, client_ws): self.client_ws = client_ws self.is_running = False self.start_time = None self.total_frames = 0 self.total_bytes = 0 self.end_video_time_ms = None # Tempo do vídeo onde parou (para sync) self.start_frame_idx = None # Frame inicial usado pelo Wav2Lip self.end_frame_idx = None # Frame final usado pelo Wav2Lip async def send_json(self, msg_type: str, **kwargs): """Envia mensagem JSON (para status/controle).""" try: await self.client_ws.send_json({"type": msg_type, **kwargs}) except Exception as e: print(f"Erro enviando JSON: {e}") async def send_binary_frame(self, frame_bytes: bytes, frame_index: int): """Envia frame como binário puro (sem base64).""" try: # Header: [tipo:1][index:4][tamanho:4][dados] header = struct.pack(' 0 async def run(self, text: str, voice: str, jpeg_quality: int = 95, idle_video_time_ms: int = 0): """Streaming otimizado - áudio e frames em paralelo, enviados conforme chegam.""" self.is_running = True self.start_time = time.time() self.total_bytes = 0 await self.send_json("status", message="Conectando...") try: async with aiohttp.ClientSession() as session: # Iniciar streaming de áudio do Orpheus em paralelo # Os chunks são enviados diretamente para o cliente conforme chegam audio_task = asyncio.create_task( self.stream_audio_from_orpheus(text, voice) ) wav2lip_ws = await session.ws_connect( WAV2LIP_WS, timeout=aiohttp.ClientWSTimeout(ws_close=300), max_msg_size=50*1024*1024 ) await self.send_json("status", message="Gerando...") # Enviar texto para Wav2Lip com action "generate" para receber end_video_time_ms await wav2lip_ws.send_json({ "action": "generate", "text": text, "voice": voice, "idle_video_time_ms": idle_video_time_ms, # Tempo inicial do vídeo idle "jpeg_quality": jpeg_quality # Qualidade do JPEG (50-100) }) print(f"[Optimized] Texto: '{text[:50]}...'") first_frame_time = None start_frame_idx = None # Frame inicial usado pelo Wav2Lip # Receber frames do Wav2Lip async for msg in wav2lip_ws: if not self.is_running: break if msg.type == aiohttp.WSMsgType.TEXT: data = json.loads(msg.data) msg_type = data.get("type", "") if msg_type == "frame": frame_b64 = data.get("frame", "") frame_bytes = base64.b64decode(frame_b64) self.total_frames += 1 await self.send_binary_frame(frame_bytes, self.total_frames) if first_frame_time is None: first_frame_time = time.time() - self.start_time latency_ms = int(first_frame_time * 1000) # Capturar start_frame_idx do primeiro frame start_frame_idx = data.get("source_frame_idx", None) print(f"[Optimized] Primeiro frame: {latency_ms}ms, source_frame={start_frame_idx}") await self.send_json("first_frame", latency_ms=latency_ms, start_frame_idx=start_frame_idx) elif msg_type == "full_audio": print(f"[Optimized] Ignorando audio Wav2Lip (usando Orpheus streaming)") elif msg_type == "first_chunk": print(f"[Wav2Lip] eSpeak: {data.get('latency_ms', 0)}ms") elif msg_type == "status": print(f"[Wav2Lip] {data.get('message')}") elif msg_type == "error": await self.send_json("error", message=data.get("message", "Erro")) break elif msg_type == "done": # Capturar índices de frame para sincronização self.end_video_time_ms = data.get("end_video_time_ms") if start_frame_idx is None: start_frame_idx = data.get("start_frame_idx") end_frame_idx = data.get("end_frame_idx") self.start_frame_idx = start_frame_idx self.end_frame_idx = end_frame_idx print(f"[Optimized] Wav2Lip done: {self.total_frames} frames, start_frame={start_frame_idx}, end_frame={end_frame_idx}, end_time_ms={self.end_video_time_ms}") break elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): break await wav2lip_ws.close() # Aguardar término do streaming de áudio if not audio_task.done(): print(f"[Optimized] Aguardando término do streaming Orpheus...") try: await asyncio.wait_for(audio_task, timeout=30.0) except asyncio.TimeoutError: print(f"[Optimized] ERRO: Timeout aguardando Orpheus") audio_task.cancel() audio_success = audio_task.result() if audio_task.done() else False print(f"[Optimized] Concluído: {self.total_frames} frames, audio_streaming={audio_success}") except Exception as e: print(f"Erro: {e}") import traceback traceback.print_exc() await self.send_json("error", message=str(e)) return elapsed = time.time() - self.start_time print(f"Sessao: {elapsed:.2f}s, {self.total_frames} frames, {self.total_bytes/1024:.1f}KB binário") await self.send_json( "done", total_frames=self.total_frames, elapsed_ms=int(elapsed * 1000), bytes_sent=self.total_bytes, start_frame_idx=self.start_frame_idx, end_frame_idx=self.end_frame_idx, end_video_time_ms=self.end_video_time_ms ) def stop(self): self.is_running = False @routes.get("/ws") async def websocket_handler(request): ws = web.WebSocketResponse(max_msg_size=50*1024*1024) await ws.prepare(request) print("Cliente conectado (modo binário)") session = None try: async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: try: data = json.loads(msg.data) action = data.get("action", "") if action == "generate": text = data.get("text", "").strip() voice = data.get("voice", "tara") jpeg_quality = data.get("jpeg_quality", 95) # Qualidade JPEG (50-100) idle_video_time_ms = data.get("idle_video_time_ms", 0) # Tempo do vídeo idle if not text: await ws.send_json({"type": "error", "message": "Text required"}) continue print(f"Gerando: '{text[:50]}...' (quality={jpeg_quality}, idle_time={idle_video_time_ms}ms)") if session: session.stop() session = OptimizedSession(ws) await session.run(text, voice, jpeg_quality, idle_video_time_ms) elif action == "stop": if session: session.stop() await ws.send_json({"type": "stopped"}) elif action == "ping": await ws.send_json({"type": "pong"}) except json.JSONDecodeError: await ws.send_json({"type": "error", "message": "Invalid JSON"}) elif msg.type == aiohttp.WSMsgType.ERROR: print(f"WebSocket error: {ws.exception()}") break except Exception as e: print(f"Erro: {e}") finally: if session: session.stop() print("Cliente desconectado") return ws @routes.get("/") async def index(request): return web.FileResponse(os.path.join(os.path.dirname(__file__), "index_optimized.html")) @routes.get("/idle.mp4") async def idle_video(request): return web.FileResponse(os.path.join(os.path.dirname(__file__), "idle.mp4")) @routes.get("/health") async def health(request): status = {"server": True, "wav2lip": False} try: async with aiohttp.ClientSession() as session: async with session.ws_connect(WAV2LIP_WS, timeout=aiohttp.ClientWSTimeout(ws_close=2)) as ws: status["wav2lip"] = True except: pass return web.json_response(status) app = web.Application() app.add_routes(routes) if __name__ == "__main__": print("=" * 50) print("Interface Server - OPTIMIZED (Binary)") print("=" * 50) print(f"Porta: {PORT}") print(f"Wav2Lip: {WAV2LIP_WS}") print(f"Mode: WebSocket Binary (-33% bandwidth)") print("=" * 50) web.run_app(app, host="0.0.0.0", port=PORT)