""" XGBoost Busy Detector - Hugging Face Inference Endpoint Handler Custom handler for HF Inference Endpoints. Loads XGBoost model, applies normalization, runs evidence accumulation scoring, and returns busy_score + confidence + recommendation. Derived from: src/normalization.py, src/scoring_engine.py, src/model.py """ from typing import Dict, Any, Tuple import json import math import numpy as np import pickle from pathlib import Path class EndpointHandler: """HF Inference Endpoint handler for XGBoost busy detection.""" def __init__(self, path: str = "."): model_dir = Path(path) # --- Load XGBoost model --- model_path = None for candidate in [ model_dir / "model.pkl", model_dir / "busy_detector_v1.pkl", model_dir / "busy_detector_5k.pkl", ]: if candidate.exists(): model_path = candidate break if model_path is None: raise FileNotFoundError( f"No model file found in {model_dir}. " "Expected model.pkl, busy_detector_v1.pkl, or busy_detector_5k.pkl" ) with open(model_path, "rb") as f: saved = pickle.load(f) # Handle both raw model and dict-wrapped model if isinstance(saved, dict): self.model = saved.get("model") or saved.get("booster") self.feature_names = saved.get("feature_names") else: self.model = saved self.feature_names = None print(f"✓ XGBoost model loaded from {model_path}") # --- Load feature ranges --- ranges_path = model_dir / "feature_ranges.json" with open(ranges_path) as f: ranges_data = json.load(f) self.voice_ranges = ranges_data["voice_ranges"] self.text_ranges = ranges_data["text_ranges"] self.voice_order = ranges_data["voice_feature_order"] self.text_order = ranges_data["text_feature_order"] # --- Load scoring rules --- rules_path = model_dir / "scoring_rules.json" with open(rules_path) as f: self.scoring = json.load(f) self.weights = self.scoring["weights"] self.thresholds = self.scoring["thresholds"] print("✓ Feature ranges and scoring rules loaded") # --------------------------------------------------------------------- # # Public interface # --------------------------------------------------------------------- # def __call__(self, data: Dict[str, Any]) -> Dict[str, Any]: """ Entrypoint for HF Inference Endpoints. Expected input (JSON): { "inputs": { "audio_features": { "v1_snr": 15.0, ... }, "text_features": { "t1_explicit_busy": 0.8, ... } } } Returns: { "busy_score": 0.72, "confidence": 0.85, "recommendation": "EXIT", "ml_probability": 0.65, "evidence_details": [...] } """ # HF wraps payload in "inputs" inputs = data.get("inputs", data) audio_features = inputs.get("audio_features", {}) text_features = inputs.get("text_features", {}) # 1. Normalize normalized = self._normalize_features(audio_features, text_features) # 2. XGBoost inference import xgboost as xgb dmatrix = xgb.DMatrix(normalized.reshape(1, -1)) ml_prob = float(self.model.predict(dmatrix)[0]) # 3. Evidence accumulation scoring final_score, confidence, details = self._score_with_evidence( ml_prob, audio_features, text_features ) # 4. Recommendation recommendation = self._get_recommendation(final_score) return { "busy_score": round(final_score, 4), "confidence": round(confidence, 4), "recommendation": recommendation, "ml_probability": round(ml_prob, 4), "evidence_details": details, } # --------------------------------------------------------------------- # # Normalization (mirrors src/normalization.py FeatureNormalizer) # --------------------------------------------------------------------- # def _normalize_value(self, value: float, min_val: float, max_val: float) -> float: if max_val == min_val: return 0.0 value = max(min_val, min(max_val, value)) return (value - min_val) / (max_val - min_val) def _normalize_features( self, audio_features: Dict[str, float], text_features: Dict[str, float], ) -> np.ndarray: """Min-max normalize all 26 features and concatenate.""" voice_norm = [] for feat in self.voice_order: val = audio_features.get(feat, 0.0) lo, hi = self.voice_ranges[feat] voice_norm.append(self._normalize_value(val, lo, hi)) text_norm = [] for feat in self.text_order: val = text_features.get(feat, 0.0) lo, hi = self.text_ranges[feat] text_norm.append(self._normalize_value(val, lo, hi)) return np.array(voice_norm + text_norm, dtype=np.float32) # --------------------------------------------------------------------- # # Evidence scoring (mirrors src/scoring_engine.py ScoringEngine) # --------------------------------------------------------------------- # @staticmethod def _sigmoid(x: float) -> float: return 1.0 / (1.0 + math.exp(-x)) @staticmethod def _logit(p: float) -> float: p = max(0.01, min(0.99, p)) return math.log(p / (1.0 - p)) def _score_with_evidence( self, ml_prob: float, audio_features: Dict[str, float], text_features: Dict[str, float], ) -> Tuple[float, float, list]: """Evidence accumulation scoring exactly matching ScoringEngine.calculate_score.""" evidence = 0.0 details = [] # --- Text evidence --- explicit = text_features.get("t1_explicit_busy", 0.0) if explicit > 0.5: pts = self.weights["explicit_busy"] * explicit evidence += pts details.append(f"Explicit Intent (+{pts:.1f})") explicit_free = text_features.get("t0_explicit_free", 0.0) if explicit_free > 0.5: pts = self.weights["explicit_free"] * explicit_free evidence += pts details.append(f"Explicit Free ({pts:.1f})") short_ratio = text_features.get("t3_short_ratio", 0.0) if short_ratio > 0.3: pts = self.weights["short_answers"] * short_ratio evidence += pts details.append(f"Brief Responses (+{pts:.1f})") deflection = text_features.get("t6_deflection", 0.0) if deflection > 0.1: pts = self.weights["deflection"] * deflection evidence += pts details.append(f"Deflection (+{pts:.1f})") # --- Audio evidence --- traffic = audio_features.get("v2_noise_traffic", 0.0) if traffic > 0.5: pts = self.weights["traffic_noise"] * traffic evidence += pts details.append(f"Traffic Context (+{pts:.1f})") rate = audio_features.get("v3_speech_rate", 0.0) if rate > 3.5: pts = self.weights["rushed_speech"] evidence += pts details.append(f"Rushed Speech (+{pts:.1f})") pitch_std = audio_features.get("v5_pitch_std", 0.0) if pitch_std > 80.0: evidence += 0.5 details.append("Voice Stress (+0.5)") emotion_stress = audio_features.get("v11_emotion_stress", 0.0) if emotion_stress > 0.6: pts = self.weights["emotion_stress"] * emotion_stress evidence += pts details.append(f"Emotional Stress (+{pts:.1f})") emotion_energy = audio_features.get("v12_emotion_energy", 0.0) if emotion_energy > 0.7: pts = self.weights["emotion_energy"] * emotion_energy evidence += pts details.append(f"High Energy (+{pts:.1f})") # --- ML baseline --- ml_evidence = self._logit(ml_prob) * self.weights["ml_model_factor"] evidence += ml_evidence details.append(f"ML Baseline ({ml_evidence:+.1f})") # --- Final --- final_score = self._sigmoid(evidence) confidence = float(math.tanh(abs(evidence) / 2.0)) return final_score, confidence, details def _get_recommendation(self, score: float) -> str: if score < self.thresholds["continue"]: return "CONTINUE" elif score < self.thresholds["check_in"]: return "CHECK_IN" else: return "EXIT"