| """ |
| Interface Server - WebRTC Streaming com VP9 |
| Porta: 8080 |
| |
| Arquitetura: |
| 1. Cliente conecta via WebRTC (signaling por WebSocket) |
| 2. Servidor envia stream de video VP9 + audio Opus |
| 3. Fusao idle/lip-sync feita no backend |
| 4. Frontend apenas renderiza o <video> |
| |
| Framework: aiortc (https://github.com/aiortc/aiortc) |
| """ |
| from aiohttp import web |
| import aiohttp |
| import asyncio |
| import json |
| import base64 |
| import os |
| import sys |
| import time |
| import uuid |
| import fractions |
| import numpy as np |
| from av import VideoFrame, AudioFrame |
| from aiortc import RTCPeerConnection, RTCSessionDescription, MediaStreamTrack, RTCConfiguration, RTCIceServer |
| from aiortc.contrib.media import MediaRelay |
| import cv2 |
| import subprocess |
| import tempfile |
| from scipy import signal |
|
|
| |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', 'video')) |
|
|
| |
| try: |
| from avatar_api import AvatarAPI |
| AVATAR_API_AVAILABLE = True |
| print("[Import] AvatarAPI imported successfully") |
| except ImportError as e: |
| AVATAR_API_AVAILABLE = False |
| print(f"[Import] WARNING: Could not import AvatarAPI: {e}") |
| print("[Import] Falling back to WebSocket mode") |
|
|
| |
| PORT = int(os.getenv("PORT", "8080")) |
| IDLE_VIDEO = os.path.join(os.path.dirname(__file__), "idle.mp4") |
| AVATAR_VIDEO = os.path.join(os.path.dirname(__file__), '..', 'video', 'avatar_parado.mp4') |
| WAV2LIP_CHECKPOINT = os.path.join(os.path.dirname(__file__), '..', 'video', 'checkpoints', 'wav2lip_gan.pth') |
| TTS_URL = os.getenv("TTS_URL", "http://localhost:8081") |
|
|
| |
| VIDEO_FPS = 25 |
| VIDEO_WIDTH = 854 |
| VIDEO_HEIGHT = 480 |
| AUDIO_SAMPLE_RATE = 48000 |
| ORPHEUS_SAMPLE_RATE = 24000 |
| VIDEO_TIME_BASE = fractions.Fraction(1, VIDEO_FPS) |
| AUDIO_TIME_BASE = fractions.Fraction(1, AUDIO_SAMPLE_RATE) |
|
|
| |
| idle_frames_cache = [] |
| pcs = set() |
| relay = MediaRelay() |
| avatar_api = None |
|
|
| routes = web.RouteTableDef() |
|
|
|
|
| def calculate_frame_difference(frame1, frame2): |
| """Calcula diferenca entre dois frames (0 = identicos, 1 = muito diferentes).""" |
| if frame1 is None or frame2 is None: |
| return 1.0 |
|
|
| |
| if len(frame1.shape) == 3: |
| gray1 = cv2.cvtColor(frame1, cv2.COLOR_RGB2GRAY) |
| else: |
| gray1 = frame1 |
|
|
| if len(frame2.shape) == 3: |
| gray2 = cv2.cvtColor(frame2, cv2.COLOR_RGB2GRAY) |
| else: |
| gray2 = frame2 |
|
|
| |
| if gray1.shape != gray2.shape: |
| gray2 = cv2.resize(gray2, (gray1.shape[1], gray1.shape[0])) |
|
|
| |
| diff = cv2.absdiff(gray1, gray2) |
| return np.mean(diff) / 255.0 |
|
|
|
|
| def find_best_matching_idle_frame(last_speak_frame, idle_frames, sample_step=10): |
| """ |
| Encontra o frame idle mais similar ao ultimo frame de fala. |
| Usa amostragem para ser mais rapido. |
| """ |
| if not idle_frames or last_speak_frame is None: |
| return 0, float('inf') |
|
|
| best_idx = 0 |
| best_diff = float('inf') |
|
|
| |
| for i in range(0, len(idle_frames), sample_step): |
| diff = calculate_frame_difference(last_speak_frame, idle_frames[i]) |
| if diff < best_diff: |
| best_diff = diff |
| best_idx = i |
|
|
| |
| start = max(0, best_idx - sample_step) |
| end = min(len(idle_frames), best_idx + sample_step) |
|
|
| for i in range(start, end): |
| diff = calculate_frame_difference(last_speak_frame, idle_frames[i]) |
| if diff < best_diff: |
| best_diff = diff |
| best_idx = i |
|
|
| return best_idx, best_diff |
|
|
|
|
| def trim_high_motion_frames(frames, threshold_multiplier=1.0, max_trim=20): |
| """ |
| Remove frames do final que tem movimento muito alto (saltos). |
| """ |
| if len(frames) < 20: |
| return frames, None |
|
|
| |
| last_n = min(20, len(frames) - 1) |
| differences = [] |
| for i in range(len(frames) - last_n, len(frames)): |
| if i > 0: |
| diff = calculate_frame_difference(frames[i-1], frames[i]) |
| differences.append((i, diff)) |
|
|
| if not differences: |
| return frames, frames[-1] if frames else None |
|
|
| |
| diffs = [d[1] for d in differences] |
| mean_diff = np.mean(diffs) |
| std_diff = np.std(diffs) |
|
|
| |
| threshold = mean_diff + threshold_multiplier * std_diff |
| min_threshold = 0.7 |
| if threshold > min_threshold: |
| threshold = min_threshold |
|
|
| |
| trim_from = len(frames) |
| frames_removed = 0 |
|
|
| for i in range(len(differences) - 1, -1, -1): |
| idx, diff = differences[i] |
| if diff > threshold: |
| trim_from = idx |
| frames_removed += 1 |
| if frames_removed >= max_trim: |
| break |
| else: |
| break |
|
|
| frames_to_trim = len(frames) - trim_from |
|
|
| if frames_to_trim > 0 and frames_to_trim <= max_trim: |
| print(f"[Trim] Removendo {frames_to_trim} frames problematicos") |
| trimmed_frames = frames[:trim_from] |
| return trimmed_frames, trimmed_frames[-1] if trimmed_frames else None |
|
|
| return frames, frames[-1] if frames else None |
|
|
|
|
| def load_idle_frames(): |
| """Carrega frames do idle.mp4 como arrays numpy, redimensionados para 480p.""" |
| global idle_frames_cache |
|
|
| if idle_frames_cache: |
| return idle_frames_cache |
|
|
| if not os.path.exists(IDLE_VIDEO): |
| print(f"[Idle] Arquivo nao encontrado: {IDLE_VIDEO}") |
| return [] |
|
|
| print(f"[Idle] Carregando frames de {IDLE_VIDEO}...") |
|
|
| cap = cv2.VideoCapture(IDLE_VIDEO) |
|
|
| |
| original_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| original_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| print(f"[Idle] Resolucao original: {original_width}x{original_height}") |
| print(f"[Idle] Redimensionando para: {VIDEO_WIDTH}x{VIDEO_HEIGHT} (480p)") |
|
|
| while True: |
| ret, frame = cap.read() |
| if not ret: |
| break |
| |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
|
|
| |
| if original_width != VIDEO_WIDTH or original_height != VIDEO_HEIGHT: |
| frame_rgb = cv2.resize(frame_rgb, (VIDEO_WIDTH, VIDEO_HEIGHT), |
| interpolation=cv2.INTER_LANCZOS4) |
|
|
| idle_frames_cache.append(frame_rgb) |
| cap.release() |
|
|
| print(f"[Idle] Carregados {len(idle_frames_cache)} frames @ {VIDEO_WIDTH}x{VIDEO_HEIGHT}") |
| return idle_frames_cache |
|
|
|
|
| class AvatarVideoTrack(MediaStreamTrack): |
| """ |
| Track de video que envia frames do avatar. |
| Alterna entre idle e lip-sync conforme necessario. |
| """ |
| kind = "video" |
|
|
| def __init__(self): |
| super().__init__() |
| self.idle_frames = load_idle_frames() |
| self.current_idx = 0 |
| self.frame_count = 0 |
| self.start_time = None |
|
|
| |
| self.is_speaking = False |
| self.speaking_frames = [] |
| self.speaking_idx = 0 |
| self.best_idle_idx = None |
|
|
| |
| self.width = VIDEO_WIDTH |
| self.height = VIDEO_HEIGHT |
|
|
| print(f"[VideoTrack] Inicializado: {self.width}x{self.height} @ {VIDEO_FPS}fps (480p fixo)") |
|
|
| async def recv(self): |
| """Retorna o proximo frame de video.""" |
| if self.start_time is None: |
| self.start_time = time.time() |
|
|
| |
| pts = int(self.frame_count * VIDEO_TIME_BASE.denominator / VIDEO_FPS) |
| self.frame_count += 1 |
|
|
| |
| if self.is_speaking and self.speaking_frames: |
| if self.speaking_idx < len(self.speaking_frames): |
| frame_data = self.speaking_frames[self.speaking_idx] |
| self.speaking_idx += 1 |
| else: |
| |
| self.is_speaking = False |
| self.speaking_frames = [] |
| self.speaking_idx = 0 |
|
|
| |
| if self.best_idle_idx is not None and self.idle_frames: |
| self.current_idx = self.best_idle_idx |
| self.best_idle_idx = None |
| print(f"[VideoTrack] Transicao suave -> idle frame {self.current_idx}") |
|
|
| frame_data = self.idle_frames[self.current_idx % len(self.idle_frames)] |
| self.current_idx += 1 |
| elif self.idle_frames: |
| frame_data = self.idle_frames[self.current_idx % len(self.idle_frames)] |
| self.current_idx += 1 |
| else: |
| |
| frame_data = np.zeros((self.height, self.width, 3), dtype=np.uint8) |
|
|
| |
| frame = VideoFrame.from_ndarray(frame_data, format="rgb24") |
| frame.pts = pts |
| frame.time_base = VIDEO_TIME_BASE |
|
|
| |
| elapsed = time.time() - self.start_time |
| expected = self.frame_count / VIDEO_FPS |
| if expected > elapsed: |
| await asyncio.sleep(expected - elapsed) |
|
|
| return frame |
|
|
| def set_speaking_frames(self, frames): |
| """Define frames de lip-sync para reproduzir com transicao suave.""" |
| |
| trimmed_frames, last_frame = trim_high_motion_frames(frames) |
|
|
| |
| if last_frame is not None and self.idle_frames: |
| best_idx, best_diff = find_best_matching_idle_frame( |
| last_frame, self.idle_frames, sample_step=10 |
| ) |
| self.best_idle_idx = best_idx |
| print(f"[VideoTrack] Best match: idle frame {best_idx} (diff: {best_diff:.2f})") |
| else: |
| self.best_idle_idx = None |
|
|
| self.speaking_frames = trimmed_frames |
| self.speaking_idx = 0 |
| self.is_speaking = True |
| print(f"[VideoTrack] Speaking: {len(trimmed_frames)} frames (original: {len(frames)})") |
|
|
|
|
| class AvatarAudioTrack(MediaStreamTrack): |
| """ |
| Track de audio que envia silencio ou audio do Orpheus. |
| """ |
| kind = "audio" |
|
|
| def __init__(self): |
| super().__init__() |
| self.sample_rate = AUDIO_SAMPLE_RATE |
| self.samples_per_frame = 1920 |
| self.frame_count = 0 |
| self.start_time = None |
|
|
| |
| self.audio_buffer = [] |
| self.buffer_idx = 0 |
|
|
| print(f"[AudioTrack] Inicializado: {self.sample_rate}Hz, {self.samples_per_frame} samples/frame (40ms)") |
|
|
| async def recv(self): |
| """Retorna o proximo frame de audio.""" |
| if self.start_time is None: |
| self.start_time = time.time() |
|
|
| pts = self.frame_count * self.samples_per_frame |
| self.frame_count += 1 |
|
|
| |
| if self.audio_buffer and self.buffer_idx < len(self.audio_buffer): |
| samples = self.audio_buffer[self.buffer_idx] |
| self.buffer_idx += 1 |
| else: |
| |
| samples = np.zeros(self.samples_per_frame, dtype=np.int16) |
|
|
| |
| frame = AudioFrame(format="s16", layout="mono", samples=len(samples)) |
| frame.sample_rate = self.sample_rate |
| frame.pts = pts |
| frame.time_base = AUDIO_TIME_BASE |
|
|
| |
| frame.planes[0].update(samples.tobytes()) |
|
|
| |
| elapsed = time.time() - self.start_time |
| expected = self.frame_count * self.samples_per_frame / self.sample_rate |
| if expected > elapsed: |
| await asyncio.sleep(expected - elapsed) |
|
|
| return frame |
|
|
| def set_audio(self, pcm_data): |
| """Define audio PCM para reproduzir.""" |
| |
| samples = np.frombuffer(pcm_data, dtype=np.int16) |
|
|
| print(f"[AudioTrack] Recebido: {len(samples)} samples @ {ORPHEUS_SAMPLE_RATE}Hz") |
|
|
| |
| |
| if ORPHEUS_SAMPLE_RATE != AUDIO_SAMPLE_RATE: |
| num_samples_48k = int(len(samples) * AUDIO_SAMPLE_RATE / ORPHEUS_SAMPLE_RATE) |
| samples_48k = signal.resample(samples, num_samples_48k).astype(np.int16) |
| print(f"[AudioTrack] Upsampled: {len(samples_48k)} samples @ {AUDIO_SAMPLE_RATE}Hz (scipy high-quality)") |
| samples = samples_48k |
|
|
| |
| self.audio_buffer = [] |
| for i in range(0, len(samples), self.samples_per_frame): |
| chunk = samples[i:i + self.samples_per_frame] |
| if len(chunk) < self.samples_per_frame: |
| |
| chunk = np.pad(chunk, (0, self.samples_per_frame - len(chunk))) |
| self.audio_buffer.append(chunk) |
|
|
| self.buffer_idx = 0 |
| print(f"[AudioTrack] Buffer: {len(self.audio_buffer)} frames x {self.samples_per_frame} samples (40ms cada)") |
|
|
|
|
| class AvatarSession: |
| """Gerencia uma sessao WebRTC com o cliente.""" |
|
|
| def __init__(self, pc, video_track, audio_track): |
| self.pc = pc |
| self.video_track = video_track |
| self.audio_track = audio_track |
| self.wav2lip_ws = None |
| self.wav2lip_session = None |
|
|
| async def generate(self, text: str, voice: str): |
| """Gera fala com lip-sync via Wav2Lip.""" |
| print(f"[Session] Gerando: '{text[:50]}...'") |
|
|
| try: |
| |
| self.wav2lip_session = aiohttp.ClientSession() |
| self.wav2lip_ws = await self.wav2lip_session.ws_connect( |
| WAV2LIP_WS, |
| timeout=aiohttp.ClientWSTimeout(ws_close=120) |
| ) |
|
|
| |
| await self.wav2lip_ws.send_json({ |
| "action": "generate", |
| "text": text, |
| "voice": voice |
| }) |
|
|
| speaking_frames = [] |
| audio_data = b'' |
|
|
| |
| async for msg in self.wav2lip_ws: |
| if msg.type == aiohttp.WSMsgType.TEXT: |
| data = json.loads(msg.data) |
| msg_type = data.get("type", "") |
|
|
| if msg_type == "frame": |
| frame_b64 = data.get("frame", "") |
| if frame_b64: |
| |
| jpeg_data = base64.b64decode(frame_b64) |
| nparr = np.frombuffer(jpeg_data, np.uint8) |
| frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
| frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| speaking_frames.append(frame_rgb) |
|
|
| elif msg_type == "full_audio": |
| audio_b64 = data.get("audio", "") |
| if audio_b64: |
| audio_data = base64.b64decode(audio_b64) |
|
|
| elif msg_type == "done": |
| break |
|
|
| elif msg_type == "error": |
| print(f"[Session] Erro Wav2Lip: {data.get('message')}") |
| break |
|
|
| elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): |
| break |
|
|
| await self.wav2lip_ws.close() |
| await self.wav2lip_session.close() |
|
|
| |
| if speaking_frames: |
| self.video_track.set_speaking_frames(speaking_frames) |
| if audio_data: |
| self.audio_track.set_audio(audio_data) |
|
|
| print(f"[Session] Gerado: {len(speaking_frames)} frames, {len(audio_data)} bytes audio") |
|
|
| except Exception as e: |
| print(f"[Session] Erro: {e}") |
| import traceback |
| traceback.print_exc() |
|
|
| async def close(self): |
| """Fecha a sessao.""" |
| if self.wav2lip_ws and not self.wav2lip_ws.closed: |
| await self.wav2lip_ws.close() |
| if self.wav2lip_session: |
| await self.wav2lip_session.close() |
|
|
|
|
| |
| sessions = {} |
|
|
|
|
| @routes.post("/offer") |
| async def offer(request): |
| """Recebe offer SDP do cliente e retorna answer.""" |
| params = await request.json() |
| offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"]) |
|
|
| |
| ice_servers = [ |
| RTCIceServer(urls=["stun:stun.l.google.com:19302"]), |
| |
| RTCIceServer( |
| urls=[ |
| "turn:openrelay.metered.ca:80", |
| "turn:openrelay.metered.ca:443", |
| "turn:openrelay.metered.ca:443?transport=tcp" |
| ], |
| username="openrelayproject", |
| credential="openrelayproject" |
| ), |
| |
| RTCIceServer( |
| urls=["turn:global.turn.twilio.com:3478?transport=udp"], |
| username="f4b4035eaa76f4a55de5f4351567653ee4ff6fa97b50b6b334fcc1be9c27212d", |
| credential="w1uxM55V9yVoqyVFjt+mxDBV0F87AUCemaYVQGxsPLw=" |
| ), |
| ] |
|
|
| config = RTCConfiguration(iceServers=ice_servers) |
| pc = RTCPeerConnection(configuration=config) |
| pc_id = str(uuid.uuid4()) |
| pcs.add(pc) |
|
|
| print(f"[WebRTC] Nova conexao: {pc_id}") |
|
|
| |
| video_track = AvatarVideoTrack() |
| audio_track = AvatarAudioTrack() |
|
|
| |
| pc.addTrack(video_track) |
| pc.addTrack(audio_track) |
|
|
| |
| session = AvatarSession(pc, video_track, audio_track) |
| sessions[pc_id] = session |
|
|
| @pc.on("iceconnectionstatechange") |
| async def on_ice_state(): |
| print(f"[ICE] Estado: {pc.iceConnectionState}") |
|
|
| @pc.on("icegatheringstatechange") |
| async def on_ice_gathering(): |
| print(f"[ICE] Gathering: {pc.iceGatheringState}") |
|
|
| @pc.on("connectionstatechange") |
| async def on_connectionstatechange(): |
| print(f"[WebRTC] Estado: {pc.connectionState}") |
| if pc.connectionState == "failed" or pc.connectionState == "closed": |
| await pc.close() |
| pcs.discard(pc) |
| if pc_id in sessions: |
| await sessions[pc_id].close() |
| del sessions[pc_id] |
|
|
| |
| await pc.setRemoteDescription(offer) |
| answer = await pc.createAnswer() |
| await pc.setLocalDescription(answer) |
|
|
| return web.json_response({ |
| "sdp": pc.localDescription.sdp, |
| "type": pc.localDescription.type, |
| "session_id": pc_id |
| }) |
|
|
|
|
| @routes.post("/generate") |
| async def generate(request): |
| """Gera fala com lip-sync.""" |
| params = await request.json() |
| session_id = params.get("session_id") |
| text = params.get("text", "").strip() |
| voice = params.get("voice", "tara") |
|
|
| if not session_id or session_id not in sessions: |
| return web.json_response({"error": "Sessao invalida"}, status=400) |
|
|
| if not text: |
| return web.json_response({"error": "Texto obrigatorio"}, status=400) |
|
|
| session = sessions[session_id] |
| asyncio.create_task(session.generate(text, voice)) |
|
|
| return web.json_response({"status": "generating"}) |
|
|
|
|
| @routes.get("/") |
| async def index(request): |
| return web.FileResponse(os.path.join(os.path.dirname(__file__), "index.html")) |
|
|
|
|
| @routes.get("/{filename}") |
| async def static_file(request): |
| filename = request.match_info["filename"] |
| filepath = os.path.join(os.path.dirname(__file__), filename) |
| if os.path.exists(filepath): |
| return web.FileResponse(filepath) |
| return web.Response(status=404) |
|
|
|
|
| @routes.get("/health") |
| async def health(request): |
| return web.json_response({ |
| "status": "ok", |
| "mode": "webrtc", |
| "connections": len(pcs) |
| }) |
|
|
|
|
| async def on_shutdown(app): |
| """Fecha todas as conexoes ao desligar.""" |
| coros = [pc.close() for pc in pcs] |
| await asyncio.gather(*coros) |
| pcs.clear() |
|
|
|
|
| app = web.Application() |
| app.add_routes(routes) |
| app.on_shutdown.append(on_shutdown) |
|
|
|
|
| if __name__ == "__main__": |
| print("=" * 50) |
| print("Interface Server - WebRTC VP9 Streaming") |
| print("=" * 50) |
| print(f"Porta: {PORT}") |
| print(f"Idle Video: {IDLE_VIDEO}") |
| print(f"Wav2Lip: {WAV2LIP_WS}") |
| print("=" * 50) |
| print("Endpoints:") |
| print(" POST /offer - WebRTC signaling") |
| print(" POST /generate - Gerar fala") |
| print("=" * 50) |
|
|
| |
| print("Carregando idle frames...") |
| load_idle_frames() |
| print("=" * 50) |
|
|
| web.run_app(app, host="0.0.0.0", port=PORT) |
|
|