| """ |
| Interface Server - Streaming com Crossfade Suave |
| Faz transicao suave entre idle e fala usando blending de frames |
| """ |
| from aiohttp import web |
| import aiohttp |
| import asyncio |
| import json |
| import base64 |
| import os |
| import time |
| import cv2 |
| import numpy as np |
|
|
| WAV2LIP_WS = os.getenv("WAV2LIP_WS", "ws://localhost:8082/ws") |
| PORT = int(os.getenv("PORT", "8000")) |
| IDLE_VIDEO = os.path.join(os.path.dirname(__file__), "idle.mp4") |
|
|
| |
| CROSSFADE_FRAMES = 5 |
|
|
| routes = web.RouteTableDef() |
|
|
| |
| idle_frames = [] |
| idle_frame_count = 0 |
| idle_resolution = (1920, 1080) |
|
|
| |
| |
| |
| MOUTH_REGION = { |
| 'top': 0.50, |
| 'bottom': 0.80, |
| 'left': 0.32, |
| 'right': 0.68 |
| } |
|
|
|
|
| def load_idle_frames(): |
| """Carrega frames do idle.mp4 e obtem resolucao""" |
| global idle_frames, idle_frame_count, idle_resolution |
|
|
| if idle_frames: |
| return |
|
|
| if not os.path.exists(IDLE_VIDEO): |
| print(f"[AVISO] Idle video nao encontrado: {IDLE_VIDEO}") |
| return |
|
|
| print(f"Carregando idle frames de {IDLE_VIDEO}...") |
| cap = cv2.VideoCapture(IDLE_VIDEO) |
|
|
| |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) |
| idle_resolution = (width, height) |
| print(f"Resolucao idle: {width}x{height}") |
|
|
| while True: |
| ret, frame = cap.read() |
| if not ret: |
| break |
| idle_frames.append(frame) |
|
|
| cap.release() |
| idle_frame_count = len(idle_frames) |
| print(f"Carregados {idle_frame_count} frames idle em full resolution") |
|
|
|
|
| def frame_to_jpeg_base64(frame, quality=85): |
| """Converte frame numpy para JPEG base64""" |
| encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), quality] |
| _, buffer = cv2.imencode('.jpg', frame, encode_param) |
| return base64.b64encode(buffer).decode('utf-8') |
|
|
|
|
| def jpeg_base64_to_frame(b64_data): |
| """Converte JPEG base64 para frame numpy""" |
| jpeg_data = base64.b64decode(b64_data) |
| nparr = np.frombuffer(jpeg_data, np.uint8) |
| return cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
|
|
|
|
| def upscale_frame(frame, target_size): |
| """ |
| Upscale frame para a resolucao alvo usando LANCZOS4 (alta qualidade). |
| target_size: (width, height) |
| """ |
| if frame is None: |
| return frame |
|
|
| current_h, current_w = frame.shape[:2] |
| target_w, target_h = target_size |
|
|
| |
| if current_w == target_w and current_h == target_h: |
| return frame |
|
|
| |
| upscaled = cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4) |
| return upscaled |
|
|
|
|
| def match_histogram(source, reference): |
| """ |
| Ajusta o histograma da source para corresponder ao da reference. |
| Isso corrige diferencas de brilho/cor entre Wav2Lip e idle frames. |
| Usa o espaco de cor LAB para melhor correspondencia perceptual. |
| """ |
| |
| source_lab = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype(np.float32) |
| reference_lab = cv2.cvtColor(reference, cv2.COLOR_BGR2LAB).astype(np.float32) |
|
|
| |
| for i in range(3): |
| src_mean, src_std = source_lab[:, :, i].mean(), source_lab[:, :, i].std() |
| ref_mean, ref_std = reference_lab[:, :, i].mean(), reference_lab[:, :, i].std() |
|
|
| |
| if src_std < 1e-6: |
| src_std = 1e-6 |
|
|
| |
| source_lab[:, :, i] = (source_lab[:, :, i] - src_mean) * (ref_std / src_std) + ref_mean |
|
|
| |
| source_lab = np.clip(source_lab, 0, 255).astype(np.uint8) |
| result = cv2.cvtColor(source_lab, cv2.COLOR_LAB2BGR) |
|
|
| return result |
|
|
|
|
| def extract_mouth_region(frame, region=MOUTH_REGION): |
| """ |
| Extrai apenas a regiao da boca/queixo do frame. |
| Retorna (regiao_cortada, coordenadas) para posterior blending. |
| """ |
| h, w = frame.shape[:2] |
|
|
| y1 = int(h * region['top']) |
| y2 = int(h * region['bottom']) |
| x1 = int(w * region['left']) |
| x2 = int(w * region['right']) |
|
|
| mouth_crop = frame[y1:y2, x1:x2].copy() |
| return mouth_crop, (x1, y1, x2, y2) |
|
|
|
|
| def create_feathered_mask(shape, feather_pixels=15): |
| """ |
| Cria mascara com bordas suavizadas (feathered) para blending seamless. |
| Usa gradiente suave (ease-in-out) para transicao mais natural. |
| """ |
| h, w = shape[:2] |
| mask = np.ones((h, w), dtype=np.float32) |
|
|
| |
| for i in range(feather_pixels): |
| |
| t = i / feather_pixels |
| alpha = t * t * (3 - 2 * t) |
|
|
| |
| mask[i, :] = np.minimum(mask[i, :], alpha) |
| |
| mask[h - 1 - i, :] = np.minimum(mask[h - 1 - i, :], alpha) |
| |
| mask[:, i] = np.minimum(mask[:, i], alpha) |
| |
| mask[:, w - 1 - i] = np.minimum(mask[:, w - 1 - i], alpha) |
|
|
| return mask |
|
|
|
|
| def blend_mouth_region_only(wav2lip_frame, idle_frame): |
| """ |
| Nova estrategia: Manter idle em full resolution, substituir APENAS a boca. |
| |
| 1. Extrai regiao da boca do frame Wav2Lip (853x480) |
| 2. Upscala APENAS essa regiao para a escala do idle (1920x1080) |
| 3. Aplica Poisson Blending apenas na regiao da boca |
| 4. Retorna o frame idle com apenas a boca substituida |
| |
| Isso preserva toda a qualidade do idle (cabelo, fundo, roupa) e |
| so substitui a pequena regiao da boca. |
| """ |
| if wav2lip_frame is None or idle_frame is None: |
| return wav2lip_frame if wav2lip_frame is not None else idle_frame |
|
|
| |
| idle_h, idle_w = idle_frame.shape[:2] |
| w2l_h, w2l_w = wav2lip_frame.shape[:2] |
|
|
| |
| scale_x = idle_w / w2l_w |
| scale_y = idle_h / w2l_h |
|
|
| |
| mouth_crop, (x1_w2l, y1_w2l, x2_w2l, y2_w2l) = extract_mouth_region(wav2lip_frame) |
|
|
| |
| x1_idle = int(x1_w2l * scale_x) |
| y1_idle = int(y1_w2l * scale_y) |
| x2_idle = int(x2_w2l * scale_x) |
| y2_idle = int(y2_w2l * scale_y) |
|
|
| |
| region_w = x2_idle - x1_idle |
| region_h = y2_idle - y1_idle |
|
|
| |
| mouth_upscaled = cv2.resize(mouth_crop, (region_w, region_h), interpolation=cv2.INTER_LANCZOS4) |
|
|
| |
| idle_region = idle_frame[y1_idle:y2_idle, x1_idle:x2_idle] |
| mouth_upscaled = match_histogram(mouth_upscaled, idle_region) |
|
|
| |
| |
| feather = max(30, min(region_w, region_h) // 4) |
| mask = create_feathered_mask((region_h, region_w), feather_pixels=feather) |
| mask_3ch = np.dstack([mask, mask, mask]) |
|
|
| |
| result = idle_frame.copy() |
|
|
| |
| idle_region = result[y1_idle:y2_idle, x1_idle:x2_idle] |
|
|
| |
| blended_region = (mouth_upscaled * mask_3ch + idle_region * (1 - mask_3ch)).astype(np.uint8) |
|
|
| |
| result[y1_idle:y2_idle, x1_idle:x2_idle] = blended_region |
|
|
| return result |
|
|
|
|
| def blend_with_poisson(wav2lip_frame, idle_frame): |
| """ |
| Estrategia alternativa: Poisson Blending apenas na regiao da boca. |
| Mais lento mas com transicao mais suave nos bordos. |
| """ |
| if wav2lip_frame is None or idle_frame is None: |
| return wav2lip_frame if wav2lip_frame is not None else idle_frame |
|
|
| idle_h, idle_w = idle_frame.shape[:2] |
| w2l_h, w2l_w = wav2lip_frame.shape[:2] |
|
|
| scale_x = idle_w / w2l_w |
| scale_y = idle_h / w2l_h |
|
|
| |
| mouth_crop, (x1_w2l, y1_w2l, x2_w2l, y2_w2l) = extract_mouth_region(wav2lip_frame) |
|
|
| x1_idle = int(x1_w2l * scale_x) |
| y1_idle = int(y1_w2l * scale_y) |
| x2_idle = int(x2_w2l * scale_x) |
| y2_idle = int(y2_w2l * scale_y) |
|
|
| region_w = x2_idle - x1_idle |
| region_h = y2_idle - y1_idle |
|
|
| mouth_upscaled = cv2.resize(mouth_crop, (region_w, region_h), interpolation=cv2.INTER_LANCZOS4) |
|
|
| |
| source = np.zeros_like(idle_frame) |
| source[y1_idle:y2_idle, x1_idle:x2_idle] = mouth_upscaled |
|
|
| |
| mask = np.zeros((idle_h, idle_w), dtype=np.uint8) |
| center_x = (x1_idle + x2_idle) // 2 |
| center_y = (y1_idle + y2_idle) // 2 |
| axes_x = region_w // 2 - 10 |
| axes_y = region_h // 2 - 10 |
| cv2.ellipse(mask, (center_x, center_y), (axes_x, axes_y), 0, 0, 360, 255, -1) |
|
|
| try: |
| result = cv2.seamlessClone( |
| source, |
| idle_frame, |
| mask, |
| (center_x, center_y), |
| cv2.NORMAL_CLONE |
| ) |
| return result |
| except Exception as e: |
| print(f"[Poisson] Erro: {e}, usando feathered blend") |
| return blend_mouth_region_only(wav2lip_frame, idle_frame) |
|
|
|
|
| def calculate_frame_difference(frame1, frame2): |
| """ |
| Calcula a diferenca entre dois frames. |
| Retorna um valor de 0-100 indicando quanta diferenca ha. |
| """ |
| if frame1 is None or frame2 is None: |
| return 0 |
|
|
| |
| gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY) |
| gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY) |
|
|
| |
| diff = cv2.absdiff(gray1, gray2) |
|
|
| |
| mean_diff = np.mean(diff) |
|
|
| |
| return (mean_diff / 255.0) * 100 |
|
|
|
|
| def calculate_sharpness(frame): |
| """ |
| Calcula a nitidez de um frame usando variância do Laplaciano. |
| Quanto maior o valor, mais nítido o frame. |
| """ |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if len(frame.shape) == 3 else frame |
| laplacian = cv2.Laplacian(gray, cv2.CV_64F) |
| return laplacian.var() |
|
|
|
|
| def find_best_matching_idle_frame(target_frame, idle_frames, sample_step=5, sharpness_weight=0.3): |
| """ |
| Encontra o frame do idle mais similar ao target_frame. |
| Considera tanto similaridade quanto nitidez para evitar frames desfocados. |
| |
| Args: |
| target_frame: Frame para comparar (último frame da fala) |
| idle_frames: Lista de frames idle |
| sample_step: Passo de amostragem (5 = compara 1 a cada 5 frames) |
| sharpness_weight: Peso da nitidez no score (0-1) |
| |
| Returns: |
| Índice do frame idle mais similar e nítido |
| """ |
| if not idle_frames or target_frame is None: |
| return 0, 0 |
|
|
| |
| target_gray = cv2.cvtColor(target_frame, cv2.COLOR_BGR2GRAY) |
|
|
| |
| candidates = [] |
|
|
| for i in range(0, len(idle_frames), sample_step): |
| idle_gray = cv2.cvtColor(idle_frames[i], cv2.COLOR_BGR2GRAY) |
| diff = np.mean(cv2.absdiff(target_gray, idle_gray)) |
| candidates.append((i, diff)) |
|
|
| |
| candidates.sort(key=lambda x: x[1]) |
|
|
| |
| top_candidates = candidates[:20] |
|
|
| |
| refined_candidates = [] |
|
|
| for idx, _ in top_candidates: |
| start = max(0, idx - sample_step) |
| end = min(len(idle_frames), idx + sample_step + 1) |
|
|
| for i in range(start, end): |
| idle_frame = idle_frames[i] |
| idle_gray = cv2.cvtColor(idle_frame, cv2.COLOR_BGR2GRAY) |
|
|
| |
| diff = np.mean(cv2.absdiff(target_gray, idle_gray)) |
|
|
| |
| sharpness = calculate_sharpness(idle_frame) |
|
|
| refined_candidates.append((i, diff, sharpness)) |
|
|
| if not refined_candidates: |
| return 0, 0 |
|
|
| |
| diffs = [c[1] for c in refined_candidates] |
| sharpnesses = [c[2] for c in refined_candidates] |
|
|
| min_diff, max_diff = min(diffs), max(diffs) |
| min_sharp, max_sharp = min(sharpnesses), max(sharpnesses) |
|
|
| |
| diff_range = max_diff - min_diff if max_diff > min_diff else 1 |
| sharp_range = max_sharp - min_sharp if max_sharp > min_sharp else 1 |
|
|
| |
| |
| |
| best_idx = 0 |
| best_score = float('inf') |
| best_diff = 0 |
|
|
| for i, diff, sharpness in refined_candidates: |
| diff_score = (diff - min_diff) / diff_range |
| sharp_score = 1 - (sharpness - min_sharp) / sharp_range |
|
|
| |
| combined_score = (1 - sharpness_weight) * diff_score + sharpness_weight * sharp_score |
|
|
| if combined_score < best_score: |
| best_score = combined_score |
| best_idx = i |
| best_diff = diff |
|
|
| return best_idx, best_diff |
|
|
|
|
| def trim_high_motion_frames(frames, threshold_multiplier=1.0, max_trim=20): |
| """ |
| Remove frames do final que tem movimento muito alto (saltos). |
| Isso elimina os frames problemáticos que causam "travamento". |
| |
| Versão mais agressiva: usa threshold menor e remove mais frames. |
| |
| Args: |
| frames: Lista de frames |
| threshold_multiplier: Multiplicador do threshold (media + multiplier * std) |
| max_trim: Maximo de frames a remover |
| |
| Returns: |
| Lista de frames com os problematicos removidos |
| """ |
| if len(frames) < 20: |
| return frames |
|
|
| |
| last_n = min(20, len(frames) - 1) |
| differences = [] |
| for i in range(len(frames) - last_n, len(frames)): |
| if i > 0: |
| diff = calculate_frame_difference(frames[i-1], frames[i]) |
| differences.append((i, diff)) |
|
|
| if not differences: |
| return frames |
|
|
| |
| diffs = [d[1] for d in differences] |
| mean_diff = np.mean(diffs) |
| std_diff = np.std(diffs) |
|
|
| |
| threshold = mean_diff + threshold_multiplier * std_diff |
|
|
| |
| min_threshold = 0.7 |
| if threshold > min_threshold: |
| threshold = min_threshold |
|
|
| |
| trim_from = len(frames) |
| frames_removed = 0 |
|
|
| |
| for i in range(len(differences) - 1, -1, -1): |
| idx, diff = differences[i] |
| if diff > threshold: |
| trim_from = idx |
| frames_removed += 1 |
| if frames_removed >= max_trim: |
| break |
| else: |
| |
| break |
|
|
| |
| frames_to_trim = len(frames) - trim_from |
|
|
| if frames_to_trim > 0 and frames_to_trim <= max_trim: |
| print(f"[Trim] Removendo {frames_to_trim} frames problemáticos (threshold: {threshold:.2f}, mean: {mean_diff:.2f})") |
| return frames[:trim_from] |
|
|
| return frames |
|
|
|
|
| def blend_frames(frame1, frame2, alpha): |
| """Blend entre dois frames. alpha=0 -> frame1, alpha=1 -> frame2""" |
| |
| if frame1.shape != frame2.shape: |
| frame2 = cv2.resize(frame2, (frame1.shape[1], frame1.shape[0])) |
|
|
| return cv2.addWeighted(frame1, 1 - alpha, frame2, alpha, 0) |
|
|
|
|
| def create_crossfade_frames(from_frame, to_frame, num_frames): |
| """Cria frames de transicao suave entre dois frames""" |
| frames = [] |
| for i in range(num_frames): |
| alpha = (i + 1) / (num_frames + 1) |
| blended = blend_frames(from_frame, to_frame, alpha) |
| frames.append(blended) |
| return frames |
|
|
|
|
| @routes.get("/ws") |
| async def websocket_handler(request): |
| ws = web.WebSocketResponse() |
| await ws.prepare(request) |
| print("Cliente conectado") |
|
|
| |
| idle_position = 0 |
|
|
| try: |
| async for msg in ws: |
| if msg.type == aiohttp.WSMsgType.TEXT: |
| data = json.loads(msg.data) |
| action = data.get("action", "") |
|
|
| if action == "generate": |
| text = data.get("text", "").strip() |
| voice = data.get("voice", "tara") |
| idle_video_time_ms = data.get("idle_video_time_ms", 0) |
|
|
| if not text: |
| await ws.send_json({"type": "error", "message": "Text required"}) |
| continue |
|
|
| print(f"Gerando: {text[:50]}... (idle_time: {idle_video_time_ms}ms)") |
| start_time = time.time() |
|
|
| try: |
| async with aiohttp.ClientSession() as session: |
| wav2lip_ws = await session.ws_connect( |
| WAV2LIP_WS, |
| timeout=aiohttp.ClientWSTimeout(ws_close=120) |
| ) |
|
|
| await wav2lip_ws.send_json({ |
| "action": "generate", |
| "text": text, |
| "voice": voice, |
| "idle_video_time_ms": idle_video_time_ms |
| }) |
|
|
| |
| speaking_frames = [] |
| audio_data = None |
| audio_duration = 0 |
| end_video_time_ms = 0 |
|
|
| |
| |
| fps = 25 |
| frame_duration_ms = 1000 / fps |
| start_idle_idx = int(idle_video_time_ms / frame_duration_ms) % idle_frame_count if idle_frame_count > 0 else 0 |
| current_idle_idx = start_idle_idx |
|
|
| async for w2l_msg in wav2lip_ws: |
| if w2l_msg.type == aiohttp.WSMsgType.TEXT: |
| w2l_data = json.loads(w2l_msg.data) |
| msg_type = w2l_data.get("type", "") |
|
|
| if msg_type == "status": |
| await ws.send_json(w2l_data) |
|
|
| elif msg_type == "frame": |
| frame_b64 = w2l_data.get("frame", "") |
| if frame_b64: |
| frame = jpeg_base64_to_frame(frame_b64) |
|
|
| |
| idle_ref = None |
| if idle_frames and idle_frame_count > 0: |
| idle_ref = idle_frames[current_idle_idx] |
| current_idle_idx = (current_idle_idx + 1) % idle_frame_count |
|
|
| |
| frame = upscale_frame(frame, idle_resolution) |
|
|
| |
| if idle_ref is not None: |
| frame = match_histogram(frame, idle_ref) |
|
|
| speaking_frames.append(frame) |
|
|
| elif msg_type == "full_audio": |
| audio_data = w2l_data.get("audio", "") |
| audio_duration = w2l_data.get("duration_ms", 0) |
|
|
| elif msg_type == "done": |
| |
| end_video_time_ms = w2l_data.get("end_video_time_ms", 0) |
| break |
|
|
| elif msg_type == "error": |
| await ws.send_json(w2l_data) |
| break |
|
|
| elif w2l_msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): |
| break |
|
|
| await wav2lip_ws.close() |
|
|
| |
| if speaking_frames: |
| |
| original_count = len(speaking_frames) |
| speaking_frames = trim_high_motion_frames(speaking_frames) |
| if len(speaking_frames) < original_count: |
| print(f"[Motion Trim] {original_count} -> {len(speaking_frames)} frames") |
|
|
| |
| fps = 25 |
| if audio_duration > 0: |
| expected_frames = int(audio_duration / 1000 * fps) |
| if len(speaking_frames) > expected_frames: |
| trimmed = len(speaking_frames) - expected_frames |
| print(f"[Duration Trim] {trimmed} extra frames ({len(speaking_frames)} -> {expected_frames})") |
| speaking_frames = speaking_frames[:expected_frames] |
|
|
| |
| |
| best_idle_idx = 0 |
| if idle_frames and speaking_frames: |
| last_speak_frame = speaking_frames[-1] |
| best_idle_idx, best_diff = find_best_matching_idle_frame( |
| last_speak_frame, idle_frames, sample_step=10 |
| ) |
| |
| end_video_time_ms = int(best_idle_idx * 40) |
| print(f"[Best Match] Idle frame {best_idle_idx} (diff: {best_diff:.2f}) -> {end_video_time_ms}ms") |
|
|
| |
| if idle_frames: |
| idle_position = best_idle_idx |
|
|
| |
| ttfb = int((time.time() - start_time) * 1000) |
| await ws.send_json({"type": "stream_start", "ttfb_ms": ttfb}) |
|
|
| |
| |
| for idx, frame in enumerate(speaking_frames): |
| frame_b64 = frame_to_jpeg_base64(frame, quality=95) |
| await ws.send_json({ |
| "type": "frame", |
| "frame": frame_b64, |
| "index": idx |
| }) |
|
|
| |
| if audio_data: |
| await ws.send_json({ |
| "type": "audio", |
| "audio": audio_data, |
| "duration_ms": audio_duration |
| }) |
|
|
| |
| elapsed = int((time.time() - start_time) * 1000) |
| await ws.send_json({ |
| "type": "done", |
| "frames": len(speaking_frames), |
| "elapsed_ms": elapsed, |
| "end_video_time_ms": end_video_time_ms |
| }) |
|
|
| print(f"Enviados {len(speaking_frames)} frames (Poisson Blending)") |
|
|
| except Exception as e: |
| print(f"Erro: {e}") |
| import traceback |
| traceback.print_exc() |
| await ws.send_json({"type": "error", "message": str(e)}) |
|
|
| elif action == "generate_complete": |
| |
| text = data.get("text", "").strip() |
| voice = data.get("voice", "tara") |
| idle_before_frames = data.get("idle_before_frames", 0) |
| idle_after_frames = data.get("idle_after_frames", 0) |
| crossfade_frames = data.get("crossfade_frames", 0) |
| jpeg_quality = data.get("jpeg_quality", 95) |
|
|
| if not text: |
| await ws.send_json({"type": "error", "message": "Text required"}) |
| continue |
|
|
| print(f"Generate Complete: {text[:50]}...") |
|
|
| try: |
| async with aiohttp.ClientSession() as session: |
| wav2lip_ws = await session.ws_connect( |
| WAV2LIP_WS, |
| timeout=aiohttp.ClientWSTimeout(ws_close=120) |
| ) |
|
|
| await wav2lip_ws.send_json({ |
| "action": "generate_complete", |
| "text": text, |
| "voice": voice, |
| "idle_before_frames": idle_before_frames, |
| "idle_after_frames": idle_after_frames, |
| "crossfade_frames": crossfade_frames, |
| "jpeg_quality": jpeg_quality |
| }) |
|
|
| |
| async for w2l_msg in wav2lip_ws: |
| if w2l_msg.type == aiohttp.WSMsgType.TEXT: |
| await ws.send_str(w2l_msg.data) |
| w2l_data = json.loads(w2l_msg.data) |
| if w2l_data.get("type") in ("done", "error"): |
| break |
| elif w2l_msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): |
| break |
|
|
| await wav2lip_ws.close() |
|
|
| except Exception as e: |
| print(f"Erro generate_complete: {e}") |
| await ws.send_json({"type": "error", "message": str(e)}) |
|
|
| elif action == "ping": |
| await ws.send_json({"type": "pong"}) |
|
|
| except Exception as e: |
| print(f"WS Error: {e}") |
| finally: |
| print("Cliente desconectado") |
|
|
| return ws |
|
|
|
|
| @routes.get("/") |
| async def index(request): |
| return web.FileResponse(os.path.join(os.path.dirname(__file__), "index_streaming.html")) |
|
|
|
|
| @routes.get("/{filename}") |
| async def static_file(request): |
| filename = request.match_info["filename"] |
| filepath = os.path.join(os.path.dirname(__file__), filename) |
| if os.path.exists(filepath): |
| return web.FileResponse(filepath) |
| return web.Response(status=404) |
|
|
|
|
| app = web.Application() |
| app.add_routes(routes) |
|
|
| if __name__ == "__main__": |
| print("=" * 50) |
| print("Streaming Server - Porta", PORT) |
| print("Wav2Lip:", WAV2LIP_WS) |
| print("Idle Video:", IDLE_VIDEO) |
| print("=" * 50) |
|
|
| |
| load_idle_frames() |
|
|
| print(f"Upscaling: ENABLED (target {idle_resolution[0]}x{idle_resolution[1]})") |
| print("Interpolacao: LANCZOS4 (alta qualidade)") |
| print("Color: HISTOGRAM MATCHING (LAB color space)") |
| print("=" * 50) |
| web.run_app(app, host="0.0.0.0", port=PORT) |
|
|