streaming-speech-translation / ARCHITECTURE.md
pltobing's picture
Update README command & sample rate
23577ee

Architecture: Streaming Speech Translation Pipeline

Overview

End-to-end streaming speech translation pipeline:

Audio Input β†’ ASR β†’ NMT β†’ TTS β†’ Audio Output
  (PCM16)   (ONNX) (GGUF) (ONNX)  (PCM16)

Translates spoken English into spoken Russian in real-time with streaming output.

Pipeline Stages

1. ASR β€” Cache-Aware Streaming ASR (NeMo Conformer RNN-T)

Model: NVIDIA NeMo Conformer RNN-T, exported to ONNX.

Architecture: 3 internal threads connected by queue.Queue:

Audio β†’ [Preprocess Thread] β†’ [Encoder Thread] β†’ [Decoder Thread] β†’ text deltas
  • Preprocess: Buffers raw audio, extracts mel-spectrogram features, chunks them per CacheAwareStreamingConfig (chunk_size=[49,56], shift_size=[49,56])
  • Encoder: Runs ONNX encoder inference on feature chunks, maintains encoder cache state
  • Decoder: Runs RNN-T decoder (joint network), produces incremental text tokens

Key classes: CacheAwareStreamingAudioBuffer, CacheAwareStreamingASR, ASRModelPackage Wrapper: StreamingASR β€” exposes push_audio_chunk() / get_transcript_chunk() API

2. NMT β€” Streaming Segmented Translation (TranslateGemma)

Model: TranslateGemma 4B (GGUF, Q8_0) via llama-cpp-python.

Architecture: Single-threaded, three internal components:

text deltas β†’ [Segmenter] β†’ text segments β†’ [Translator] β†’ raw translations β†’ [Merger] β†’ display text
  • StreamingSegmenter: Batches ASR tokens into word-groups (max 5 words + 2 hold-back). Triggers on punctuation, pause (>700ms), or max-token boundaries (min 3 words)
  • StreamingTranslator: Multi-turn translation using init/continuation prompt templates with KV cache warming
  • StreamingTranslationMerger: Handles revision/append/continuation logic for incremental translations. Detects trailing ellipsis (incomplete), leading ellipsis (continuation), and word-level LCP revision

Wrapper: StreamingNMT β€” exposes push_text_chunk() / flush() / check_pause() API

3. TTS β€” Streaming XTTS v2 (ONNX)

Model: XTTSv2 with ONNX-exported GPT-2 AR model + HiFi-GAN vocoder.

Architecture: Sequential within a single call:

text β†’ [BPE Tokenizer] β†’ [GPT-2 AR Loop] β†’ mel latents β†’ [HiFi-GAN Vocoder] β†’ audio chunks
  • Speaker conditioning: One-time compute from reference audio β†’ gpt_cond_latent [1,32,1024] + speaker_embedding [1,512,1]
  • AR generation: GPT-2 autoregressive loop producing audio token latents. Accumulates stream_chunk_size (default 20) tokens before running vocoder
  • Vocoder: HiFi-GAN converts accumulated latents to waveform
  • Crossfade stitching: Linear fade-in/fade-out between consecutive vocoder output chunks for seamless playback

Output: 24kHz float32 audio chunks Wrapper: StreamingTTS β€” exposes synthesize_stream() generator API

Concurrency Model

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                     asyncio Event Loop                          β”‚
β”‚                                                                 β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚ WebSocket I/O β”‚    β”‚ asr_to_nmt   β”‚    β”‚ tts_synthesis    β”‚  β”‚
β”‚  β”‚   handler     β”‚    β”‚   loop       β”‚    β”‚    loop          β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜    β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β”‚         β”‚                   β”‚                      β”‚            β”‚
β”‚         β”‚ run_in_executor() β”‚ run_in_executor()    β”‚ run_in_ex… β”‚
β”‚         β–Ό                   β–Ό                      β–Ό            β”‚
β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚
β”‚  β”‚              ThreadPoolExecutor (4 workers)               β”‚  β”‚
β”‚  β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”‚  β”‚
β”‚  β”‚  β”‚ ASR internal  β”‚ β”‚ NMT blocking β”‚ β”‚ TTS blocking     β”‚  β”‚  β”‚
β”‚  β”‚  β”‚ threads (3)   β”‚ β”‚ llama-cpp    β”‚ β”‚ ONNX inference   β”‚  β”‚  β”‚
β”‚  β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚  β”‚
β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜  β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
  • asyncio loop: Handles WebSocket I/O and coordinates pipeline stages
  • ASR threads: 3 dedicated daemon threads (preprocess, encoder, decoder)
  • NMT: Blocking llama-cpp inference bridged via run_in_executor()
  • TTS: Blocking ONNX inference bridged via run_in_executor()
  • Concurrency: All per-session state (including NMT turn context: prev_translation, prev_query) is held in per-session wrapper objects. The shared model weights (ASRModelPackage, StreamingTranslator, StreamingTTSPipeline) hold no mutable per-session state after initialization, making concurrent sessions safe.

