File size: 14,666 Bytes
da63a34
 
 
 
f724af4
 
 
 
 
da63a34
c764ad5
da63a34
 
 
 
 
f724af4
da63a34
 
f724af4
6dbcce8
f724af4
 
da63a34
 
f724af4
 
 
a2ce242
c5831b3
f724af4
58904c6
f724af4
 
da63a34
58904c6
da63a34
f724af4
da63a34
f724af4
da63a34
f724af4
c764ad5
f724af4
da63a34
 
a2ce242
 
 
 
 
 
 
da63a34
c764ad5
 
 
da63a34
 
 
 
 
 
 
c764ad5
da63a34
6dbcce8
da63a34
 
 
f724af4
a2ce242
f724af4
 
c764ad5
da63a34
f724af4
c764ad5
 
58904c6
 
 
c764ad5
a2ce242
f724af4
a2ce242
f724af4
da63a34
0c397a9
a2ce242
f724af4
c764ad5
da63a34
c5831b3
f724af4
 
c764ad5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
da63a34
 
 
f724af4
d953431
da63a34
 
 
c764ad5
 
 
 
 
 
 
 
 
 
 
 
f724af4
da63a34
c764ad5
f724af4
da63a34
f724af4
 
 
 
 
 
 
 
 
a2ce242
da63a34
c764ad5
da63a34
 
f724af4
 
 
 
0c397a9
f724af4
 
da63a34
f724af4
 
 
 
c764ad5
 
 
c5831b3
c764ad5
c5831b3
 
c764ad5
c5831b3
 
 
c764ad5
 
c5831b3
c764ad5
d953431
c764ad5
 
 
 
c5831b3
c764ad5
 
c5831b3
 
c764ad5
 
 
 
 
 
 
 
 
 
 
c5831b3
 
 
 
 
 
 
c764ad5
c5831b3
 
 
c764ad5
 
 
c5831b3
c764ad5
 
 
 
 
c5831b3
c764ad5
 
 
 
c5831b3
c764ad5
da63a34
 
c5831b3
c764ad5
58904c6
f724af4
 
 
 
c764ad5
da63a34
a2ce242
 
 
 
 
 
 
 
 
da63a34
a2ce242
 
 
 
 
 
 
 
 
58904c6
c764ad5
c5831b3
 
 
a2ce242
 
 
 
 
 
 
 
 
58904c6
c764ad5
 
c5831b3
 
 
 
 
 
 
 
 
 
f724af4
 
 
 
 
c764ad5
 
 
 
a2ce242
 
f724af4
da63a34
a2ce242
f724af4
a2ce242
f724af4
 
 
c764ad5
 
 
 
 
a2ce242
c5831b3
c764ad5
 
 
 
 
 
c5831b3
c764ad5
 
 
a2ce242
c764ad5
f724af4
c764ad5
f724af4
a2ce242
da63a34
f724af4
da63a34
f724af4
 
c764ad5
 
 
58904c6
 
a2ce242
58904c6
a2ce242
58904c6
 
 
a2ce242
58904c6
a2ce242
58904c6
f724af4
c764ad5
da63a34
 
 
 
 
 
 
 
 
f724af4
 
0c397a9
da63a34
 
 
 
 
 
 
 
6dbcce8
 
da63a34
 
 
 
 
 
 
f724af4
 
 
 
 
a2ce242
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
#!/usr/bin/env python3
# License: CC-BY-NC-ND-4.0
# Created by: Patrick Lumbantobing, Vertox-AI
# Copyright (c) 2025-2026 Vertox-AI. All rights reserved.
#
# This work is licensed under the Creative Commons
# Attribution-NonCommercial-NoDerivatives 4.0 International License.
# To view a copy of this license, visit
# http://creativecommons.org/licenses/by-nc-nd/4.0/
"""
Streaming speech translation client (Python CLI).

- Captures microphone audio (mono, float32) via `sounddevice`.
- Streams audio as PCM16 binary frames to the WebSocket server.
- Receives translated PCM16 audio at 24 kHz and plays it back.
- Prints incremental ASR transcript and NMT translation to stdout.

Usage
-----

    python clients/python_client.py --uri ws://localhost:8765 --sample-rate 16000
"""

