speech2speech-interface / interface /server_streaming.py
marcosremar2's picture
Add WebRTC streaming interface with vast.ai deployment
e62aafd
Raw
History Blame Contribute Delete
29.5 kB
"""
Interface Server - Streaming com Crossfade Suave
Faz transicao suave entre idle e fala usando blending de frames
"""
from aiohttp import web
import aiohttp
import asyncio
import json
import base64
import os
import time
import cv2
import numpy as np
WAV2LIP_WS = os.getenv("WAV2LIP_WS", "ws://localhost:8082/ws")
PORT = int(os.getenv("PORT", "8000"))
IDLE_VIDEO = os.path.join(os.path.dirname(__file__), "idle.mp4")
# Configuracao de crossfade
CROSSFADE_FRAMES = 5 # Numero de frames para transicao (200ms @ 25fps)
routes = web.RouteTableDef()
# Cache de frames idle
idle_frames = []
idle_frame_count = 0
idle_resolution = (1920, 1080) # Resolucao do idle video (width, height)
# Regiao da boca/queixo (em ratio do frame)
# Regiao mais focada para evitar "pulos" na transicao
# Apenas boca e queixo, sem incluir muito do rosto
MOUTH_REGION = {
'top': 0.50, # 50% do topo (comeca abaixo do nariz)
'bottom': 0.80, # ate 80% (apenas queixo)
'left': 0.32, # 32% da esquerda
'right': 0.68 # ate 68% (mais estreito)
}
def load_idle_frames():
"""Carrega frames do idle.mp4 e obtem resolucao"""
global idle_frames, idle_frame_count, idle_resolution
if idle_frames:
return
if not os.path.exists(IDLE_VIDEO):
print(f"[AVISO] Idle video nao encontrado: {IDLE_VIDEO}")
return
print(f"Carregando idle frames de {IDLE_VIDEO}...")
cap = cv2.VideoCapture(IDLE_VIDEO)
# Obter resolucao do video
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
idle_resolution = (width, height)
print(f"Resolucao idle: {width}x{height}")
while True:
ret, frame = cap.read()
if not ret:
break
idle_frames.append(frame)
cap.release()
idle_frame_count = len(idle_frames)
print(f"Carregados {idle_frame_count} frames idle em full resolution")
def frame_to_jpeg_base64(frame, quality=85):
"""Converte frame numpy para JPEG base64"""
encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), quality]
_, buffer = cv2.imencode('.jpg', frame, encode_param)
return base64.b64encode(buffer).decode('utf-8')
def jpeg_base64_to_frame(b64_data):
"""Converte JPEG base64 para frame numpy"""
jpeg_data = base64.b64decode(b64_data)
nparr = np.frombuffer(jpeg_data, np.uint8)
return cv2.imdecode(nparr, cv2.IMREAD_COLOR)
def upscale_frame(frame, target_size):
"""
Upscale frame para a resolucao alvo usando LANCZOS4 (alta qualidade).
target_size: (width, height)
"""
if frame is None:
return frame
current_h, current_w = frame.shape[:2]
target_w, target_h = target_size
# Se ja esta na resolucao correta, retornar
if current_w == target_w and current_h == target_h:
return frame
# Upscale usando LANCZOS4 (melhor qualidade para upscaling)
upscaled = cv2.resize(frame, (target_w, target_h), interpolation=cv2.INTER_LANCZOS4)
return upscaled
def match_histogram(source, reference):
"""
Ajusta o histograma da source para corresponder ao da reference.
Isso corrige diferencas de brilho/cor entre Wav2Lip e idle frames.
Usa o espaco de cor LAB para melhor correspondencia perceptual.
"""
# Converter para LAB (melhor para correspondencia de cor)
source_lab = cv2.cvtColor(source, cv2.COLOR_BGR2LAB).astype(np.float32)
reference_lab = cv2.cvtColor(reference, cv2.COLOR_BGR2LAB).astype(np.float32)
# Para cada canal, ajustar media e desvio padrao
for i in range(3):
src_mean, src_std = source_lab[:, :, i].mean(), source_lab[:, :, i].std()
ref_mean, ref_std = reference_lab[:, :, i].mean(), reference_lab[:, :, i].std()
# Evitar divisao por zero
if src_std < 1e-6:
src_std = 1e-6
# Normalizar e reescalar
source_lab[:, :, i] = (source_lab[:, :, i] - src_mean) * (ref_std / src_std) + ref_mean
# Clipar valores validos e converter de volta
source_lab = np.clip(source_lab, 0, 255).astype(np.uint8)
result = cv2.cvtColor(source_lab, cv2.COLOR_LAB2BGR)
return result
def extract_mouth_region(frame, region=MOUTH_REGION):
"""
Extrai apenas a regiao da boca/queixo do frame.
Retorna (regiao_cortada, coordenadas) para posterior blending.
"""
h, w = frame.shape[:2]
y1 = int(h * region['top'])
y2 = int(h * region['bottom'])
x1 = int(w * region['left'])
x2 = int(w * region['right'])
mouth_crop = frame[y1:y2, x1:x2].copy()
return mouth_crop, (x1, y1, x2, y2)
def create_feathered_mask(shape, feather_pixels=15):
"""
Cria mascara com bordas suavizadas (feathered) para blending seamless.
Usa gradiente suave (ease-in-out) para transicao mais natural.
"""
h, w = shape[:2]
mask = np.ones((h, w), dtype=np.float32)
# Criar gradiente nas bordas usando curva suave (ease-in-out)
for i in range(feather_pixels):
# Curva suave: smoothstep para transicao mais natural
t = i / feather_pixels
alpha = t * t * (3 - 2 * t) # smoothstep
# Top
mask[i, :] = np.minimum(mask[i, :], alpha)
# Bottom
mask[h - 1 - i, :] = np.minimum(mask[h - 1 - i, :], alpha)
# Left
mask[:, i] = np.minimum(mask[:, i], alpha)
# Right
mask[:, w - 1 - i] = np.minimum(mask[:, w - 1 - i], alpha)
return mask
def blend_mouth_region_only(wav2lip_frame, idle_frame):
"""
Nova estrategia: Manter idle em full resolution, substituir APENAS a boca.
1. Extrai regiao da boca do frame Wav2Lip (853x480)
2. Upscala APENAS essa regiao para a escala do idle (1920x1080)
3. Aplica Poisson Blending apenas na regiao da boca
4. Retorna o frame idle com apenas a boca substituida
Isso preserva toda a qualidade do idle (cabelo, fundo, roupa) e
so substitui a pequena regiao da boca.
"""
if wav2lip_frame is None or idle_frame is None:
return wav2lip_frame if wav2lip_frame is not None else idle_frame
# Dimensoes
idle_h, idle_w = idle_frame.shape[:2]
w2l_h, w2l_w = wav2lip_frame.shape[:2]
# Calcular escala entre frames
scale_x = idle_w / w2l_w
scale_y = idle_h / w2l_h
# 1. Extrair regiao da boca do Wav2Lip
mouth_crop, (x1_w2l, y1_w2l, x2_w2l, y2_w2l) = extract_mouth_region(wav2lip_frame)
# 2. Calcular coordenadas equivalentes no idle (full res)
x1_idle = int(x1_w2l * scale_x)
y1_idle = int(y1_w2l * scale_y)
x2_idle = int(x2_w2l * scale_x)
y2_idle = int(y2_w2l * scale_y)
# Dimensao da regiao no idle
region_w = x2_idle - x1_idle
region_h = y2_idle - y1_idle
# 3. Upscale apenas a regiao da boca para a resolucao do idle
mouth_upscaled = cv2.resize(mouth_crop, (region_w, region_h), interpolation=cv2.INTER_LANCZOS4)
# 3.5 Histogram matching: ajustar cor/brilho do mouth para corresponder ao idle
idle_region = idle_frame[y1_idle:y2_idle, x1_idle:x2_idle]
mouth_upscaled = match_histogram(mouth_upscaled, idle_region)
# 4. Criar mascara com bordas suavizadas
# Usar 25% da menor dimensao para feathering bem suave
feather = max(30, min(region_w, region_h) // 4) # ~25% da menor dimensao
mask = create_feathered_mask((region_h, region_w), feather_pixels=feather)
mask_3ch = np.dstack([mask, mask, mask])
# 5. Fazer copia do idle e aplicar blending na regiao
result = idle_frame.copy()
# Regiao do idle onde vai o mouth
idle_region = result[y1_idle:y2_idle, x1_idle:x2_idle]
# Blending com mascara feathered
blended_region = (mouth_upscaled * mask_3ch + idle_region * (1 - mask_3ch)).astype(np.uint8)
# Substituir regiao
result[y1_idle:y2_idle, x1_idle:x2_idle] = blended_region
return result
def blend_with_poisson(wav2lip_frame, idle_frame):
"""
Estrategia alternativa: Poisson Blending apenas na regiao da boca.
Mais lento mas com transicao mais suave nos bordos.
"""
if wav2lip_frame is None or idle_frame is None:
return wav2lip_frame if wav2lip_frame is not None else idle_frame
idle_h, idle_w = idle_frame.shape[:2]
w2l_h, w2l_w = wav2lip_frame.shape[:2]
scale_x = idle_w / w2l_w
scale_y = idle_h / w2l_h
# Extrair e upscalar boca
mouth_crop, (x1_w2l, y1_w2l, x2_w2l, y2_w2l) = extract_mouth_region(wav2lip_frame)
x1_idle = int(x1_w2l * scale_x)
y1_idle = int(y1_w2l * scale_y)
x2_idle = int(x2_w2l * scale_x)
y2_idle = int(y2_w2l * scale_y)
region_w = x2_idle - x1_idle
region_h = y2_idle - y1_idle
mouth_upscaled = cv2.resize(mouth_crop, (region_w, region_h), interpolation=cv2.INTER_LANCZOS4)
# Criar imagem source do tamanho do idle (preta com boca no lugar certo)
source = np.zeros_like(idle_frame)
source[y1_idle:y2_idle, x1_idle:x2_idle] = mouth_upscaled
# Criar mascara eliptica para a regiao
mask = np.zeros((idle_h, idle_w), dtype=np.uint8)
center_x = (x1_idle + x2_idle) // 2
center_y = (y1_idle + y2_idle) // 2
axes_x = region_w // 2 - 10 # Um pouco menor para evitar bordas
axes_y = region_h // 2 - 10
cv2.ellipse(mask, (center_x, center_y), (axes_x, axes_y), 0, 0, 360, 255, -1)
try:
result = cv2.seamlessClone(
source,
idle_frame,
mask,
(center_x, center_y),
cv2.NORMAL_CLONE
)
return result
except Exception as e:
print(f"[Poisson] Erro: {e}, usando feathered blend")
return blend_mouth_region_only(wav2lip_frame, idle_frame)
def calculate_frame_difference(frame1, frame2):
"""
Calcula a diferenca entre dois frames.
Retorna um valor de 0-100 indicando quanta diferenca ha.
"""
if frame1 is None or frame2 is None:
return 0
# Converter para grayscale
gray1 = cv2.cvtColor(frame1, cv2.COLOR_BGR2GRAY)
gray2 = cv2.cvtColor(frame2, cv2.COLOR_BGR2GRAY)
# Calcular diferenca absoluta
diff = cv2.absdiff(gray1, gray2)
# Valor medio da diferenca (0-255)
mean_diff = np.mean(diff)
# Normalizar para 0-100
return (mean_diff / 255.0) * 100
def calculate_sharpness(frame):
"""
Calcula a nitidez de um frame usando variância do Laplaciano.
Quanto maior o valor, mais nítido o frame.
"""
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) if len(frame.shape) == 3 else frame
laplacian = cv2.Laplacian(gray, cv2.CV_64F)
return laplacian.var()
def find_best_matching_idle_frame(target_frame, idle_frames, sample_step=5, sharpness_weight=0.3):
"""
Encontra o frame do idle mais similar ao target_frame.
Considera tanto similaridade quanto nitidez para evitar frames desfocados.
Args:
target_frame: Frame para comparar (último frame da fala)
idle_frames: Lista de frames idle
sample_step: Passo de amostragem (5 = compara 1 a cada 5 frames)
sharpness_weight: Peso da nitidez no score (0-1)
Returns:
Índice do frame idle mais similar e nítido
"""
if not idle_frames or target_frame is None:
return 0, 0
# Converter target para grayscale uma vez
target_gray = cv2.cvtColor(target_frame, cv2.COLOR_BGR2GRAY)
# Primeira fase: encontrar os N melhores candidatos por similaridade
candidates = []
for i in range(0, len(idle_frames), sample_step):
idle_gray = cv2.cvtColor(idle_frames[i], cv2.COLOR_BGR2GRAY)
diff = np.mean(cv2.absdiff(target_gray, idle_gray))
candidates.append((i, diff))
# Ordenar por diferença (menor = mais similar)
candidates.sort(key=lambda x: x[1])
# Pegar os top 20 candidatos mais similares
top_candidates = candidates[:20]
# Segunda fase: refinar busca na vizinhança dos top candidatos
refined_candidates = []
for idx, _ in top_candidates:
start = max(0, idx - sample_step)
end = min(len(idle_frames), idx + sample_step + 1)
for i in range(start, end):
idle_frame = idle_frames[i]
idle_gray = cv2.cvtColor(idle_frame, cv2.COLOR_BGR2GRAY)
# Calcular diferença
diff = np.mean(cv2.absdiff(target_gray, idle_gray))
# Calcular nitidez
sharpness = calculate_sharpness(idle_frame)
refined_candidates.append((i, diff, sharpness))
if not refined_candidates:
return 0, 0
# Normalizar valores para scoring
diffs = [c[1] for c in refined_candidates]
sharpnesses = [c[2] for c in refined_candidates]
min_diff, max_diff = min(diffs), max(diffs)
min_sharp, max_sharp = min(sharpnesses), max(sharpnesses)
# Evitar divisão por zero
diff_range = max_diff - min_diff if max_diff > min_diff else 1
sharp_range = max_sharp - min_sharp if max_sharp > min_sharp else 1
# Calcular score combinado (menor = melhor)
# diff_score: 0 = mais similar, 1 = menos similar
# sharp_score: 0 = mais nítido, 1 = menos nítido (invertido)
best_idx = 0
best_score = float('inf')
best_diff = 0
for i, diff, sharpness in refined_candidates:
diff_score = (diff - min_diff) / diff_range
sharp_score = 1 - (sharpness - min_sharp) / sharp_range # Invertido: maior nitidez = menor score
# Score combinado
combined_score = (1 - sharpness_weight) * diff_score + sharpness_weight * sharp_score
if combined_score < best_score:
best_score = combined_score
best_idx = i
best_diff = diff
return best_idx, best_diff
def trim_high_motion_frames(frames, threshold_multiplier=1.0, max_trim=20):
"""
Remove frames do final que tem movimento muito alto (saltos).
Isso elimina os frames problemáticos que causam "travamento".
Versão mais agressiva: usa threshold menor e remove mais frames.
Args:
frames: Lista de frames
threshold_multiplier: Multiplicador do threshold (media + multiplier * std)
max_trim: Maximo de frames a remover
Returns:
Lista de frames com os problematicos removidos
"""
if len(frames) < 20:
return frames
# Calcular diferenças entre frames consecutivos (últimos 20)
last_n = min(20, len(frames) - 1)
differences = []
for i in range(len(frames) - last_n, len(frames)):
if i > 0:
diff = calculate_frame_difference(frames[i-1], frames[i])
differences.append((i, diff))
if not differences:
return frames
# Calcular média e desvio padrão
diffs = [d[1] for d in differences]
mean_diff = np.mean(diffs)
std_diff = np.std(diffs)
# Threshold mais agressivo: média + 1.0*std (antes era 1.5)
threshold = mean_diff + threshold_multiplier * std_diff
# Threshold mínimo absoluto para evitar frames com muito movimento
min_threshold = 0.7 # Frames com diff > 0.7 são sempre problemáticos
if threshold > min_threshold:
threshold = min_threshold
# Encontrar onde começam os frames problemáticos (do fim para o início)
trim_from = len(frames)
frames_removed = 0
# Abordagem mais agressiva: remove todos os frames problemáticos do final
for i in range(len(differences) - 1, -1, -1):
idx, diff = differences[i]
if diff > threshold:
trim_from = idx
frames_removed += 1
if frames_removed >= max_trim:
break
else:
# Para no primeiro frame bom encontrado
break
# Calcular quantos frames remover
frames_to_trim = len(frames) - trim_from
if frames_to_trim > 0 and frames_to_trim <= max_trim:
print(f"[Trim] Removendo {frames_to_trim} frames problemáticos (threshold: {threshold:.2f}, mean: {mean_diff:.2f})")
return frames[:trim_from]
return frames
def blend_frames(frame1, frame2, alpha):
"""Blend entre dois frames. alpha=0 -> frame1, alpha=1 -> frame2"""
# Garantir que ambos frames tem o mesmo tamanho
if frame1.shape != frame2.shape:
frame2 = cv2.resize(frame2, (frame1.shape[1], frame1.shape[0]))
return cv2.addWeighted(frame1, 1 - alpha, frame2, alpha, 0)
def create_crossfade_frames(from_frame, to_frame, num_frames):
"""Cria frames de transicao suave entre dois frames"""
frames = []
for i in range(num_frames):
alpha = (i + 1) / (num_frames + 1) # 0.16, 0.33, 0.5, 0.66, 0.83 para 5 frames
blended = blend_frames(from_frame, to_frame, alpha)
frames.append(blended)
return frames
@routes.get("/ws")
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
print("Cliente conectado")
# Posicao atual no idle loop (para continuidade)
idle_position = 0
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
action = data.get("action", "")
if action == "generate":
text = data.get("text", "").strip()
voice = data.get("voice", "tara")
idle_video_time_ms = data.get("idle_video_time_ms", 0)
if not text:
await ws.send_json({"type": "error", "message": "Text required"})
continue
print(f"Gerando: {text[:50]}... (idle_time: {idle_video_time_ms}ms)")
start_time = time.time()
try:
async with aiohttp.ClientSession() as session:
wav2lip_ws = await session.ws_connect(
WAV2LIP_WS,
timeout=aiohttp.ClientWSTimeout(ws_close=120)
)
await wav2lip_ws.send_json({
"action": "generate",
"text": text,
"voice": voice,
"idle_video_time_ms": idle_video_time_ms
})
# Coletar todos os frames
speaking_frames = []
audio_data = None
audio_duration = 0
end_video_time_ms = 0
# Calcular posicao inicial no idle baseado no tempo
# idle_video_time_ms em ms, video @ 25fps = 40ms/frame
fps = 25
frame_duration_ms = 1000 / fps
start_idle_idx = int(idle_video_time_ms / frame_duration_ms) % idle_frame_count if idle_frame_count > 0 else 0
current_idle_idx = start_idle_idx
async for w2l_msg in wav2lip_ws:
if w2l_msg.type == aiohttp.WSMsgType.TEXT:
w2l_data = json.loads(w2l_msg.data)
msg_type = w2l_data.get("type", "")
if msg_type == "status":
await ws.send_json(w2l_data)
elif msg_type == "frame":
frame_b64 = w2l_data.get("frame", "")
if frame_b64:
frame = jpeg_base64_to_frame(frame_b64)
# Pegar frame idle full-res correspondente para histogram matching
idle_ref = None
if idle_frames and idle_frame_count > 0:
idle_ref = idle_frames[current_idle_idx]
current_idle_idx = (current_idle_idx + 1) % idle_frame_count
# Upscale frame inteiro do Wav2Lip
frame = upscale_frame(frame, idle_resolution)
# Histogram matching para consistencia de cor
if idle_ref is not None:
frame = match_histogram(frame, idle_ref)
speaking_frames.append(frame)
elif msg_type == "full_audio":
audio_data = w2l_data.get("audio", "")
audio_duration = w2l_data.get("duration_ms", 0)
elif msg_type == "done":
# Capturar end_video_time_ms para sincronizar idle
end_video_time_ms = w2l_data.get("end_video_time_ms", 0)
break
elif msg_type == "error":
await ws.send_json(w2l_data)
break
elif w2l_msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
break
await wav2lip_ws.close()
# Enviar frames SEM crossfade - transicao e feita no cliente
if speaking_frames:
# 1. Primeiro, remover frames problemáticos do final (alto movimento)
original_count = len(speaking_frames)
speaking_frames = trim_high_motion_frames(speaking_frames)
if len(speaking_frames) < original_count:
print(f"[Motion Trim] {original_count} -> {len(speaking_frames)} frames")
# 2. Depois, trim para match audio duration (se ainda houver excesso)
fps = 25
if audio_duration > 0:
expected_frames = int(audio_duration / 1000 * fps)
if len(speaking_frames) > expected_frames:
trimmed = len(speaking_frames) - expected_frames
print(f"[Duration Trim] {trimmed} extra frames ({len(speaking_frames)} -> {expected_frames})")
speaking_frames = speaking_frames[:expected_frames]
# 3. Encontrar o frame idle mais similar ao último frame de fala
# Isso minimiza o "salto" visual na transição speak->idle
best_idle_idx = 0
if idle_frames and speaking_frames:
last_speak_frame = speaking_frames[-1]
best_idle_idx, best_diff = find_best_matching_idle_frame(
last_speak_frame, idle_frames, sample_step=10
)
# Converter índice para tempo em ms (25fps = 40ms/frame)
end_video_time_ms = int(best_idle_idx * 40)
print(f"[Best Match] Idle frame {best_idle_idx} (diff: {best_diff:.2f}) -> {end_video_time_ms}ms")
# Atualizar posicao do idle para continuidade apos fala
if idle_frames:
idle_position = best_idle_idx
# Enviar stream_start
ttfb = int((time.time() - start_time) * 1000)
await ws.send_json({"type": "stream_start", "ttfb_ms": ttfb})
# Enviar apenas os frames de fala (sem crossfade)
# Usar qualidade JPEG alta (95) para minimizar artefatos
for idx, frame in enumerate(speaking_frames):
frame_b64 = frame_to_jpeg_base64(frame, quality=95)
await ws.send_json({
"type": "frame",
"frame": frame_b64,
"index": idx
})
# Enviar audio
if audio_data:
await ws.send_json({
"type": "audio",
"audio": audio_data,
"duration_ms": audio_duration
})
# Enviar done com end_video_time_ms para sincronizar idle
elapsed = int((time.time() - start_time) * 1000)
await ws.send_json({
"type": "done",
"frames": len(speaking_frames),
"elapsed_ms": elapsed,
"end_video_time_ms": end_video_time_ms
})
print(f"Enviados {len(speaking_frames)} frames (Poisson Blending)")
except Exception as e:
print(f"Erro: {e}")
import traceback
traceback.print_exc()
await ws.send_json({"type": "error", "message": str(e)})
elif action == "generate_complete":
# Proxy para generate_complete do Wav2Lip
text = data.get("text", "").strip()
voice = data.get("voice", "tara")
idle_before_frames = data.get("idle_before_frames", 0)
idle_after_frames = data.get("idle_after_frames", 0)
crossfade_frames = data.get("crossfade_frames", 0)
jpeg_quality = data.get("jpeg_quality", 95)
if not text:
await ws.send_json({"type": "error", "message": "Text required"})
continue
print(f"Generate Complete: {text[:50]}...")
try:
async with aiohttp.ClientSession() as session:
wav2lip_ws = await session.ws_connect(
WAV2LIP_WS,
timeout=aiohttp.ClientWSTimeout(ws_close=120)
)
await wav2lip_ws.send_json({
"action": "generate_complete",
"text": text,
"voice": voice,
"idle_before_frames": idle_before_frames,
"idle_after_frames": idle_after_frames,
"crossfade_frames": crossfade_frames,
"jpeg_quality": jpeg_quality
})
# Repassar todas as mensagens
async for w2l_msg in wav2lip_ws:
if w2l_msg.type == aiohttp.WSMsgType.TEXT:
await ws.send_str(w2l_msg.data)
w2l_data = json.loads(w2l_msg.data)
if w2l_data.get("type") in ("done", "error"):
break
elif w2l_msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
break
await wav2lip_ws.close()
except Exception as e:
print(f"Erro generate_complete: {e}")
await ws.send_json({"type": "error", "message": str(e)})
elif action == "ping":
await ws.send_json({"type": "pong"})
except Exception as e:
print(f"WS Error: {e}")
finally:
print("Cliente desconectado")
return ws
@routes.get("/")
async def index(request):
return web.FileResponse(os.path.join(os.path.dirname(__file__), "index_streaming.html"))
@routes.get("/{filename}")
async def static_file(request):
filename = request.match_info["filename"]
filepath = os.path.join(os.path.dirname(__file__), filename)
if os.path.exists(filepath):
return web.FileResponse(filepath)
return web.Response(status=404)
app = web.Application()
app.add_routes(routes)
if __name__ == "__main__":
print("=" * 50)
print("Streaming Server - Porta", PORT)
print("Wav2Lip:", WAV2LIP_WS)
print("Idle Video:", IDLE_VIDEO)
print("=" * 50)
# Carregar idle frames
load_idle_frames()
print(f"Upscaling: ENABLED (target {idle_resolution[0]}x{idle_resolution[1]})")
print("Interpolacao: LANCZOS4 (alta qualidade)")
print("Color: HISTOGRAM MATCHING (LAB color space)")
print("=" * 50)
web.run_app(app, host="0.0.0.0", port=PORT)