Demo-Voice-Agent-Service / src /stt /assemblyai_client.py
ishaq101's picture
[KM-501, KM-498, KM-499] Web Socket, STS and TTS
226ff5d
import asyncio
import logging
from typing import Callable, Awaitable
import assemblyai as aai
from src.config import ASSEMBLYAI_API_KEY, SAMPLE_RATE
logger = logging.getLogger(__name__)
OnTranscriptCallback = Callable[[str], Awaitable[None]]
class AssemblyAIStreamer:
"""
Wraps AssemblyAI real-time streaming STT.
Audio chunks (PCM 16kHz 16-bit mono bytes) are fed via `send_audio()`.
When a final transcript arrives, `on_final_transcript` callback is awaited.
"""
def __init__(self, on_final_transcript: OnTranscriptCallback, loop: asyncio.AbstractEventLoop):
self._on_final_transcript = on_final_transcript
self._loop = loop
self._transcriber: aai.RealtimeTranscriber | None = None
def start(self) -> None:
aai.settings.api_key = ASSEMBLYAI_API_KEY
self._transcriber = aai.RealtimeTranscriber(
sample_rate=SAMPLE_RATE,
on_data=self._on_data,
on_error=self._on_error,
)
self._transcriber.connect()
logger.info("AssemblyAI STT connected")
def _on_data(self, transcript: aai.RealtimeTranscript) -> None:
if not isinstance(transcript, aai.RealtimeFinalTranscript):
return
text = transcript.text.strip()
if not text:
return
logger.info("Final transcript: %s", text)
asyncio.run_coroutine_threadsafe(self._on_final_transcript(text), self._loop)
def _on_error(self, error: aai.RealtimeError) -> None:
logger.error("AssemblyAI error: %s", error)
def send_audio(self, chunk: bytes) -> None:
if self._transcriber:
self._transcriber.stream(chunk)
def stop(self) -> None:
if self._transcriber:
self._transcriber.close()
self._transcriber = None
logger.info("AssemblyAI STT closed")