Spaces:
Runtime error
Runtime error
| from configs import get_settings | |
| import asyncio | |
| # import librosa | |
| # import numpy as np | |
| from stores.sttremotes import STTRemoteManager | |
| from faster_whisper.audio import decode_audio # handles webm natively | |
| class TranscriptionController: | |
| def __init__(self,models,logger,remotename): | |
| self.settings = get_settings() | |
| self.models = models | |
| self.logger = logger | |
| self.remote_max_request_rate=60 | |
| self.remotename=remotename | |
| self.remote=STTRemoteManager(default_provider=remotename) if remotename else None | |
| async def transcribe_audio(self,audio_path: str): | |
| if self.settings.INFERENCE_TYPE == "local": | |
| return await self.transcribe_local(audio_path) | |
| elif self.settings.INFERENCE_TYPE == "remote": | |
| return await self.transcribe_remote(audio_path) | |
| else: | |
| raise ValueError(f"Unsupported INFERENCE_TYPE: {self.settings.INFERENCE_TYPE}") | |
| async def language_detection(self, audio_path: str): | |
| if self.settings.INFERENCE_TYPE == "local": | |
| model_size = self.settings.LOCAL_INFERENCE_MODEL_SIZE | |
| model = self.models.get(f"{model_size}_english") | |
| if not model: | |
| raise ValueError(f"Model {model_size}_language_detection not available") | |
| print(f"Detecting language for {audio_path} with {model_size} model...") | |
| def process(): | |
| waveform = decode_audio(audio_path) | |
| language, probability,_ = model.detect_language(waveform) | |
| return language, probability | |
| loop = asyncio.get_event_loop() | |
| language, language_probability = await loop.run_in_executor(None, process) | |
| return language, language_probability | |
| async def transcribe_local(self,audio_path: str): | |
| language, probability = await self.language_detection(audio_path) | |
| if language == "ar": | |
| self.logger.info(f"Processing Arabic audio with probability {probability:.2f}") | |
| return await self.transcribe_local_arabic(audio_path) | |
| elif language == "en": | |
| self.logger.info(f"Processing English audio with probability {probability:.2f}") | |
| return await self.transcribe_local_english(audio_path) | |
| else: | |
| self.logger.warning(f"Unsupported language detected: {language}. Skipping transcription.") | |
| return None, language | |
| async def transcribe_local_arabic(self,audio_path: str): | |
| if self.settings.INFERENCE_TYPE == "local": | |
| model_size = self.settings.LOCAL_INFERENCE_MODEL_SIZE | |
| model=self.models.get(f"{model_size}_arabic") | |
| if not model: | |
| raise ValueError(f"Model {model_size}_arabic not available") | |
| print(f"Transcribing {audio_path} with {model_size} model...") | |
| ALLOWED_LANGUAGES = ['ar'] | |
| def process_with_filter(): | |
| segments, info = model.transcribe( | |
| audio_path, | |
| beam_size=5, | |
| best_of=5, | |
| language="ar", | |
| vad_filter=True, | |
| vad_parameters=dict(min_silence_duration_ms=500,threshold=0.3) | |
| ) | |
| if info.language not in ALLOWED_LANGUAGES: | |
| self.logger.info(f"Skipping: Detected {info.language} with prob {info.language_probability:.2f}") | |
| return None, info.language | |
| full_text = "" | |
| for segment in segments: | |
| full_text += segment.text + " " | |
| return full_text.strip(), info.language | |
| loop = asyncio.get_event_loop() | |
| text, language = await loop.run_in_executor(None, process_with_filter) | |
| return text, language | |
| async def transcribe_local_english(self,audio_path: str): | |
| if self.settings.INFERENCE_TYPE == "local": | |
| model_size = self.settings.LOCAL_INFERENCE_MODEL_SIZE | |
| model=self.models.get(f"{model_size}_english") | |
| if not model: | |
| raise ValueError(f"Model {model_size}_english not available") | |
| print(f"Transcribing {audio_path} with {model_size} model...") | |
| ALLOWED_LANGUAGES = ['en'] | |
| def process_with_filter(): | |
| segments, info = model.transcribe( | |
| audio_path, | |
| beam_size=5, | |
| best_of=5, | |
| language="en", | |
| vad_filter=True, | |
| vad_parameters=dict(min_silence_duration_ms=500,threshold=0.3) | |
| ) | |
| if info.language not in ALLOWED_LANGUAGES: | |
| self.logger.info(f"Skipping: Detected {info.language} with prob {info.language_probability:.2f}") | |
| return None, info.language | |
| full_text = "" | |
| for segment in segments: | |
| full_text += segment.text + " " | |
| return full_text.strip(), info.language | |
| loop = asyncio.get_event_loop() | |
| text, language = await loop.run_in_executor(None, process_with_filter) | |
| return text, language | |
| async def transcribe_remote(self,audio_path: str): | |
| if not self.remote: | |
| raise ValueError("Remote STT provider not configured") | |
| if not hasattr(self, "_last_request_time"): | |
| self._last_request_time = 0 | |
| elapsed = asyncio.get_event_loop().time() - self._last_request_time | |
| if elapsed < 1 / self.remote_max_request_rate: | |
| await asyncio.sleep((1 / self.remote_max_request_rate) - elapsed) | |
| self._last_request_time = asyncio.get_event_loop().time() | |
| return await self.remote.transcribe_remote(audio_path,self.remotename) | |