from __future__ import annotations

import argparse
import asyncio
import json
import logging
import queue
import sys
import time

import numpy as np
import numpy.typing as npt
import soundfile as sf
import websockets

try:  # sounddevice is optional (for headless environments)
    import sounddevice as sd

    HAS_SOUNDDEVICE = True
except (ImportError, OSError):
    HAS_SOUNDDEVICE = False
    sd = None  # type: ignore[assignment]

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger(__name__)


global full_transcript


async def main(uri: str, sample_rate: int, chunk_duration_ms: int) -> None:
    """
    Run the streaming translation client.

    Parameters
    ----------
    uri :
        WebSocket server URI, e.g. "ws://localhost:8765".
    sample_rate :
        Microphone capture sample rate (Hz), e.g. 16000.
    chunk_duration_ms :
        Audio chunk duration in milliseconds, e.g. 10.
    """
    if not HAS_SOUNDDEVICE:
        log.error("ERROR: sounddevice is required. Install with: pip install sounddevice")
        sys.exit(1)

    # Mic callback block size (what sounddevice pulls from the OS).
    chunk_size: int = int(sample_rate * chunk_duration_ms / 1000)

    global full_transcript
    full_transcript = ""
    global all_audio_pre, all_audio_post
    all_audio_pre = []
    all_audio_post = []

    log.info(f"Connecting to {uri}...")
    async with websockets.connect(uri, max_size=2**20) as ws:
        log.info("Connected. Starting session...")

        # Send start control message.
        await ws.send(json.dumps({"action": "start", "samplerate": sample_rate}, ensure_ascii=False))
        log.info("Waiting for start confirmation...")

        # Local state.
        audio_send_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=256)
        # playback_queue: asyncio.Queue[np.ndarray] = asyncio.Queue(maxsize=256)
        running = True

        # ---------- TX: microphone -> websocket ----------

        # Target WS chunk size (independent of mic block size).
        # Example: send ~20 ms of audio per WS frame.
        target_ws_chunk_ms = 20
        target_ws_chunk_bytes = int(sample_rate * target_ws_chunk_ms / 1000) * 2  # int16 -> 2 bytes
        send_accumulator = bytearray()

        def audio_callback(
            indata: npt.NDArray[np.float32],
            frames,
            time_info,
            status,
        ) -> None:
            """Convert float32 mic buffer to PCM16 bytes and enqueue buffered chunks for sending."""
            nonlocal running
            if not running:
                return
            if status:
                log.warning(f"Audio status: {status}")
            # indata shape: (frames, channels); we use channel 0
            audio = indata[:, 0]
            audio_int16 = np.clip(audio * 32767.0, -32768, 32767).astype(np.int16)
            chunk = audio_int16.tobytes()

            # Accumulate and slice into larger WS chunks.
            send_accumulator.extend(chunk)
            while len(send_accumulator) >= target_ws_chunk_bytes:
                to_send = bytes(send_accumulator[:target_ws_chunk_bytes])
                del send_accumulator[:target_ws_chunk_bytes]
                try:
                    audio_send_queue.put_nowait(to_send)
                except asyncio.QueueFull:
                    # Drop newest chunk on overload.
                    pass

        async def send_audio() -> None:
            """Capture microphone audio and send buffered PCM16 chunks to server."""
            nonlocal running

            stream = sd.InputStream(
                channels=1,
                samplerate=sample_rate,
                blocksize=chunk_size,
                callback=audio_callback,
                latency="low",
                dtype="float32",
            )
            stream.start()
            log.info(
                f"Microphone capture started: {sample_rate} Hz, {chunk_size} samples/chunk.\n"
                f"Sending ~{target_ws_chunk_ms} ms per WebSocket frame.\n"
                "Speak into the microphone... Press Ctrl+C to stop."
            )

            try:
                while running:
                    try:
                        audio_bytes = await asyncio.wait_for(audio_send_queue.get(), timeout=0.5)
                    except asyncio.TimeoutError:
                        continue
                    await ws.send(audio_bytes)
            finally:
                stream.stop()
                stream.close()

        # ---------- RX: websocket -> FIFO playback with cross-fade ----------

        playback_samplerate = 24000
        blocksize = 480  # ~20 ms at 24 kHz

        limiter_threshold = 0.99
        fixed_gain = 0.9  # keep a bit below full-scale

        playback_buffer = queue.Queue(maxsize=256)
        playback_chunk = np.zeros(0, dtype=np.float32)
        was_underrun = False  # track whether last callback had no data

        def playback_callback(outdata, frames, time_info, status):
            nonlocal playback_chunk, was_underrun
            if status:
                log.warning(f"Playback status: {status}")

            needed = frames
            out = np.zeros(needed, dtype=np.float32)

            # Refill from thread-safe queue
            while len(playback_chunk) < needed:
                try:
                    new = playback_buffer.get_nowait()
                except queue.Empty:
                    break
                new = np.asarray(new, dtype=np.float32).flatten()
                if playback_chunk.size == 0:
                    playback_chunk = new
                else:
                    playback_chunk = np.concatenate([playback_chunk, new])

            take = min(needed, len(playback_chunk))
            if take > 0:
                out[:take] = playback_chunk[:take]
                playback_chunk = playback_chunk[take:]

                # Optional: fade-in if we had an underrun just before
                if was_underrun:
                    fade_len = min(240, take)  # ~10 ms at 24 kHz
                    fade = np.linspace(0.0, 1.0, fade_len, endpoint=False).astype(np.float32)
                    out[:fade_len] *= fade
                was_underrun = False
            else:
                # No data: output silence (required by sounddevice docs)
                # log.debug("Playback underrun: outputting silence")
                was_underrun = True

            outdata[:, 0] = out

        def prebuffer_playback_blocking(min_ms: int = 160):
            """Pre-buffer a small amount of audio to reduce underflows."""
            min_samples = int(playback_samplerate * (min_ms / 1000.0))
            buffered = []
            total = 0
            while total < min_samples:
                audio = playback_buffer.get()
                buffered.append(audio)
                total += len(audio)
            # Put everything back so the callback can consume it.
            for a in buffered:
                playback_buffer.put(a)

        async def receive_messages() -> None:
            """Receive audio and text messages from server and handle them."""
            nonlocal running
            global full_transcript
            global all_audio_pre, all_audio_post

            try:
                async for message in ws:
                    if isinstance(message, bytes):
                        # Raw PCM16 audio at 24 kHz – queue for playback as float32.
                        pcm16 = np.frombuffer(message, dtype=np.int16)
                        # log.info(
                        #     "recv pcm16 %s mean=%s std=%s min=%s max=%s shape=%s",
                        #     pcm16,
                        #     np.mean(pcm16),
                        #     np.std(pcm16),
                        #     np.min(pcm16),
                        #     np.max(pcm16),
                        #     pcm16.shape,
                        # )
                        audio = pcm16.astype(np.float32) / 32768.0
                        # log.info(
                        #     "recv audio-pre %s mean=%s std=%s min=%s max=%s shape=%s",
                        #     audio,
                        #     np.mean(audio),
                        #     np.std(audio),
                        #     np.min(audio),
                        #     np.max(audio),
                        #     audio.shape,
                        # )
                        all_audio_pre.append(audio)

                        # No DC removal, no client cross-fade, no LUFS; just fixed gain + limiter.
                        audio = audio * fixed_gain
                        np.clip(audio, -limiter_threshold, limiter_threshold, out=audio)
                        # log.info(
                        #     "recv audio-post %s mean=%s std=%s min=%s max=%s shape=%s",
                        #     audio,
                        #     np.mean(audio),
                        #     np.std(audio),
                        #     np.min(audio),
                        #     np.max(audio),
                        #     audio.shape,
                        # )
                        all_audio_post.append(audio)

                        # Rolling buffer: if full, drop oldest before adding new.
                        # while playback_queue.full():
                        #     _ = await playback_queue.get()
                        # await playback_queue.put(audio)
                        # instead of playback_queue.put(...)
                        while playback_buffer.full():
                            try:
                                _ = playback_buffer.get_nowait()
                            except queue.Empty:
                                break
                        playback_buffer.put(audio)
                    else:
                        data = json.loads(message)
                        msg_type = data.get("type")

                        if msg_type == "transcript":
                            transcript_text_delta = data.get("text", "")
                            full_transcript = (
                                full_transcript + transcript_text_delta if full_transcript else transcript_text_delta
                            )
                            log.info(f"ASR delta: {transcript_text_delta}")
                            log.info(f"ASR full: {full_transcript}")
                        elif msg_type == "translation":
                            translation_text = data.get("text", "")
                            log.info(f"NMT: {translation_text}")
                        elif msg_type == "status":
                            log.info(f"STATUS: {data.get('status')}")
            except websockets.exceptions.ConnectionClosed:
                running = False

        # Start RX and TX tasks.
        receive_task = asyncio.create_task(receive_messages())
        send_task = asyncio.create_task(send_audio())

        # Pre-buffer some audio for smoother playback.
        log.info("Pre-buffering playback audio...")
        await asyncio.to_thread(prebuffer_playback_blocking, 160)

        playback_stream = sd.OutputStream(
            channels=1,
            samplerate=playback_samplerate,
            callback=playback_callback,
            dtype="float32",
            latency=0.08,  # ~80 ms output latency
            blocksize=blocksize,
        )
        playback_stream.start()
        log.info("Playback started.")

        try:
            await asyncio.gather(send_task, receive_task)
        except KeyboardInterrupt:
            log.info("Stopping client...")
            running = False
            try:
                await ws.send(json.dumps({"action": "stop"}))
            except Exception:
                pass
        finally:
            playback_stream.stop()
            playback_stream.close()
            if all_audio_pre:
                full_wav_pre = np.concatenate(all_audio_pre, axis=0)
                out_wav_file = f'client-synth-{str(time.time()).replace(".", "-")}-pre.wav'
                sf.write(out_wav_file, full_wav_pre, 24000)
                log.info(f"TTS pre-output saved: {out_wav_file} ({full_wav_pre.shape[0]/24000:.2f}s audio)")
                all_audio_pre = []
            if all_audio_post:
                full_wav_post = np.concatenate(all_audio_post, axis=0)
                out_wav_file = f'client-synth-{str(time.time()).replace(".", "-")}-post.wav'
                sf.write(out_wav_file, full_wav_post, 24000)
                log.info(f"TTS post-output saved: {out_wav_file} ({full_wav_post.shape[0]/24000:.2f}s audio)")
                all_audio_post = []

        # Optional: allow server to flush final messages if protocol supports it.
        try:
            async for message in ws:
                if isinstance(message, str):
                    data = json.loads(message)
                    if data.get("type") == "status" and data.get("status") == "stopped":
                        break
        except Exception:
            pass


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Streaming speech translation client (Python CLI)")
    parser.add_argument(
        "--uri",
        default="ws://localhost:8765",
        help="WebSocket server URI (default: ws://localhost:8765)",
    )
    parser.add_argument(
        "--sample-rate",
        type=int,
        default=16000,
        help="Microphone sample rate in Hz (default: 16000)",
    )
    parser.add_argument(
        "--chunk-ms",
        type=int,
        default=10,
        help="Audio chunk duration in milliseconds (default: 10 ms)",
    )
    args = parser.parse_args()

    try:
        asyncio.run(main(args.uri, args.sample_rate, args.chunk_ms))
    except KeyboardInterrupt:
        log.info("Client stopped.")