| |
| """ |
| Test client para streaming progressivo. |
| Conecta via WebSocket, recebe chunks, monta e salva vídeo. |
| """ |
| import asyncio |
| import aiohttp |
| import json |
| import base64 |
| import struct |
| import os |
| import sys |
| import wave |
| import subprocess |
| import tempfile |
| import time |
|
|
| |
| WS_URL = os.getenv("WS_URL", "ws://localhost:8090/ws") |
| TEXT = os.getenv("TEXT", "Hello! I am testing the progressive streaming avatar system.") |
| VOICE = os.getenv("VOICE", "tara") |
| OUTPUT_DIR = "/tmp/streaming_test" |
|
|
| async def test_streaming(): |
| print("=" * 60) |
| print("Test Streaming Client") |
| print("=" * 60) |
| print(f"WebSocket: {WS_URL}") |
| print(f"Text: {TEXT[:50]}...") |
| print(f"Voice: {VOICE}") |
| print(f"Output: {OUTPUT_DIR}") |
| print("=" * 60) |
|
|
| |
| os.makedirs(OUTPUT_DIR, exist_ok=True) |
|
|
| |
| all_audio = bytearray() |
| all_frames = [] |
| chunks_received = 0 |
| start_time = None |
| first_chunk_time = None |
|
|
| try: |
| async with aiohttp.ClientSession() as session: |
| print("\nConectando ao WebSocket...") |
| ws = await session.ws_connect(WS_URL, timeout=aiohttp.ClientWSTimeout(ws_close=120)) |
| print("Conectado!") |
|
|
| |
| print(f"\nEnviando requisicao...") |
| await ws.send_json({ |
| "action": "generate", |
| "text": TEXT, |
| "voice": VOICE |
| }) |
| start_time = time.time() |
|
|
| |
| async for msg in ws: |
| if msg.type == aiohttp.WSMsgType.TEXT: |
| data = json.loads(msg.data) |
| msg_type = data.get("type", "") |
|
|
| if msg_type == "status": |
| print(f" Status: {data.get('message')}") |
|
|
| elif msg_type == "stream_start": |
| ttfb = data.get("ttfb_ms", 0) |
| first_chunk_time = time.time() |
| print(f" Stream iniciado! TTFB: {ttfb}ms") |
|
|
| elif msg_type == "chunk": |
| chunks_received += 1 |
| chunk_index = data.get("chunk_index", 0) |
| audio_size = data.get("audio_size", 0) |
| num_frames = data.get("num_frames", 0) |
|
|
| |
| chunk_data = base64.b64decode(data.get("data", "")) |
| view = memoryview(chunk_data) |
| offset = 0 |
|
|
| |
| audio_len = struct.unpack(">I", view[offset:offset+4])[0] |
| offset += 4 |
| audio_bytes = bytes(view[offset:offset+audio_len]) |
| offset += audio_len |
| all_audio.extend(audio_bytes) |
|
|
| |
| frames_count = struct.unpack(">I", view[offset:offset+4])[0] |
| offset += 4 |
| for i in range(frames_count): |
| frame_len = struct.unpack(">I", view[offset:offset+4])[0] |
| offset += 4 |
| frame_bytes = bytes(view[offset:offset+frame_len]) |
| offset += frame_len |
| all_frames.append(frame_bytes) |
|
|
| print(f" Chunk {chunk_index}: {audio_size} bytes audio, {num_frames} frames (total: {len(all_frames)} frames, {len(all_audio)} bytes audio)") |
|
|
| elif msg_type == "done": |
| total_chunks = data.get("total_chunks", 0) |
| total_frames = data.get("total_frames", 0) |
| elapsed_ms = data.get("elapsed_ms", 0) |
| print(f"\n Done! {total_chunks} chunks, {total_frames} frames, {elapsed_ms}ms") |
| break |
|
|
| elif msg_type == "error": |
| print(f" ERRO: {data.get('message')}") |
| break |
|
|
| elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR): |
| print(" WebSocket fechado") |
| break |
|
|
| await ws.close() |
|
|
| except Exception as e: |
| print(f"Erro: {e}") |
| import traceback |
| traceback.print_exc() |
| return |
|
|
| total_time = time.time() - start_time if start_time else 0 |
|
|
| print("\n" + "=" * 60) |
| print("Resultados:") |
| print("=" * 60) |
| print(f"Chunks recebidos: {chunks_received}") |
| print(f"Frames recebidos: {len(all_frames)}") |
| print(f"Audio recebido: {len(all_audio)} bytes") |
| print(f"Tempo total: {total_time:.2f}s") |
|
|
| if not all_frames or not all_audio: |
| print("\nNenhum dado recebido!") |
| return |
|
|
| |
| print(f"\nSalvando {len(all_frames)} frames...") |
| frames_dir = os.path.join(OUTPUT_DIR, "frames") |
| os.makedirs(frames_dir, exist_ok=True) |
| for i, frame in enumerate(all_frames): |
| frame_path = os.path.join(frames_dir, f"frame_{i:05d}.jpg") |
| with open(frame_path, "wb") as f: |
| f.write(frame) |
|
|
| |
| print(f"Salvando audio ({len(all_audio)} bytes)...") |
| audio_path = os.path.join(OUTPUT_DIR, "audio.wav") |
|
|
| |
| |
| |
| audio_duration_ms = len(all_frames) * 40 |
| samples = len(all_audio) // 2 |
| if audio_duration_ms > 0: |
| estimated_sample_rate = int(samples / (audio_duration_ms / 1000)) |
| |
| if estimated_sample_rate > 20000: |
| sample_rate = 24000 |
| else: |
| sample_rate = 22050 |
| else: |
| sample_rate = 24000 |
|
|
| print(f"Sample rate estimado: {sample_rate} Hz") |
|
|
| with wave.open(audio_path, "wb") as wf: |
| wf.setnchannels(1) |
| wf.setsampwidth(2) |
| wf.setframerate(sample_rate) |
| wf.writeframes(all_audio) |
|
|
| |
| print("\nCriando video com ffmpeg...") |
| video_path = os.path.join(OUTPUT_DIR, "output.mp4") |
|
|
| |
| audio_duration_s = len(all_audio) / 2 / sample_rate |
| if audio_duration_s > 0: |
| video_fps = len(all_frames) / audio_duration_s |
| else: |
| video_fps = 25 |
|
|
| print(f"Video FPS: {video_fps:.2f}") |
|
|
| cmd = [ |
| "ffmpeg", "-y", |
| "-framerate", str(video_fps), |
| "-i", os.path.join(frames_dir, "frame_%05d.jpg"), |
| "-i", audio_path, |
| "-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2", |
| "-c:v", "libx264", |
| "-preset", "fast", |
| "-crf", "23", |
| "-c:a", "aac", |
| "-b:a", "128k", |
| "-pix_fmt", "yuv420p", |
| "-shortest", |
| video_path |
| ] |
|
|
| result = subprocess.run(cmd, capture_output=True, text=True) |
| if result.returncode != 0: |
| print(f"FFmpeg erro: {result.stderr[-500:]}") |
| else: |
| print(f"\nVideo salvo em: {video_path}") |
| |
| probe = subprocess.run( |
| ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", video_path], |
| capture_output=True, text=True |
| ) |
| if probe.returncode == 0: |
| import json as json_mod |
| info = json_mod.loads(probe.stdout) |
| duration = float(info.get("format", {}).get("duration", 0)) |
| size = int(info.get("format", {}).get("size", 0)) |
| print(f"Duracao: {duration:.2f}s") |
| print(f"Tamanho: {size / 1024:.1f} KB") |
|
|
| print("\n" + "=" * 60) |
| print("Arquivos gerados:") |
| print("=" * 60) |
| print(f" Frames: {frames_dir}/") |
| print(f" Audio: {audio_path}") |
| print(f" Video: {video_path}") |
|
|
|
|
| if __name__ == "__main__": |
| asyncio.run(test_streaming()) |
|
|