import os import threading import uuid from typing import Optional # HF Space container runs Python 3.9 → PEP 604 (str | None) not supported import numpy as np from fastapi import FastAPI, UploadFile, File, HTTPException, Header _SVM_VERSION = "ser_colombian_v1" _FALLBACK_VERSION = "emotion2vec_plus_base" _CLASSES = ["happy", "sad", "angry", "neutral"] # reference only — do not use for predict_proba ordering _LABEL_MAP = {"ang": "angry", "hap": "happy", "neu": "neutral", "sad": "sad"} _MAX_UPLOAD_BYTES = 10 * 1024 * 1024 _SUPPORTED_AUDIO_TYPES = { "audio/wav", "audio/x-wav", "audio/wave", "audio/vnd.wave", "application/octet-stream", } svm_pipeline = None _emb_processor = None _emb_model = None _fallback_classifier = None _embedding_lock = threading.Lock() _svm_lock = threading.Lock() def _load_embedding_stack() -> None: global _emb_processor, _emb_model, _fallback_classifier if _emb_processor is not None and _emb_model is not None and _fallback_classifier is not None: return with _embedding_lock: if _emb_processor is not None and _emb_model is not None and _fallback_classifier is not None: return print("Loading emotion2vec_plus_large via transformers...") try: from transformers import AutoProcessor, AutoModel, pipeline as hf_pipeline processor = AutoProcessor.from_pretrained( "iic/emotion2vec_plus_large", trust_remote_code=True ) model = AutoModel.from_pretrained( "iic/emotion2vec_plus_large", trust_remote_code=True ) model.eval() classifier = hf_pipeline( "audio-classification", model="iic/emotion2vec_plus_large", trust_remote_code=True, ) _emb_processor = processor _emb_model = model _fallback_classifier = classifier print("emotion2vec_plus_large loaded.") except Exception as e: print(f"emotion2vec_plus_large load failed: {e}") _emb_processor = None _emb_model = None _fallback_classifier = None _svm_revision: Optional[str] = None # commit SHA / revision of the currently loaded SVM def _load_svm_pipeline(force: bool = False) -> None: """Download + load the SVM pipeline from HF Hub. When force=True (called via POST /reload), bypass the cache and re-download the latest revision so the Space picks up freshly retrained models without needing a Space rebuild. """ global svm_pipeline, _svm_revision if svm_pipeline is not None and not force: return with _svm_lock: if svm_pipeline is not None and not force: return print(f"Downloading SVM pipeline from ainterviewer/colombian-ser-model... (force={force})") try: from huggingface_hub import hf_hub_download import joblib repo_id = os.environ.get("HF_SER_MODEL_REPO", "ainterviewer/colombian-ser-model") # When forcing, skip the local cache so we get the latest revision kwargs = {"force_download": True} if force else {} svm_path = hf_hub_download( repo_id=repo_id, filename="emotion_classifier_svm.joblib", **kwargs, ) svm_pipeline = joblib.load(svm_path) try: from huggingface_hub import model_info _svm_revision = model_info(repo_id).sha except Exception: _svm_revision = None print(f"SVM pipeline loaded. Classes: {svm_pipeline.classes_} · rev={_svm_revision}") except Exception as e: print(f"SVM load failed — fallback mode active: {e}") if not force: svm_pipeline = None # only reset cache when not forcing # ── Helper functions (named for test patching) ──────────────────────────────── def extract_embedding(wav_path: str) -> np.ndarray: """Load WAV, resample to 16kHz, run through emotion2vec+ model, mean-pool to (768,).""" _load_embedding_stack() if _emb_processor is None or _emb_model is None: raise RuntimeError("Embedding model not loaded") import torch import torchaudio waveform, sr = torchaudio.load(wav_path) if sr != 16000: waveform = torchaudio.functional.resample(waveform, sr, 16000) inputs = _emb_processor( waveform.squeeze().numpy(), sampling_rate=16000, return_tensors="pt" ) with torch.no_grad(): outputs = _emb_model(**inputs) emb = outputs.last_hidden_state.mean(dim=1).squeeze().numpy() return emb # shape (768,) def classify_fallback(wav_path: str) -> dict: """Run emotion2vec+ audio-classification pipeline, map labels, return dominant + distribution.""" _load_embedding_stack() if _fallback_classifier is None: raise RuntimeError("Fallback classifier not loaded") results = _fallback_classifier(wav_path) distribution = {cls: 0.0 for cls in _CLASSES} for r in results: label = _LABEL_MAP.get(r["label"].lower(), "neutral") distribution[label] += float(r["score"]) total = sum(distribution.values()) if total > 0: distribution = {k: round(v / total, 4) for k, v in distribution.items()} dominant = max(distribution, key=distribution.get) return {"dominant": dominant, "distribution": distribution} # ── FastAPI app and endpoints ────────────────────────────────────────────────── app = FastAPI(title="AInterviewer SER Space", version=_SVM_VERSION) def _runtime_mode() -> str: if svm_pipeline is not None: return "svm" if _fallback_classifier is not None: return "fallback" return "cold" def _validate_upload(file: UploadFile, size_bytes: int) -> None: if size_bytes <= 0: raise HTTPException(status_code=400, detail="Uploaded file is empty.") if size_bytes > _MAX_UPLOAD_BYTES: raise HTTPException(status_code=413, detail="Uploaded file is too large.") if file.content_type and file.content_type not in _SUPPORTED_AUDIO_TYPES: raise HTTPException(status_code=415, detail=f"Unsupported content type: {file.content_type}") @app.get("/health") def healthcheck(): return { "status": "ok", "svm_loaded": svm_pipeline is not None, "svm_revision": _svm_revision, "fallback_loaded": _fallback_classifier is not None, "mode": _runtime_mode(), "ready": svm_pipeline is not None or _fallback_classifier is not None, } @app.get("/version") def version(): v = _SVM_VERSION if svm_pipeline is not None else _FALLBACK_VERSION return {"version": v, "mode": _runtime_mode(), "svm_revision": _svm_revision} @app.post("/reload") def reload_svm(authorization: Optional[str] = Header(default=None)): """Force the analyzer to re-download the SVM from HF Hub. Called by retrain_pipeline.py after a successful publish so the Space picks up the new model without a manual restart. Optional token auth via `RELOAD_TOKEN` env var — if set, the request must carry `Authorization: Bearer `. If unset, the endpoint is open (the operation is idempotent and only touches model weights, not user data). """ expected = os.environ.get("RELOAD_TOKEN", "") if expected: provided = authorization or "" if not provided.startswith("Bearer ") or provided[7:] != expected: raise HTTPException(status_code=401, detail="Invalid RELOAD_TOKEN") prev_rev = _svm_revision _load_svm_pipeline(force=True) return { "reloaded": True, "previous_revision": prev_rev, "current_revision": _svm_revision, "svm_loaded": svm_pipeline is not None, "mode": _runtime_mode(), } @app.post("/analyze") async def analyze_audio(file: UploadFile = File(...)): temp_path = f"/tmp/{uuid.uuid4()}.wav" try: payload = await file.read() _validate_upload(file, len(payload)) with open(temp_path, "wb") as buf: buf.write(payload) if svm_pipeline is None: _load_svm_pipeline() if svm_pipeline is not None: # Primary path: two-stage inference emb = extract_embedding(temp_path) proba = svm_pipeline.predict_proba(emb.reshape(1, -1))[0] # CRITICAL: use svm_pipeline.classes_ — sklearn sorts alphabetically distribution = { cls: round(float(p), 4) for cls, p in zip(svm_pipeline.classes_, proba) } dominant = max(distribution, key=distribution.get) model_name = _SVM_VERSION else: # Fallback path: emotion2vec+ direct classification result = classify_fallback(temp_path) dominant = result["dominant"] distribution = result["distribution"] model_name = _FALLBACK_VERSION return {"dominant": dominant, "distribution": distribution, "model": model_name} except HTTPException: raise except Exception as e: raise HTTPException(status_code=500, detail=str(e)) finally: if os.path.exists(temp_path): os.remove(temp_path) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)