File size: 3,949 Bytes
226ff5d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import asyncio
import logging
import threading
from typing import Callable, Awaitable

from deepgram import DeepgramClient
from deepgram.core.events import EventType
from src.config import DEEPGRAM_API_KEY, SAMPLE_RATE, DEEPGRAM_LANGUAGE, DEEPGRAM_ENDPOINTING_MS, DEEPGRAM_UTTERANCE_END_MS

logger = logging.getLogger(__name__)

OnTranscriptCallback = Callable[[str], Awaitable[None]]


class DeepgramStreamer:
    """
    Wraps Deepgram real-time streaming STT.
    Audio chunks (PCM 16kHz 16-bit mono bytes) are fed via `send_audio()`.

    Final transcript segments are buffered. A flush timer (DEEPGRAM_UTTERANCE_END_MS)
    is reset on each new segment. When the timer fires, all buffered segments are
    joined and emitted as one complete utterance.
    """

    def __init__(self, on_final_transcript: OnTranscriptCallback, loop: asyncio.AbstractEventLoop):
        self._on_final_transcript = on_final_transcript
        self._loop = loop
        self._connection = None
        self._cm = None
        self._ready = threading.Event()
        self._transcript_buffer: list[str] = []
        self._buffer_lock = threading.Lock()
        self._flush_handle: asyncio.TimerHandle | None = None

    def start(self) -> None:
        client = DeepgramClient(api_key=DEEPGRAM_API_KEY)

        connect_kwargs: dict = dict(
            model="nova-2",
            encoding="linear16",
            sample_rate=SAMPLE_RATE,
            endpointing=DEEPGRAM_ENDPOINTING_MS,
        )
        if DEEPGRAM_LANGUAGE:
            connect_kwargs["language"] = DEEPGRAM_LANGUAGE

        self._cm = client.listen.v1.connect(**connect_kwargs)
        self._connection = self._cm.__enter__()

        self._connection.on(EventType.OPEN, self._on_open)
        self._connection.on(EventType.MESSAGE, self._on_message)
        self._connection.on(EventType.ERROR, self._on_error)

        # start_listening() is blocking — run in background thread
        thread = threading.Thread(target=self._connection.start_listening, daemon=True)
        thread.start()

        # Wait until connection is open before returning
        self._ready.wait(timeout=5)
        logger.info("Deepgram STT connected")

    def _on_open(self, _result) -> None:
        self._ready.set()

    def _on_message(self, result) -> None:
        try:
            transcript = result.channel.alternatives[0].transcript.strip()
        except (AttributeError, IndexError):
            return
        if not transcript or not result.is_final:
            return
        with self._buffer_lock:
            self._transcript_buffer.append(transcript)
        logger.debug("Buffered segment: %s", transcript)
        # Reset flush timer on the event loop thread
        self._loop.call_soon_threadsafe(self._reset_flush_timer)

    def _reset_flush_timer(self) -> None:
        if self._flush_handle is not None:
            self._flush_handle.cancel()
        delay = DEEPGRAM_UTTERANCE_END_MS / 1000.0
        self._flush_handle = self._loop.call_later(delay, self._flush_buffer)

    def _flush_buffer(self) -> None:
        with self._buffer_lock:
            if not self._transcript_buffer:
                return
            full_text = " ".join(self._transcript_buffer)
            self._transcript_buffer.clear()
        self._flush_handle = None
        logger.info("Final transcript: %s", full_text)
        asyncio.ensure_future(self._on_final_transcript(full_text), loop=self._loop)

    def _on_error(self, error) -> None:
        logger.error("Deepgram error: %s", error)

    def send_audio(self, chunk: bytes) -> None:
        if self._connection:
            self._connection.send_media(chunk)

    def stop(self) -> None:
        if self._cm:
            try:
                self._cm.__exit__(None, None, None)
            except Exception:
                pass
            self._connection = None
            self._cm = None
            logger.info("Deepgram STT closed")