maya-voice-agent / src /vad_engine.py
rudyByte
fix: implement debounced barge-in check, greeting isolation, and remove aggressive dynamic VAD fast-tracking to eliminate premature filler interruptions and cut-off greetings
61cd1af
"""
vad_engine.py β€” Neural Voice Activity Detection for Maya
Replaces the old RMS energy-based VAD (SPEECH_THRESHOLD = 400)
with Silero VAD, a 1MB neural model that:
- Processes one audio chunk in under 1ms on CPU
- Was trained on 6000+ languages including Hindi and Gujarati
- Handles background noise, phone audio quality, and soft speech
- Zero false positives from fans, AC units, or background TV
- MIT licensed, no API key needed, runs fully offline
Architecture:
Each incoming Twilio audio chunk (8kHz Β΅-law) passes through:
1. Audio format conversion: Β΅-law 8kHz β†’ PCM 16kHz float32
2. Silero VAD inference: returns speech probability 0.0–1.0
3. Hangover logic: keeps speech "open" for language-appropriate
silence duration before declaring end-of-turn
4. Pre-roll buffer: keeps 200ms before speech onset so first
syllable is never clipped (preserves existing Maya behavior)
Language-aware silence thresholds (research-backed):
Gujarati: 1000ms β€” Gujarati speakers pause longer between phrases
Hindi: 800ms β€” Standard South Asian speech rhythm
English: 600ms β€” Standard conversational English pause
Usage:
vad = SileroVADEngine()
result = vad.process_chunk(raw_mulaw_bytes, current_language)
if result.end_of_turn:
# User has finished speaking β€” send to STT
audio_for_stt = result.speech_audio
"""
import numpy as np
import audioop
from dataclasses import dataclass, field
from typing import Optional
from silero_vad import load_silero_vad
import torch
# ── LANGUAGE-AWARE SILENCE DURATIONS ────────────────────────────────────────
# Values in milliseconds. Based on cross-linguistic speech research showing
# Gujarati and Hindi speakers have longer natural inter-phrase pauses
# compared to English. Using a single threshold cuts them off mid-sentence.
SILENCE_DURATION_MS = {
"gujarati": 1000, # 1.0s β€” Gujarati has the longest natural pauses
"hindi": 800, # 0.8s β€” Hindi, slightly faster rhythm than Gujarati
"english": 600, # 0.6s β€” Standard conversational English
"default": 800, # fallback
}
# VAD inference threshold: 0.5 is standard. Raising to 0.75 for Maya
# to prevent false-positives from phone line noise/echo.
SPEECH_PROBABILITY_THRESHOLD = 0.75
# Audio sample rate constants
INPUT_SAMPLE_RATE = 8000 # Twilio sends 8kHz Β΅-law (G.711 PCMU)
TARGET_SAMPLE_RATE = 16000 # Silero VAD and Whisper both work at 16kHz
CHUNK_DURATION_MS = 32 # Process audio in 32ms chunks (Silero requirement: min 512 samples)
CHUNK_SIZE_SAMPLES = 512 # 16000 * 0.032 = 512
# Pre-roll buffer: keep 200ms before speech onset
# Ensures first syllable is never clipped (existing Maya behavior preserved)
PRE_ROLL_MS = 200
PRE_ROLL_SAMPLES = int(TARGET_SAMPLE_RATE * PRE_ROLL_MS / 1000) # 3200 samples
# Safety cap: max audio before force-flush (preserves existing 15s cap)
MAX_SPEECH_DURATION_MS = 15000
MAX_SPEECH_SAMPLES = int(TARGET_SAMPLE_RATE * MAX_SPEECH_DURATION_MS / 1000)
@dataclass
class VADResult:
"""Result from processing one audio chunk through Silero VAD."""
is_speech: bool = False # Is this chunk classified as speech?
end_of_turn: bool = False # Has the user finished their turn?
speech_audio: bytes = b"" # Complete speech audio ready for STT (PCM 16kHz)
speech_prob: float = 0.0 # Raw Silero probability (useful for debugging)
class SileroVADEngine:
"""
Drop-in replacement for Maya's RMS energy VAD.
The Silero model is loaded ONCE as a class-level singleton and reused
across all sessions. Each instance maintains its own per-call state
(speech buffer, silence counter, pre-roll buffer) so concurrent calls
don't interfere with each other.
"""
# ── Class-level singleton so the model is loaded only once ───────────────
_shared_model = None
@classmethod
def _ensure_model_loaded(cls):
if cls._shared_model is None:
print("[VAD] Loading Silero VAD model...")
cls._shared_model = load_silero_vad()
cls._shared_model.eval()
torch.set_num_threads(1)
print("[VAD] βœ… Silero VAD loaded β€” 1ms per chunk on CPU")
def __init__(self):
SileroVADEngine._ensure_model_loaded()
self.model = SileroVADEngine._shared_model
# Per-call state β€” reset on each new call via reset()
self._reset_state()
def _reset_state(self):
"""Reset all buffers. Call at the start of each new phone call."""
self._pre_roll_buffer: list = [] # circular buffer for pre-roll
self._speech_buffer: list = [] # accumulating speech audio
self._silence_frames: int = 0 # consecutive silent frames count
self._in_speech: bool = False
self._audio_remainder: bytes = b"" # leftover bytes < one chunk size
def reset(self):
"""Public reset β€” call at start of each new phone call."""
self._reset_state()
# Also reset Silero's internal state machine
self.model.reset_states()
print("[VAD] State reset for new call")
def _convert_mulaw_to_pcm16k(self, mulaw_bytes: bytes) -> np.ndarray:
"""
Convert Twilio's G.711 Β΅-law 8kHz audio to 16kHz float32 PCM.
This is the exact format Silero VAD and Whisper expect.
Steps:
1. Β΅-law β†’ 16-bit linear PCM (audioop.ulaw2lin)
2. 8kHz β†’ 16kHz resample (audioop.ratecv)
3. int16 numpy array β†’ float32 normalized -1.0 to 1.0
"""
# Step 1: Β΅-law β†’ 16-bit PCM at 8kHz
pcm_8k = audioop.ulaw2lin(mulaw_bytes, 2)
# Step 2: Resample 8kHz β†’ 16kHz
pcm_16k, _ = audioop.ratecv(pcm_8k, 2, 1, INPUT_SAMPLE_RATE,
TARGET_SAMPLE_RATE, None)
# Step 3: Convert to float32 numpy array (Silero's required format)
audio_int16 = np.frombuffer(pcm_16k, dtype=np.int16)
audio_float32 = audio_int16.astype(np.float32) / 32768.0
return audio_float32
def _get_silence_frames_threshold(self, language: str) -> int:
"""
Returns how many consecutive silent 30ms frames = end of turn.
Language-aware: Gujarati gets more time before being cut off.
"""
silence_ms = SILENCE_DURATION_MS.get(language,
SILENCE_DURATION_MS["default"])
return silence_ms // CHUNK_DURATION_MS # e.g. 1000ms / 30ms = 33 frames
def process_chunk(self, mulaw_bytes: bytes, language: str = "gujarati") -> VADResult:
"""
Process one Twilio audio chunk through Silero VAD.
Args:
mulaw_bytes: Raw G.711 Β΅-law bytes from Twilio 'media' event
language: Current detected language of the call session
Returns:
VADResult β€” check .end_of_turn to know when to send to STT
"""
result = VADResult()
# Convert incoming audio to Silero's required format
audio_float32 = self._convert_mulaw_to_pcm16k(mulaw_bytes)
# Combine with any leftover bytes from previous chunk
combined = np.concatenate(
[np.frombuffer(self._audio_remainder, dtype=np.float32), audio_float32]
) if self._audio_remainder else audio_float32
# Process in CHUNK_SIZE_SAMPLES (480 sample) windows
idx = 0
while idx + CHUNK_SIZE_SAMPLES <= len(combined):
chunk = combined[idx : idx + CHUNK_SIZE_SAMPLES]
idx += CHUNK_SIZE_SAMPLES
# Run Silero VAD inference β€” takes ~0.5ms on CPU
chunk_tensor = torch.FloatTensor(chunk)
speech_prob = self.model(chunk_tensor,
TARGET_SAMPLE_RATE).item()
is_speech = speech_prob >= SPEECH_PROBABILITY_THRESHOLD
result.speech_prob = speech_prob
result.is_speech = is_speech
# ── STATE MACHINE ───────────────────────────────────────────────
if not self._in_speech:
# Update pre-roll circular buffer (always running)
self._pre_roll_buffer.append(chunk)
if len(self._pre_roll_buffer) > PRE_ROLL_SAMPLES // CHUNK_SIZE_SAMPLES:
self._pre_roll_buffer.pop(0)
if is_speech:
# Speech onset detected β€” start accumulating
self._in_speech = True
self._silence_frames = 0
# Prepend pre-roll buffer so first syllable is intact
self._speech_buffer = list(self._pre_roll_buffer) + [chunk]
print(f"[VAD] Speech start detected (prob={speech_prob:.2f},"
f" lang={language})")
else:
# We are currently in speech
self._speech_buffer.append(chunk)
# Safety cap: force flush if speech too long
total_samples = len(self._speech_buffer) * CHUNK_SIZE_SAMPLES
if total_samples >= MAX_SPEECH_SAMPLES:
print("[VAD] Safety cap reached β€” force flushing")
result.end_of_turn = True
result.speech_audio = self._build_speech_bytes()
self._in_speech = False
self._speech_buffer = []
self._silence_frames = 0
break
if not is_speech:
self._silence_frames += 1
base_threshold = self._get_silence_frames_threshold(language)
# Respect the full, research-backed cross-linguistic base silence thresholds
# to prevent premature cutoffs when the user takes a natural pause or breath.
threshold = base_threshold
if self._silence_frames >= threshold:
# User has finished their turn
print(f"[VAD] End of turn detected after "
f"{self._silence_frames * CHUNK_DURATION_MS}ms silence"
f" (threshold={threshold * CHUNK_DURATION_MS}ms,"
f" lang={language})")
result.end_of_turn = True
result.speech_audio = self._build_speech_bytes()
self._in_speech = False
self._speech_buffer = []
self._silence_frames = 0
break
else:
# Speech resumed β€” reset silence counter
self._silence_frames = 0
# Store remaining bytes for next chunk
remaining_samples = len(combined) - idx
if remaining_samples > 0:
self._audio_remainder = combined[idx:].tobytes()
else:
self._audio_remainder = b""
return result
def _build_speech_bytes(self) -> bytes:
"""
Concatenates all buffered speech chunks into a single bytes object
in 16kHz float32 PCM format, ready for Whisper transcription.
"""
if not self._speech_buffer:
return b""
combined = np.concatenate(self._speech_buffer)
return combined.tobytes()
def get_debug_stats(self) -> dict:
"""Returns current VAD state for logging and debugging."""
return {
"in_speech": self._in_speech,
"silence_frames": self._silence_frames,
"speech_buf_secs": len(self._speech_buffer) * CHUNK_DURATION_MS / 1000,
"pre_roll_frames": len(self._pre_roll_buffer),
}