Text-to-Speech
ONNX
GGUF
speech-translation
streaming-speech-translation
speech
audio
speech-recognition
automatic-speech-recognition
streaming-asr
ASR
NeMo
ONNX
cache-aware ASR
FastConformer
RNNT
Parakeet
neural-machine-translation
NMT
gemma3
llama-cpp
GGUF
conversational
TTS
xtts
xttsv2
voice-clone
gpt2
hifigan
multilingual
vq
perceiver-encoder
websocket
| #!/usr/bin/env python3 | |
| # License: CC-BY-NC-ND-4.0 | |
| # Created by: Patrick Lumbantobing, Vertox-AI | |
| # Copyright (c) 2025-2026 Vertox-AI. All rights reserved. | |
| # | |
| # This work is licensed under the Creative Commons | |
| # Attribution-NonCommercial-NoDerivatives 4.0 International License. | |
| # To view a copy of this license, visit | |
| # http://creativecommons.org/licenses/by-nc-nd/4.0/ | |
| """ | |
| Streaming speech translation client (Python CLI). | |
| - Captures microphone audio (mono, float32) via `sounddevice`. | |
| - Streams audio as PCM16 binary frames to the WebSocket server. | |
| - Receives translated PCM16 audio at 24 kHz and plays it back. | |
| - Prints incremental ASR transcript and NMT translation to stdout. | |
| Usage | |
| ----- | |
| python clients/python_client.py --uri ws://localhost:8765 --sample-rate 16000 | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import asyncio | |
| import json | |
| import logging | |
| import queue | |
| import sys | |
| import time | |
| import numpy as np | |
| import numpy.typing as npt | |
| import soundfile as sf | |
| import websockets | |
| try: # sounddevice is optional (for headless environments) | |
| import sounddevice as sd | |
| HAS_SOUNDDEVICE = True | |
| except (ImportError, OSError): | |
| HAS_SOUNDDEVICE = False | |
| sd = None # type: ignore[assignment] | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", | |
| datefmt="%Y-%m-%d %H:%M:%S", | |
| ) | |
| log = logging.getLogger(__name__) | |
| global full_transcript | |
| async def main(uri: str, sample_rate: int, chunk_duration_ms: int) -> None: | |
| """ | |
| Run the streaming translation client. | |
| Parameters | |
| ---------- | |
| uri : | |
| WebSocket server URI, e.g. "ws://localhost:8765". | |
| sample_rate : | |
| Microphone capture sample rate (Hz), e.g. 16000. | |
| chunk_duration_ms : | |
| Audio chunk duration in milliseconds, e.g. 10. | |
| """ | |
| if not HAS_SOUNDDEVICE: | |
| log.error("ERROR: sounddevice is required. Install with: pip install sounddevice") | |
| sys.exit(1) | |
| # Mic callback block size (what sounddevice pulls from the OS). | |
| chunk_size: int = int(sample_rate * chunk_duration_ms / 1000) | |
| global full_transcript | |
| full_transcript = "" | |
| global all_audio_pre, all_audio_post | |
| all_audio_pre = [] | |
| all_audio_post = [] | |
| log.info(f"Connecting to {uri}...") | |
| async with websockets.connect(uri, max_size=2**20) as ws: | |
| log.info("Connected. Starting session...") | |
| # Send start control message. | |
| await ws.send(json.dumps({"action": "start", "samplerate": sample_rate}, ensure_ascii=False)) | |
| log.info("Waiting for start confirmation...") | |
| # Local state. | |
| audio_send_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=256) | |
| # playback_queue: asyncio.Queue[np.ndarray] = asyncio.Queue(maxsize=256) | |
| running = True | |
| # ---------- TX: microphone -> websocket ---------- | |
| # Target WS chunk size (independent of mic block size). | |
| # Example: send ~20 ms of audio per WS frame. | |
| target_ws_chunk_ms = 20 | |
| target_ws_chunk_bytes = int(sample_rate * target_ws_chunk_ms / 1000) * 2 # int16 -> 2 bytes | |
| send_accumulator = bytearray() | |
| def audio_callback( | |
| indata: npt.NDArray[np.float32], | |
| frames, | |
| time_info, | |
| status, | |
| ) -> None: | |
| """Convert float32 mic buffer to PCM16 bytes and enqueue buffered chunks for sending.""" | |
| nonlocal running | |
| if not running: | |
| return | |
| if status: | |
| log.warning(f"Audio status: {status}") | |
| # indata shape: (frames, channels); we use channel 0 | |
| audio = indata[:, 0] | |
| audio_int16 = np.clip(audio * 32767.0, -32768, 32767).astype(np.int16) | |
| chunk = audio_int16.tobytes() | |
| # Accumulate and slice into larger WS chunks. | |
| send_accumulator.extend(chunk) | |
| while len(send_accumulator) >= target_ws_chunk_bytes: | |
| to_send = bytes(send_accumulator[:target_ws_chunk_bytes]) | |
| del send_accumulator[:target_ws_chunk_bytes] | |
| try: | |
| audio_send_queue.put_nowait(to_send) | |
| except asyncio.QueueFull: | |
| # Drop newest chunk on overload. | |
| pass | |
| async def send_audio() -> None: | |
| """Capture microphone audio and send buffered PCM16 chunks to server.""" | |
| nonlocal running | |
| stream = sd.InputStream( | |
| channels=1, | |
| samplerate=sample_rate, | |
| blocksize=chunk_size, | |
| callback=audio_callback, | |
| latency="low", | |
| dtype="float32", | |
| ) | |
| stream.start() | |
| log.info( | |
| f"Microphone capture started: {sample_rate} Hz, {chunk_size} samples/chunk.\n" | |
| f"Sending ~{target_ws_chunk_ms} ms per WebSocket frame.\n" | |
| "Speak into the microphone... Press Ctrl+C to stop." | |
| ) | |
| try: | |
| while running: | |
| try: | |
| audio_bytes = await asyncio.wait_for(audio_send_queue.get(), timeout=0.5) | |
| except asyncio.TimeoutError: | |
| continue | |
| await ws.send(audio_bytes) | |
| finally: | |
| stream.stop() | |
| stream.close() | |
| # ---------- RX: websocket -> FIFO playback with cross-fade ---------- | |
| playback_samplerate = 24000 | |
| blocksize = 480 # ~20 ms at 24 kHz | |
| limiter_threshold = 0.99 | |
| fixed_gain = 0.9 # keep a bit below full-scale | |
| playback_buffer = queue.Queue(maxsize=256) | |
| playback_chunk = np.zeros(0, dtype=np.float32) | |
| was_underrun = False # track whether last callback had no data | |
| def playback_callback(outdata, frames, time_info, status): | |
| nonlocal playback_chunk, was_underrun | |
| if status: | |
| log.warning(f"Playback status: {status}") | |
| needed = frames | |
| out = np.zeros(needed, dtype=np.float32) | |
| # Refill from thread-safe queue | |
| while len(playback_chunk) < needed: | |
| try: | |
| new = playback_buffer.get_nowait() | |
| except queue.Empty: | |
| break | |
| new = np.asarray(new, dtype=np.float32).flatten() | |
| if playback_chunk.size == 0: | |
| playback_chunk = new | |
| else: | |
| playback_chunk = np.concatenate([playback_chunk, new]) | |
| take = min(needed, len(playback_chunk)) | |
| if take > 0: | |
| out[:take] = playback_chunk[:take] | |
| playback_chunk = playback_chunk[take:] | |
| # Optional: fade-in if we had an underrun just before | |
| if was_underrun: | |
| fade_len = min(240, take) # ~10 ms at 24 kHz | |
| fade = np.linspace(0.0, 1.0, fade_len, endpoint=False).astype(np.float32) | |
| out[:fade_len] *= fade | |
| was_underrun = False | |
| else: | |
| # No data: output silence (required by sounddevice docs) | |
| # log.debug("Playback underrun: outputting silence") | |
| was_underrun = True | |
| outdata[:, 0] = out | |
| def prebuffer_playback_blocking(min_ms: int = 160): | |
| """Pre-buffer a small amount of audio to reduce underflows.""" | |
| min_samples = int(playback_samplerate * (min_ms / 1000.0)) | |
| buffered = [] | |
| total = 0 | |
| while total < min_samples: | |
| audio = playback_buffer.get() | |
| buffered.append(audio) | |
| total += len(audio) | |
| # Put everything back so the callback can consume it. | |
| for a in buffered: | |
| playback_buffer.put(a) | |
| async def receive_messages() -> None: | |
| """Receive audio and text messages from server and handle them.""" | |
| nonlocal running | |
| global full_transcript | |
| global all_audio_pre, all_audio_post | |
| try: | |
| async for message in ws: | |
| if isinstance(message, bytes): | |
| # Raw PCM16 audio at 24 kHz – queue for playback as float32. | |
| pcm16 = np.frombuffer(message, dtype=np.int16) | |
| # log.info( | |
| # "recv pcm16 %s mean=%s std=%s min=%s max=%s shape=%s", | |
| # pcm16, | |
| # np.mean(pcm16), | |
| # np.std(pcm16), | |
| # np.min(pcm16), | |
| # np.max(pcm16), | |
| # pcm16.shape, | |
| # ) | |
| audio = pcm16.astype(np.float32) / 32768.0 | |
| # log.info( | |
| # "recv audio-pre %s mean=%s std=%s min=%s max=%s shape=%s", | |
| # audio, | |
| # np.mean(audio), | |
| # np.std(audio), | |
| # np.min(audio), | |
| # np.max(audio), | |
| # audio.shape, | |
| # ) | |
| all_audio_pre.append(audio) | |
| # No DC removal, no client cross-fade, no LUFS; just fixed gain + limiter. | |
| audio = audio * fixed_gain | |
| np.clip(audio, -limiter_threshold, limiter_threshold, out=audio) | |
| # log.info( | |
| # "recv audio-post %s mean=%s std=%s min=%s max=%s shape=%s", | |
| # audio, | |
| # np.mean(audio), | |
| # np.std(audio), | |
| # np.min(audio), | |
| # np.max(audio), | |
| # audio.shape, | |
| # ) | |
| all_audio_post.append(audio) | |
| # Rolling buffer: if full, drop oldest before adding new. | |
| # while playback_queue.full(): | |
| # _ = await playback_queue.get() | |
| # await playback_queue.put(audio) | |
| # instead of playback_queue.put(...) | |
| while playback_buffer.full(): | |
| try: | |
| _ = playback_buffer.get_nowait() | |
| except queue.Empty: | |
| break | |
| playback_buffer.put(audio) | |
| else: | |
| data = json.loads(message) | |
| msg_type = data.get("type") | |
| if msg_type == "transcript": | |
| transcript_text_delta = data.get("text", "") | |
| full_transcript = ( | |
| full_transcript + transcript_text_delta if full_transcript else transcript_text_delta | |
| ) | |
| log.info(f"ASR delta: {transcript_text_delta}") | |
| log.info(f"ASR full: {full_transcript}") | |
| elif msg_type == "translation": | |
| translation_text = data.get("text", "") | |
| log.info(f"NMT: {translation_text}") | |
| elif msg_type == "status": | |
| log.info(f"STATUS: {data.get('status')}") | |
| except websockets.exceptions.ConnectionClosed: | |
| running = False | |
| # Start RX and TX tasks. | |
| receive_task = asyncio.create_task(receive_messages()) | |
| send_task = asyncio.create_task(send_audio()) | |
| # Pre-buffer some audio for smoother playback. | |
| log.info("Pre-buffering playback audio...") | |
| await asyncio.to_thread(prebuffer_playback_blocking, 160) | |
| playback_stream = sd.OutputStream( | |
| channels=1, | |
| samplerate=playback_samplerate, | |
| callback=playback_callback, | |
| dtype="float32", | |
| latency=0.08, # ~80 ms output latency | |
| blocksize=blocksize, | |
| ) | |
| playback_stream.start() | |
| log.info("Playback started.") | |
| try: | |
| await asyncio.gather(send_task, receive_task) | |
| except KeyboardInterrupt: | |
| log.info("Stopping client...") | |
| running = False | |
| try: | |
| await ws.send(json.dumps({"action": "stop"})) | |
| except Exception: | |
| pass | |
| finally: | |
| playback_stream.stop() | |
| playback_stream.close() | |
| if all_audio_pre: | |
| full_wav_pre = np.concatenate(all_audio_pre, axis=0) | |
| out_wav_file = f'client-synth-{str(time.time()).replace(".", "-")}-pre.wav' | |
| sf.write(out_wav_file, full_wav_pre, 24000) | |
| log.info(f"TTS pre-output saved: {out_wav_file} ({full_wav_pre.shape[0]/24000:.2f}s audio)") | |
| all_audio_pre = [] | |
| if all_audio_post: | |
| full_wav_post = np.concatenate(all_audio_post, axis=0) | |
| out_wav_file = f'client-synth-{str(time.time()).replace(".", "-")}-post.wav' | |
| sf.write(out_wav_file, full_wav_post, 24000) | |
| log.info(f"TTS post-output saved: {out_wav_file} ({full_wav_post.shape[0]/24000:.2f}s audio)") | |
| all_audio_post = [] | |
| # Optional: allow server to flush final messages if protocol supports it. | |
| try: | |
| async for message in ws: | |
| if isinstance(message, str): | |
| data = json.loads(message) | |
| if data.get("type") == "status" and data.get("status") == "stopped": | |
| break | |
| except Exception: | |
| pass | |
| if __name__ == "__main__": | |
| parser = argparse.ArgumentParser(description="Streaming speech translation client (Python CLI)") | |
| parser.add_argument( | |
| "--uri", | |
| default="ws://localhost:8765", | |
| help="WebSocket server URI (default: ws://localhost:8765)", | |
| ) | |
| parser.add_argument( | |
| "--sample-rate", | |
| type=int, | |
| default=16000, | |
| help="Microphone sample rate in Hz (default: 16000)", | |
| ) | |
| parser.add_argument( | |
| "--chunk-ms", | |
| type=int, | |
| default=10, | |
| help="Audio chunk duration in milliseconds (default: 10 ms)", | |
| ) | |
| args = parser.parse_args() | |
| try: | |
| asyncio.run(main(args.uri, args.sample_rate, args.chunk_ms)) | |
| except KeyboardInterrupt: | |
| log.info("Client stopped.") | |