personabot-api / app /services /transcriber.py
GitHub Actions
Deploy 8e14626
8da917e
import asyncio
import re
import httpx
from groq import AsyncGroq
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed
from app.core.exceptions import GenerationError
_FILLER_PREFIX_RE = re.compile(r"^\s*(uh+|um+|erm+|like|you know|please|hey)\s+", re.IGNORECASE)
_MULTISPACE_RE = re.compile(r"\s+")
_TRANSCRIPT_REPLACEMENTS: tuple[tuple[re.Pattern[str], str], ...] = (
(re.compile(r"\bwalk experience\b", re.IGNORECASE), "work experience"),
(re.compile(r"\btext stack\b", re.IGNORECASE), "tech stack"),
(re.compile(r"\bprofessional sitting\b", re.IGNORECASE), "professional setting"),
(re.compile(r"\btech stocks\b", re.IGNORECASE), "tech stack"),
(re.compile(r"\bwhat tech stack does he\s+used\b", re.IGNORECASE), "what tech stack does he use"),
)
def _normalise_transcript_text(text: str) -> str:
cleaned = text.strip()
cleaned = _FILLER_PREFIX_RE.sub("", cleaned)
for pattern, replacement in _TRANSCRIPT_REPLACEMENTS:
cleaned = pattern.sub(replacement, cleaned)
cleaned = _MULTISPACE_RE.sub(" ", cleaned)
return cleaned.strip()
class GroqTranscriber:
def __init__(
self,
api_key: str,
model: str,
timeout_seconds: float,
) -> None:
self._client = AsyncGroq(api_key=api_key) if api_key else None
self._model = model
self._timeout_seconds = timeout_seconds
@property
def is_configured(self) -> bool:
return self._client is not None
@retry(
stop=stop_after_attempt(2),
wait=wait_fixed(0.8),
retry=retry_if_exception_type((httpx.RequestError, httpx.TimeoutException)),
)
async def transcribe(
self,
filename: str,
content_type: str,
audio_bytes: bytes,
language: str | None = None,
) -> str:
if not self._client:
raise GenerationError("Transcriber is not configured with GROQ_API_KEY")
async def _call() -> str:
response = await self._client.audio.transcriptions.create(
file=(filename, audio_bytes, content_type),
model=self._model,
temperature=0,
language=language,
)
text = getattr(response, "text", None)
if isinstance(text, str) and text.strip():
return _normalise_transcript_text(text)
if isinstance(response, dict):
value = response.get("text")
if isinstance(value, str) and value.strip():
return _normalise_transcript_text(value)
raise GenerationError("Transcription response did not contain text")
try:
return await asyncio.wait_for(_call(), timeout=self._timeout_seconds)
except TimeoutError as exc:
raise GenerationError("Transcription timed out") from exc
except GenerationError:
raise
except Exception as exc:
raise GenerationError("Transcription failed", context={"error": str(exc)}) from exc