import asyncio import logging import threading from typing import Callable, Awaitable from deepgram import DeepgramClient from deepgram.core.events import EventType from src.config import DEEPGRAM_API_KEY, SAMPLE_RATE, DEEPGRAM_LANGUAGE, DEEPGRAM_ENDPOINTING_MS, DEEPGRAM_UTTERANCE_END_MS logger = logging.getLogger(__name__) OnTranscriptCallback = Callable[[str], Awaitable[None]] class DeepgramStreamer: """ Wraps Deepgram real-time streaming STT. Audio chunks (PCM 16kHz 16-bit mono bytes) are fed via `send_audio()`. Final transcript segments are buffered. A flush timer (DEEPGRAM_UTTERANCE_END_MS) is reset on each new segment. When the timer fires, all buffered segments are joined and emitted as one complete utterance. """ def __init__(self, on_final_transcript: OnTranscriptCallback, loop: asyncio.AbstractEventLoop): self._on_final_transcript = on_final_transcript self._loop = loop self._connection = None self._cm = None self._ready = threading.Event() self._transcript_buffer: list[str] = [] self._buffer_lock = threading.Lock() self._flush_handle: asyncio.TimerHandle | None = None def start(self) -> None: client = DeepgramClient(api_key=DEEPGRAM_API_KEY) connect_kwargs: dict = dict( model="nova-2", encoding="linear16", sample_rate=SAMPLE_RATE, endpointing=DEEPGRAM_ENDPOINTING_MS, ) if DEEPGRAM_LANGUAGE: connect_kwargs["language"] = DEEPGRAM_LANGUAGE self._cm = client.listen.v1.connect(**connect_kwargs) self._connection = self._cm.__enter__() self._connection.on(EventType.OPEN, self._on_open) self._connection.on(EventType.MESSAGE, self._on_message) self._connection.on(EventType.ERROR, self._on_error) # start_listening() is blocking — run in background thread thread = threading.Thread(target=self._connection.start_listening, daemon=True) thread.start() # Wait until connection is open before returning self._ready.wait(timeout=5) logger.info("Deepgram STT connected") def _on_open(self, _result) -> None: self._ready.set() def _on_message(self, result) -> None: try: transcript = result.channel.alternatives[0].transcript.strip() except (AttributeError, IndexError): return if not transcript or not result.is_final: return with self._buffer_lock: self._transcript_buffer.append(transcript) logger.debug("Buffered segment: %s", transcript) # Reset flush timer on the event loop thread self._loop.call_soon_threadsafe(self._reset_flush_timer) def _reset_flush_timer(self) -> None: if self._flush_handle is not None: self._flush_handle.cancel() delay = DEEPGRAM_UTTERANCE_END_MS / 1000.0 self._flush_handle = self._loop.call_later(delay, self._flush_buffer) def _flush_buffer(self) -> None: with self._buffer_lock: if not self._transcript_buffer: return full_text = " ".join(self._transcript_buffer) self._transcript_buffer.clear() self._flush_handle = None logger.info("Final transcript: %s", full_text) asyncio.ensure_future(self._on_final_transcript(full_text), loop=self._loop) def _on_error(self, error) -> None: logger.error("Deepgram error: %s", error) def send_audio(self, chunk: bytes) -> None: if self._connection: self._connection.send_media(chunk) def stop(self) -> None: if self._cm: try: self._cm.__exit__(None, None, None) except Exception: pass self._connection = None self._cm = None logger.info("Deepgram STT closed")