Data Flow & Queues

WebSocket binary β†’ push_audio()
                      β”‚
                      β–Ό
              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
              β”‚  audio_queue  β”‚ (maxsize=256, queue.Queue)
              β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
                     β–Ό
            [ASR Internal Threads]
                     β”‚
                     β–Ό
              β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
              β”‚ output_queue  β”‚ (maxsize=64, queue.Queue)
              β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
                     β–Ό
         [asr_to_nmt_loop via executor]
                     β”‚
              β”Œβ”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”
              β–Ό              β–Ό
     transcript_queue   tts_text_queue (maxsize=16, asyncio.Queue)
     (β†’ WebSocket)           β”‚
                             β–Ό
              [tts_synthesis_loop via executor]
                             β”‚
                             β–Ό
                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β”‚audio_out_queueβ”‚ (maxsize=32, asyncio.Queue)
                    β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
                           β–Ό
                   WebSocket binary

Backpressure Strategy

  • Audio input: put_nowait with drop-on-full (acceptable to lose frames vs. building latency)
  • ASRβ†’NMT: put_nowait with drop warning on encoder/decoder output queues
  • NMTβ†’TTS: put_nowait with drop warning (translations can be reconstructed from next segment)
  • TTSβ†’Output: put_nowait with drop warning per audio chunk
  • All queue sizes are configurable via PipelineConfig

Model Loading & Session Lifecycle

Startup: Models loaded ONCE in TranslationServer._load_models():

  • ASR ONNX sessions (ASRModelPackage)
  • NMT GGUF model (StreamingTranslator) + KV cache warmup
  • TTS ONNX sessions (StreamingTTSPipeline)

Per-session: Each WebSocket connection creates:

  • StreamingASR β€” own audio buffers, streaming state, thread pool
  • StreamingNMT β€” own segmenter, merger, and translation context (prev_translation, prev_query); shares model weights only
  • StreamingTTS β€” own speaker conditioning; shares ONNX sessions

Cleanup: On disconnect, orchestrator flushes remaining NMT text through TTS, then stops all threads and resets state.

WebSocket Protocol

Direction Type Format Description
Client→ Binary PCM16 Raw audio at declared sample rate
Client→ Text JSON {"action": "start", "sample_rate": 16000}
Client→ Text JSON {"action": "stop"}
β†’Client Binary PCM16 Synthesized audio at 24kHz
β†’Client Text JSON {"type": "transcript", "text": "..."}
β†’Client Text JSON {"type": "translation", "text": "..."}
β†’Client Text JSON {"type": "status", "status": "started"}

Configuration

All tunables are in PipelineConfig (dataclass) and exposed as CLI args:

Parameter Default Description
asr_chunk_duration_ms 10 Audio chunk duration for ASR
nmt_n_threads 4 CPU threads for llama-cpp
tts_stream_chunk_size 20 AR tokens per vocoder chunk
audio_queue_maxsize 256 Audio input queue bound
tts_queue_maxsize 16 NMT→TTS text queue bound
audio_out_queue_maxsize 32 TTS→output audio queue bound

File Structure

src/
β”œβ”€β”€ asr/
β”‚   β”œβ”€β”€ streaming_asr.py          # StreamingASR wrapper
β”‚   β”œβ”€β”€ pipeline.py               # ThreadedSpeechTranslator (reference)
β”‚   β”œβ”€β”€ cache_aware_modules.py    # Audio buffer + streaming ASR
β”‚   β”œβ”€β”€ modules.py                # ONNX model loading
β”‚   └── utils.py                  # Audio utilities
β”œβ”€β”€ nmt/
β”‚   β”œβ”€β”€ streaming_nmt.py          # StreamingNMT wrapper
β”‚   β”œβ”€β”€ streaming_segmenter.py    # Word-group segmentation
β”‚   β”œβ”€β”€ streaming_translation_merger.py  # Translation merging
β”‚   └── translator_module.py      # TranslateGemma via llama-cpp
β”œβ”€β”€ tts/
β”‚   β”œβ”€β”€ streaming_tts.py          # StreamingTTS wrapper
β”‚   β”œβ”€β”€ xtts_streaming_pipeline.py  # Full TTS pipeline
β”‚   β”œβ”€β”€ xtts_onnx_orchestrator.py   # GPT-2 AR + vocoder
β”‚   β”œβ”€β”€ xtts_tokenizer.py          # BPE tokenizer
β”‚   └── zh_num2words.py            # Chinese text normalization
β”œβ”€β”€ pipeline/
β”‚   β”œβ”€β”€ orchestrator.py           # PipelineOrchestrator
β”‚   └── config.py                 # PipelineConfig
└── server/
    └── websocket_server.py       # WebSocket server