speech2speech-interface / interface /server_optimized.py
marcosremar2's picture
Add WebRTC streaming interface with vast.ai deployment
e62aafd
Raw
History Blame Contribute Delete
15.5 kB
"""
Interface Server - Streaming Otimizado
Porta: 8080
VERSAO OTIMIZADA - 2024-12-27
Otimizações implementadas:
1. WebSocket binário (sem base64) - reduz 33% do tráfego
2. Compressão de frames mais eficiente
3. Passthrough direto de bytes
Arquitetura:
[Frontend] <--WebSocket Binary--> [Interface:8080] <--WebSocket--> [Wav2Lip:8082]
|
[Orpheus TTS:8000]
"""
from aiohttp import web
import aiohttp
import asyncio
import json
import base64
import os
import time
import struct
# Configuracao
WAV2LIP_WS = os.getenv("WAV2LIP_WS", "ws://localhost:8082/ws")
ORPHEUS_WS = os.getenv("ORPHEUS_WS", "ws://localhost:8081/ws")
PORT = int(os.getenv("PORT", "8080"))
VIDEO_FPS = 25
AUDIO_SAMPLE_RATE = 24000
routes = web.RouteTableDef()
class OptimizedSession:
"""Sessao otimizada - usa WebSocket binário."""
# Tipos de mensagem binária
MSG_FRAME = 0x01
MSG_AUDIO = 0x02
MSG_AUDIO_CHUNK = 0x03
def __init__(self, client_ws):
self.client_ws = client_ws
self.is_running = False
self.start_time = None
self.total_frames = 0
self.total_bytes = 0
self.end_video_time_ms = None # Tempo do vídeo onde parou (para sync)
self.start_frame_idx = None # Frame inicial usado pelo Wav2Lip
self.end_frame_idx = None # Frame final usado pelo Wav2Lip
async def send_json(self, msg_type: str, **kwargs):
"""Envia mensagem JSON (para status/controle)."""
try:
await self.client_ws.send_json({"type": msg_type, **kwargs})
except Exception as e:
print(f"Erro enviando JSON: {e}")
async def send_binary_frame(self, frame_bytes: bytes, frame_index: int):
"""Envia frame como binário puro (sem base64)."""
try:
# Header: [tipo:1][index:4][tamanho:4][dados]
header = struct.pack('<BII', self.MSG_FRAME, frame_index, len(frame_bytes))
await self.client_ws.send_bytes(header + frame_bytes)
self.total_bytes += len(header) + len(frame_bytes)
except Exception as e:
print(f"Erro enviando frame binário: {e}")
async def send_binary_audio(self, audio_bytes: bytes, sample_rate: int):
"""Envia audio como binário puro."""
try:
# Header: [tipo:1][sample_rate:4][tamanho:4][dados]
header = struct.pack('<BII', self.MSG_AUDIO, sample_rate, len(audio_bytes))
await self.client_ws.send_bytes(header + audio_bytes)
self.total_bytes += len(header) + len(audio_bytes)
except Exception as e:
print(f"Erro enviando audio binário: {e}")
async def send_binary_audio_chunk(self, chunk_bytes: bytes, chunk_index: int, is_last: bool):
"""Envia chunk de audio para streaming."""
try:
# Header: [tipo:1][chunk_index:4][is_last:1][tamanho:4][dados]
header = struct.pack('<BIBI', self.MSG_AUDIO_CHUNK, chunk_index, 1 if is_last else 0, len(chunk_bytes))
await self.client_ws.send_bytes(header + chunk_bytes)
self.total_bytes += len(header) + len(chunk_bytes)
except Exception as e:
print(f"Erro enviando audio chunk: {e}")
async def stream_audio_from_orpheus(self, text: str, voice: str):
"""Busca áudio do Orpheus TTS e envia chunks para o cliente conforme chegam."""
total_chunks = 0
total_bytes = 0
first_chunk_sent = False
try:
async with aiohttp.ClientSession() as orpheus_session:
orpheus_ws = await orpheus_session.ws_connect(
ORPHEUS_WS,
timeout=aiohttp.ClientWSTimeout(ws_close=60),
max_msg_size=10*1024*1024
)
await orpheus_ws.send_json({
"action": "synthesize",
"text": text,
"voice": voice,
"stream": True
})
print(f"[Orpheus] Requisição enviada: '{text[:30]}...'")
async for msg in orpheus_ws:
if not self.is_running:
break
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
msg_type = data.get("type", "")
if msg_type == "audio_chunk":
audio_b64 = data.get("audio", "")
chunk_data = base64.b64decode(audio_b64)
chunk_index = data.get("chunk_index", total_chunks)
total_chunks += 1
total_bytes += len(chunk_data)
# Enviar chunk imediatamente para o cliente
await self.send_binary_audio_chunk(chunk_data, chunk_index, is_last=False)
if not first_chunk_sent:
first_chunk_sent = True
print(f"[Orpheus] Primeiro chunk enviado: {len(chunk_data)} bytes")
else:
print(f"[Orpheus] Chunk {chunk_index}: {len(chunk_data)} bytes")
elif msg_type == "done":
# Enviar marcador de fim
await self.send_binary_audio_chunk(b'', total_chunks, is_last=True)
print(f"[Orpheus] Done: {total_chunks} chunks, {total_bytes} bytes")
break
elif msg_type == "error":
print(f"[Orpheus] Erro: {data.get('message')}")
break
elif msg.type == aiohttp.WSMsgType.BINARY:
chunk_data = msg.data
total_chunks += 1
total_bytes += len(chunk_data)
await self.send_binary_audio_chunk(chunk_data, total_chunks, is_last=False)
print(f"[Orpheus] Chunk binário {total_chunks}: {len(chunk_data)} bytes")
elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
break
await orpheus_ws.close()
except Exception as e:
print(f"[Orpheus] Erro: {e}")
import traceback
traceback.print_exc()
return total_chunks > 0
async def run(self, text: str, voice: str, jpeg_quality: int = 95, idle_video_time_ms: int = 0):
"""Streaming otimizado - áudio e frames em paralelo, enviados conforme chegam."""
self.is_running = True
self.start_time = time.time()
self.total_bytes = 0
await self.send_json("status", message="Conectando...")
try:
async with aiohttp.ClientSession() as session:
# Iniciar streaming de áudio do Orpheus em paralelo
# Os chunks são enviados diretamente para o cliente conforme chegam
audio_task = asyncio.create_task(
self.stream_audio_from_orpheus(text, voice)
)
wav2lip_ws = await session.ws_connect(
WAV2LIP_WS,
timeout=aiohttp.ClientWSTimeout(ws_close=300),
max_msg_size=50*1024*1024
)
await self.send_json("status", message="Gerando...")
# Enviar texto para Wav2Lip com action "generate" para receber end_video_time_ms
await wav2lip_ws.send_json({
"action": "generate",
"text": text,
"voice": voice,
"idle_video_time_ms": idle_video_time_ms, # Tempo inicial do vídeo idle
"jpeg_quality": jpeg_quality # Qualidade do JPEG (50-100)
})
print(f"[Optimized] Texto: '{text[:50]}...'")
first_frame_time = None
start_frame_idx = None # Frame inicial usado pelo Wav2Lip
# Receber frames do Wav2Lip
async for msg in wav2lip_ws:
if not self.is_running:
break
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
msg_type = data.get("type", "")
if msg_type == "frame":
frame_b64 = data.get("frame", "")
frame_bytes = base64.b64decode(frame_b64)
self.total_frames += 1
await self.send_binary_frame(frame_bytes, self.total_frames)
if first_frame_time is None:
first_frame_time = time.time() - self.start_time
latency_ms = int(first_frame_time * 1000)
# Capturar start_frame_idx do primeiro frame
start_frame_idx = data.get("source_frame_idx", None)
print(f"[Optimized] Primeiro frame: {latency_ms}ms, source_frame={start_frame_idx}")
await self.send_json("first_frame", latency_ms=latency_ms, start_frame_idx=start_frame_idx)
elif msg_type == "full_audio":
print(f"[Optimized] Ignorando audio Wav2Lip (usando Orpheus streaming)")
elif msg_type == "first_chunk":
print(f"[Wav2Lip] eSpeak: {data.get('latency_ms', 0)}ms")
elif msg_type == "status":
print(f"[Wav2Lip] {data.get('message')}")
elif msg_type == "error":
await self.send_json("error", message=data.get("message", "Erro"))
break
elif msg_type == "done":
# Capturar índices de frame para sincronização
self.end_video_time_ms = data.get("end_video_time_ms")
if start_frame_idx is None:
start_frame_idx = data.get("start_frame_idx")
end_frame_idx = data.get("end_frame_idx")
self.start_frame_idx = start_frame_idx
self.end_frame_idx = end_frame_idx
print(f"[Optimized] Wav2Lip done: {self.total_frames} frames, start_frame={start_frame_idx}, end_frame={end_frame_idx}, end_time_ms={self.end_video_time_ms}")
break
elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
break
await wav2lip_ws.close()
# Aguardar término do streaming de áudio
if not audio_task.done():
print(f"[Optimized] Aguardando término do streaming Orpheus...")
try:
await asyncio.wait_for(audio_task, timeout=30.0)
except asyncio.TimeoutError:
print(f"[Optimized] ERRO: Timeout aguardando Orpheus")
audio_task.cancel()
audio_success = audio_task.result() if audio_task.done() else False
print(f"[Optimized] Concluído: {self.total_frames} frames, audio_streaming={audio_success}")
except Exception as e:
print(f"Erro: {e}")
import traceback
traceback.print_exc()
await self.send_json("error", message=str(e))
return
elapsed = time.time() - self.start_time
print(f"Sessao: {elapsed:.2f}s, {self.total_frames} frames, {self.total_bytes/1024:.1f}KB binário")
await self.send_json(
"done",
total_frames=self.total_frames,
elapsed_ms=int(elapsed * 1000),
bytes_sent=self.total_bytes,
start_frame_idx=self.start_frame_idx,
end_frame_idx=self.end_frame_idx,
end_video_time_ms=self.end_video_time_ms
)
def stop(self):
self.is_running = False
@routes.get("/ws")
async def websocket_handler(request):
ws = web.WebSocketResponse(max_msg_size=50*1024*1024)
await ws.prepare(request)
print("Cliente conectado (modo binário)")
session = None
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
try:
data = json.loads(msg.data)
action = data.get("action", "")
if action == "generate":
text = data.get("text", "").strip()
voice = data.get("voice", "tara")
jpeg_quality = data.get("jpeg_quality", 95) # Qualidade JPEG (50-100)
idle_video_time_ms = data.get("idle_video_time_ms", 0) # Tempo do vídeo idle
if not text:
await ws.send_json({"type": "error", "message": "Text required"})
continue
print(f"Gerando: '{text[:50]}...' (quality={jpeg_quality}, idle_time={idle_video_time_ms}ms)")
if session:
session.stop()
session = OptimizedSession(ws)
await session.run(text, voice, jpeg_quality, idle_video_time_ms)
elif action == "stop":
if session:
session.stop()
await ws.send_json({"type": "stopped"})
elif action == "ping":
await ws.send_json({"type": "pong"})
except json.JSONDecodeError:
await ws.send_json({"type": "error", "message": "Invalid JSON"})
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f"WebSocket error: {ws.exception()}")
break
except Exception as e:
print(f"Erro: {e}")
finally:
if session:
session.stop()
print("Cliente desconectado")
return ws
@routes.get("/")
async def index(request):
return web.FileResponse(os.path.join(os.path.dirname(__file__), "index_optimized.html"))
@routes.get("/idle.mp4")
async def idle_video(request):
return web.FileResponse(os.path.join(os.path.dirname(__file__), "idle.mp4"))
@routes.get("/health")
async def health(request):
status = {"server": True, "wav2lip": False}
try:
async with aiohttp.ClientSession() as session:
async with session.ws_connect(WAV2LIP_WS, timeout=aiohttp.ClientWSTimeout(ws_close=2)) as ws:
status["wav2lip"] = True
except:
pass
return web.json_response(status)
app = web.Application()
app.add_routes(routes)
if __name__ == "__main__":
print("=" * 50)
print("Interface Server - OPTIMIZED (Binary)")
print("=" * 50)
print(f"Porta: {PORT}")
print(f"Wav2Lip: {WAV2LIP_WS}")
print(f"Mode: WebSocket Binary (-33% bandwidth)")
print("=" * 50)
web.run_app(app, host="0.0.0.0", port=PORT)