""" Audio Feature Extraction — Hugging Face Inference Endpoint Handler Extracts all 17 voice features from uploaded audio: v1_snr, v2_noise_* (5), v3_speech_rate, v4/v5_pitch, v6/v7_energy, v8/v9/v10_pause, v11/v12/v13_emotion Derived from: src/audio_features.py, src/emotion_features.py """ import io import numpy as np import librosa from scipy import signal as scipy_signal from typing import Dict import torch import torch.nn as nn from torchvision import models import warnings warnings.filterwarnings("ignore") # ──────────────────────────────────────────────────────────────────────── # # Emotion CNN (mirrors src/emotion_features.py EmotionCNN) # ──────────────────────────────────────────────────────────────────────── # class EmotionCNN: """Lightweight CNN for emotion embedding from spectrograms (MobileNetV3).""" def __init__(self): self.model = models.mobilenet_v3_small(pretrained=True) self.model.classifier = nn.Identity() self.model.eval() self.device = "cuda" if torch.cuda.is_available() else "cpu" if self.device == "cuda": self.model = self.model.cuda() def audio_to_spectrogram(self, audio: np.ndarray, sr: int = 16000) -> np.ndarray: mel_spec = librosa.feature.melspectrogram( y=audio, sr=sr, n_fft=512, hop_length=64, n_mels=128, fmin=0, fmax=sr / 2 ) mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max) mel_spec_db = np.clip(mel_spec_db, -80, 0) mel_spec_norm = (mel_spec_db + 80) / 80 from skimage.transform import resize mel_resized = resize(mel_spec_norm, (224, 224), mode="constant") from matplotlib import cm colormap = cm.get_cmap("jet") rgb = colormap(mel_resized)[:, :, :3] return np.transpose(rgb, (2, 0, 1)).astype(np.float32) def extract_embedding(self, audio: np.ndarray, sr: int = 16000) -> np.ndarray: spec_rgb = self.audio_to_spectrogram(audio, sr) tensor = torch.from_numpy(spec_rgb).unsqueeze(0) if self.device == "cuda": tensor = tensor.cuda() with torch.no_grad(): emb = self.model(tensor) return emb.cpu().numpy().flatten() # ──────────────────────────────────────────────────────────────────────── # # Audio Feature Extractor (mirrors src/audio_features.py) # ──────────────────────────────────────────────────────────────────────── # class AudioFeatureExtractorEndpoint: """Stateless audio feature extraction for HF endpoint.""" def __init__(self): self.sr = 16000 self.emotion_cnn = EmotionCNN() # Load Silero VAD try: self.vad_model, self.vad_utils = torch.hub.load( repo_or_dir="snakers4/silero-vad", model="silero_vad", trust_repo=True ) self.get_speech_timestamps = self.vad_utils[0] print("✓ Silero VAD loaded") except Exception as e: print(f"⚠ Silero VAD failed: {e}") self.vad_model = None # -------- V1: SNR -------- def extract_snr(self, audio: np.ndarray) -> float: if len(audio) == 0: return 0.0 frame_length = min(2048, len(audio)) frames = librosa.util.frame(audio, frame_length=frame_length, hop_length=frame_length // 2) frame_energy = np.sum(frames ** 2, axis=0) if len(frame_energy) < 2: return 0.0 sorted_energy = np.sort(frame_energy) n_noise = max(1, len(sorted_energy) // 5) noise_floor = np.mean(sorted_energy[:n_noise]) signal_power = np.mean(sorted_energy) if noise_floor <= 0: return 40.0 snr = 10 * np.log10(signal_power / noise_floor + 1e-10) return float(np.clip(snr, -10, 40)) # -------- V2: Noise classification -------- def classify_noise_type(self, audio: np.ndarray) -> Dict[str, float]: if len(audio) < 2048: return { "v2_noise_traffic": 0.0, "v2_noise_office": 0.0, "v2_noise_crowd": 0.0, "v2_noise_wind": 0.0, "v2_noise_clean": 1.0, } spec = np.abs(librosa.stft(audio, n_fft=2048)) freq_bins = librosa.fft_frequencies(sr=self.sr, n_fft=2048) low = np.mean(spec[(freq_bins >= 50) & (freq_bins <= 500)]) mid = np.mean(spec[(freq_bins >= 500) & (freq_bins <= 2000)]) high = np.mean(spec[(freq_bins >= 2000) & (freq_bins <= 6000)]) total = low + mid + high + 1e-10 low_r, mid_r, high_r = low / total, mid / total, high / total spectral_centroid = float(np.mean(librosa.feature.spectral_centroid(y=audio, sr=self.sr))) spectral_flatness = float(np.mean(librosa.feature.spectral_flatness(y=audio))) noise = { "v2_noise_traffic": float(np.clip(low_r * 2 - 0.3, 0, 1)), "v2_noise_office": float(np.clip(mid_r * 1.5 - 0.2, 0, 1) if spectral_flatness > 0.01 else 0), "v2_noise_crowd": float(np.clip(mid_r * 2 - 0.5, 0, 1) if spectral_centroid > 1500 else 0), "v2_noise_wind": float(np.clip(low_r * 3 - 0.8, 0, 1) if spectral_flatness > 0.1 else 0), } noise["v2_noise_clean"] = float(np.clip(1 - max(noise.values()), 0, 1)) return noise # -------- V3: Speech rate -------- def extract_speech_rate(self, audio: np.ndarray, transcript: str) -> float: if not transcript: return 0.0 word_count = len(transcript.split()) duration = len(audio) / self.sr if duration == 0: return 0.0 return float(word_count / duration) # -------- V4-V5: Pitch -------- def extract_pitch_features(self, audio: np.ndarray) -> Dict[str, float]: try: pitches, magnitudes = librosa.piptrack(y=audio, sr=self.sr) pitch_values = pitches[magnitudes > np.median(magnitudes)] pitch_values = pitch_values[pitch_values > 0] if len(pitch_values) == 0: return {"v4_pitch_mean": 0.0, "v5_pitch_std": 0.0} return { "v4_pitch_mean": float(np.mean(pitch_values)), "v5_pitch_std": float(np.std(pitch_values)), } except Exception: return {"v4_pitch_mean": 0.0, "v5_pitch_std": 0.0} # -------- V6-V7: Energy -------- def extract_energy_features(self, audio: np.ndarray) -> Dict[str, float]: rms = librosa.feature.rms(y=audio)[0] return {"v6_energy_mean": float(np.mean(rms)), "v7_energy_std": float(np.std(rms))} # -------- V8-V10: Pause features (Silero VAD) -------- def extract_pause_features(self, audio: np.ndarray) -> Dict[str, float]: defaults = {"v8_pause_ratio": 0.0, "v9_avg_pause_dur": 0.0, "v10_mid_pause_cnt": 0} if self.vad_model is None or len(audio) < self.sr: return defaults try: audio_tensor = torch.FloatTensor(audio) timestamps = self.get_speech_timestamps(audio_tensor, self.vad_model, sampling_rate=self.sr) if not timestamps: return {"v8_pause_ratio": 1.0, "v9_avg_pause_dur": len(audio) / self.sr, "v10_mid_pause_cnt": 0} total_speech = sum(t["end"] - t["start"] for t in timestamps) total_samples = len(audio) pause_ratio = 1.0 - (total_speech / total_samples) pauses = [] for i in range(1, len(timestamps)): gap = (timestamps[i]["start"] - timestamps[i - 1]["end"]) / self.sr if gap > 0.1: pauses.append(gap) return { "v8_pause_ratio": float(np.clip(pause_ratio, 0, 1)), "v9_avg_pause_dur": float(np.mean(pauses)) if pauses else 0.0, "v10_mid_pause_cnt": len([p for p in pauses if 0.3 < p < 2.0]), } except Exception: return defaults # -------- V11-V13: Emotion features -------- def extract_emotion_features(self, audio: np.ndarray) -> Dict[str, float]: try: embedding = self.emotion_cnn.extract_embedding(audio, self.sr) stress_indices = [0, 100, 200, 300, 400] stress_values = embedding[stress_indices] stress_score = float(np.clip(np.mean(np.abs(stress_values)), 0, 1)) return { "v11_emotion_stress": stress_score, "v12_emotion_energy": float(np.mean(np.abs(embedding[500:600]))), "v13_emotion_valence": float(np.mean(embedding[700:800])), } except Exception: return {"v11_emotion_stress": 0.0, "v12_emotion_energy": 0.0, "v13_emotion_valence": 0.0} # -------- Main: extract all -------- def extract_all(self, audio: np.ndarray, transcript: str = "") -> Dict[str, float]: features = {} features["v1_snr"] = self.extract_snr(audio) features.update(self.classify_noise_type(audio)) features["v3_speech_rate"] = self.extract_speech_rate(audio, transcript) features.update(self.extract_pitch_features(audio)) features.update(self.extract_energy_features(audio)) features.update(self.extract_pause_features(audio)) features.update(self.extract_emotion_features(audio)) return features # ──────────────────────────────────────────────────────────────────────── # # FastAPI handler for deployment (HF Spaces / Cloud Run / Lambda) # ──────────────────────────────────────────────────────────────────────── # from fastapi import FastAPI, File, UploadFile, Form from fastapi.middleware.cors import CORSMiddleware import base64 app = FastAPI(title="Audio Feature Extraction API", version="1.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) extractor = AudioFeatureExtractorEndpoint() @app.get("/health") async def health(): return {"status": "healthy", "vad_loaded": extractor.vad_model is not None} @app.post("/extract-audio-features") async def extract_audio_features(audio: UploadFile = File(...), transcript: str = Form("")): """Extract all 17 voice features from uploaded audio file.""" audio_bytes = await audio.read() y, sr = librosa.load(io.BytesIO(audio_bytes), sr=16000, mono=True) features = extractor.extract_all(y, transcript) return features @app.post("/extract-audio-features-base64") async def extract_audio_features_base64(data: dict): """Extract features from base64-encoded audio (for Vercel serverless calls).""" import soundfile as sf audio_b64 = data.get("audio_base64", "") transcript = data.get("transcript", "") audio_bytes = base64.b64decode(audio_b64) y, sr = sf.read(io.BytesIO(audio_bytes)) if len(y.shape) > 1: y = np.mean(y, axis=1) if sr != 16000: y = librosa.resample(y, orig_sr=sr, target_sr=16000) y = y.astype(np.float32) features = extractor.extract_all(y, transcript) return features if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)