| """
|
| Audio Feature Extraction β Hugging Face Inference Endpoint Handler
|
|
|
| Extracts all 17 voice features from uploaded audio:
|
| v1_snr, v2_noise_* (5), v3_speech_rate, v4/v5_pitch, v6/v7_energy,
|
| v8/v9/v10_pause, v11/v12/v13_emotion
|
|
|
| Derived from: src/audio_features.py, src/emotion_features.py
|
| """
|
|
|
| import io
|
| import numpy as np
|
| import librosa
|
| from scipy import signal as scipy_signal
|
| from typing import Dict
|
| import torch
|
| import torch.nn as nn
|
| from torchvision import models
|
| import warnings
|
|
|
| warnings.filterwarnings("ignore")
|
|
|
|
|
|
|
|
|
|
|
|
|
| class EmotionCNN:
|
| """Lightweight CNN for emotion embedding from spectrograms (MobileNetV3)."""
|
|
|
| def __init__(self):
|
| self.model = models.mobilenet_v3_small(pretrained=True)
|
| self.model.classifier = nn.Identity()
|
| self.model.eval()
|
| self.device = "cuda" if torch.cuda.is_available() else "cpu"
|
| if self.device == "cuda":
|
| self.model = self.model.cuda()
|
|
|
| def audio_to_spectrogram(self, audio: np.ndarray, sr: int = 16000) -> np.ndarray:
|
| mel_spec = librosa.feature.melspectrogram(
|
| y=audio, sr=sr, n_fft=512, hop_length=64, n_mels=128, fmin=0, fmax=sr / 2
|
| )
|
| mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
|
| mel_spec_db = np.clip(mel_spec_db, -80, 0)
|
| mel_spec_norm = (mel_spec_db + 80) / 80
|
|
|
| from skimage.transform import resize
|
| mel_resized = resize(mel_spec_norm, (224, 224), mode="constant")
|
|
|
| from matplotlib import cm
|
| colormap = cm.get_cmap("jet")
|
| rgb = colormap(mel_resized)[:, :, :3]
|
| return np.transpose(rgb, (2, 0, 1)).astype(np.float32)
|
|
|
| def extract_embedding(self, audio: np.ndarray, sr: int = 16000) -> np.ndarray:
|
| spec_rgb = self.audio_to_spectrogram(audio, sr)
|
| tensor = torch.from_numpy(spec_rgb).unsqueeze(0)
|
| if self.device == "cuda":
|
| tensor = tensor.cuda()
|
| with torch.no_grad():
|
| emb = self.model(tensor)
|
| return emb.cpu().numpy().flatten()
|
|
|
|
|
|
|
|
|
|
|
|
|
| class AudioFeatureExtractorEndpoint:
|
| """Stateless audio feature extraction for HF endpoint."""
|
|
|
| def __init__(self):
|
| self.sr = 16000
|
| self.emotion_cnn = EmotionCNN()
|
|
|
|
|
| try:
|
| self.vad_model, self.vad_utils = torch.hub.load(
|
| repo_or_dir="snakers4/silero-vad", model="silero_vad", trust_repo=True
|
| )
|
| self.get_speech_timestamps = self.vad_utils[0]
|
| print("β Silero VAD loaded")
|
| except Exception as e:
|
| print(f"β Silero VAD failed: {e}")
|
| self.vad_model = None
|
|
|
|
|
| def extract_snr(self, audio: np.ndarray) -> float:
|
| if len(audio) == 0:
|
| return 0.0
|
| frame_length = min(2048, len(audio))
|
| frames = librosa.util.frame(audio, frame_length=frame_length, hop_length=frame_length // 2)
|
| frame_energy = np.sum(frames ** 2, axis=0)
|
| if len(frame_energy) < 2:
|
| return 0.0
|
| sorted_energy = np.sort(frame_energy)
|
| n_noise = max(1, len(sorted_energy) // 5)
|
| noise_floor = np.mean(sorted_energy[:n_noise])
|
| signal_power = np.mean(sorted_energy)
|
| if noise_floor <= 0:
|
| return 40.0
|
| snr = 10 * np.log10(signal_power / noise_floor + 1e-10)
|
| return float(np.clip(snr, -10, 40))
|
|
|
|
|
| def classify_noise_type(self, audio: np.ndarray) -> Dict[str, float]:
|
| if len(audio) < 2048:
|
| return {
|
| "v2_noise_traffic": 0.0, "v2_noise_office": 0.0,
|
| "v2_noise_crowd": 0.0, "v2_noise_wind": 0.0, "v2_noise_clean": 1.0,
|
| }
|
| spec = np.abs(librosa.stft(audio, n_fft=2048))
|
| freq_bins = librosa.fft_frequencies(sr=self.sr, n_fft=2048)
|
|
|
| low = np.mean(spec[(freq_bins >= 50) & (freq_bins <= 500)])
|
| mid = np.mean(spec[(freq_bins >= 500) & (freq_bins <= 2000)])
|
| high = np.mean(spec[(freq_bins >= 2000) & (freq_bins <= 6000)])
|
| total = low + mid + high + 1e-10
|
|
|
| low_r, mid_r, high_r = low / total, mid / total, high / total
|
| spectral_centroid = float(np.mean(librosa.feature.spectral_centroid(y=audio, sr=self.sr)))
|
| spectral_flatness = float(np.mean(librosa.feature.spectral_flatness(y=audio)))
|
|
|
| noise = {
|
| "v2_noise_traffic": float(np.clip(low_r * 2 - 0.3, 0, 1)),
|
| "v2_noise_office": float(np.clip(mid_r * 1.5 - 0.2, 0, 1) if spectral_flatness > 0.01 else 0),
|
| "v2_noise_crowd": float(np.clip(mid_r * 2 - 0.5, 0, 1) if spectral_centroid > 1500 else 0),
|
| "v2_noise_wind": float(np.clip(low_r * 3 - 0.8, 0, 1) if spectral_flatness > 0.1 else 0),
|
| }
|
| noise["v2_noise_clean"] = float(np.clip(1 - max(noise.values()), 0, 1))
|
| return noise
|
|
|
|
|
| def extract_speech_rate(self, audio: np.ndarray, transcript: str) -> float:
|
| if not transcript:
|
| return 0.0
|
| word_count = len(transcript.split())
|
| duration = len(audio) / self.sr
|
| if duration == 0:
|
| return 0.0
|
| return float(word_count / duration)
|
|
|
|
|
| def extract_pitch_features(self, audio: np.ndarray) -> Dict[str, float]:
|
| try:
|
| pitches, magnitudes = librosa.piptrack(y=audio, sr=self.sr)
|
| pitch_values = pitches[magnitudes > np.median(magnitudes)]
|
| pitch_values = pitch_values[pitch_values > 0]
|
| if len(pitch_values) == 0:
|
| return {"v4_pitch_mean": 0.0, "v5_pitch_std": 0.0}
|
| return {
|
| "v4_pitch_mean": float(np.mean(pitch_values)),
|
| "v5_pitch_std": float(np.std(pitch_values)),
|
| }
|
| except Exception:
|
| return {"v4_pitch_mean": 0.0, "v5_pitch_std": 0.0}
|
|
|
|
|
| def extract_energy_features(self, audio: np.ndarray) -> Dict[str, float]:
|
| rms = librosa.feature.rms(y=audio)[0]
|
| return {"v6_energy_mean": float(np.mean(rms)), "v7_energy_std": float(np.std(rms))}
|
|
|
|
|
| def extract_pause_features(self, audio: np.ndarray) -> Dict[str, float]:
|
| defaults = {"v8_pause_ratio": 0.0, "v9_avg_pause_dur": 0.0, "v10_mid_pause_cnt": 0}
|
| if self.vad_model is None or len(audio) < self.sr:
|
| return defaults
|
| try:
|
| audio_tensor = torch.FloatTensor(audio)
|
| timestamps = self.get_speech_timestamps(audio_tensor, self.vad_model, sampling_rate=self.sr)
|
| if not timestamps:
|
| return {"v8_pause_ratio": 1.0, "v9_avg_pause_dur": len(audio) / self.sr, "v10_mid_pause_cnt": 0}
|
|
|
| total_speech = sum(t["end"] - t["start"] for t in timestamps)
|
| total_samples = len(audio)
|
| pause_ratio = 1.0 - (total_speech / total_samples)
|
|
|
| pauses = []
|
| for i in range(1, len(timestamps)):
|
| gap = (timestamps[i]["start"] - timestamps[i - 1]["end"]) / self.sr
|
| if gap > 0.1:
|
| pauses.append(gap)
|
|
|
| return {
|
| "v8_pause_ratio": float(np.clip(pause_ratio, 0, 1)),
|
| "v9_avg_pause_dur": float(np.mean(pauses)) if pauses else 0.0,
|
| "v10_mid_pause_cnt": len([p for p in pauses if 0.3 < p < 2.0]),
|
| }
|
| except Exception:
|
| return defaults
|
|
|
|
|
| def extract_emotion_features(self, audio: np.ndarray) -> Dict[str, float]:
|
| try:
|
| embedding = self.emotion_cnn.extract_embedding(audio, self.sr)
|
| stress_indices = [0, 100, 200, 300, 400]
|
| stress_values = embedding[stress_indices]
|
| stress_score = float(np.clip(np.mean(np.abs(stress_values)), 0, 1))
|
| return {
|
| "v11_emotion_stress": stress_score,
|
| "v12_emotion_energy": float(np.mean(np.abs(embedding[500:600]))),
|
| "v13_emotion_valence": float(np.mean(embedding[700:800])),
|
| }
|
| except Exception:
|
| return {"v11_emotion_stress": 0.0, "v12_emotion_energy": 0.0, "v13_emotion_valence": 0.0}
|
|
|
|
|
| def extract_all(self, audio: np.ndarray, transcript: str = "") -> Dict[str, float]:
|
| features = {}
|
| features["v1_snr"] = self.extract_snr(audio)
|
| features.update(self.classify_noise_type(audio))
|
| features["v3_speech_rate"] = self.extract_speech_rate(audio, transcript)
|
| features.update(self.extract_pitch_features(audio))
|
| features.update(self.extract_energy_features(audio))
|
| features.update(self.extract_pause_features(audio))
|
| features.update(self.extract_emotion_features(audio))
|
| return features
|
|
|
|
|
|
|
|
|
|
|
|
|
| from fastapi import FastAPI, File, UploadFile, Form
|
| from fastapi.middleware.cors import CORSMiddleware
|
| import base64
|
|
|
| app = FastAPI(title="Audio Feature Extraction API", version="1.0.0")
|
| app.add_middleware(
|
| CORSMiddleware,
|
| allow_origins=["*"], allow_credentials=True,
|
| allow_methods=["*"], allow_headers=["*"],
|
| )
|
|
|
| extractor = AudioFeatureExtractorEndpoint()
|
|
|
|
|
| @app.get("/health")
|
| async def health():
|
| return {"status": "healthy", "vad_loaded": extractor.vad_model is not None}
|
|
|
|
|
| @app.post("/extract-audio-features")
|
| async def extract_audio_features(audio: UploadFile = File(...), transcript: str = Form("")):
|
| """Extract all 17 voice features from uploaded audio file."""
|
| audio_bytes = await audio.read()
|
| y, sr = librosa.load(io.BytesIO(audio_bytes), sr=16000, mono=True)
|
| features = extractor.extract_all(y, transcript)
|
| return features
|
|
|
|
|
| @app.post("/extract-audio-features-base64")
|
| async def extract_audio_features_base64(data: dict):
|
| """Extract features from base64-encoded audio (for Vercel serverless calls)."""
|
| import soundfile as sf
|
|
|
| audio_b64 = data.get("audio_base64", "")
|
| transcript = data.get("transcript", "")
|
|
|
| audio_bytes = base64.b64decode(audio_b64)
|
| y, sr = sf.read(io.BytesIO(audio_bytes))
|
| if len(y.shape) > 1:
|
| y = np.mean(y, axis=1)
|
| if sr != 16000:
|
| y = librosa.resample(y, orig_sr=sr, target_sr=16000)
|
| y = y.astype(np.float32)
|
|
|
| features = extractor.extract_all(y, transcript)
|
| return features
|
|
|
|
|
| if __name__ == "__main__":
|
| import uvicorn
|
| uvicorn.run(app, host="0.0.0.0", port=7860)
|
|
|