import threading from utils import get_en_hotwords, get_ar_hotwords # Lazy-loaded singletons — models are loaded on first use, not at startup _embedder = None _image_classifier = None # {model, processor, text_features, logit_scale, categories} _audio_model = None _audio_model_arabic = None _absa_pipeline = None _translator_ar_en = None _spacy_nlp = None _audio_model_lock = threading.Lock() def get_embedder(): global _embedder if _embedder is None: from langchain_community.embeddings import HuggingFaceEmbeddings print("Loading embedder model...") _embedder = HuggingFaceEmbeddings(model_name='sentence-transformers/paraphrase-multilingual-MiniLM-L12-v2') print("✅ Embedder loaded") return _embedder def get_image_classifier(): """ Loads INT8-quantized CLIP vision encoder for CPU inference. Strategy: - Download model_int8.pt (293 MB) from robotflowlabs/clip-vit-large-patch14-int8 (the 1.2 GB ONNX was GPU-traced and fails on CPU). - Load the full CLIP architecture, patch vision_model with INT8 weights. - Compute all category text embeddings once, then delete the text encoder to free ~200 MB of RAM. - Per-request cost = vision_model + visual_projection only. """ global _image_classifier if _image_classifier is None: import torch from huggingface_hub import hf_hub_download from transformers import CLIPModel, CLIPProcessor from utils import load_categories print("Loading CLIP image model (INT8 vision encoder)...") model_name = "openai/clip-vit-large-patch14" repo_id = "robotflowlabs/clip-vit-large-patch14-int8" # Load base CLIP architecture (fp32) processor = CLIPProcessor.from_pretrained(model_name) model = CLIPModel.from_pretrained(model_name) model.eval() # Patch vision_model with INT8 quantized weights (CPU-compatible, 293 MB) int8_path = hf_hub_download(repo_id=repo_id, filename="model_int8.pt") int8_state = torch.load(int8_path, map_location="cpu", weights_only=True) model.vision_model.load_state_dict(int8_state, strict=False) print("✅ INT8 vision encoder weights applied") # Compute category text embeddings once — this is the only text encoder call categories = load_categories() text_inputs = processor( text=categories, return_tensors="pt", padding=True, truncation=True ) print("Computing category text embeddings (one-time)...") with torch.no_grad(): text_features = model.get_text_features(**text_inputs) text_features = text_features / text_features.norm(dim=-1, keepdim=True) logit_scale = float(model.logit_scale.exp().item()) # Free text encoder RAM — only vision side needed at inference del model.text_model del model.text_projection _image_classifier = { "model": model, # CLIPModel with INT8 vision_model; text side freed "processor": processor, "text_features": text_features, # [N_categories, D] — cached "logit_scale": logit_scale, # learned temperature scalar "categories": categories, } print(f"✅ Image model (INT8 vision encoder) loaded — {len(categories)} category embeddings cached") return _image_classifier def classify_image(img): """ Classify a single PIL image. Only the image encoder runs per request. Returns (predicted_category, confidence_score). """ results = classify_images_batch([img]) return results[0] def classify_images_batch(images): """ Classify a batch of PIL images in one forward pass. Uses INT8 vision encoder (model.vision_model patched with robotflowlabs weights). Returns list of (predicted_category, confidence_score). """ import torch c = get_image_classifier() image_inputs = c["processor"](images=images, return_tensors="pt") with torch.no_grad(): image_features = c["model"].get_image_features(**image_inputs) image_features = image_features / image_features.norm(dim=-1, keepdim=True) logits = c["logit_scale"] * (image_features @ c["text_features"].T) # [B, N] probs = logits.softmax(dim=-1) results = [] for i in range(probs.shape[0]): best_idx = int(probs[i].argmax()) results.append((c["categories"][best_idx], float(probs[i][best_idx]))) return results def get_audio_model(): """ Loads nvidia/parakeet-tdt-0.6b-v2 via NeMo ASR. Runs a silent warmup pass so the first real request isn't slow. The warmup pre-warms all internal CUDA/JIT compilation graphs: model is left in an unfrozen, ready-to-infer state after loading. """ global _audio_model if _audio_model is None: import os import wave import tempfile import nemo.collections.asr as nemo_asr print("Loading audio model (nvidia/parakeet-tdt-0.6b-v2)...") model = nemo_asr.models.ASRModel.from_pretrained("nvidia/parakeet-tdt-0.6b-v2") # Warmup: transcribe silence to pre-JIT the computation graph so # the first real request has the same speed as subsequent ones. with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f: warmup_path = f.name try: with wave.open(warmup_path, 'wb') as wf: wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(16000) wf.writeframes(b'\x00' * 32000) # 1 s of silence model.freeze() import torch with torch.no_grad(): model.transcribe([warmup_path]) # Unfreeze after warmup so the model is ready for real inference # without any extra freeze/unfreeze overhead on the first call. model.unfreeze() except Exception as e: print(f"⚠️ Audio model warmup failed (non-fatal): {e}") finally: try: os.unlink(warmup_path) except Exception: pass _audio_model = model print("✅ Audio model loaded and warmed up") # ── Pass 1: Hotword biasing via NeMo ───────────────────────────────── try: from omegaconf import OmegaConf hotwords = get_en_hotwords() decoding_cfg = OmegaConf.structured(model.cfg.decoding) try: from nemo.collections.asr.parts.context_biasing import KeyWordBoostingConfig boosting_cfg = KeyWordBoostingConfig(key_phrases_list=hotwords) if 'greedy' in decoding_cfg.strategy: OmegaConf.update(decoding_cfg, 'greedy.boosting_tree', boosting_cfg) OmegaConf.update(decoding_cfg, 'greedy.boosting_tree_alpha', 1.0) else: OmegaConf.update(decoding_cfg, 'beam.boosting_tree', boosting_cfg) OmegaConf.update(decoding_cfg, 'beam.boosting_tree_alpha', 1.0) except ImportError: # Fallback implementation if KeyWordBoostingConfig does not exist if 'beam' in decoding_cfg.strategy and hasattr(decoding_cfg.beam, 'keywords'): OmegaConf.update(decoding_cfg, 'beam.keywords', hotwords) elif hasattr(decoding_cfg, 'greedy') and hasattr(decoding_cfg.greedy, 'keywords'): OmegaConf.update(decoding_cfg, 'greedy.keywords', hotwords) else: # Fall back to setting strategy='beam' and defining beam.keywords OmegaConf.update(decoding_cfg, 'strategy', 'beam') OmegaConf.update(decoding_cfg, 'beam.keywords', hotwords) # Force greedy_batch strategy for speed OmegaConf.update(decoding_cfg, 'strategy', 'greedy_batch') if hasattr(decoding_cfg, 'greedy'): OmegaConf.update(decoding_cfg, 'greedy.max_symbols', 5) try: model.change_decoding_strategy(decoding_cfg, verbose=False) except Exception as strat_e: print(f"⚠️ Failed to apply greedy_batch ({strat_e}), falling back to beam search") OmegaConf.update(decoding_cfg, 'strategy', 'beam') if hasattr(decoding_cfg, 'beam'): OmegaConf.update(decoding_cfg, 'beam.beam_size', 4) model.change_decoding_strategy(decoding_cfg, verbose=False) print(f"✅ English hotword biasing active ({len(hotwords)} keywords)") except Exception as e: print(f"⚠️ NeMo hotword biasing setup failed (non-fatal, falling back to default greedy): {e}") return _audio_model def transcribe_audio_batch(audio_paths): """ Batch transcription for English audio (Parakeet). NeMo's model.transcribe() already handles native batching internally when given a list of paths — it pads and runs a single forward pass. Returns list of transcription strings. """ import torch model = get_audio_model() with _audio_model_lock: with torch.no_grad(): transcriptions = model.transcribe(audio_paths) if isinstance(transcriptions, tuple): transcriptions = transcriptions[0] results = [] for t in transcriptions: text = t.text if hasattr(t, 'text') else str(t) text = text.strip().rstrip('.') results.append(text) return results def get_audio_model_arabic(): """ Loads IbrahimAmin/egyptian-arabic-wav2vec2-xlsr-53 for Arabic ASR. (Native PyTorch) """ global _audio_model_arabic if _audio_model_arabic is None: import os import torch import numpy as np from transformers import Wav2Vec2ForCTC, AutoProcessor model_name = "IbrahimAmin/egyptian-arabic-wav2vec2-xlsr-53" processor = AutoProcessor.from_pretrained(model_name) print("Loading audio model (IbrahimAmin/egyptian-arabic-wav2vec2-xlsr-53)...") model = Wav2Vec2ForCTC.from_pretrained(model_name) model.eval() compiled = False use_compile = os.getenv("ARABIC_ASR_USE_COMPILE", "0") == "1" if use_compile: try: model = torch.compile(model, mode="default") compiled = True print("✅ Arabic wav2vec2 loaded via PyTorch + torch.compile (FP32)") except Exception as compile_err: print(f"⚠️ torch.compile failed (non-fatal): {compile_err}") else: print("ℹ️ Arabic ASR torch.compile disabled for runtime stability") print("✅ Arabic wav2vec2 loaded via PyTorch (FP32)") _audio_model_arabic = { "model": model, "processor": processor, "use_onnx": False, "ctc_decoder": None, # built lazily on first transcription call "compiled": compiled, "model_name": model_name, } # One-time warmup inference to avoid first-request latency spikes. try: dummy = np.zeros(8000, dtype=np.float32) warmup_inputs = processor([dummy], sampling_rate=16000, return_tensors="pt", padding=True) with torch.no_grad(): model(**warmup_inputs) except Exception as e: print(f"⚠️ Arabic model warmup failed (non-fatal): {e}") print("✅ Arabic audio model ready") # ── Pass 1: Build pyctcdecode CTC decoder with hotword biasing ───── try: from pyctcdecode import build_ctcdecoder # Extract vocab from the Wav2Vec2 tokenizer in the correct label order vocab_dict = processor.tokenizer.get_vocab() # Sort by token_id so the label list aligns with logit columns labels = [tok for tok, _ in sorted(vocab_dict.items(), key=lambda x: x[1])] ar_hotwords = get_ar_hotwords() ctc_decoder = build_ctcdecoder(labels) # no LM — hotwords injected at decode time _audio_model_arabic["ctc_decoder"] = (ctc_decoder, ar_hotwords) # Create a persistent pool once from multiprocessing import get_context ctc_pool = get_context("fork").Pool(processes=4) _audio_model_arabic["ctc_pool"] = ctc_pool print(f"✅ Arabic pyctcdecode CTC decoder ready ({len(ar_hotwords)} hotwords)") except Exception as e: print(f"⚠️ Arabic CTC decoder setup failed (non-fatal, falling back to greedy): {e}") return _audio_model_arabic def transcribe_audio_arabic_batch(audio_paths): """ Batch transcription for Arabic audio (wav2vec2). Loads audio files, pads to same length, runs one forward pass. Returns list of transcription strings. """ import torch import numpy as np from utils import load_audio_bytes_ffmpeg ar = get_audio_model_arabic() model = ar["model"] processor = ar["processor"] # Load all audio files as numpy arrays (16kHz mono float32) from concurrent.futures import ThreadPoolExecutor def _load_audio(path): with open(path, "rb") as f: audio_bytes = f.read() return load_audio_bytes_ffmpeg(audio_bytes) # Load all audio files as numpy arrays (16kHz mono float32) concurrently with ThreadPoolExecutor(max_workers=min(len(audio_paths), 8)) as executor: waveforms_raw = list(executor.map(_load_audio, audio_paths)) # Guard against decode failures that produce zero-length waveforms. valid_indices: list[int] = [] valid_waveforms: list[np.ndarray] = [] final_texts = [""] * len(audio_paths) for idx, wav in enumerate(waveforms_raw): arr = np.asarray(wav, dtype=np.float32).reshape(-1) if arr.size == 0: print(f"⚠️ Arabic ASR: empty waveform at batch index {idx}; skipping") continue valid_indices.append(idx) valid_waveforms.append(arr) if not valid_waveforms: return final_texts # Pad to same length and batch inputs = processor( valid_waveforms, sampling_rate=16000, return_tensors="pt", padding=True, ) with torch.no_grad(): try: outputs = model(**inputs) except RuntimeError as model_err: err_text = str(model_err) is_dynamo_fx_issue = ( "dynamo-optimized function" in err_text or "symbolically trace" in err_text or "fake tensor" in err_text.lower() ) if ar.get("compiled") and is_dynamo_fx_issue: print("⚠️ Arabic ASR compiled model hit FX/Dynamo runtime error; retrying with eager model") from transformers import Wav2Vec2ForCTC eager_model = Wav2Vec2ForCTC.from_pretrained(ar.get("model_name", "IbrahimAmin/egyptian-arabic-wav2vec2-xlsr-53")) eager_model.eval() ar["model"] = eager_model ar["compiled"] = False model = eager_model outputs = model(**inputs) else: raise logits = outputs.logits predicted_ids = torch.argmax(logits, dim=-1) ctc_info = ar.get("ctc_decoder") if ctc_info is not None: # Pass 1 — pyctcdecode beam search with Arabic hotword biasing # Uses a persistent pool created at model load time try: ctc_decoder, ar_hotwords = ctc_info ctc_pool = ar.get("ctc_pool") # reuse persistent pool — no fork overhead logits_np = logits.numpy() # [B, T, V] transcriptions = ctc_decoder.decode_batch( ctc_pool, [logits_np[i] for i in range(logits_np.shape[0])], hotwords=ar_hotwords, hotword_weight=12.0, beam_width=15, ) except Exception as e: print(f"⚠️ pyctcdecode decoding failed (falling back to greedy): {e}") transcriptions = processor.batch_decode(predicted_ids) else: # Fallback: standard greedy decoding transcriptions = processor.batch_decode(predicted_ids) cleaned = [t.strip().rstrip('.') for t in transcriptions] for local_i, text in enumerate(cleaned): original_i = valid_indices[local_i] final_texts[original_i] = text return final_texts def get_absa_pipeline(): """DeBERTa-v3 fine-tuned for Aspect-Based Sentiment Analysis. Runs a tiny warmup inference after loading so the first real request is fast. """ global _absa_pipeline if _absa_pipeline is None: from transformers import pipeline print("Loading ABSA model...") _absa_pipeline = pipeline( "text-classification", model="yangheng/deberta-v3-base-absa-v1.1", device=-1, # CPU ) # Warmup: run a tiny dummy classification to pre-warm tokenizer + model graph try: _absa_pipeline("[CLS] This is great. [SEP] quality [SEP]") except Exception as e: print(f"⚠️ ABSA warmup failed (non-fatal): {e}") print("✅ ABSA model loaded and warmed up") return _absa_pipeline def get_translator_ar_en(): """Helsinki-NLP Arabic → English translation pipeline.""" global _translator_ar_en if _translator_ar_en is None: from transformers import pipeline print("Loading Arabic→English translator (Helsinki-NLP/opus-mt-ar-en)...") _translator_ar_en = pipeline( "translation", model="Helsinki-NLP/opus-mt-ar-en", device=-1, # CPU ) print("✅ Arabic→English translator loaded") return _translator_ar_en def get_spacy_nlp(): """spaCy English model for noun-chunk (aspect) extraction.""" global _spacy_nlp if _spacy_nlp is None: import spacy print("Loading spaCy model (en_core_web_md)...") _spacy_nlp = spacy.load("en_core_web_md") print("✅ spaCy model loaded") return _spacy_nlp # ═══════════════════════ Lightweight Warmup (keep OpenMP alive) ════════════════ def warmup_clip(): """INT8 vision encoder forward pass (~5-15ms). No-op if model not loaded.""" if _image_classifier is None: return import torch c = _image_classifier dummy = torch.randn(1, 3, 224, 224) with torch.no_grad(): c["model"].vision_model(pixel_values=dummy) def warmup_parakeet(): """ Keeps Parakeet's OpenMP threads alive by running a full transcription of 0.5 s of silence behind _audio_model_lock. We deliberately use model.transcribe() (not raw encoder calls) because Parakeet-TDT is stateful — bypassing the full pipeline corrupts its internal RNNT/TDT decoder cache and causes 500 errors on the next real request. The lock ensures this never races with real inference. No-op if model not loaded. """ if _audio_model is None: return import torch import tempfile import wave import os model = _audio_model with tempfile.NamedTemporaryFile(suffix='.wav', delete=False) as f: path = f.name try: with wave.open(path, 'wb') as wf: wf.setnchannels(1) wf.setsampwidth(2) wf.setframerate(16000) wf.writeframes(b'\x00' * 16000) # 0.5 s of silence with _audio_model_lock: model.eval() with torch.no_grad(): model.transcribe([path]) except Exception as e: print(f"⚠️ Parakeet warmup error (non-fatal): {e}") finally: try: os.unlink(path) except Exception: pass def warmup_wav2vec2(): """Raw wav2vec2 forward pass (~5-15ms). No-op if model not loaded.""" if _audio_model_arabic is None: return import torch import numpy as np ar = _audio_model_arabic dummy = np.zeros(8000, dtype=np.float32) inputs = ar["processor"]( [dummy], sampling_rate=16000, return_tensors="pt", padding=True ) with torch.no_grad(): ar["model"](**inputs) # Keep backward-compatible aliases that load on first attribute access class _LazyProxy: def __init__(self, loader): object.__setattr__(self, '_loader', loader) object.__setattr__(self, '_obj', None) def _load(self): if object.__getattribute__(self, '_obj') is None: obj = object.__getattribute__(self, '_loader')() object.__setattr__(self, '_obj', obj) return object.__getattribute__(self, '_obj') def __getattr__(self, name): return getattr(self._load(), name) def __call__(self, *args, **kwargs): return self._load()(*args, **kwargs) EMBEDDER = _LazyProxy(get_embedder) AUDIO_MODEL = _LazyProxy(get_audio_model) AUDIO_MODEL_ARABIC = _LazyProxy(get_audio_model_arabic) ABSA_PIPELINE = _LazyProxy(get_absa_pipeline) TRANSLATOR_AR_EN = _LazyProxy(get_translator_ar_en) SPACY_NLP = _LazyProxy(get_spacy_nlp)