""" Interface Server - Streaming com Crossfade Suave Faz transicao suave entre idle e fala usando blending de frames """ from aiohttp import web import aiohttp import asyncio import json import base64 import os import time import cv2 import numpy as np WAV2LIP_WS = os.getenv("WAV2LIP_WS", "ws://localhost:8082/ws") PORT = int(os.getenv("PORT", "8000")) IDLE_VIDEO = os.path.join(os.path.dirname(__file__), "idle.mp4") # Configuracao de crossfade CROSSFADE_FRAMES = 5 # Numero de frames para transicao (200ms @ 25fps) routes = web.RouteTableDef() # Cache de frames idle idle_frames = [] idle_frame_count = 0 idle_resolution = (1920, 1080) # Resolucao do idle video (width, height) # Regiao da boca/queixo (em ratio do frame) # Regiao mais focada para evitar "pulos" na transicao # Apenas boca e queixo, sem incluir muito do rosto MOUTH_REGION = { 'top': 0.50, # 50% do topo (comeca abaixo do nariz) 'bottom': 0.80, # ate 80% (apenas queixo) 'left': 0.32, # 32% da esquerda 'right': 0.68 # ate 68% (mais estreito) } def load_idle_frames(): """Carrega frames do idle.mp4 e obtem resolucao""" global idle_frames, idle_frame_count, idle_resolution if idle_frames: return if not os.path.exists(IDLE_VIDEO): print(f"[AVISO] Idle video nao encontrado: {IDLE_VIDEO}") return print(f"Carregando idle frames de {IDLE_VIDEO}...") cap = cv2.VideoCapture(IDLE_VIDEO) # Obter resolucao do video width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) idle_resolution = (width, height) print(f"Resolucao idle: {width}x{height}") while True: ret, frame = cap.read() if not ret: break idle_frames.append(frame) cap.release() idle_frame_count = len(idle_frames) print(f"Carregados {idle_frame_count} frames idle em full resolution") def frame_to_jpeg_base64(frame, quality=85): """Converte frame numpy para JPEG base64""" encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), quality] _, buffer = cv2.imencode('.jpg', frame, encode_param) return base64.b64encode(buffer).decode('utf-8') def jpeg_base64_to_frame(b64_data): """Converte JPEG base64 para frame numpy""" jpeg_data = base64.b64decode(b64_data) nparr = np.frombuffer(jpeg_data, np.uint8) return cv2.imdecode(nparr, cv2.IMREAD_COLOR) def upscale_frame(frame, target_size): """ Upscale frame para a resolucao alvo usando LANCZOS4 (alta qualidade). target_size: (width, height) """ if frame is None: return frame current_h, current_w = frame.shape[:2] target_w, target_h = target_size # Se ja esta na resolucao correta, retornar if current_w == target_w and current_h == target_h: return frame # Upscale usando LANCZOS4 (melhor qualidade para upscaling) upscaled = cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) return upscaled def match_histogram(source, reference): """ Ajusta o histograma da source para corresponder ao da reference. Isso corrige diferencas de brilho/cor entre Wav2Lip e idle frames. Usa o espaco de cor LAB para melhor correspondencia perceptual. """ # Converter para LAB (melhor para correspondencia de cor) source_lab = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype(np.float32) reference_lab = cv2.cvtColor(reference, cv2.COLOR_BGR2LAB).astype(np.float32) # Para cada canal, ajustar media e desvio padrao for i in range(3): src_mean, src_std = source_lab[:, :, i].mean(), source_lab[:, :, i].std() ref_mean, ref_std = reference_lab[:, :, i].mean(), reference_lab[:, :, i].std() # Evitar divisao por zero if src_std < 1e-6: src_std = 1e-6 # Normalizar e reescalar source_lab[:, :, i] = (source_lab[:, :, i] - src_mean) * (ref_std / src_std) + ref_mean # Clipar valores validos e converter de volta source_lab = np.clip(source_lab, 0, 255).astype(np.uint8) result = cv2.cvtColor(source_lab, cv2.COLOR_LAB2BGR) return result def extract_mouth_region(frame, region=MOUTH_REGION): """ Extrai apenas a regiao da boca/queixo do frame. Retorna (regiao_cortada, coordenadas) para posterior blending. """ h, w = frame.shape[:2] y1 = int(h * region['top']) y2 = int(h * region['bottom']) x1 = int(w * region['left']) x2 = int(w * region['right']) mouth_crop = frame[y1:y2, x1:x2].copy() return mouth_crop, (x1, y1, x2, y2) def create_feathered_mask(shape, feather_pixels=15): """ Cria mascara com bordas suavizadas (feathered) para blending seamless. Usa gradiente suave (ease-in-out) para transicao mais natural. """ h, w = shape[:2] mask = np.ones((h, w), dtype=np.float32) # Criar gradiente nas bordas usando curva suave (ease-in-out) for i in range(feather_pixels): # Curva suave: smoothstep para transicao mais natural t = i / feather_pixels alpha = t * t * (3 - 2 * t) # smoothstep # Top mask[i, :] = np.minimum(mask[i, :], alpha) # Bottom mask[h - 1 - i, :] = np.minimum(mask[h - 1 - i, :], alpha) # Left mask[:, i] = np.minimum(mask[:, i], alpha) # Right mask[:, w - 1 - i] = np.minimum(mask[:, w - 1 - i], alpha) return mask def blend_mouth_region_only(wav2lip_frame, idle_frame): """ Nova estrategia: Manter idle em full resolution, substituir APENAS a boca. 1. Extrai regiao da boca do frame Wav2Lip (853x480) 2. Upscala APENAS essa regiao para a escala do idle (1920x1080) 3. Aplica Poisson Blending apenas na regiao da boca 4. Retorna o frame idle com apenas a boca substituida Isso preserva toda a qualidade do idle (cabelo, fundo, roupa) e so substitui a pequena regiao da boca. """ if wav2lip_frame is None or idle_frame is None: return wav2lip_frame if wav2lip_frame is not None else idle_frame # Dimensoes idle_h, idle_w = idle_frame.shape[:2] w2l_h, w2l_w = wav2lip_frame.shape[:2] # Calcular escala entre frames scale_x = idle_w / w2l_w scale_y = idle_h / w2l_h # 1. Extrair regiao da boca do Wav2Lip mouth_crop, (x1_w2l, y1_w2l, x2_w2l, y2_w2l) = extract_mouth_region(wav2lip_frame) # 2. Calcular coordenadas equivalentes no idle (full res) x1_idle = int(x1_w2l * scale_x) y1_idle = int(y1_w2l * scale_y) x2_idle = int(x2_w2l * scale_x) y2_idle = int(y2_w2l * scale_y) # Dimensao da regiao no idle region_w = x2_idle - x1_idle region_h = y2_idle - y1_idle # 3. Upscale apenas a regiao da boca para a resolucao do idle mouth_upscaled = cv2.resize(mouth_crop, (region_w, region_h), interpolation=cv2.INTER_LANCZOS4) # 3.5 Histogram matching: ajustar cor/brilho do mouth para corresponder ao idle idle_region = idle_frame[y1_idle:y2_idle, x1_idle:x2_idle] mouth_upscaled = match_histogram(mouth_upscaled, idle_region) # 4. Criar mascara com bordas suavizadas # Usar 25% da menor dimensao para feathering bem suave feather = max(30, min(region_w, region_h) // 4) # ~25% da menor dimensao mask = create_feathered_mask((region_h, region_w), feather_pixels=feather) mask_3ch = np.dstack([mask, mask, mask]) # 5. Fazer copia do idle e aplicar blending na regiao result = idle_frame.copy() # Regiao do idle onde vai o mouth idle_region = result[y1_idle:y2_idle, x1_idle:x2_idle] # Blending com mascara feathered blended_region = (mouth_upscaled * mask_3ch + idle_region * (1 - mask_3ch)).astype(np.uint8) # Substituir regiao result[y1_idle:y2_idle, x1_idle:x2_idle] = blended_region return result def blend_with_poisson(wav2lip_frame, idle_frame): """ Estrategia alternativa: Poisson Blending apenas na regiao da boca. Mais lento mas com transicao mais suave nos bordos. """ if wav2lip_frame is None or idle_frame is None: return wav2lip_frame if wav2lip_frame is not None else idle_frame idle_h, idle_w = idle_frame.shape[:2] w2l_h, w2l_w = wav2lip_frame.shape[:2] scale_x = idle_w / w2l_w scale_y = idle_h / w2l_h # Extrair e upscalar boca mouth_crop, (x1_w2l, y1_w2l, x2_w2l, y2_w2l) = extract_mouth_region(wav2lip_frame) x1_idle = int(x1_w2l * scale_x) y1_idle = int(y1_w2l * scale_y) x2_idle = int(x2_w2l * scale_x) y2_idle = int(y2_w2l * scale_y) region_w = x2_idle - x1_idle region_h = y2_idle - y1_idle mouth_upscaled = cv2.resize(mouth_crop, (region_w, region_h), interpolation=cv2.INTER_LANCZOS4) # Criar imagem source do tamanho do idle (preta com boca no lugar certo) source = np.zeros_like(idle_frame) source[y1_idle:y2_idle, x1_idle:x2_idle] = mouth_upscaled # Criar mascara eliptica para a regiao mask = np.zeros((idle_h, idle_w), dtype=np.uint8) center_x = (x1_idle + x2_idle) // 2 center_y = (y1_idle + y2_idle) // 2 axes_x = region_w // 2 - 10 # Um pouco menor para evitar bordas axes_y = region_h // 2 - 10 cv2.ellipse(mask, (center_x, center_y), (axes_x, axes_y), 0, 0, 360, 255, -1) try: result = cv2.seamlessClone( source, idle_frame, mask, (center_x, center_y), cv2.NORMAL_CLONE ) return result except Exception as e: print(f"[Poisson] Erro: {e}, usando feathered blend") return blend_mouth_region_only(wav2lip_frame, idle_frame) def calculate_frame_difference(frame1, frame2): """ Calcula a diferenca entre dois frames. Retorna um valor de 0-100 indicando quanta diferenca ha. """ if frame1 is None or frame2 is None: return 0 # Converter para grayscale gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY) gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY) # Calcular diferenca absoluta diff = cv2.absdiff(gray1, gray2) # Valor medio da diferenca (0-255) mean_diff = np.mean(diff) # Normalizar para 0-100 return (mean_diff / 255.0) * 100 def calculate_sharpness(frame): """ Calcula a nitidez de um frame usando variância do Laplaciano. Quanto maior o valor, mais nítido o frame. """ gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if len(frame.shape) == 3 else frame laplacian = cv2.Laplacian(gray, cv2.CV_64F) return laplacian.var() def find_best_matching_idle_frame(target_frame, idle_frames, sample_step=5, sharpness_weight=0.3): """ Encontra o frame do idle mais similar ao target_frame. Considera tanto similaridade quanto nitidez para evitar frames desfocados. Args: target_frame: Frame para comparar (último frame da fala) idle_frames: Lista de frames idle sample_step: Passo de amostragem (5 = compara 1 a cada 5 frames) sharpness_weight: Peso da nitidez no score (0-1) Returns: Índice do frame idle mais similar e nítido """ if not idle_frames or target_frame is None: return 0, 0 # Converter target para grayscale uma vez target_gray = cv2.cvtColor(target_frame, cv2.COLOR_BGR2GRAY) # Primeira fase: encontrar os N melhores candidatos por similaridade candidates = [] for i in range(0, len(idle_frames), sample_step): idle_gray = cv2.cvtColor(idle_frames[i], cv2.COLOR_BGR2GRAY) diff = np.mean(cv2.absdiff(target_gray, idle_gray)) candidates.append((i, diff)) # Ordenar por diferença (menor = mais similar) candidates.sort(key=lambda x: x[1]) # Pegar os top 20 candidatos mais similares top_candidates = candidates[:20] # Segunda fase: refinar busca na vizinhança dos top candidatos refined_candidates = [] for idx, _ in top_candidates: start = max(0, idx - sample_step) end = min(len(idle_frames), idx + sample_step + 1) for i in range(start, end): idle_frame = idle_frames[i] idle_gray = cv2.cvtColor(idle_frame, cv2.COLOR_BGR2GRAY) # Calcular diferença diff = np.mean(cv2.absdiff(target_gray, idle_gray)) # Calcular nitidez sharpness = calculate_sharpness(idle_frame) refined_candidates.append((i, diff, sharpness)) if not refined_candidates: return 0, 0 # Normalizar valores para scoring diffs = [c[1] for c in refined_candidates] sharpnesses = [c[2] for c in refined_candidates] min_diff, max_diff = min(diffs), max(diffs) min_sharp, max_sharp = min(sharpnesses), max(sharpnesses) # Evitar divisão por zero diff_range = max_diff - min_diff if max_diff > min_diff else 1 sharp_range = max_sharp - min_sharp if max_sharp > min_sharp else 1 # Calcular score combinado (menor = melhor) # diff_score: 0 = mais similar, 1 = menos similar # sharp_score: 0 = mais nítido, 1 = menos nítido (invertido) best_idx = 0 best_score = float('inf') best_diff = 0 for i, diff, sharpness in refined_candidates: diff_score = (diff - min_diff) / diff_range sharp_score = 1 - (sharpness - min_sharp) / sharp_range # Invertido: maior nitidez = menor score # Score combinado combined_score = (1 - sharpness_weight) * diff_score + sharpness_weight * sharp_score if combined_score < best_score: best_score = combined_score best_idx = i best_diff = diff return best_idx, best_diff def trim_high_motion_frames(frames, threshold_multiplier=1.0, max_trim=20): """ Remove frames do final que tem movimento muito alto (saltos). Isso elimina os frames problemáticos que causam "travamento". Versão mais agressiva: usa threshold menor e remove mais frames. Args: frames: Lista de frames threshold_multiplier: Multiplicador do threshold (media + multiplier * std) max_trim: Maximo de frames a remover Returns: Lista de frames com os problematicos removidos """ if len(frames) < 20: return frames # Calcular diferenças entre frames consecutivos (últimos 20) last_n = min(20, len(frames) - 1) differences = [] for i in range(len(frames) - last_n, len(frames)): if i > 0: diff = calculate_frame_difference(frames[i-1], frames[i]) differences.append((i, diff)) if not differences: return frames # Calcular média e desvio padrão diffs = [d[1] for d in differences] mean_diff = np.mean(diffs) std_diff = np.std(diffs) # Threshold mais agressivo: média + 1.0*std (antes era 1.5) threshold = mean_diff + threshold_multiplier * std_diff # Threshold mínimo absoluto para evitar frames com muito movimento min_threshold = 0.7 # Frames com diff > 0.7 são sempre problemáticos if threshold > min_threshold: threshold = min_threshold # Encontrar onde começam os frames problemáticos (do fim para o início) trim_from = len(frames) frames_removed = 0 # Abordagem mais agressiva: remove todos os frames problemáticos do final for i in range(len(differences) - 1, -1, -1): idx, diff = differences[i] if diff > threshold: trim_from = idx frames_removed += 1 if frames_removed >= max_trim: break else: # Para no primeiro frame bom encontrado break # Calcular quantos frames remover frames_to_trim = len(frames) - trim_from if frames_to_trim > 0 and frames_to_trim <= max_trim: print(f"[Trim] Removendo {frames_to_trim} frames problemáticos (threshold: {threshold:.2f}, mean: {mean_diff:.2f})") return frames[:trim_from] return frames def blend_frames(frame1, frame2, alpha): """Blend entre dois frames. alpha=0 -> frame1, alpha=1 -> frame2""" # Garantir que ambos frames tem o mesmo tamanho if frame1.shape != frame2.shape: frame2 = cv2.resize(frame2, (frame1.shape[1], frame1.shape[0])) return cv2.addWeighted(frame1, 1 - alpha, frame2, alpha, 0) def create_crossfade_frames(from_frame, to_frame, num_frames): """Cria frames de transicao suave entre dois frames""" frames = [] for i in range(num_frames): alpha = (i + 1) / (num_frames + 1) # 0.16, 0.33, 0.5, 0.66, 0.83 para 5 frames blended = blend_frames(from_frame, to_frame, alpha) frames.append(blended) return frames @routes.get("/ws") async def websocket_handler(request): ws = web.WebSocketResponse() await ws.prepare(request) print("Cliente conectado") # Posicao atual no idle loop (para continuidade) idle_position = 0 try: async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: data = json.loads(msg.data) action = data.get("action", "") if action == "generate": text = data.get("text", "").strip() voice = data.get("voice", "tara") idle_video_time_ms = data.get("idle_video_time_ms", 0) if not text: await ws.send_json({"type": "error", "message": "Text required"}) continue print(f"Gerando: {text[:50]}... (idle_time: {idle_video_time_ms}ms)") start_time = time.time() try: async with aiohttp.ClientSession() as session: wav2lip_ws = await session.ws_connect( WAV2LIP_WS, timeout=aiohttp.ClientWSTimeout(ws_close=120) ) await wav2lip_ws.send_json({ "action": "generate", "text": text, "voice": voice, "idle_video_time_ms": idle_video_time_ms }) # Coletar todos os frames speaking_frames = [] audio_data = None audio_duration = 0 end_video_time_ms = 0 # Calcular posicao inicial no idle baseado no tempo # idle_video_time_ms em ms, video @ 25fps = 40ms/frame fps = 25 frame_duration_ms = 1000 / fps start_idle_idx = int(idle_video_time_ms / frame_duration_ms) % idle_frame_count if idle_frame_count > 0 else 0 current_idle_idx = start_idle_idx async for w2l_msg in wav2lip_ws: if w2l_msg.type == aiohttp.WSMsgType.TEXT: w2l_data = json.loads(w2l_msg.data) msg_type = w2l_data.get("type", "") if msg_type == "status": await ws.send_json(w2l_data) elif msg_type == "frame": frame_b64 = w2l_data.get("frame", "") if frame_b64: frame = jpeg_base64_to_frame(frame_b64) # Pegar frame idle full-res correspondente para histogram matching idle_ref = None if idle_frames and idle_frame_count > 0: idle_ref = idle_frames[current_idle_idx] current_idle_idx = (current_idle_idx + 1) % idle_frame_count # Upscale frame inteiro do Wav2Lip frame = upscale_frame(frame, idle_resolution) # Histogram matching para consistencia de cor if idle_ref is not None: frame = match_histogram(frame, idle_ref) speaking_frames.append(frame) elif msg_type == "full_audio": audio_data = w2l_data.get("audio", "") audio_duration = w2l_data.get("duration_ms", 0) elif msg_type == "done": # Capturar end_video_time_ms para sincronizar idle end_video_time_ms = w2l_data.get("end_video_time_ms", 0) break elif msg_type == "error": await ws.send_json(w2l_data) break elif w2l_msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): break await wav2lip_ws.close() # Enviar frames SEM crossfade - transicao e feita no cliente if speaking_frames: # 1. Primeiro, remover frames problemáticos do final (alto movimento) original_count = len(speaking_frames) speaking_frames = trim_high_motion_frames(speaking_frames) if len(speaking_frames) < original_count: print(f"[Motion Trim] {original_count} -> {len(speaking_frames)} frames") # 2. Depois, trim para match audio duration (se ainda houver excesso) fps = 25 if audio_duration > 0: expected_frames = int(audio_duration / 1000 * fps) if len(speaking_frames) > expected_frames: trimmed = len(speaking_frames) - expected_frames print(f"[Duration Trim] {trimmed} extra frames ({len(speaking_frames)} -> {expected_frames})") speaking_frames = speaking_frames[:expected_frames] # 3. Encontrar o frame idle mais similar ao último frame de fala # Isso minimiza o "salto" visual na transição speak->idle best_idle_idx = 0 if idle_frames and speaking_frames: last_speak_frame = speaking_frames[-1] best_idle_idx, best_diff = find_best_matching_idle_frame( last_speak_frame, idle_frames, sample_step=10 ) # Converter índice para tempo em ms (25fps = 40ms/frame) end_video_time_ms = int(best_idle_idx * 40) print(f"[Best Match] Idle frame {best_idle_idx} (diff: {best_diff:.2f}) -> {end_video_time_ms}ms") # Atualizar posicao do idle para continuidade apos fala if idle_frames: idle_position = best_idle_idx # Enviar stream_start ttfb = int((time.time() - start_time) * 1000) await ws.send_json({"type": "stream_start", "ttfb_ms": ttfb}) # Enviar apenas os frames de fala (sem crossfade) # Usar qualidade JPEG alta (95) para minimizar artefatos for idx, frame in enumerate(speaking_frames): frame_b64 = frame_to_jpeg_base64(frame, quality=95) await ws.send_json({ "type": "frame", "frame": frame_b64, "index": idx }) # Enviar audio if audio_data: await ws.send_json({ "type": "audio", "audio": audio_data, "duration_ms": audio_duration }) # Enviar done com end_video_time_ms para sincronizar idle elapsed = int((time.time() - start_time) * 1000) await ws.send_json({ "type": "done", "frames": len(speaking_frames), "elapsed_ms": elapsed, "end_video_time_ms": end_video_time_ms }) print(f"Enviados {len(speaking_frames)} frames (Poisson Blending)") except Exception as e: print(f"Erro: {e}") import traceback traceback.print_exc() await ws.send_json({"type": "error", "message": str(e)}) elif action == "generate_complete": # Proxy para generate_complete do Wav2Lip text = data.get("text", "").strip() voice = data.get("voice", "tara") idle_before_frames = data.get("idle_before_frames", 0) idle_after_frames = data.get("idle_after_frames", 0) crossfade_frames = data.get("crossfade_frames", 0) jpeg_quality = data.get("jpeg_quality", 95) if not text: await ws.send_json({"type": "error", "message": "Text required"}) continue print(f"Generate Complete: {text[:50]}...") try: async with aiohttp.ClientSession() as session: wav2lip_ws = await session.ws_connect( WAV2LIP_WS, timeout=aiohttp.ClientWSTimeout(ws_close=120) ) await wav2lip_ws.send_json({ "action": "generate_complete", "text": text, "voice": voice, "idle_before_frames": idle_before_frames, "idle_after_frames": idle_after_frames, "crossfade_frames": crossfade_frames, "jpeg_quality": jpeg_quality }) # Repassar todas as mensagens async for w2l_msg in wav2lip_ws: if w2l_msg.type == aiohttp.WSMsgType.TEXT: await ws.send_str(w2l_msg.data) w2l_data = json.loads(w2l_msg.data) if w2l_data.get("type") in ("done", "error"): break elif w2l_msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): break await wav2lip_ws.close() except Exception as e: print(f"Erro generate_complete: {e}") await ws.send_json({"type": "error", "message": str(e)}) elif action == "ping": await ws.send_json({"type": "pong"}) except Exception as e: print(f"WS Error: {e}") finally: print("Cliente desconectado") return ws @routes.get("/") async def index(request): return web.FileResponse(os.path.join(os.path.dirname(__file__), "index_streaming.html")) @routes.get("/{filename}") async def static_file(request): filename = request.match_info["filename"] filepath = os.path.join(os.path.dirname(__file__), filename) if os.path.exists(filepath): return web.FileResponse(filepath) return web.Response(status=404) app = web.Application() app.add_routes(routes) if __name__ == "__main__": print("=" * 50) print("Streaming Server - Porta", PORT) print("Wav2Lip:", WAV2LIP_WS) print("Idle Video:", IDLE_VIDEO) print("=" * 50) # Carregar idle frames load_idle_frames() print(f"Upscaling: ENABLED (target {idle_resolution[0]}x{idle_resolution[1]})") print("Interpolacao: LANCZOS4 (alta qualidade)") print("Color: HISTOGRAM MATCHING (LAB color space)") print("=" * 50) web.run_app(app, host="0.0.0.0", port=PORT)