import asyncio import logging from typing import Callable, Awaitable import assemblyai as aai from src.config import ASSEMBLYAI_API_KEY, SAMPLE_RATE logger = logging.getLogger(__name__) OnTranscriptCallback = Callable[[str], Awaitable[None]] class AssemblyAIStreamer: """ Wraps AssemblyAI real-time streaming STT. Audio chunks (PCM 16kHz 16-bit mono bytes) are fed via `send_audio()`. When a final transcript arrives, `on_final_transcript` callback is awaited. """ def __init__(self, on_final_transcript: OnTranscriptCallback, loop: asyncio.AbstractEventLoop): self._on_final_transcript = on_final_transcript self._loop = loop self._transcriber: aai.RealtimeTranscriber | None = None def start(self) -> None: aai.settings.api_key = ASSEMBLYAI_API_KEY self._transcriber = aai.RealtimeTranscriber( sample_rate=SAMPLE_RATE, on_data=self._on_data, on_error=self._on_error, ) self._transcriber.connect() logger.info("AssemblyAI STT connected") def _on_data(self, transcript: aai.RealtimeTranscript) -> None: if not isinstance(transcript, aai.RealtimeFinalTranscript): return text = transcript.text.strip() if not text: return logger.info("Final transcript: %s", text) asyncio.run_coroutine_threadsafe(self._on_final_transcript(text), self._loop) def _on_error(self, error: aai.RealtimeError) -> None: logger.error("AssemblyAI error: %s", error) def send_audio(self, chunk: bytes) -> None: if self._transcriber: self._transcriber.stream(chunk) def stop(self) -> None: if self._transcriber: self._transcriber.close() self._transcriber = None logger.info("AssemblyAI STT closed")