pltobing's picture
python_client: Avoid async in playback callback, use thread async for prebuffer, reduce blocksize and latency, keep minimal threshold and gain.
c5831b3
#!/usr/bin/env python3
# License: CC-BY-NC-ND-4.0
# Created by: Patrick Lumbantobing, Vertox-AI
# Copyright (c) 2025-2026 Vertox-AI. All rights reserved.
#
# This work is licensed under the Creative Commons
# Attribution-NonCommercial-NoDerivatives 4.0 International License.
# To view a copy of this license, visit
# http://creativecommons.org/licenses/by-nc-nd/4.0/
"""
Streaming speech translation client (Python CLI).
- Captures microphone audio (mono, float32) via `sounddevice`.
- Streams audio as PCM16 binary frames to the WebSocket server.
- Receives translated PCM16 audio at 24 kHz and plays it back.
- Prints incremental ASR transcript and NMT translation to stdout.
Usage
-----
python clients/python_client.py --uri ws://localhost:8765 --sample-rate 16000
"""
from __future__ import annotations
import argparse
import asyncio
import json
import logging
import queue
import sys
import time
import numpy as np
import numpy.typing as npt
import soundfile as sf
import websockets
try: # sounddevice is optional (for headless environments)
import sounddevice as sd
HAS_SOUNDDEVICE = True
except (ImportError, OSError):
HAS_SOUNDDEVICE = False
sd = None # type: ignore[assignment]
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
log = logging.getLogger(__name__)
global full_transcript
async def main(uri: str, sample_rate: int, chunk_duration_ms: int) -> None:
"""
Run the streaming translation client.
Parameters
----------
uri :
WebSocket server URI, e.g. "ws://localhost:8765".
sample_rate :
Microphone capture sample rate (Hz), e.g. 16000.
chunk_duration_ms :
Audio chunk duration in milliseconds, e.g. 10.
"""
if not HAS_SOUNDDEVICE:
log.error("ERROR: sounddevice is required. Install with: pip install sounddevice")
sys.exit(1)
# Mic callback block size (what sounddevice pulls from the OS).
chunk_size: int = int(sample_rate * chunk_duration_ms / 1000)
global full_transcript
full_transcript = ""
global all_audio_pre, all_audio_post
all_audio_pre = []
all_audio_post = []
log.info(f"Connecting to {uri}...")
async with websockets.connect(uri, max_size=2**20) as ws:
log.info("Connected. Starting session...")
# Send start control message.
await ws.send(json.dumps({"action": "start", "samplerate": sample_rate}, ensure_ascii=False))
log.info("Waiting for start confirmation...")
# Local state.
audio_send_queue: asyncio.Queue[bytes] = asyncio.Queue(maxsize=256)
# playback_queue: asyncio.Queue[np.ndarray] = asyncio.Queue(maxsize=256)
running = True
# ---------- TX: microphone -> websocket ----------
# Target WS chunk size (independent of mic block size).
# Example: send ~20 ms of audio per WS frame.
target_ws_chunk_ms = 20
target_ws_chunk_bytes = int(sample_rate * target_ws_chunk_ms / 1000) * 2 # int16 -> 2 bytes
send_accumulator = bytearray()
def audio_callback(
indata: npt.NDArray[np.float32],
frames,
time_info,
status,
) -> None:
"""Convert float32 mic buffer to PCM16 bytes and enqueue buffered chunks for sending."""
nonlocal running
if not running:
return
if status:
log.warning(f"Audio status: {status}")
# indata shape: (frames, channels); we use channel 0
audio = indata[:, 0]
audio_int16 = np.clip(audio * 32767.0, -32768, 32767).astype(np.int16)
chunk = audio_int16.tobytes()
# Accumulate and slice into larger WS chunks.
send_accumulator.extend(chunk)
while len(send_accumulator) >= target_ws_chunk_bytes:
to_send = bytes(send_accumulator[:target_ws_chunk_bytes])
del send_accumulator[:target_ws_chunk_bytes]
try:
audio_send_queue.put_nowait(to_send)
except asyncio.QueueFull:
# Drop newest chunk on overload.
pass
async def send_audio() -> None:
"""Capture microphone audio and send buffered PCM16 chunks to server."""
nonlocal running
stream = sd.InputStream(
channels=1,
samplerate=sample_rate,
blocksize=chunk_size,
callback=audio_callback,
latency="low",
dtype="float32",
)
stream.start()
log.info(
f"Microphone capture started: {sample_rate} Hz, {chunk_size} samples/chunk.\n"
f"Sending ~{target_ws_chunk_ms} ms per WebSocket frame.\n"
"Speak into the microphone... Press Ctrl+C to stop."
)
try:
while running:
try:
audio_bytes = await asyncio.wait_for(audio_send_queue.get(), timeout=0.5)
except asyncio.TimeoutError:
continue
await ws.send(audio_bytes)
finally:
stream.stop()
stream.close()
# ---------- RX: websocket -> FIFO playback with cross-fade ----------
playback_samplerate = 24000
blocksize = 480 # ~20 ms at 24 kHz
limiter_threshold = 0.99
fixed_gain = 0.9 # keep a bit below full-scale
playback_buffer = queue.Queue(maxsize=256)
playback_chunk = np.zeros(0, dtype=np.float32)
was_underrun = False # track whether last callback had no data
def playback_callback(outdata, frames, time_info, status):
nonlocal playback_chunk, was_underrun
if status:
log.warning(f"Playback status: {status}")
needed = frames
out = np.zeros(needed, dtype=np.float32)
# Refill from thread-safe queue
while len(playback_chunk) < needed:
try:
new = playback_buffer.get_nowait()
except queue.Empty:
break
new = np.asarray(new, dtype=np.float32).flatten()
if playback_chunk.size == 0:
playback_chunk = new
else:
playback_chunk = np.concatenate([playback_chunk, new])
take = min(needed, len(playback_chunk))
if take > 0:
out[:take] = playback_chunk[:take]
playback_chunk = playback_chunk[take:]
# Optional: fade-in if we had an underrun just before
if was_underrun:
fade_len = min(240, take) # ~10 ms at 24 kHz
fade = np.linspace(0.0, 1.0, fade_len, endpoint=False).astype(np.float32)
out[:fade_len] *= fade
was_underrun = False
else:
# No data: output silence (required by sounddevice docs)
# log.debug("Playback underrun: outputting silence")
was_underrun = True
outdata[:, 0] = out
def prebuffer_playback_blocking(min_ms: int = 160):
"""Pre-buffer a small amount of audio to reduce underflows."""
min_samples = int(playback_samplerate * (min_ms / 1000.0))
buffered = []
total = 0
while total < min_samples:
audio = playback_buffer.get()
buffered.append(audio)
total += len(audio)
# Put everything back so the callback can consume it.
for a in buffered:
playback_buffer.put(a)
async def receive_messages() -> None:
"""Receive audio and text messages from server and handle them."""
nonlocal running
global full_transcript
global all_audio_pre, all_audio_post
try:
async for message in ws:
if isinstance(message, bytes):
# Raw PCM16 audio at 24 kHz – queue for playback as float32.
pcm16 = np.frombuffer(message, dtype=np.int16)
# log.info(
# "recv pcm16 %s mean=%s std=%s min=%s max=%s shape=%s",
# pcm16,
# np.mean(pcm16),
# np.std(pcm16),
# np.min(pcm16),
# np.max(pcm16),
# pcm16.shape,
# )
audio = pcm16.astype(np.float32) / 32768.0
# log.info(
# "recv audio-pre %s mean=%s std=%s min=%s max=%s shape=%s",
# audio,
# np.mean(audio),
# np.std(audio),
# np.min(audio),
# np.max(audio),
# audio.shape,
# )
all_audio_pre.append(audio)
# No DC removal, no client cross-fade, no LUFS; just fixed gain + limiter.
audio = audio * fixed_gain
np.clip(audio, -limiter_threshold, limiter_threshold, out=audio)
# log.info(
# "recv audio-post %s mean=%s std=%s min=%s max=%s shape=%s",
# audio,
# np.mean(audio),
# np.std(audio),
# np.min(audio),
# np.max(audio),
# audio.shape,
# )
all_audio_post.append(audio)
# Rolling buffer: if full, drop oldest before adding new.
# while playback_queue.full():
# _ = await playback_queue.get()
# await playback_queue.put(audio)
# instead of playback_queue.put(...)
while playback_buffer.full():
try:
_ = playback_buffer.get_nowait()
except queue.Empty:
break
playback_buffer.put(audio)
else:
data = json.loads(message)
msg_type = data.get("type")
if msg_type == "transcript":
transcript_text_delta = data.get("text", "")
full_transcript = (
full_transcript + transcript_text_delta if full_transcript else transcript_text_delta
)
log.info(f"ASR delta: {transcript_text_delta}")
log.info(f"ASR full: {full_transcript}")
elif msg_type == "translation":
translation_text = data.get("text", "")
log.info(f"NMT: {translation_text}")
elif msg_type == "status":
log.info(f"STATUS: {data.get('status')}")
except websockets.exceptions.ConnectionClosed:
running = False
# Start RX and TX tasks.
receive_task = asyncio.create_task(receive_messages())
send_task = asyncio.create_task(send_audio())
# Pre-buffer some audio for smoother playback.
log.info("Pre-buffering playback audio...")
await asyncio.to_thread(prebuffer_playback_blocking, 160)
playback_stream = sd.OutputStream(
channels=1,
samplerate=playback_samplerate,
callback=playback_callback,
dtype="float32",
latency=0.08, # ~80 ms output latency
blocksize=blocksize,
)
playback_stream.start()
log.info("Playback started.")
try:
await asyncio.gather(send_task, receive_task)
except KeyboardInterrupt:
log.info("Stopping client...")
running = False
try:
await ws.send(json.dumps({"action": "stop"}))
except Exception:
pass
finally:
playback_stream.stop()
playback_stream.close()
if all_audio_pre:
full_wav_pre = np.concatenate(all_audio_pre, axis=0)
out_wav_file = f'client-synth-{str(time.time()).replace(".", "-")}-pre.wav'
sf.write(out_wav_file, full_wav_pre, 24000)
log.info(f"TTS pre-output saved: {out_wav_file} ({full_wav_pre.shape[0]/24000:.2f}s audio)")
all_audio_pre = []
if all_audio_post:
full_wav_post = np.concatenate(all_audio_post, axis=0)
out_wav_file = f'client-synth-{str(time.time()).replace(".", "-")}-post.wav'
sf.write(out_wav_file, full_wav_post, 24000)
log.info(f"TTS post-output saved: {out_wav_file} ({full_wav_post.shape[0]/24000:.2f}s audio)")
all_audio_post = []
# Optional: allow server to flush final messages if protocol supports it.
try:
async for message in ws:
if isinstance(message, str):
data = json.loads(message)
if data.get("type") == "status" and data.get("status") == "stopped":
break
except Exception:
pass
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Streaming speech translation client (Python CLI)")
parser.add_argument(
"--uri",
default="ws://localhost:8765",
help="WebSocket server URI (default: ws://localhost:8765)",
)
parser.add_argument(
"--sample-rate",
type=int,
default=16000,
help="Microphone sample rate in Hz (default: 16000)",
)
parser.add_argument(
"--chunk-ms",
type=int,
default=10,
help="Audio chunk duration in milliseconds (default: 10 ms)",
)
args = parser.parse_args()
try:
asyncio.run(main(args.uri, args.sample_rate, args.chunk_ms))
except KeyboardInterrupt:
log.info("Client stopped.")