the-busy-module-audio / handler.py
EurekaPotato's picture
Upload folder using huggingface_hub
9f95086 verified
"""
Audio Feature Extraction β€” Hugging Face Inference Endpoint Handler
Extracts all 17 voice features from uploaded audio:
v1_snr, v2_noise_* (5), v3_speech_rate, v4/v5_pitch, v6/v7_energy,
v8/v9/v10_pause, v11/v12/v13_emotion
Derived from: src/audio_features.py, src/emotion_features.py
"""
import io
import numpy as np
import librosa
from scipy import signal as scipy_signal
from typing import Dict
import torch
import torch.nn as nn
from torchvision import models
import warnings
warnings.filterwarnings("ignore")
# ──────────────────────────────────────────────────────────────────────── #
# Emotion CNN (mirrors src/emotion_features.py EmotionCNN)
# ──────────────────────────────────────────────────────────────────────── #
class EmotionCNN:
"""Lightweight CNN for emotion embedding from spectrograms (MobileNetV3)."""
def __init__(self):
self.model = models.mobilenet_v3_small(pretrained=True)
self.model.classifier = nn.Identity()
self.model.eval()
self.device = "cuda" if torch.cuda.is_available() else "cpu"
if self.device == "cuda":
self.model = self.model.cuda()
def audio_to_spectrogram(self, audio: np.ndarray, sr: int = 16000) -> np.ndarray:
mel_spec = librosa.feature.melspectrogram(
y=audio, sr=sr, n_fft=512, hop_length=64, n_mels=128, fmin=0, fmax=sr / 2
)
mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
mel_spec_db = np.clip(mel_spec_db, -80, 0)
mel_spec_norm = (mel_spec_db + 80) / 80
from skimage.transform import resize
mel_resized = resize(mel_spec_norm, (224, 224), mode="constant")
from matplotlib import cm
colormap = cm.get_cmap("jet")
rgb = colormap(mel_resized)[:, :, :3]
return np.transpose(rgb, (2, 0, 1)).astype(np.float32)
def extract_embedding(self, audio: np.ndarray, sr: int = 16000) -> np.ndarray:
spec_rgb = self.audio_to_spectrogram(audio, sr)
tensor = torch.from_numpy(spec_rgb).unsqueeze(0)
if self.device == "cuda":
tensor = tensor.cuda()
with torch.no_grad():
emb = self.model(tensor)
return emb.cpu().numpy().flatten()
# ──────────────────────────────────────────────────────────────────────── #
# Audio Feature Extractor (mirrors src/audio_features.py)
# ──────────────────────────────────────────────────────────────────────── #
class AudioFeatureExtractorEndpoint:
"""Stateless audio feature extraction for HF endpoint."""
def __init__(self):
self.sr = 16000
self.emotion_cnn = EmotionCNN()
# Load Silero VAD
try:
self.vad_model, self.vad_utils = torch.hub.load(
repo_or_dir="snakers4/silero-vad", model="silero_vad", trust_repo=True
)
self.get_speech_timestamps = self.vad_utils[0]
print("βœ“ Silero VAD loaded")
except Exception as e:
print(f"⚠ Silero VAD failed: {e}")
self.vad_model = None
# -------- V1: SNR --------
def extract_snr(self, audio: np.ndarray) -> float:
if len(audio) == 0:
return 0.0
frame_length = min(2048, len(audio))
frames = librosa.util.frame(audio, frame_length=frame_length, hop_length=frame_length // 2)
frame_energy = np.sum(frames ** 2, axis=0)
if len(frame_energy) < 2:
return 0.0
sorted_energy = np.sort(frame_energy)
n_noise = max(1, len(sorted_energy) // 5)
noise_floor = np.mean(sorted_energy[:n_noise])
signal_power = np.mean(sorted_energy)
if noise_floor <= 0:
return 40.0
snr = 10 * np.log10(signal_power / noise_floor + 1e-10)
return float(np.clip(snr, -10, 40))
# -------- V2: Noise classification --------
def classify_noise_type(self, audio: np.ndarray) -> Dict[str, float]:
if len(audio) < 2048:
return {
"v2_noise_traffic": 0.0, "v2_noise_office": 0.0,
"v2_noise_crowd": 0.0, "v2_noise_wind": 0.0, "v2_noise_clean": 1.0,
}
spec = np.abs(librosa.stft(audio, n_fft=2048))
freq_bins = librosa.fft_frequencies(sr=self.sr, n_fft=2048)
low = np.mean(spec[(freq_bins >= 50) & (freq_bins <= 500)])
mid = np.mean(spec[(freq_bins >= 500) & (freq_bins <= 2000)])
high = np.mean(spec[(freq_bins >= 2000) & (freq_bins <= 6000)])
total = low + mid + high + 1e-10
low_r, mid_r, high_r = low / total, mid / total, high / total
spectral_centroid = float(np.mean(librosa.feature.spectral_centroid(y=audio, sr=self.sr)))
spectral_flatness = float(np.mean(librosa.feature.spectral_flatness(y=audio)))
noise = {
"v2_noise_traffic": float(np.clip(low_r * 2 - 0.3, 0, 1)),
"v2_noise_office": float(np.clip(mid_r * 1.5 - 0.2, 0, 1) if spectral_flatness > 0.01 else 0),
"v2_noise_crowd": float(np.clip(mid_r * 2 - 0.5, 0, 1) if spectral_centroid > 1500 else 0),
"v2_noise_wind": float(np.clip(low_r * 3 - 0.8, 0, 1) if spectral_flatness > 0.1 else 0),
}
noise["v2_noise_clean"] = float(np.clip(1 - max(noise.values()), 0, 1))
return noise
# -------- V3: Speech rate --------
def extract_speech_rate(self, audio: np.ndarray, transcript: str) -> float:
if not transcript:
return 0.0
word_count = len(transcript.split())
duration = len(audio) / self.sr
if duration == 0:
return 0.0
return float(word_count / duration)
# -------- V4-V5: Pitch --------
def extract_pitch_features(self, audio: np.ndarray) -> Dict[str, float]:
try:
pitches, magnitudes = librosa.piptrack(y=audio, sr=self.sr)
pitch_values = pitches[magnitudes > np.median(magnitudes)]
pitch_values = pitch_values[pitch_values > 0]
if len(pitch_values) == 0:
return {"v4_pitch_mean": 0.0, "v5_pitch_std": 0.0}
return {
"v4_pitch_mean": float(np.mean(pitch_values)),
"v5_pitch_std": float(np.std(pitch_values)),
}
except Exception:
return {"v4_pitch_mean": 0.0, "v5_pitch_std": 0.0}
# -------- V6-V7: Energy --------
def extract_energy_features(self, audio: np.ndarray) -> Dict[str, float]:
rms = librosa.feature.rms(y=audio)[0]
return {"v6_energy_mean": float(np.mean(rms)), "v7_energy_std": float(np.std(rms))}
# -------- V8-V10: Pause features (Silero VAD) --------
def extract_pause_features(self, audio: np.ndarray) -> Dict[str, float]:
defaults = {"v8_pause_ratio": 0.0, "v9_avg_pause_dur": 0.0, "v10_mid_pause_cnt": 0}
if self.vad_model is None or len(audio) < self.sr:
return defaults
try:
audio_tensor = torch.FloatTensor(audio)
timestamps = self.get_speech_timestamps(audio_tensor, self.vad_model, sampling_rate=self.sr)
if not timestamps:
return {"v8_pause_ratio": 1.0, "v9_avg_pause_dur": len(audio) / self.sr, "v10_mid_pause_cnt": 0}
total_speech = sum(t["end"] - t["start"] for t in timestamps)
total_samples = len(audio)
pause_ratio = 1.0 - (total_speech / total_samples)
pauses = []
for i in range(1, len(timestamps)):
gap = (timestamps[i]["start"] - timestamps[i - 1]["end"]) / self.sr
if gap > 0.1:
pauses.append(gap)
return {
"v8_pause_ratio": float(np.clip(pause_ratio, 0, 1)),
"v9_avg_pause_dur": float(np.mean(pauses)) if pauses else 0.0,
"v10_mid_pause_cnt": len([p for p in pauses if 0.3 < p < 2.0]),
}
except Exception:
return defaults
# -------- V11-V13: Emotion features --------
def extract_emotion_features(self, audio: np.ndarray) -> Dict[str, float]:
try:
embedding = self.emotion_cnn.extract_embedding(audio, self.sr)
stress_indices = [0, 100, 200, 300, 400]
stress_values = embedding[stress_indices]
stress_score = float(np.clip(np.mean(np.abs(stress_values)), 0, 1))
return {
"v11_emotion_stress": stress_score,
"v12_emotion_energy": float(np.mean(np.abs(embedding[500:600]))),
"v13_emotion_valence": float(np.mean(embedding[700:800])),
}
except Exception:
return {"v11_emotion_stress": 0.0, "v12_emotion_energy": 0.0, "v13_emotion_valence": 0.0}
# -------- Main: extract all --------
def extract_all(self, audio: np.ndarray, transcript: str = "") -> Dict[str, float]:
features = {}
features["v1_snr"] = self.extract_snr(audio)
features.update(self.classify_noise_type(audio))
features["v3_speech_rate"] = self.extract_speech_rate(audio, transcript)
features.update(self.extract_pitch_features(audio))
features.update(self.extract_energy_features(audio))
features.update(self.extract_pause_features(audio))
features.update(self.extract_emotion_features(audio))
return features
# ──────────────────────────────────────────────────────────────────────── #
# FastAPI handler for deployment (HF Spaces / Cloud Run / Lambda)
# ──────────────────────────────────────────────────────────────────────── #
from fastapi import FastAPI, File, UploadFile, Form
from fastapi.middleware.cors import CORSMiddleware
import base64
app = FastAPI(title="Audio Feature Extraction API", version="1.0.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], allow_credentials=True,
allow_methods=["*"], allow_headers=["*"],
)
extractor = AudioFeatureExtractorEndpoint()
@app.get("/health")
async def health():
return {"status": "healthy", "vad_loaded": extractor.vad_model is not None}
@app.post("/extract-audio-features")
async def extract_audio_features(audio: UploadFile = File(...), transcript: str = Form("")):
"""Extract all 17 voice features from uploaded audio file."""
audio_bytes = await audio.read()
y, sr = librosa.load(io.BytesIO(audio_bytes), sr=16000, mono=True)
features = extractor.extract_all(y, transcript)
return features
@app.post("/extract-audio-features-base64")
async def extract_audio_features_base64(data: dict):
"""Extract features from base64-encoded audio (for Vercel serverless calls)."""
import soundfile as sf
audio_b64 = data.get("audio_base64", "")
transcript = data.get("transcript", "")
audio_bytes = base64.b64decode(audio_b64)
y, sr = sf.read(io.BytesIO(audio_bytes))
if len(y.shape) > 1:
y = np.mean(y, axis=1)
if sr != 16000:
y = librosa.resample(y, orig_sr=sr, target_sr=16000)
y = y.astype(np.float32)
features = extractor.extract_all(y, transcript)
return features
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)