File size: 8,510 Bytes
14b6b3e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 | """
Interface Server - Streaming em Tempo Real
Porta: 8080
VERSAO FUNCIONANDO - 2024-12-27
Fluxo otimizado para baixa latência:
1. Frontend conecta via WebSocket
2. Server envia texto para Wav2Lip (action: speak)
3. Conforme frames chegam, envia direto para frontend (sem ffmpeg)
4. Frontend renderiza em Canvas + Web Audio API
Arquitetura:
[Frontend] <--WebSocket--> [Interface:8080] <--WebSocket--> [Wav2Lip:8082]
|
[Orpheus TTS:8000]
Latência típica: ~1-2s até primeiro frame
Formato: Frames JPEG (base64) + Audio WAV (base64)
"""
from aiohttp import web
import aiohttp
import asyncio
import json
import base64
import os
import time
# Configuracao dos servicos
WAV2LIP_WS = os.getenv("WAV2LIP_WS", "ws://localhost:8082/ws")
PORT = int(os.getenv("PORT", "8080"))
# Configuracao de video
VIDEO_FPS = 25
FRAME_INTERVAL_MS = 1000 / VIDEO_FPS # 40ms entre frames
routes = web.RouteTableDef()
class RealtimeSession:
"""Sessao de streaming em tempo real - envia frames diretamente."""
def __init__(self, client_ws):
self.client_ws = client_ws
self.is_running = False
self.start_time = None
self.total_frames = 0
async def send_to_client(self, msg_type: str, **kwargs):
try:
await self.client_ws.send_json({"type": msg_type, **kwargs})
except Exception as e:
print(f"Erro enviando para cliente: {e}")
async def run(self, text: str, voice: str):
"""Streaming em tempo real - envia frames conforme chegam."""
self.is_running = True
self.start_time = time.time()
await self.send_to_client("status", message="Conectando...")
try:
async with aiohttp.ClientSession() as session:
wav2lip_ws = await session.ws_connect(
WAV2LIP_WS,
timeout=aiohttp.ClientWSTimeout(ws_close=300)
)
await self.send_to_client("status", message="Gerando...")
# Enviar texto para Wav2Lip
await wav2lip_ws.send_json({
"action": "speak",
"text": text,
"voice": voice,
"parallel": True
})
print(f"[Wav2Lip] Texto: '{text[:50]}...'")
first_frame_time = None
audio_sent = False
# Receber e repassar frames em tempo real
async for msg in wav2lip_ws:
if not self.is_running:
break
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
msg_type = data.get("type", "")
if msg_type == "frame":
# Enviar frame DIRETAMENTE para o cliente
frame_b64 = data.get("frame", "")
self.total_frames += 1
await self.send_to_client(
"frame",
frame=frame_b64,
index=self.total_frames
)
if first_frame_time is None:
first_frame_time = time.time() - self.start_time
print(f"[Realtime] Primeiro frame: {first_frame_time*1000:.0f}ms")
await self.send_to_client(
"first_frame",
latency_ms=int(first_frame_time * 1000)
)
elif msg_type == "full_audio":
# Enviar áudio completo
audio_b64 = data.get("audio", "")
duration = data.get("duration_ms", 0)
await self.send_to_client(
"audio",
audio=audio_b64,
duration_ms=duration,
sample_rate=24000
)
audio_sent = True
print(f"[Realtime] Audio: {duration}ms")
elif msg_type == "done":
print(f"[Realtime] Concluído: {self.total_frames} frames")
break
elif msg_type == "first_chunk":
latency = data.get("latency_ms", 0)
print(f"[Wav2Lip] eSpeak pronto: {latency}ms")
elif msg_type == "status":
print(f"[Wav2Lip] {data.get('message')}")
elif msg_type == "error":
error_msg = data.get("message", "Erro")
print(f"[Wav2Lip] Erro: {error_msg}")
await self.send_to_client("error", message=error_msg)
break
elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
break
await wav2lip_ws.close()
except Exception as e:
print(f"Erro na sessao: {e}")
import traceback
traceback.print_exc()
await self.send_to_client("error", message=str(e))
return
# Finalizar
elapsed = time.time() - self.start_time
print(f"Sessao: {elapsed:.2f}s, {self.total_frames} frames")
await self.send_to_client(
"done",
total_frames=self.total_frames,
elapsed_ms=int(elapsed * 1000)
)
def stop(self):
self.is_running = False
@routes.get("/ws")
async def websocket_handler(request):
ws = web.WebSocketResponse(max_msg_size=50*1024*1024)
await ws.prepare(request)
print("Cliente conectado")
session = None
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
try:
data = json.loads(msg.data)
action = data.get("action", "")
if action == "generate":
text = data.get("text", "").strip()
voice = data.get("voice", "tara")
if not text:
await ws.send_json({"type": "error", "message": "Text required"})
continue
print(f"Gerando: '{text[:50]}...'")
if session:
session.stop()
session = RealtimeSession(ws)
await session.run(text, voice)
elif action == "stop":
if session:
session.stop()
await ws.send_json({"type": "stopped"})
elif action == "ping":
await ws.send_json({"type": "pong"})
except json.JSONDecodeError:
await ws.send_json({"type": "error", "message": "Invalid JSON"})
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"WebSocket error: {ws.exception()}")
break
except Exception as e:
print(f"Erro: {e}")
finally:
if session:
session.stop()
print("Cliente desconectado")
return ws
@routes.get("/")
async def index(request):
return web.FileResponse(os.path.join(os.path.dirname(__file__), "index_realtime.html"))
@routes.get("/health")
async def health(request):
status = {"server": True, "wav2lip": False}
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(WAV2LIP_WS, timeout=aiohttp.ClientWSTimeout(ws_close=2)) as ws:
status["wav2lip"] = True
except:
pass
return web.json_response(status)
app = web.Application()
app.add_routes(routes)
if __name__ == "__main__":
print("=" * 50)
print("Interface Server - REALTIME Streaming")
print("=" * 50)
print(f"Porta: {PORT}")
print(f"Wav2Lip: {WAV2LIP_WS}")
print(f"Mode: Canvas + Web Audio (baixa latencia)")
print("=" * 50)
web.run_app(app, host="0.0.0.0", port=PORT)
|