Spaces:
Sleeping
Sleeping
| import os | |
| import threading | |
| import uuid | |
| from typing import Optional # HF Space container runs Python 3.9 β PEP 604 (str | None) not supported | |
| import numpy as np | |
| from fastapi import FastAPI, UploadFile, File, HTTPException, Header | |
| _SVM_VERSION = "ser_colombian_v1" | |
| _FALLBACK_VERSION = "emotion2vec_plus_base" | |
| _CLASSES = ["happy", "sad", "angry", "neutral"] # reference only β do not use for predict_proba ordering | |
| _LABEL_MAP = {"ang": "angry", "hap": "happy", "neu": "neutral", "sad": "sad"} | |
| _MAX_UPLOAD_BYTES = 10 * 1024 * 1024 | |
| _SUPPORTED_AUDIO_TYPES = { | |
| "audio/wav", | |
| "audio/x-wav", | |
| "audio/wave", | |
| "audio/vnd.wave", | |
| "application/octet-stream", | |
| } | |
| svm_pipeline = None | |
| _emb_processor = None | |
| _emb_model = None | |
| _fallback_classifier = None | |
| _embedding_lock = threading.Lock() | |
| _svm_lock = threading.Lock() | |
| def _load_embedding_stack() -> None: | |
| global _emb_processor, _emb_model, _fallback_classifier | |
| if _emb_processor is not None and _emb_model is not None and _fallback_classifier is not None: | |
| return | |
| with _embedding_lock: | |
| if _emb_processor is not None and _emb_model is not None and _fallback_classifier is not None: | |
| return | |
| print("Loading emotion2vec_plus_large via transformers...") | |
| try: | |
| from transformers import AutoProcessor, AutoModel, pipeline as hf_pipeline | |
| processor = AutoProcessor.from_pretrained( | |
| "iic/emotion2vec_plus_large", trust_remote_code=True | |
| ) | |
| model = AutoModel.from_pretrained( | |
| "iic/emotion2vec_plus_large", trust_remote_code=True | |
| ) | |
| model.eval() | |
| classifier = hf_pipeline( | |
| "audio-classification", | |
| model="iic/emotion2vec_plus_large", | |
| trust_remote_code=True, | |
| ) | |
| _emb_processor = processor | |
| _emb_model = model | |
| _fallback_classifier = classifier | |
| print("emotion2vec_plus_large loaded.") | |
| except Exception as e: | |
| print(f"emotion2vec_plus_large load failed: {e}") | |
| _emb_processor = None | |
| _emb_model = None | |
| _fallback_classifier = None | |
| _svm_revision: Optional[str] = None # commit SHA / revision of the currently loaded SVM | |
| def _load_svm_pipeline(force: bool = False) -> None: | |
| """Download + load the SVM pipeline from HF Hub. | |
| When force=True (called via POST /reload), bypass the cache and re-download | |
| the latest revision so the Space picks up freshly retrained models without | |
| needing a Space rebuild. | |
| """ | |
| global svm_pipeline, _svm_revision | |
| if svm_pipeline is not None and not force: | |
| return | |
| with _svm_lock: | |
| if svm_pipeline is not None and not force: | |
| return | |
| print(f"Downloading SVM pipeline from ainterviewer/colombian-ser-model... (force={force})") | |
| try: | |
| from huggingface_hub import hf_hub_download | |
| import joblib | |
| repo_id = os.environ.get("HF_SER_MODEL_REPO", "ainterviewer/colombian-ser-model") | |
| # When forcing, skip the local cache so we get the latest revision | |
| kwargs = {"force_download": True} if force else {} | |
| svm_path = hf_hub_download( | |
| repo_id=repo_id, | |
| filename="emotion_classifier_svm.joblib", | |
| **kwargs, | |
| ) | |
| svm_pipeline = joblib.load(svm_path) | |
| try: | |
| from huggingface_hub import model_info | |
| _svm_revision = model_info(repo_id).sha | |
| except Exception: | |
| _svm_revision = None | |
| print(f"SVM pipeline loaded. Classes: {svm_pipeline.classes_} Β· rev={_svm_revision}") | |
| except Exception as e: | |
| print(f"SVM load failed β fallback mode active: {e}") | |
| if not force: | |
| svm_pipeline = None # only reset cache when not forcing | |
| # ββ Helper functions (named for test patching) ββββββββββββββββββββββββββββββββ | |
| def extract_embedding(wav_path: str) -> np.ndarray: | |
| """Load WAV, resample to 16kHz, run through emotion2vec+ model, mean-pool to (768,).""" | |
| _load_embedding_stack() | |
| if _emb_processor is None or _emb_model is None: | |
| raise RuntimeError("Embedding model not loaded") | |
| import torch | |
| import torchaudio | |
| waveform, sr = torchaudio.load(wav_path) | |
| if sr != 16000: | |
| waveform = torchaudio.functional.resample(waveform, sr, 16000) | |
| inputs = _emb_processor( | |
| waveform.squeeze().numpy(), sampling_rate=16000, return_tensors="pt" | |
| ) | |
| with torch.no_grad(): | |
| outputs = _emb_model(**inputs) | |
| emb = outputs.last_hidden_state.mean(dim=1).squeeze().numpy() | |
| return emb # shape (768,) | |
| def classify_fallback(wav_path: str) -> dict: | |
| """Run emotion2vec+ audio-classification pipeline, map labels, return dominant + distribution.""" | |
| _load_embedding_stack() | |
| if _fallback_classifier is None: | |
| raise RuntimeError("Fallback classifier not loaded") | |
| results = _fallback_classifier(wav_path) | |
| distribution = {cls: 0.0 for cls in _CLASSES} | |
| for r in results: | |
| label = _LABEL_MAP.get(r["label"].lower(), "neutral") | |
| distribution[label] += float(r["score"]) | |
| total = sum(distribution.values()) | |
| if total > 0: | |
| distribution = {k: round(v / total, 4) for k, v in distribution.items()} | |
| dominant = max(distribution, key=distribution.get) | |
| return {"dominant": dominant, "distribution": distribution} | |
| # ββ FastAPI app and endpoints ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app = FastAPI(title="AInterviewer SER Space", version=_SVM_VERSION) | |
| def _runtime_mode() -> str: | |
| if svm_pipeline is not None: | |
| return "svm" | |
| if _fallback_classifier is not None: | |
| return "fallback" | |
| return "cold" | |
| def _validate_upload(file: UploadFile, size_bytes: int) -> None: | |
| if size_bytes <= 0: | |
| raise HTTPException(status_code=400, detail="Uploaded file is empty.") | |
| if size_bytes > _MAX_UPLOAD_BYTES: | |
| raise HTTPException(status_code=413, detail="Uploaded file is too large.") | |
| if file.content_type and file.content_type not in _SUPPORTED_AUDIO_TYPES: | |
| raise HTTPException(status_code=415, detail=f"Unsupported content type: {file.content_type}") | |
| def healthcheck(): | |
| return { | |
| "status": "ok", | |
| "svm_loaded": svm_pipeline is not None, | |
| "svm_revision": _svm_revision, | |
| "fallback_loaded": _fallback_classifier is not None, | |
| "mode": _runtime_mode(), | |
| "ready": svm_pipeline is not None or _fallback_classifier is not None, | |
| } | |
| def version(): | |
| v = _SVM_VERSION if svm_pipeline is not None else _FALLBACK_VERSION | |
| return {"version": v, "mode": _runtime_mode(), "svm_revision": _svm_revision} | |
| def reload_svm(authorization: Optional[str] = Header(default=None)): | |
| """Force the analyzer to re-download the SVM from HF Hub. | |
| Called by retrain_pipeline.py after a successful publish so the Space | |
| picks up the new model without a manual restart. Optional token auth via | |
| `RELOAD_TOKEN` env var β if set, the request must carry | |
| `Authorization: Bearer <token>`. If unset, the endpoint is open (the | |
| operation is idempotent and only touches model weights, not user data). | |
| """ | |
| expected = os.environ.get("RELOAD_TOKEN", "") | |
| if expected: | |
| provided = authorization or "" | |
| if not provided.startswith("Bearer ") or provided[7:] != expected: | |
| raise HTTPException(status_code=401, detail="Invalid RELOAD_TOKEN") | |
| prev_rev = _svm_revision | |
| _load_svm_pipeline(force=True) | |
| return { | |
| "reloaded": True, | |
| "previous_revision": prev_rev, | |
| "current_revision": _svm_revision, | |
| "svm_loaded": svm_pipeline is not None, | |
| "mode": _runtime_mode(), | |
| } | |
| async def analyze_audio(file: UploadFile = File(...)): | |
| temp_path = f"/tmp/{uuid.uuid4()}.wav" | |
| try: | |
| payload = await file.read() | |
| _validate_upload(file, len(payload)) | |
| with open(temp_path, "wb") as buf: | |
| buf.write(payload) | |
| if svm_pipeline is None: | |
| _load_svm_pipeline() | |
| if svm_pipeline is not None: | |
| # Primary path: two-stage inference | |
| emb = extract_embedding(temp_path) | |
| proba = svm_pipeline.predict_proba(emb.reshape(1, -1))[0] | |
| # CRITICAL: use svm_pipeline.classes_ β sklearn sorts alphabetically | |
| distribution = { | |
| cls: round(float(p), 4) | |
| for cls, p in zip(svm_pipeline.classes_, proba) | |
| } | |
| dominant = max(distribution, key=distribution.get) | |
| model_name = _SVM_VERSION | |
| else: | |
| # Fallback path: emotion2vec+ direct classification | |
| result = classify_fallback(temp_path) | |
| dominant = result["dominant"] | |
| distribution = result["distribution"] | |
| model_name = _FALLBACK_VERSION | |
| return {"dominant": dominant, "distribution": distribution, "model": model_name} | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| finally: | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |