| """ |
| Interface Server - Streaming Otimizado |
| Porta: 8080 |
| |
| VERSAO OTIMIZADA - 2024-12-27 |
| |
| Otimizações implementadas: |
| 1. WebSocket binário (sem base64) - reduz 33% do tráfego |
| 2. Compressão de frames mais eficiente |
| 3. Passthrough direto de bytes |
| |
| Arquitetura: |
| [Frontend] <--WebSocket Binary--> [Interface:8080] <--WebSocket--> [Wav2Lip:8082] |
| | |
| [Orpheus TTS:8000] |
| """ |
| from aiohttp import web |
| import aiohttp |
| import asyncio |
| import json |
| import base64 |
| import os |
| import time |
| import struct |
|
|
| |
| WAV2LIP_WS = os.getenv("WAV2LIP_WS", "ws://localhost:8082/ws") |
| ORPHEUS_WS = os.getenv("ORPHEUS_WS", "ws://localhost:8081/ws") |
| PORT = int(os.getenv("PORT", "8080")) |
| VIDEO_FPS = 25 |
| AUDIO_SAMPLE_RATE = 24000 |
|
|
| routes = web.RouteTableDef() |
|
|
|
|
| class OptimizedSession: |
| """Sessao otimizada - usa WebSocket binário.""" |
|
|
| |
| MSG_FRAME = 0x01 |
| MSG_AUDIO = 0x02 |
| MSG_AUDIO_CHUNK = 0x03 |
|
|
| def __init__(self, client_ws): |
| self.client_ws = client_ws |
| self.is_running = False |
| self.start_time = None |
| self.total_frames = 0 |
| self.total_bytes = 0 |
| self.end_video_time_ms = None |
| self.start_frame_idx = None |
| self.end_frame_idx = None |
|
|
| async def send_json(self, msg_type: str, **kwargs): |
| """Envia mensagem JSON (para status/controle).""" |
| try: |
| await self.client_ws.send_json({"type": msg_type, **kwargs}) |
| except Exception as e: |
| print(f"Erro enviando JSON: {e}") |
|
|
| async def send_binary_frame(self, frame_bytes: bytes, frame_index: int): |
| """Envia frame como binário puro (sem base64).""" |
| try: |
| |
| header = struct.pack('<BII', self.MSG_FRAME, frame_index, len(frame_bytes)) |
| await self.client_ws.send_bytes(header + frame_bytes) |
| self.total_bytes += len(header) + len(frame_bytes) |
| except Exception as e: |
| print(f"Erro enviando frame binário: {e}") |
|
|
| async def send_binary_audio(self, audio_bytes: bytes, sample_rate: int): |
| """Envia audio como binário puro.""" |
| try: |
| |
| header = struct.pack('<BII', self.MSG_AUDIO, sample_rate, len(audio_bytes)) |
| await self.client_ws.send_bytes(header + audio_bytes) |
| self.total_bytes += len(header) + len(audio_bytes) |
| except Exception as e: |
| print(f"Erro enviando audio binário: {e}") |
|
|
| async def send_binary_audio_chunk(self, chunk_bytes: bytes, chunk_index: int, is_last: bool): |
| """Envia chunk de audio para streaming.""" |
| try: |
| |
| header = struct.pack('<BIBI', self.MSG_AUDIO_CHUNK, chunk_index, 1 if is_last else 0, len(chunk_bytes)) |
| await self.client_ws.send_bytes(header + chunk_bytes) |
| self.total_bytes += len(header) + len(chunk_bytes) |
| except Exception as e: |
| print(f"Erro enviando audio chunk: {e}") |
|
|
| async def stream_audio_from_orpheus(self, text: str, voice: str): |
| """Busca áudio do Orpheus TTS e envia chunks para o cliente conforme chegam.""" |
| total_chunks = 0 |
| total_bytes = 0 |
| first_chunk_sent = False |
|
|
| try: |
| async with aiohttp.ClientSession() as orpheus_session: |
| orpheus_ws = await orpheus_session.ws_connect( |
| ORPHEUS_WS, |
| timeout=aiohttp.ClientWSTimeout(ws_close=60), |
| max_msg_size=10*1024*1024 |
| ) |
|
|
| await orpheus_ws.send_json({ |
| "action": "synthesize", |
| "text": text, |
| "voice": voice, |
| "stream": True |
| }) |
|
|
| print(f"[Orpheus] Requisição enviada: '{text[:30]}...'") |
|
|
| async for msg in orpheus_ws: |
| if not self.is_running: |
| break |
|
|
| if msg.type == aiohttp.WSMsgType.TEXT: |
| data = json.loads(msg.data) |
| msg_type = data.get("type", "") |
|
|
| if msg_type == "audio_chunk": |
| audio_b64 = data.get("audio", "") |
| chunk_data = base64.b64decode(audio_b64) |
| chunk_index = data.get("chunk_index", total_chunks) |
| total_chunks += 1 |
| total_bytes += len(chunk_data) |
|
|
| |
| await self.send_binary_audio_chunk(chunk_data, chunk_index, is_last=False) |
|
|
| if not first_chunk_sent: |
| first_chunk_sent = True |
| print(f"[Orpheus] Primeiro chunk enviado: {len(chunk_data)} bytes") |
| else: |
| print(f"[Orpheus] Chunk {chunk_index}: {len(chunk_data)} bytes") |
|
|
| elif msg_type == "done": |
| |
| await self.send_binary_audio_chunk(b'', total_chunks, is_last=True) |
| print(f"[Orpheus] Done: {total_chunks} chunks, {total_bytes} bytes") |
| break |
|
|
| elif msg_type == "error": |
| print(f"[Orpheus] Erro: {data.get('message')}") |
| break |
|
|
| elif msg.type == aiohttp.WSMsgType.BINARY: |
| chunk_data = msg.data |
| total_chunks += 1 |
| total_bytes += len(chunk_data) |
| await self.send_binary_audio_chunk(chunk_data, total_chunks, is_last=False) |
| print(f"[Orpheus] Chunk binário {total_chunks}: {len(chunk_data)} bytes") |
|
|
| elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): |
| break |
|
|
| await orpheus_ws.close() |
|
|
| except Exception as e: |
| print(f"[Orpheus] Erro: {e}") |
| import traceback |
| traceback.print_exc() |
|
|
| return total_chunks > 0 |
|
|
| async def run(self, text: str, voice: str, jpeg_quality: int = 95, idle_video_time_ms: int = 0): |
| """Streaming otimizado - áudio e frames em paralelo, enviados conforme chegam.""" |
| self.is_running = True |
| self.start_time = time.time() |
| self.total_bytes = 0 |
|
|
| await self.send_json("status", message="Conectando...") |
|
|
| try: |
| async with aiohttp.ClientSession() as session: |
| |
| |
| audio_task = asyncio.create_task( |
| self.stream_audio_from_orpheus(text, voice) |
| ) |
|
|
| wav2lip_ws = await session.ws_connect( |
| WAV2LIP_WS, |
| timeout=aiohttp.ClientWSTimeout(ws_close=300), |
| max_msg_size=50*1024*1024 |
| ) |
|
|
| await self.send_json("status", message="Gerando...") |
|
|
| |
| await wav2lip_ws.send_json({ |
| "action": "generate", |
| "text": text, |
| "voice": voice, |
| "idle_video_time_ms": idle_video_time_ms, |
| "jpeg_quality": jpeg_quality |
| }) |
|
|
| print(f"[Optimized] Texto: '{text[:50]}...'") |
|
|
| first_frame_time = None |
| start_frame_idx = None |
|
|
| |
| async for msg in wav2lip_ws: |
| if not self.is_running: |
| break |
|
|
| if msg.type == aiohttp.WSMsgType.TEXT: |
| data = json.loads(msg.data) |
| msg_type = data.get("type", "") |
|
|
| if msg_type == "frame": |
| frame_b64 = data.get("frame", "") |
| frame_bytes = base64.b64decode(frame_b64) |
| self.total_frames += 1 |
|
|
| await self.send_binary_frame(frame_bytes, self.total_frames) |
|
|
| if first_frame_time is None: |
| first_frame_time = time.time() - self.start_time |
| latency_ms = int(first_frame_time * 1000) |
| |
| start_frame_idx = data.get("source_frame_idx", None) |
| print(f"[Optimized] Primeiro frame: {latency_ms}ms, source_frame={start_frame_idx}") |
| await self.send_json("first_frame", latency_ms=latency_ms, start_frame_idx=start_frame_idx) |
|
|
| elif msg_type == "full_audio": |
| print(f"[Optimized] Ignorando audio Wav2Lip (usando Orpheus streaming)") |
|
|
| elif msg_type == "first_chunk": |
| print(f"[Wav2Lip] eSpeak: {data.get('latency_ms', 0)}ms") |
|
|
| elif msg_type == "status": |
| print(f"[Wav2Lip] {data.get('message')}") |
|
|
| elif msg_type == "error": |
| await self.send_json("error", message=data.get("message", "Erro")) |
| break |
|
|
| elif msg_type == "done": |
| |
| self.end_video_time_ms = data.get("end_video_time_ms") |
| if start_frame_idx is None: |
| start_frame_idx = data.get("start_frame_idx") |
| end_frame_idx = data.get("end_frame_idx") |
| self.start_frame_idx = start_frame_idx |
| self.end_frame_idx = end_frame_idx |
| print(f"[Optimized] Wav2Lip done: {self.total_frames} frames, start_frame={start_frame_idx}, end_frame={end_frame_idx}, end_time_ms={self.end_video_time_ms}") |
| break |
|
|
| elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): |
| break |
|
|
| await wav2lip_ws.close() |
|
|
| |
| if not audio_task.done(): |
| print(f"[Optimized] Aguardando término do streaming Orpheus...") |
| try: |
| await asyncio.wait_for(audio_task, timeout=30.0) |
| except asyncio.TimeoutError: |
| print(f"[Optimized] ERRO: Timeout aguardando Orpheus") |
| audio_task.cancel() |
|
|
| audio_success = audio_task.result() if audio_task.done() else False |
| print(f"[Optimized] Concluído: {self.total_frames} frames, audio_streaming={audio_success}") |
|
|
| except Exception as e: |
| print(f"Erro: {e}") |
| import traceback |
| traceback.print_exc() |
| await self.send_json("error", message=str(e)) |
| return |
|
|
| elapsed = time.time() - self.start_time |
| print(f"Sessao: {elapsed:.2f}s, {self.total_frames} frames, {self.total_bytes/1024:.1f}KB binário") |
|
|
| await self.send_json( |
| "done", |
| total_frames=self.total_frames, |
| elapsed_ms=int(elapsed * 1000), |
| bytes_sent=self.total_bytes, |
| start_frame_idx=self.start_frame_idx, |
| end_frame_idx=self.end_frame_idx, |
| end_video_time_ms=self.end_video_time_ms |
| ) |
|
|
| def stop(self): |
| self.is_running = False |
|
|
|
|
| @routes.get("/ws") |
| async def websocket_handler(request): |
| ws = web.WebSocketResponse(max_msg_size=50*1024*1024) |
| await ws.prepare(request) |
|
|
| print("Cliente conectado (modo binário)") |
| session = None |
|
|
| try: |
| async for msg in ws: |
| if msg.type == aiohttp.WSMsgType.TEXT: |
| try: |
| data = json.loads(msg.data) |
| action = data.get("action", "") |
|
|
| if action == "generate": |
| text = data.get("text", "").strip() |
| voice = data.get("voice", "tara") |
| jpeg_quality = data.get("jpeg_quality", 95) |
| idle_video_time_ms = data.get("idle_video_time_ms", 0) |
|
|
| if not text: |
| await ws.send_json({"type": "error", "message": "Text required"}) |
| continue |
|
|
| print(f"Gerando: '{text[:50]}...' (quality={jpeg_quality}, idle_time={idle_video_time_ms}ms)") |
|
|
| if session: |
| session.stop() |
|
|
| session = OptimizedSession(ws) |
| await session.run(text, voice, jpeg_quality, idle_video_time_ms) |
|
|
| elif action == "stop": |
| if session: |
| session.stop() |
| await ws.send_json({"type": "stopped"}) |
|
|
| elif action == "ping": |
| await ws.send_json({"type": "pong"}) |
|
|
| except json.JSONDecodeError: |
| await ws.send_json({"type": "error", "message": "Invalid JSON"}) |
|
|
| elif msg.type == aiohttp.WSMsgType.ERROR: |
| print(f"WebSocket error: {ws.exception()}") |
| break |
|
|
| except Exception as e: |
| print(f"Erro: {e}") |
| finally: |
| if session: |
| session.stop() |
| print("Cliente desconectado") |
|
|
| return ws |
|
|
|
|
| @routes.get("/") |
| async def index(request): |
| return web.FileResponse(os.path.join(os.path.dirname(__file__), "index_optimized.html")) |
|
|
|
|
| @routes.get("/idle.mp4") |
| async def idle_video(request): |
| return web.FileResponse(os.path.join(os.path.dirname(__file__), "idle.mp4")) |
|
|
|
|
| @routes.get("/health") |
| async def health(request): |
| status = {"server": True, "wav2lip": False} |
| try: |
| async with aiohttp.ClientSession() as session: |
| async with session.ws_connect(WAV2LIP_WS, timeout=aiohttp.ClientWSTimeout(ws_close=2)) as ws: |
| status["wav2lip"] = True |
| except: |
| pass |
| return web.json_response(status) |
|
|
|
|
| app = web.Application() |
| app.add_routes(routes) |
|
|
|
|
| if __name__ == "__main__": |
| print("=" * 50) |
| print("Interface Server - OPTIMIZED (Binary)") |
| print("=" * 50) |
| print(f"Porta: {PORT}") |
| print(f"Wav2Lip: {WAV2LIP_WS}") |
| print(f"Mode: WebSocket Binary (-33% bandwidth)") |
| print("=" * 50) |
| web.run_app(app, host="0.0.0.0", port=PORT) |
|
|