Demo-Voice-Agent-Service / src /stt /deepgram_client.py
ishaq101's picture
[KM-501, KM-498, KM-499] Web Socket, STS and TTS
226ff5d
import asyncio
import logging
import threading
from typing import Callable, Awaitable
from deepgram import DeepgramClient
from deepgram.core.events import EventType
from src.config import DEEPGRAM_API_KEY, SAMPLE_RATE, DEEPGRAM_LANGUAGE, DEEPGRAM_ENDPOINTING_MS, DEEPGRAM_UTTERANCE_END_MS
logger = logging.getLogger(__name__)
OnTranscriptCallback = Callable[[str], Awaitable[None]]
class DeepgramStreamer:
"""
Wraps Deepgram real-time streaming STT.
Audio chunks (PCM 16kHz 16-bit mono bytes) are fed via `send_audio()`.
Final transcript segments are buffered. A flush timer (DEEPGRAM_UTTERANCE_END_MS)
is reset on each new segment. When the timer fires, all buffered segments are
joined and emitted as one complete utterance.
"""
def __init__(self, on_final_transcript: OnTranscriptCallback, loop: asyncio.AbstractEventLoop):
self._on_final_transcript = on_final_transcript
self._loop = loop
self._connection = None
self._cm = None
self._ready = threading.Event()
self._transcript_buffer: list[str] = []
self._buffer_lock = threading.Lock()
self._flush_handle: asyncio.TimerHandle | None = None
def start(self) -> None:
client = DeepgramClient(api_key=DEEPGRAM_API_KEY)
connect_kwargs: dict = dict(
model="nova-2",
encoding="linear16",
sample_rate=SAMPLE_RATE,
endpointing=DEEPGRAM_ENDPOINTING_MS,
)
if DEEPGRAM_LANGUAGE:
connect_kwargs["language"] = DEEPGRAM_LANGUAGE
self._cm = client.listen.v1.connect(**connect_kwargs)
self._connection = self._cm.__enter__()
self._connection.on(EventType.OPEN, self._on_open)
self._connection.on(EventType.MESSAGE, self._on_message)
self._connection.on(EventType.ERROR, self._on_error)
# start_listening() is blocking — run in background thread
thread = threading.Thread(target=self._connection.start_listening, daemon=True)
thread.start()
# Wait until connection is open before returning
self._ready.wait(timeout=5)
logger.info("Deepgram STT connected")
def _on_open(self, _result) -> None:
self._ready.set()
def _on_message(self, result) -> None:
try:
transcript = result.channel.alternatives[0].transcript.strip()
except (AttributeError, IndexError):
return
if not transcript or not result.is_final:
return
with self._buffer_lock:
self._transcript_buffer.append(transcript)
logger.debug("Buffered segment: %s", transcript)
# Reset flush timer on the event loop thread
self._loop.call_soon_threadsafe(self._reset_flush_timer)
def _reset_flush_timer(self) -> None:
if self._flush_handle is not None:
self._flush_handle.cancel()
delay = DEEPGRAM_UTTERANCE_END_MS / 1000.0
self._flush_handle = self._loop.call_later(delay, self._flush_buffer)
def _flush_buffer(self) -> None:
with self._buffer_lock:
if not self._transcript_buffer:
return
full_text = " ".join(self._transcript_buffer)
self._transcript_buffer.clear()
self._flush_handle = None
logger.info("Final transcript: %s", full_text)
asyncio.ensure_future(self._on_final_transcript(full_text), loop=self._loop)
def _on_error(self, error) -> None:
logger.error("Deepgram error: %s", error)
def send_audio(self, chunk: bytes) -> None:
if self._connection:
self._connection.send_media(chunk)
def stop(self) -> None:
if self._cm:
try:
self._cm.__exit__(None, None, None)
except Exception:
pass
self._connection = None
self._cm = None
logger.info("Deepgram STT closed")