| """ |
| Interface Server - Streaming em Tempo Real |
| Porta: 8080 |
| |
| VERSAO FUNCIONANDO - 2024-12-27 |
| |
| Fluxo otimizado para baixa latência: |
| 1. Frontend conecta via WebSocket |
| 2. Server envia texto para Wav2Lip (action: speak) |
| 3. Conforme frames chegam, envia direto para frontend (sem ffmpeg) |
| 4. Frontend renderiza em Canvas + Web Audio API |
| |
| Arquitetura: |
| [Frontend] <--WebSocket--> [Interface:8080] <--WebSocket--> [Wav2Lip:8082] |
| | |
| [Orpheus TTS:8000] |
| |
| Latência típica: ~1-2s até primeiro frame |
| Formato: Frames JPEG (base64) + Audio WAV (base64) |
| """ |
| from aiohttp import web |
| import aiohttp |
| import asyncio |
| import json |
| import base64 |
| import os |
| import time |
|
|
| |
| WAV2LIP_WS = os.getenv("WAV2LIP_WS", "ws://localhost:8082/ws") |
| PORT = int(os.getenv("PORT", "8080")) |
|
|
| |
| VIDEO_FPS = 25 |
| FRAME_INTERVAL_MS = 1000 / VIDEO_FPS |
|
|
| routes = web.RouteTableDef() |
|
|
|
|
| class RealtimeSession: |
| """Sessao de streaming em tempo real - envia frames diretamente.""" |
|
|
| def __init__(self, client_ws): |
| self.client_ws = client_ws |
| self.is_running = False |
| self.start_time = None |
| self.total_frames = 0 |
|
|
| async def send_to_client(self, msg_type: str, **kwargs): |
| try: |
| await self.client_ws.send_json({"type": msg_type, **kwargs}) |
| except Exception as e: |
| print(f"Erro enviando para cliente: {e}") |
|
|
| async def run(self, text: str, voice: str): |
| """Streaming em tempo real - envia frames conforme chegam.""" |
| self.is_running = True |
| self.start_time = time.time() |
|
|
| await self.send_to_client("status", message="Conectando...") |
|
|
| try: |
| async with aiohttp.ClientSession() as session: |
| wav2lip_ws = await session.ws_connect( |
| WAV2LIP_WS, |
| timeout=aiohttp.ClientWSTimeout(ws_close=300) |
| ) |
|
|
| await self.send_to_client("status", message="Gerando...") |
|
|
| |
| await wav2lip_ws.send_json({ |
| "action": "speak", |
| "text": text, |
| "voice": voice, |
| "parallel": True |
| }) |
|
|
| print(f"[Wav2Lip] Texto: '{text[:50]}...'") |
|
|
| first_frame_time = None |
| audio_sent = False |
|
|
| |
| async for msg in wav2lip_ws: |
| if not self.is_running: |
| break |
|
|
| if msg.type == aiohttp.WSMsgType.TEXT: |
| data = json.loads(msg.data) |
| msg_type = data.get("type", "") |
|
|
| if msg_type == "frame": |
| |
| frame_b64 = data.get("frame", "") |
| self.total_frames += 1 |
|
|
| await self.send_to_client( |
| "frame", |
| frame=frame_b64, |
| index=self.total_frames |
| ) |
|
|
| if first_frame_time is None: |
| first_frame_time = time.time() - self.start_time |
| print(f"[Realtime] Primeiro frame: {first_frame_time*1000:.0f}ms") |
| await self.send_to_client( |
| "first_frame", |
| latency_ms=int(first_frame_time * 1000) |
| ) |
|
|
| elif msg_type == "full_audio": |
| |
| audio_b64 = data.get("audio", "") |
| duration = data.get("duration_ms", 0) |
|
|
| await self.send_to_client( |
| "audio", |
| audio=audio_b64, |
| duration_ms=duration, |
| sample_rate=24000 |
| ) |
| audio_sent = True |
| print(f"[Realtime] Audio: {duration}ms") |
|
|
| elif msg_type == "done": |
| print(f"[Realtime] Concluído: {self.total_frames} frames") |
| break |
|
|
| elif msg_type == "first_chunk": |
| latency = data.get("latency_ms", 0) |
| print(f"[Wav2Lip] eSpeak pronto: {latency}ms") |
|
|
| elif msg_type == "status": |
| print(f"[Wav2Lip] {data.get('message')}") |
|
|
| elif msg_type == "error": |
| error_msg = data.get("message", "Erro") |
| print(f"[Wav2Lip] Erro: {error_msg}") |
| await self.send_to_client("error", message=error_msg) |
| break |
|
|
| elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): |
| break |
|
|
| await wav2lip_ws.close() |
|
|
| except Exception as e: |
| print(f"Erro na sessao: {e}") |
| import traceback |
| traceback.print_exc() |
| await self.send_to_client("error", message=str(e)) |
| return |
|
|
| |
| elapsed = time.time() - self.start_time |
| print(f"Sessao: {elapsed:.2f}s, {self.total_frames} frames") |
|
|
| await self.send_to_client( |
| "done", |
| total_frames=self.total_frames, |
| elapsed_ms=int(elapsed * 1000) |
| ) |
|
|
| def stop(self): |
| self.is_running = False |
|
|
|
|
| @routes.get("/ws") |
| async def websocket_handler(request): |
| ws = web.WebSocketResponse(max_msg_size=50*1024*1024) |
| await ws.prepare(request) |
|
|
| print("Cliente conectado") |
| session = None |
|
|
| try: |
| async for msg in ws: |
| if msg.type == aiohttp.WSMsgType.TEXT: |
| try: |
| data = json.loads(msg.data) |
| action = data.get("action", "") |
|
|
| if action == "generate": |
| text = data.get("text", "").strip() |
| voice = data.get("voice", "tara") |
|
|
| if not text: |
| await ws.send_json({"type": "error", "message": "Text required"}) |
| continue |
|
|
| print(f"Gerando: '{text[:50]}...'") |
|
|
| if session: |
| session.stop() |
|
|
| session = RealtimeSession(ws) |
| await session.run(text, voice) |
|
|
| elif action == "stop": |
| if session: |
| session.stop() |
| await ws.send_json({"type": "stopped"}) |
|
|
| elif action == "ping": |
| await ws.send_json({"type": "pong"}) |
|
|
| except json.JSONDecodeError: |
| await ws.send_json({"type": "error", "message": "Invalid JSON"}) |
|
|
| elif msg.type == aiohttp.WSMsgType.ERROR: |
| print(f"WebSocket error: {ws.exception()}") |
| break |
|
|
| except Exception as e: |
| print(f"Erro: {e}") |
| finally: |
| if session: |
| session.stop() |
| print("Cliente desconectado") |
|
|
| return ws |
|
|
|
|
| @routes.get("/") |
| async def index(request): |
| return web.FileResponse(os.path.join(os.path.dirname(__file__), "index_realtime.html")) |
|
|
|
|
| @routes.get("/health") |
| async def health(request): |
| status = {"server": True, "wav2lip": False} |
| try: |
| async with aiohttp.ClientSession() as session: |
| async with session.ws_connect(WAV2LIP_WS, timeout=aiohttp.ClientWSTimeout(ws_close=2)) as ws: |
| status["wav2lip"] = True |
| except: |
| pass |
| return web.json_response(status) |
|
|
|
|
| app = web.Application() |
| app.add_routes(routes) |
|
|
|
|
| if __name__ == "__main__": |
| print("=" * 50) |
| print("Interface Server - REALTIME Streaming") |
| print("=" * 50) |
| print(f"Porta: {PORT}") |
| print(f"Wav2Lip: {WAV2LIP_WS}") |
| print(f"Mode: Canvas + Web Audio (baixa latencia)") |
| print("=" * 50) |
| web.run_app(app, host="0.0.0.0", port=PORT) |
|
|