Spaces:
Running
Running
| import whisper | |
| import os | |
| import torch | |
| import warnings | |
| import gc # Garbage Collector for memory management | |
| import tempfile | |
| from pydub import AudioSegment # 🟢 NEW: Add pydub | |
| # Suppress FP16 warnings on CPU | |
| warnings.filterwarnings("ignore", message="FP16 is not supported on CPU; using FP32 instead") | |
| class AgentInput: | |
| def __init__(self, device="cpu"): | |
| print(f"👂 Agent 1 (Input) Online: Preparing Whisper on {device}...") | |
| # USE 'tiny' FOR CLOUD TO PREVENT EXIT CODE 137 (OOM) | |
| # Use 'base' only if you are running on a machine with 16GB+ RAM | |
| self.model_name = "tiny" | |
| try: | |
| # Load the model and immediately collect garbage to free RAM | |
| self.model = whisper.load_model(self.model_name, device=device) | |
| gc.collect() | |
| print(f"✅ Whisper '{self.model_name}' model loaded. RAM optimized.") | |
| except Exception as e: | |
| print(f"⚠️ Load failed: {e}. Attempting emergency load...") | |
| # Emergency fallback to tiny if not already tried | |
| self.model = whisper.load_model("tiny", device=device) | |
| # 🟢 NEW HELPER: Sanitizes corrupted browser audio | |
| def _sanitize_audio(self, audio_path): | |
| try: | |
| # Try to load it regardless of format | |
| audio = AudioSegment.from_file(audio_path) | |
| # Export it to a clean, standard WAV in a temp file | |
| temp_path = os.path.join(tempfile.gettempdir(), f"clean_audio_{os.path.basename(audio_path)}.wav") | |
| audio.export(temp_path, format="wav") | |
| return temp_path | |
| except Exception as e: | |
| print(f"⚠️ Audio Sanitization Warning: {e}") | |
| return audio_path # Fallback to original if pydub fails | |
| def transcribe(self, audio_path, language=None): | |
| if not self.model or not audio_path: | |
| return [{"text": "", "speaker": "SYSTEM"}] | |
| try: | |
| # 🟢 Clean the audio first! | |
| clean_path = self._sanitize_audio(audio_path) | |
| # Ensure fp16=False for CPU to save on conversion overhead | |
| result = self.model.transcribe(audio_path, language=language, fp16=False) | |
| # Clean up after transcription to keep memory low | |
| transcription_text = result["text"].strip() | |
| del result # Delete the raw result object | |
| gc.collect() # Force memory release | |
| # Clean up temp file | |
| if clean_path != audio_path and os.path.exists(clean_path): | |
| os.remove(clean_path) | |
| return [{"text": transcription_text, "speaker": "Speaker 1"}] | |
| except Exception as e: | |
| print(f"❌ Transcription Error: {e}") | |
| return [{"text": "", "speaker": "ERROR"}] |