ser-analyzer / app /main.py
Esca01's picture
sync: from Esca01/AInterviewer@c78429518beb4796b649902bde8f73588c887231
8d71630 verified
import os
import threading
import uuid
from typing import Optional # HF Space container runs Python 3.9 β†’ PEP 604 (str | None) not supported
import numpy as np
from fastapi import FastAPI, UploadFile, File, HTTPException, Header
_SVM_VERSION = "ser_colombian_v1"
_FALLBACK_VERSION = "emotion2vec_plus_base"
_CLASSES = ["happy", "sad", "angry", "neutral"] # reference only β€” do not use for predict_proba ordering
_LABEL_MAP = {"ang": "angry", "hap": "happy", "neu": "neutral", "sad": "sad"}
_MAX_UPLOAD_BYTES = 10 * 1024 * 1024
_SUPPORTED_AUDIO_TYPES = {
"audio/wav",
"audio/x-wav",
"audio/wave",
"audio/vnd.wave",
"application/octet-stream",
}
svm_pipeline = None
_emb_processor = None
_emb_model = None
_fallback_classifier = None
_embedding_lock = threading.Lock()
_svm_lock = threading.Lock()
def _load_embedding_stack() -> None:
global _emb_processor, _emb_model, _fallback_classifier
if _emb_processor is not None and _emb_model is not None and _fallback_classifier is not None:
return
with _embedding_lock:
if _emb_processor is not None and _emb_model is not None and _fallback_classifier is not None:
return
print("Loading emotion2vec_plus_large via transformers...")
try:
from transformers import AutoProcessor, AutoModel, pipeline as hf_pipeline
processor = AutoProcessor.from_pretrained(
"iic/emotion2vec_plus_large", trust_remote_code=True
)
model = AutoModel.from_pretrained(
"iic/emotion2vec_plus_large", trust_remote_code=True
)
model.eval()
classifier = hf_pipeline(
"audio-classification",
model="iic/emotion2vec_plus_large",
trust_remote_code=True,
)
_emb_processor = processor
_emb_model = model
_fallback_classifier = classifier
print("emotion2vec_plus_large loaded.")
except Exception as e:
print(f"emotion2vec_plus_large load failed: {e}")
_emb_processor = None
_emb_model = None
_fallback_classifier = None
_svm_revision: Optional[str] = None # commit SHA / revision of the currently loaded SVM
def _load_svm_pipeline(force: bool = False) -> None:
"""Download + load the SVM pipeline from HF Hub.
When force=True (called via POST /reload), bypass the cache and re-download
the latest revision so the Space picks up freshly retrained models without
needing a Space rebuild.
"""
global svm_pipeline, _svm_revision
if svm_pipeline is not None and not force:
return
with _svm_lock:
if svm_pipeline is not None and not force:
return
print(f"Downloading SVM pipeline from ainterviewer/colombian-ser-model... (force={force})")
try:
from huggingface_hub import hf_hub_download
import joblib
repo_id = os.environ.get("HF_SER_MODEL_REPO", "ainterviewer/colombian-ser-model")
# When forcing, skip the local cache so we get the latest revision
kwargs = {"force_download": True} if force else {}
svm_path = hf_hub_download(
repo_id=repo_id,
filename="emotion_classifier_svm.joblib",
**kwargs,
)
svm_pipeline = joblib.load(svm_path)
try:
from huggingface_hub import model_info
_svm_revision = model_info(repo_id).sha
except Exception:
_svm_revision = None
print(f"SVM pipeline loaded. Classes: {svm_pipeline.classes_} Β· rev={_svm_revision}")
except Exception as e:
print(f"SVM load failed β€” fallback mode active: {e}")
if not force:
svm_pipeline = None # only reset cache when not forcing
# ── Helper functions (named for test patching) ────────────────────────────────
def extract_embedding(wav_path: str) -> np.ndarray:
"""Load WAV, resample to 16kHz, run through emotion2vec+ model, mean-pool to (768,)."""
_load_embedding_stack()
if _emb_processor is None or _emb_model is None:
raise RuntimeError("Embedding model not loaded")
import torch
import torchaudio
waveform, sr = torchaudio.load(wav_path)
if sr != 16000:
waveform = torchaudio.functional.resample(waveform, sr, 16000)
inputs = _emb_processor(
waveform.squeeze().numpy(), sampling_rate=16000, return_tensors="pt"
)
with torch.no_grad():
outputs = _emb_model(**inputs)
emb = outputs.last_hidden_state.mean(dim=1).squeeze().numpy()
return emb # shape (768,)
def classify_fallback(wav_path: str) -> dict:
"""Run emotion2vec+ audio-classification pipeline, map labels, return dominant + distribution."""
_load_embedding_stack()
if _fallback_classifier is None:
raise RuntimeError("Fallback classifier not loaded")
results = _fallback_classifier(wav_path)
distribution = {cls: 0.0 for cls in _CLASSES}
for r in results:
label = _LABEL_MAP.get(r["label"].lower(), "neutral")
distribution[label] += float(r["score"])
total = sum(distribution.values())
if total > 0:
distribution = {k: round(v / total, 4) for k, v in distribution.items()}
dominant = max(distribution, key=distribution.get)
return {"dominant": dominant, "distribution": distribution}
# ── FastAPI app and endpoints ──────────────────────────────────────────────────
app = FastAPI(title="AInterviewer SER Space", version=_SVM_VERSION)
def _runtime_mode() -> str:
if svm_pipeline is not None:
return "svm"
if _fallback_classifier is not None:
return "fallback"
return "cold"
def _validate_upload(file: UploadFile, size_bytes: int) -> None:
if size_bytes <= 0:
raise HTTPException(status_code=400, detail="Uploaded file is empty.")
if size_bytes > _MAX_UPLOAD_BYTES:
raise HTTPException(status_code=413, detail="Uploaded file is too large.")
if file.content_type and file.content_type not in _SUPPORTED_AUDIO_TYPES:
raise HTTPException(status_code=415, detail=f"Unsupported content type: {file.content_type}")
@app.get("/health")
def healthcheck():
return {
"status": "ok",
"svm_loaded": svm_pipeline is not None,
"svm_revision": _svm_revision,
"fallback_loaded": _fallback_classifier is not None,
"mode": _runtime_mode(),
"ready": svm_pipeline is not None or _fallback_classifier is not None,
}
@app.get("/version")
def version():
v = _SVM_VERSION if svm_pipeline is not None else _FALLBACK_VERSION
return {"version": v, "mode": _runtime_mode(), "svm_revision": _svm_revision}
@app.post("/reload")
def reload_svm(authorization: Optional[str] = Header(default=None)):
"""Force the analyzer to re-download the SVM from HF Hub.
Called by retrain_pipeline.py after a successful publish so the Space
picks up the new model without a manual restart. Optional token auth via
`RELOAD_TOKEN` env var β€” if set, the request must carry
`Authorization: Bearer <token>`. If unset, the endpoint is open (the
operation is idempotent and only touches model weights, not user data).
"""
expected = os.environ.get("RELOAD_TOKEN", "")
if expected:
provided = authorization or ""
if not provided.startswith("Bearer ") or provided[7:] != expected:
raise HTTPException(status_code=401, detail="Invalid RELOAD_TOKEN")
prev_rev = _svm_revision
_load_svm_pipeline(force=True)
return {
"reloaded": True,
"previous_revision": prev_rev,
"current_revision": _svm_revision,
"svm_loaded": svm_pipeline is not None,
"mode": _runtime_mode(),
}
@app.post("/analyze")
async def analyze_audio(file: UploadFile = File(...)):
temp_path = f"/tmp/{uuid.uuid4()}.wav"
try:
payload = await file.read()
_validate_upload(file, len(payload))
with open(temp_path, "wb") as buf:
buf.write(payload)
if svm_pipeline is None:
_load_svm_pipeline()
if svm_pipeline is not None:
# Primary path: two-stage inference
emb = extract_embedding(temp_path)
proba = svm_pipeline.predict_proba(emb.reshape(1, -1))[0]
# CRITICAL: use svm_pipeline.classes_ β€” sklearn sorts alphabetically
distribution = {
cls: round(float(p), 4)
for cls, p in zip(svm_pipeline.classes_, proba)
}
dominant = max(distribution, key=distribution.get)
model_name = _SVM_VERSION
else:
# Fallback path: emotion2vec+ direct classification
result = classify_fallback(temp_path)
dominant = result["dominant"]
distribution = result["distribution"]
model_name = _FALLBACK_VERSION
return {"dominant": dominant, "distribution": distribution, "model": model_name}
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
finally:
if os.path.exists(temp_path):
os.remove(temp_path)
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)