#!/usr/bin/env python3 # License: CC-BY-NC-ND-4.0 # Created by: Patrick Lumbantobing, Vertox-AI # Copyright (c) 2025-2026 Vertox-AI. All rights reserved. # # This work is licensed under the Creative Commons # Attribution-NonCommercial-NoDerivatives 4.0 International License. # To view a copy of this license, visit # http://creativecommons.org/licenses/by-nc-nd/4.0/ """ Streaming speech translation client (Python CLI). - Captures microphone audio (mono, float32) via `sounddevice`. - Streams audio as PCM16 binary frames to the WebSocket server. - Receives translated PCM16 audio at 24 kHz and plays it back. - Prints incremental ASR transcript and NMT translation to stdout. Usage ----- python clients/python_client.py --uri ws://localhost:8765 --sample-rate 16000 """ from __future__ import annotations import argparse import asyncio import json import logging import queue import sys import time import numpy as np import numpy.typing as npt import soundfile as sf import websockets try: # sounddevice is optional (for headless environments) import sounddevice as sd HAS_SOUNDDEVICE = True except (ImportError, OSError): HAS_SOUNDDEVICE = False sd = None # type: ignore[assignment] logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", datefmt="%Y-%m-%d %H:%M:%S", ) log = logging.getLogger(__name__) global full_transcript async def main(uri: str, sample_rate: int, chunk_duration_ms: int) -> None: """ Run the streaming translation client. Parameters ---------- uri : WebSocket server URI, e.g. "ws://localhost:8765". sample_rate : Microphone capture sample rate (Hz), e.g. 16000. chunk_duration_ms : Audio chunk duration in milliseconds, e.g. 10. """ if not HAS_SOUNDDEVICE: log.error("ERROR: sounddevice is required. Install with: pip install sounddevice") sys.exit(1) # Mic callback block size (what sounddevice pulls from the OS). chunk_size: int = int(sample_rate * chunk_duration_ms / 1000) global full_transcript full_transcript = "" global all_audio_pre, all_audio_post all_audio_pre = [] all_audio_post = [] log.info(f"Connecting to {uri}...") async with websockets.connect(uri, max_size=2**20) as ws: log.info("Connected. Starting session...") # Send start control message. await ws.send(json.dumps({"action": "start", "samplerate": sample_rate}, ensure_ascii=False)) log.info("Waiting for start confirmation...") # Local state. audio_send_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=256) # playback_queue: asyncio.Queue[np.ndarray] = asyncio.Queue(maxsize=256) running = True # ---------- TX: microphone -> websocket ---------- # Target WS chunk size (independent of mic block size). # Example: send ~20 ms of audio per WS frame. target_ws_chunk_ms = 20 target_ws_chunk_bytes = int(sample_rate * target_ws_chunk_ms / 1000) * 2 # int16 -> 2 bytes send_accumulator = bytearray() def audio_callback( indata: npt.NDArray[np.float32], frames, time_info, status, ) -> None: """Convert float32 mic buffer to PCM16 bytes and enqueue buffered chunks for sending.""" nonlocal running if not running: return if status: log.warning(f"Audio status: {status}") # indata shape: (frames, channels); we use channel 0 audio = indata[:, 0] audio_int16 = np.clip(audio * 32767.0, -32768, 32767).astype(np.int16) chunk = audio_int16.tobytes() # Accumulate and slice into larger WS chunks. send_accumulator.extend(chunk) while len(send_accumulator) >= target_ws_chunk_bytes: to_send = bytes(send_accumulator[:target_ws_chunk_bytes]) del send_accumulator[:target_ws_chunk_bytes] try: audio_send_queue.put_nowait(to_send) except asyncio.QueueFull: # Drop newest chunk on overload. pass async def send_audio() -> None: """Capture microphone audio and send buffered PCM16 chunks to server.""" nonlocal running stream = sd.InputStream( channels=1, samplerate=sample_rate, blocksize=chunk_size, callback=audio_callback, latency="low", dtype="float32", ) stream.start() log.info( f"Microphone capture started: {sample_rate} Hz, {chunk_size} samples/chunk.\n" f"Sending ~{target_ws_chunk_ms} ms per WebSocket frame.\n" "Speak into the microphone... Press Ctrl+C to stop." ) try: while running: try: audio_bytes = await asyncio.wait_for(audio_send_queue.get(), timeout=0.5) except asyncio.TimeoutError: continue await ws.send(audio_bytes) finally: stream.stop() stream.close() # ---------- RX: websocket -> FIFO playback with cross-fade ---------- playback_samplerate = 24000 blocksize = 480 # ~20 ms at 24 kHz limiter_threshold = 0.99 fixed_gain = 0.9 # keep a bit below full-scale playback_buffer = queue.Queue(maxsize=256) playback_chunk = np.zeros(0, dtype=np.float32) was_underrun = False # track whether last callback had no data def playback_callback(outdata, frames, time_info, status): nonlocal playback_chunk, was_underrun if status: log.warning(f"Playback status: {status}") needed = frames out = np.zeros(needed, dtype=np.float32) # Refill from thread-safe queue while len(playback_chunk) < needed: try: new = playback_buffer.get_nowait() except queue.Empty: break new = np.asarray(new, dtype=np.float32).flatten() if playback_chunk.size == 0: playback_chunk = new else: playback_chunk = np.concatenate([playback_chunk, new]) take = min(needed, len(playback_chunk)) if take > 0: out[:take] = playback_chunk[:take] playback_chunk = playback_chunk[take:] # Optional: fade-in if we had an underrun just before if was_underrun: fade_len = min(240, take) # ~10 ms at 24 kHz fade = np.linspace(0.0, 1.0, fade_len, endpoint=False).astype(np.float32) out[:fade_len] *= fade was_underrun = False else: # No data: output silence (required by sounddevice docs) # log.debug("Playback underrun: outputting silence") was_underrun = True outdata[:, 0] = out def prebuffer_playback_blocking(min_ms: int = 160): """Pre-buffer a small amount of audio to reduce underflows.""" min_samples = int(playback_samplerate * (min_ms / 1000.0)) buffered = [] total = 0 while total < min_samples: audio = playback_buffer.get() buffered.append(audio) total += len(audio) # Put everything back so the callback can consume it. for a in buffered: playback_buffer.put(a) async def receive_messages() -> None: """Receive audio and text messages from server and handle them.""" nonlocal running global full_transcript global all_audio_pre, all_audio_post try: async for message in ws: if isinstance(message, bytes): # Raw PCM16 audio at 24 kHz – queue for playback as float32. pcm16 = np.frombuffer(message, dtype=np.int16) # log.info( # "recv pcm16 %s mean=%s std=%s min=%s max=%s shape=%s", # pcm16, # np.mean(pcm16), # np.std(pcm16), # np.min(pcm16), # np.max(pcm16), # pcm16.shape, # ) audio = pcm16.astype(np.float32) / 32768.0 # log.info( # "recv audio-pre %s mean=%s std=%s min=%s max=%s shape=%s", # audio, # np.mean(audio), # np.std(audio), # np.min(audio), # np.max(audio), # audio.shape, # ) all_audio_pre.append(audio) # No DC removal, no client cross-fade, no LUFS; just fixed gain + limiter. audio = audio * fixed_gain np.clip(audio, -limiter_threshold, limiter_threshold, out=audio) # log.info( # "recv audio-post %s mean=%s std=%s min=%s max=%s shape=%s", # audio, # np.mean(audio), # np.std(audio), # np.min(audio), # np.max(audio), # audio.shape, # ) all_audio_post.append(audio) # Rolling buffer: if full, drop oldest before adding new. # while playback_queue.full(): # _ = await playback_queue.get() # await playback_queue.put(audio) # instead of playback_queue.put(...) while playback_buffer.full(): try: _ = playback_buffer.get_nowait() except queue.Empty: break playback_buffer.put(audio) else: data = json.loads(message) msg_type = data.get("type") if msg_type == "transcript": transcript_text_delta = data.get("text", "") full_transcript = ( full_transcript + transcript_text_delta if full_transcript else transcript_text_delta ) log.info(f"ASR delta: {transcript_text_delta}") log.info(f"ASR full: {full_transcript}") elif msg_type == "translation": translation_text = data.get("text", "") log.info(f"NMT: {translation_text}") elif msg_type == "status": log.info(f"STATUS: {data.get('status')}") except websockets.exceptions.ConnectionClosed: running = False # Start RX and TX tasks. receive_task = asyncio.create_task(receive_messages()) send_task = asyncio.create_task(send_audio()) # Pre-buffer some audio for smoother playback. log.info("Pre-buffering playback audio...") await asyncio.to_thread(prebuffer_playback_blocking, 160) playback_stream = sd.OutputStream( channels=1, samplerate=playback_samplerate, callback=playback_callback, dtype="float32", latency=0.08, # ~80 ms output latency blocksize=blocksize, ) playback_stream.start() log.info("Playback started.") try: await asyncio.gather(send_task, receive_task) except KeyboardInterrupt: log.info("Stopping client...") running = False try: await ws.send(json.dumps({"action": "stop"})) except Exception: pass finally: playback_stream.stop() playback_stream.close() if all_audio_pre: full_wav_pre = np.concatenate(all_audio_pre, axis=0) out_wav_file = f'client-synth-{str(time.time()).replace(".", "-")}-pre.wav' sf.write(out_wav_file, full_wav_pre, 24000) log.info(f"TTS pre-output saved: {out_wav_file} ({full_wav_pre.shape[0]/24000:.2f}s audio)") all_audio_pre = [] if all_audio_post: full_wav_post = np.concatenate(all_audio_post, axis=0) out_wav_file = f'client-synth-{str(time.time()).replace(".", "-")}-post.wav' sf.write(out_wav_file, full_wav_post, 24000) log.info(f"TTS post-output saved: {out_wav_file} ({full_wav_post.shape[0]/24000:.2f}s audio)") all_audio_post = [] # Optional: allow server to flush final messages if protocol supports it. try: async for message in ws: if isinstance(message, str): data = json.loads(message) if data.get("type") == "status" and data.get("status") == "stopped": break except Exception: pass if __name__ == "__main__": parser = argparse.ArgumentParser(description="Streaming speech translation client (Python CLI)") parser.add_argument( "--uri", default="ws://localhost:8765", help="WebSocket server URI (default: ws://localhost:8765)", ) parser.add_argument( "--sample-rate", type=int, default=16000, help="Microphone sample rate in Hz (default: 16000)", ) parser.add_argument( "--chunk-ms", type=int, default=10, help="Audio chunk duration in milliseconds (default: 10 ms)", ) args = parser.parse_args() try: asyncio.run(main(args.uri, args.sample_rate, args.chunk_ms)) except KeyboardInterrupt: log.info("Client stopped.")