import asyncio import re import httpx from groq import AsyncGroq from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed from app.core.exceptions import GenerationError _FILLER_PREFIX_RE = re.compile(r"^\s*(uh+|um+|erm+|like|you know|please|hey)\s+", re.IGNORECASE) _MULTISPACE_RE = re.compile(r"\s+") _TRANSCRIPT_REPLACEMENTS: tuple[tuple[re.Pattern[str], str], ...] = ( (re.compile(r"\bwalk experience\b", re.IGNORECASE), "work experience"), (re.compile(r"\btext stack\b", re.IGNORECASE), "tech stack"), (re.compile(r"\bprofessional sitting\b", re.IGNORECASE), "professional setting"), (re.compile(r"\btech stocks\b", re.IGNORECASE), "tech stack"), (re.compile(r"\bwhat tech stack does he\s+used\b", re.IGNORECASE), "what tech stack does he use"), ) def _normalise_transcript_text(text: str) -> str: cleaned = text.strip() cleaned = _FILLER_PREFIX_RE.sub("", cleaned) for pattern, replacement in _TRANSCRIPT_REPLACEMENTS: cleaned = pattern.sub(replacement, cleaned) cleaned = _MULTISPACE_RE.sub(" ", cleaned) return cleaned.strip() class GroqTranscriber: def __init__( self, api_key: str, model: str, timeout_seconds: float, ) -> None: self._client = AsyncGroq(api_key=api_key) if api_key else None self._model = model self._timeout_seconds = timeout_seconds @property def is_configured(self) -> bool: return self._client is not None @retry( stop=stop_after_attempt(2), wait=wait_fixed(0.8), retry=retry_if_exception_type((httpx.RequestError, httpx.TimeoutException)), ) async def transcribe( self, filename: str, content_type: str, audio_bytes: bytes, language: str | None = None, ) -> str: if not self._client: raise GenerationError("Transcriber is not configured with GROQ_API_KEY") async def _call() -> str: response = await self._client.audio.transcriptions.create( file=(filename, audio_bytes, content_type), model=self._model, temperature=0, language=language, ) text = getattr(response, "text", None) if isinstance(text, str) and text.strip(): return _normalise_transcript_text(text) if isinstance(response, dict): value = response.get("text") if isinstance(value, str) and value.strip(): return _normalise_transcript_text(value) raise GenerationError("Transcription response did not contain text") try: return await asyncio.wait_for(_call(), timeout=self._timeout_seconds) except TimeoutError as exc: raise GenerationError("Transcription timed out") from exc except GenerationError: raise except Exception as exc: raise GenerationError("Transcription failed", context={"error": str(exc)}) from exc