Spaces:
Sleeping
Sleeping
| import asyncio | |
| import logging | |
| import threading | |
| from typing import Callable, Awaitable | |
| from deepgram import DeepgramClient | |
| from deepgram.core.events import EventType | |
| from src.config import DEEPGRAM_API_KEY, SAMPLE_RATE, DEEPGRAM_LANGUAGE, DEEPGRAM_ENDPOINTING_MS, DEEPGRAM_UTTERANCE_END_MS | |
| logger = logging.getLogger(__name__) | |
| OnTranscriptCallback = Callable[[str], Awaitable[None]] | |
| class DeepgramStreamer: | |
| """ | |
| Wraps Deepgram real-time streaming STT. | |
| Audio chunks (PCM 16kHz 16-bit mono bytes) are fed via `send_audio()`. | |
| Final transcript segments are buffered. A flush timer (DEEPGRAM_UTTERANCE_END_MS) | |
| is reset on each new segment. When the timer fires, all buffered segments are | |
| joined and emitted as one complete utterance. | |
| """ | |
| def __init__(self, on_final_transcript: OnTranscriptCallback, loop: asyncio.AbstractEventLoop): | |
| self._on_final_transcript = on_final_transcript | |
| self._loop = loop | |
| self._connection = None | |
| self._cm = None | |
| self._ready = threading.Event() | |
| self._transcript_buffer: list[str] = [] | |
| self._buffer_lock = threading.Lock() | |
| self._flush_handle: asyncio.TimerHandle | None = None | |
| def start(self) -> None: | |
| client = DeepgramClient(api_key=DEEPGRAM_API_KEY) | |
| connect_kwargs: dict = dict( | |
| model="nova-2", | |
| encoding="linear16", | |
| sample_rate=SAMPLE_RATE, | |
| endpointing=DEEPGRAM_ENDPOINTING_MS, | |
| ) | |
| if DEEPGRAM_LANGUAGE: | |
| connect_kwargs["language"] = DEEPGRAM_LANGUAGE | |
| self._cm = client.listen.v1.connect(**connect_kwargs) | |
| self._connection = self._cm.__enter__() | |
| self._connection.on(EventType.OPEN, self._on_open) | |
| self._connection.on(EventType.MESSAGE, self._on_message) | |
| self._connection.on(EventType.ERROR, self._on_error) | |
| # start_listening() is blocking — run in background thread | |
| thread = threading.Thread(target=self._connection.start_listening, daemon=True) | |
| thread.start() | |
| # Wait until connection is open before returning | |
| self._ready.wait(timeout=5) | |
| logger.info("Deepgram STT connected") | |
| def _on_open(self, _result) -> None: | |
| self._ready.set() | |
| def _on_message(self, result) -> None: | |
| try: | |
| transcript = result.channel.alternatives[0].transcript.strip() | |
| except (AttributeError, IndexError): | |
| return | |
| if not transcript or not result.is_final: | |
| return | |
| with self._buffer_lock: | |
| self._transcript_buffer.append(transcript) | |
| logger.debug("Buffered segment: %s", transcript) | |
| # Reset flush timer on the event loop thread | |
| self._loop.call_soon_threadsafe(self._reset_flush_timer) | |
| def _reset_flush_timer(self) -> None: | |
| if self._flush_handle is not None: | |
| self._flush_handle.cancel() | |
| delay = DEEPGRAM_UTTERANCE_END_MS / 1000.0 | |
| self._flush_handle = self._loop.call_later(delay, self._flush_buffer) | |
| def _flush_buffer(self) -> None: | |
| with self._buffer_lock: | |
| if not self._transcript_buffer: | |
| return | |
| full_text = " ".join(self._transcript_buffer) | |
| self._transcript_buffer.clear() | |
| self._flush_handle = None | |
| logger.info("Final transcript: %s", full_text) | |
| asyncio.ensure_future(self._on_final_transcript(full_text), loop=self._loop) | |
| def _on_error(self, error) -> None: | |
| logger.error("Deepgram error: %s", error) | |
| def send_audio(self, chunk: bytes) -> None: | |
| if self._connection: | |
| self._connection.send_media(chunk) | |
| def stop(self) -> None: | |
| if self._cm: | |
| try: | |
| self._cm.__exit__(None, None, None) | |
| except Exception: | |
| pass | |
| self._connection = None | |
| self._cm = None | |
| logger.info("Deepgram STT closed") | |