Spaces:
Running
Running
| import asyncio | |
| import re | |
| import httpx | |
| from groq import AsyncGroq | |
| from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_fixed | |
| from app.core.exceptions import GenerationError | |
| _FILLER_PREFIX_RE = re.compile(r"^\s*(uh+|um+|erm+|like|you know|please|hey)\s+", re.IGNORECASE) | |
| _MULTISPACE_RE = re.compile(r"\s+") | |
| _TRANSCRIPT_REPLACEMENTS: tuple[tuple[re.Pattern[str], str], ...] = ( | |
| (re.compile(r"\bwalk experience\b", re.IGNORECASE), "work experience"), | |
| (re.compile(r"\btext stack\b", re.IGNORECASE), "tech stack"), | |
| (re.compile(r"\bprofessional sitting\b", re.IGNORECASE), "professional setting"), | |
| (re.compile(r"\btech stocks\b", re.IGNORECASE), "tech stack"), | |
| (re.compile(r"\bwhat tech stack does he\s+used\b", re.IGNORECASE), "what tech stack does he use"), | |
| ) | |
| def _normalise_transcript_text(text: str) -> str: | |
| cleaned = text.strip() | |
| cleaned = _FILLER_PREFIX_RE.sub("", cleaned) | |
| for pattern, replacement in _TRANSCRIPT_REPLACEMENTS: | |
| cleaned = pattern.sub(replacement, cleaned) | |
| cleaned = _MULTISPACE_RE.sub(" ", cleaned) | |
| return cleaned.strip() | |
| class GroqTranscriber: | |
| def __init__( | |
| self, | |
| api_key: str, | |
| model: str, | |
| timeout_seconds: float, | |
| ) -> None: | |
| self._client = AsyncGroq(api_key=api_key) if api_key else None | |
| self._model = model | |
| self._timeout_seconds = timeout_seconds | |
| def is_configured(self) -> bool: | |
| return self._client is not None | |
| async def transcribe( | |
| self, | |
| filename: str, | |
| content_type: str, | |
| audio_bytes: bytes, | |
| language: str | None = None, | |
| ) -> str: | |
| if not self._client: | |
| raise GenerationError("Transcriber is not configured with GROQ_API_KEY") | |
| async def _call() -> str: | |
| response = await self._client.audio.transcriptions.create( | |
| file=(filename, audio_bytes, content_type), | |
| model=self._model, | |
| temperature=0, | |
| language=language, | |
| ) | |
| text = getattr(response, "text", None) | |
| if isinstance(text, str) and text.strip(): | |
| return _normalise_transcript_text(text) | |
| if isinstance(response, dict): | |
| value = response.get("text") | |
| if isinstance(value, str) and value.strip(): | |
| return _normalise_transcript_text(value) | |
| raise GenerationError("Transcription response did not contain text") | |
| try: | |
| return await asyncio.wait_for(_call(), timeout=self._timeout_seconds) | |
| except TimeoutError as exc: | |
| raise GenerationError("Transcription timed out") from exc | |
| except GenerationError: | |
| raise | |
| except Exception as exc: | |
| raise GenerationError("Transcription failed", context={"error": str(exc)}) from exc | |