File size: 8,001 Bytes
eeae125
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
#!/usr/bin/env python3
"""
Test client para streaming progressivo.
Conecta via WebSocket, recebe chunks, monta e salva vídeo.
"""
import asyncio
import aiohttp
import json
import base64
import struct
import os
import sys
import wave
import subprocess
import tempfile
import time

# Configuracao
WS_URL = os.getenv("WS_URL", "ws://localhost:8090/ws")
TEXT = os.getenv("TEXT", "Hello! I am testing the progressive streaming avatar system.")
VOICE = os.getenv("VOICE", "tara")
OUTPUT_DIR = "/tmp/streaming_test"

async def test_streaming():
    print("=" * 60)
    print("Test Streaming Client")
    print("=" * 60)
    print(f"WebSocket: {WS_URL}")
    print(f"Text: {TEXT[:50]}...")
    print(f"Voice: {VOICE}")
    print(f"Output: {OUTPUT_DIR}")
    print("=" * 60)

    # Criar diretorio de saida
    os.makedirs(OUTPUT_DIR, exist_ok=True)

    # Buffers para acumular dados
    all_audio = bytearray()
    all_frames = []
    chunks_received = 0
    start_time = None
    first_chunk_time = None

    try:
        async with aiohttp.ClientSession() as session:
            print("\nConectando ao WebSocket...")
            ws = await session.ws_connect(WS_URL, timeout=aiohttp.ClientWSTimeout(ws_close=120))
            print("Conectado!")

            # Enviar requisicao
            print(f"\nEnviando requisicao...")
            await ws.send_json({
                "action": "generate",
                "text": TEXT,
                "voice": VOICE
            })
            start_time = time.time()

            # Receber mensagens
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    data = json.loads(msg.data)
                    msg_type = data.get("type", "")

                    if msg_type == "status":
                        print(f"  Status: {data.get('message')}")

                    elif msg_type == "stream_start":
                        ttfb = data.get("ttfb_ms", 0)
                        first_chunk_time = time.time()
                        print(f"  Stream iniciado! TTFB: {ttfb}ms")

                    elif msg_type == "chunk":
                        chunks_received += 1
                        chunk_index = data.get("chunk_index", 0)
                        audio_size = data.get("audio_size", 0)
                        num_frames = data.get("num_frames", 0)

                        # Decodificar chunk
                        chunk_data = base64.b64decode(data.get("data", ""))
                        view = memoryview(chunk_data)
                        offset = 0

                        # Ler audio
                        audio_len = struct.unpack(">I", view[offset:offset+4])[0]
                        offset += 4
                        audio_bytes = bytes(view[offset:offset+audio_len])
                        offset += audio_len
                        all_audio.extend(audio_bytes)

                        # Ler frames
                        frames_count = struct.unpack(">I", view[offset:offset+4])[0]
                        offset += 4
                        for i in range(frames_count):
                            frame_len = struct.unpack(">I", view[offset:offset+4])[0]
                            offset += 4
                            frame_bytes = bytes(view[offset:offset+frame_len])
                            offset += frame_len
                            all_frames.append(frame_bytes)

                        print(f"  Chunk {chunk_index}: {audio_size} bytes audio, {num_frames} frames (total: {len(all_frames)} frames, {len(all_audio)} bytes audio)")

                    elif msg_type == "done":
                        total_chunks = data.get("total_chunks", 0)
                        total_frames = data.get("total_frames", 0)
                        elapsed_ms = data.get("elapsed_ms", 0)
                        print(f"\n  Done! {total_chunks} chunks, {total_frames} frames, {elapsed_ms}ms")
                        break

                    elif msg_type == "error":
                        print(f"  ERRO: {data.get('message')}")
                        break

                elif msg.type in (aiohttp.WSMsgType.CLOSED, aiohttp.WSMsgType.ERROR):
                    print("  WebSocket fechado")
                    break

            await ws.close()

    except Exception as e:
        print(f"Erro: {e}")
        import traceback
        traceback.print_exc()
        return

    total_time = time.time() - start_time if start_time else 0

    print("\n" + "=" * 60)
    print("Resultados:")
    print("=" * 60)
    print(f"Chunks recebidos: {chunks_received}")
    print(f"Frames recebidos: {len(all_frames)}")
    print(f"Audio recebido: {len(all_audio)} bytes")
    print(f"Tempo total: {total_time:.2f}s")

    if not all_frames or not all_audio:
        print("\nNenhum dado recebido!")
        return

    # Salvar frames como JPEGs
    print(f"\nSalvando {len(all_frames)} frames...")
    frames_dir = os.path.join(OUTPUT_DIR, "frames")
    os.makedirs(frames_dir, exist_ok=True)
    for i, frame in enumerate(all_frames):
        frame_path = os.path.join(frames_dir, f"frame_{i:05d}.jpg")
        with open(frame_path, "wb") as f:
            f.write(frame)

    # Salvar audio como WAV
    print(f"Salvando audio ({len(all_audio)} bytes)...")
    audio_path = os.path.join(OUTPUT_DIR, "audio.wav")

    # Detectar sample rate pelo tamanho do audio e frames
    # 25 fps, cada frame = 40ms
    # audio_duration = len(all_frames) * 40ms
    audio_duration_ms = len(all_frames) * 40
    samples = len(all_audio) // 2  # 16-bit = 2 bytes por sample
    if audio_duration_ms > 0:
        estimated_sample_rate = int(samples / (audio_duration_ms / 1000))
        # Arredondar para sample rates comuns
        if estimated_sample_rate > 20000:
            sample_rate = 24000
        else:
            sample_rate = 22050
    else:
        sample_rate = 24000

    print(f"Sample rate estimado: {sample_rate} Hz")

    with wave.open(audio_path, "wb") as wf:
        wf.setnchannels(1)
        wf.setsampwidth(2)  # 16-bit
        wf.setframerate(sample_rate)
        wf.writeframes(all_audio)

    # Criar video com ffmpeg
    print("\nCriando video com ffmpeg...")
    video_path = os.path.join(OUTPUT_DIR, "output.mp4")

    # Calcular fps para sincronizar com audio
    audio_duration_s = len(all_audio) / 2 / sample_rate
    if audio_duration_s > 0:
        video_fps = len(all_frames) / audio_duration_s
    else:
        video_fps = 25

    print(f"Video FPS: {video_fps:.2f}")

    cmd = [
        "ffmpeg", "-y",
        "-framerate", str(video_fps),
        "-i", os.path.join(frames_dir, "frame_%05d.jpg"),
        "-i", audio_path,
        "-vf", "pad=ceil(iw/2)*2:ceil(ih/2)*2",  # Fix odd dimensions for libx264
        "-c:v", "libx264",
        "-preset", "fast",
        "-crf", "23",
        "-c:a", "aac",
        "-b:a", "128k",
        "-pix_fmt", "yuv420p",
        "-shortest",
        video_path
    ]

    result = subprocess.run(cmd, capture_output=True, text=True)
    if result.returncode != 0:
        print(f"FFmpeg erro: {result.stderr[-500:]}")
    else:
        print(f"\nVideo salvo em: {video_path}")
        # Mostrar info do video
        probe = subprocess.run(
            ["ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", video_path],
            capture_output=True, text=True
        )
        if probe.returncode == 0:
            import json as json_mod
            info = json_mod.loads(probe.stdout)
            duration = float(info.get("format", {}).get("duration", 0))
            size = int(info.get("format", {}).get("size", 0))
            print(f"Duracao: {duration:.2f}s")
            print(f"Tamanho: {size / 1024:.1f} KB")

    print("\n" + "=" * 60)
    print("Arquivos gerados:")
    print("=" * 60)
    print(f"  Frames: {frames_dir}/")
    print(f"  Audio:  {audio_path}")
    print(f"  Video:  {video_path}")


if __name__ == "__main__":
    asyncio.run(test_streaming())