#!/usr/bin/env python3 """ Test client para streaming progressivo. Conecta via WebSocket, recebe chunks, monta e salva vĂ­deo. """ import asyncio import aiohttp import json import base64 import struct import os import sys import wave import subprocess import tempfile import time # Configuracao WS_URL = os.getenv("WS_URL", "ws://localhost:8090/ws") TEXT = os.getenv("TEXT", "Hello! I am testing the progressive streaming avatar system.") VOICE = os.getenv("VOICE", "tara") OUTPUT_DIR = "/tmp/streaming_test" async def test_streaming(): print("=" * 60) print("Test Streaming Client") print("=" * 60) print(f"WebSocket: {WS_URL}") print(f"Text: {TEXT[:50]}...") print(f"Voice: {VOICE}") print(f"Output: {OUTPUT_DIR}") print("=" * 60) # Criar diretorio de saida os.makedirs(OUTPUT_DIR, exist_ok=True) # Buffers para acumular dados all_audio = bytearray() all_frames = [] chunks_received = 0 start_time = None first_chunk_time = None try: async with aiohttp.ClientSession() as session: print("\nConectando ao WebSocket...") ws = await session.ws_connect(WS_URL, timeout=aiohttp.ClientWSTimeout(ws_close=120)) print("Conectado!") # Enviar requisicao print(f"\nEnviando requisicao...") await ws.send_json({ "action": "generate", "text": TEXT, "voice": VOICE }) start_time = time.time() # Receber mensagens async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: data = json.loads(msg.data) msg_type = data.get("type", "") if msg_type == "status": print(f" Status: {data.get('message')}") elif msg_type == "stream_start": ttfb = data.get("ttfb_ms", 0) first_chunk_time = time.time() print(f" Stream iniciado! TTFB: {ttfb}ms") elif msg_type == "chunk": chunks_received += 1 chunk_index = data.get("chunk_index", 0) audio_size = data.get("audio_size", 0) num_frames = data.get("num_frames", 0) # Decodificar chunk chunk_data = base64.b64decode(data.get("data", "")) view = memoryview(chunk_data) offset = 0 # Ler audio audio_len = struct.unpack(">I", view[offset:offset+4])[0] offset += 4 audio_bytes = bytes(view[offset:offset+audio_len]) offset += audio_len all_audio.extend(audio_bytes) # Ler frames frames_count = struct.unpack(">I", view[offset:offset+4])[0] offset += 4 for i in range(frames_count): frame_len = struct.unpack(">I", view[offset:offset+4])[0] offset += 4 frame_bytes = bytes(view[offset:offset+frame_len]) offset += frame_len all_frames.append(frame_bytes) print(f" Chunk {chunk_index}: {audio_size} bytes audio, {num_frames} frames (total: {len(all_frames)} frames, {len(all_audio)} bytes audio)") elif msg_type == "done": total_chunks = data.get("total_chunks", 0) total_frames = data.get("total_frames", 0) elapsed_ms = data.get("elapsed_ms", 0) print(f"\n Done! {total_chunks} chunks, {total_frames} frames, {elapsed_ms}ms") break elif msg_type == "error": print(f" ERRO: {data.get('message')}") break elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): print(" WebSocket fechado") break await ws.close() except Exception as e: print(f"Erro: {e}") import traceback traceback.print_exc() return total_time = time.time() - start_time if start_time else 0 print("\n" + "=" * 60) print("Resultados:") print("=" * 60) print(f"Chunks recebidos: {chunks_received}") print(f"Frames recebidos: {len(all_frames)}") print(f"Audio recebido: {len(all_audio)} bytes") print(f"Tempo total: {total_time:.2f}s") if not all_frames or not all_audio: print("\nNenhum dado recebido!") return # Salvar frames como JPEGs print(f"\nSalvando {len(all_frames)} frames...") frames_dir = os.path.join(OUTPUT_DIR, "frames") os.makedirs(frames_dir, exist_ok=True) for i, frame in enumerate(all_frames): frame_path = os.path.join(frames_dir, f"frame_{i:05d}.jpg") with open(frame_path, "wb") as f: f.write(frame) # Salvar audio como WAV print(f"Salvando audio ({len(all_audio)} bytes)...") audio_path = os.path.join(OUTPUT_DIR, "audio.wav") # Detectar sample rate pelo tamanho do audio e frames # 25 fps, cada frame = 40ms # audio_duration = len(all_frames) * 40ms audio_duration_ms = len(all_frames) * 40 samples = len(all_audio) // 2 # 16-bit = 2 bytes por sample if audio_duration_ms > 0: estimated_sample_rate = int(samples / (audio_duration_ms / 1000)) # Arredondar para sample rates comuns if estimated_sample_rate > 20000: sample_rate = 24000 else: sample_rate = 22050 else: sample_rate = 24000 print(f"Sample rate estimado: {sample_rate} Hz") with wave.open(audio_path, "wb") as wf: wf.setnchannels(1) wf.setsampwidth(2) # 16-bit wf.setframerate(sample_rate) wf.writeframes(all_audio) # Criar video com ffmpeg print("\nCriando video com ffmpeg...") video_path = os.path.join(OUTPUT_DIR, "output.mp4") # Calcular fps para sincronizar com audio audio_duration_s = len(all_audio) / 2 / sample_rate if audio_duration_s > 0: video_fps = len(all_frames) / audio_duration_s else: video_fps = 25 print(f"Video FPS: {video_fps:.2f}") cmd = [ "ffmpeg", "-y", "-framerate", str(video_fps), "-i", os.path.join(frames_dir, "frame_%05d.jpg"), "-i", audio_path, "-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2", # Fix odd dimensions for libx264 "-c:v", "libx264", "-preset", "fast", "-crf", "23", "-c:a", "aac", "-b:a", "128k", "-pix_fmt", "yuv420p", "-shortest", video_path ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode != 0: print(f"FFmpeg erro: {result.stderr[-500:]}") else: print(f"\nVideo salvo em: {video_path}") # Mostrar info do video probe = subprocess.run( ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", video_path], capture_output=True, text=True ) if probe.returncode == 0: import json as json_mod info = json_mod.loads(probe.stdout) duration = float(info.get("format", {}).get("duration", 0)) size = int(info.get("format", {}).get("size", 0)) print(f"Duracao: {duration:.2f}s") print(f"Tamanho: {size / 1024:.1f} KB") print("\n" + "=" * 60) print("Arquivos gerados:") print("=" * 60) print(f" Frames: {frames_dir}/") print(f" Audio: {audio_path}") print(f" Video: {video_path}") if __name__ == "__main__": asyncio.run(test_streaming())