speech2speech-interface / interface /test_streaming.py
marcosremar2's picture
Implement parallel streaming: Orpheus audio + Wav2Lip frames
eeae125
Raw
History Blame Contribute Delete
8 kB
#!/usr/bin/env python3
"""
Test client para streaming progressivo.
Conecta via WebSocket, recebe chunks, monta e salva vídeo.
"""
import asyncio
import aiohttp
import json
import base64
import struct
import os
import sys
import wave
import subprocess
import tempfile
import time
# Configuracao
WS_URL = os.getenv("WS_URL", "ws://localhost:8090/ws")
TEXT = os.getenv("TEXT", "Hello! I am testing the progressive streaming avatar system.")
VOICE = os.getenv("VOICE", "tara")
OUTPUT_DIR = "/tmp/streaming_test"
async def test_streaming():
print("=" * 60)
print("Test Streaming Client")
print("=" * 60)
print(f"WebSocket: {WS_URL}")
print(f"Text: {TEXT[:50]}...")
print(f"Voice: {VOICE}")
print(f"Output: {OUTPUT_DIR}")
print("=" * 60)
# Criar diretorio de saida
os.makedirs(OUTPUT_DIR, exist_ok=True)
# Buffers para acumular dados
all_audio = bytearray()
all_frames = []
chunks_received = 0
start_time = None
first_chunk_time = None
try:
async with aiohttp.ClientSession() as session:
print("\nConectando ao WebSocket...")
ws = await session.ws_connect(WS_URL, timeout=aiohttp.ClientWSTimeout(ws_close=120))
print("Conectado!")
# Enviar requisicao
print(f"\nEnviando requisicao...")
await ws.send_json({
"action": "generate",
"text": TEXT,
"voice": VOICE
})
start_time = time.time()
# Receber mensagens
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
msg_type = data.get("type", "")
if msg_type == "status":
print(f" Status: {data.get('message')}")
elif msg_type == "stream_start":
ttfb = data.get("ttfb_ms", 0)
first_chunk_time = time.time()
print(f" Stream iniciado! TTFB: {ttfb}ms")
elif msg_type == "chunk":
chunks_received += 1
chunk_index = data.get("chunk_index", 0)
audio_size = data.get("audio_size", 0)
num_frames = data.get("num_frames", 0)
# Decodificar chunk
chunk_data = base64.b64decode(data.get("data", ""))
view = memoryview(chunk_data)
offset = 0
# Ler audio
audio_len = struct.unpack(">I", view[offset:offset+4])[0]
offset += 4
audio_bytes = bytes(view[offset:offset+audio_len])
offset += audio_len
all_audio.extend(audio_bytes)
# Ler frames
frames_count = struct.unpack(">I", view[offset:offset+4])[0]
offset += 4
for i in range(frames_count):
frame_len = struct.unpack(">I", view[offset:offset+4])[0]
offset += 4
frame_bytes = bytes(view[offset:offset+frame_len])
offset += frame_len
all_frames.append(frame_bytes)
print(f" Chunk {chunk_index}: {audio_size} bytes audio, {num_frames} frames (total: {len(all_frames)} frames, {len(all_audio)} bytes audio)")
elif msg_type == "done":
total_chunks = data.get("total_chunks", 0)
total_frames = data.get("total_frames", 0)
elapsed_ms = data.get("elapsed_ms", 0)
print(f"\n Done! {total_chunks} chunks, {total_frames} frames, {elapsed_ms}ms")
break
elif msg_type == "error":
print(f" ERRO: {data.get('message')}")
break
elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
print(" WebSocket fechado")
break
await ws.close()
except Exception as e:
print(f"Erro: {e}")
import traceback
traceback.print_exc()
return
total_time = time.time() - start_time if start_time else 0
print("\n" + "=" * 60)
print("Resultados:")
print("=" * 60)
print(f"Chunks recebidos: {chunks_received}")
print(f"Frames recebidos: {len(all_frames)}")
print(f"Audio recebido: {len(all_audio)} bytes")
print(f"Tempo total: {total_time:.2f}s")
if not all_frames or not all_audio:
print("\nNenhum dado recebido!")
return
# Salvar frames como JPEGs
print(f"\nSalvando {len(all_frames)} frames...")
frames_dir = os.path.join(OUTPUT_DIR, "frames")
os.makedirs(frames_dir, exist_ok=True)
for i, frame in enumerate(all_frames):
frame_path = os.path.join(frames_dir, f"frame_{i:05d}.jpg")
with open(frame_path, "wb") as f:
f.write(frame)
# Salvar audio como WAV
print(f"Salvando audio ({len(all_audio)} bytes)...")
audio_path = os.path.join(OUTPUT_DIR, "audio.wav")
# Detectar sample rate pelo tamanho do audio e frames
# 25 fps, cada frame = 40ms
# audio_duration = len(all_frames) * 40ms
audio_duration_ms = len(all_frames) * 40
samples = len(all_audio) // 2 # 16-bit = 2 bytes por sample
if audio_duration_ms > 0:
estimated_sample_rate = int(samples / (audio_duration_ms / 1000))
# Arredondar para sample rates comuns
if estimated_sample_rate > 20000:
sample_rate = 24000
else:
sample_rate = 22050
else:
sample_rate = 24000
print(f"Sample rate estimado: {sample_rate} Hz")
with wave.open(audio_path, "wb") as wf:
wf.setnchannels(1)
wf.setsampwidth(2) # 16-bit
wf.setframerate(sample_rate)
wf.writeframes(all_audio)
# Criar video com ffmpeg
print("\nCriando video com ffmpeg...")
video_path = os.path.join(OUTPUT_DIR, "output.mp4")
# Calcular fps para sincronizar com audio
audio_duration_s = len(all_audio) / 2 / sample_rate
if audio_duration_s > 0:
video_fps = len(all_frames) / audio_duration_s
else:
video_fps = 25
print(f"Video FPS: {video_fps:.2f}")
cmd = [
"ffmpeg", "-y",
"-framerate", str(video_fps),
"-i", os.path.join(frames_dir, "frame_%05d.jpg"),
"-i", audio_path,
"-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2", # Fix odd dimensions for libx264
"-c:v", "libx264",
"-preset", "fast",
"-crf", "23",
"-c:a", "aac",
"-b:a", "128k",
"-pix_fmt", "yuv420p",
"-shortest",
video_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
print(f"FFmpeg erro: {result.stderr[-500:]}")
else:
print(f"\nVideo salvo em: {video_path}")
# Mostrar info do video
probe = subprocess.run(
["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", video_path],
capture_output=True, text=True
)
if probe.returncode == 0:
import json as json_mod
info = json_mod.loads(probe.stdout)
duration = float(info.get("format", {}).get("duration", 0))
size = int(info.get("format", {}).get("size", 0))
print(f"Duracao: {duration:.2f}s")
print(f"Tamanho: {size / 1024:.1f} KB")
print("\n" + "=" * 60)
print("Arquivos gerados:")
print("=" * 60)
print(f" Frames: {frames_dir}/")
print(f" Audio: {audio_path}")
print(f" Video: {video_path}")
if __name__ == "__main__":
asyncio.run(test_streaming())