import os import numpy as np import librosa import noisereduce as nr import parselmouth from parselmouth.praat import call import joblib from typing import Dict, Optional class SpeakerClassifier: def __init__(self): """Initialize models and ensure they're loaded once.""" self.gender_model = joblib.load("stacked_gender_model.joblib") self.age_model = joblib.load("stacked_age_model.joblib") def predict(self, audio_path: str) -> Dict[str, str]: """ Predict gender and age from an audio file. Returns: {'gender': 'male/female', 'age': '20s/50s'} """ features = self._extract_features(audio_path) if features is None: return {"error": "Feature extraction failed"} # Predict using your models gender_num = self.gender_model.predict([features])[0] age_num = self.age_model.predict([features])[0] # Map numerical predictions to labels gender = "male" if gender_num == 0 else "female" age = "20s" if age_num == 0 else "50s" return {"gender": gender, "age": age} # --- Your Feature Extraction Functions (adapted) --- @staticmethod def _normalize_volume(audio, target_dBFS=-20): rms = np.sqrt(np.mean(audio**2)) gain = 10**((target_dBFS - 20*np.log10(rms))/20) return audio * gain @staticmethod def _remove_silence(audio, top_db=20): intervals = librosa.effects.split(audio, top_db=top_db) return np.concatenate([audio[start:end] for start, end in intervals]) @staticmethod def _equalize_audio(audio, sr, bass_boost=2, treble_boost=1.5): S = librosa.stft(audio) freqs = librosa.fft_frequencies(sr=sr) S[freqs < 250] *= bass_boost S[freqs > 4000] *= treble_boost return librosa.istft(S) def _preprocess_audio(self, audio, sr, target_sr=16000): audio = self._remove_silence(audio) audio = nr.reduce_noise(y=audio, sr=target_sr) audio = self._normalize_volume(audio) audio = self._equalize_audio(audio, target_sr) return audio def _extract_formants(self, y, sr): try: sound = parselmouth.Sound(y, sampling_frequency=sr) formant = sound.to_formant_burg(time_step=0.01) f1_list, f2_list, f3_list = [], [], [] for t in np.arange(0, sound.duration, 0.01): try: f1 = formant.get_value_at_time(1, t) f2 = formant.get_value_at_time(2, t) f3 = formant.get_value_at_time(3, t) if all(v and not np.isnan(v) for v in [f1, f2, f3]): f1_list.append(f1) f2_list.append(f2) f3_list.append(f3) except Exception: continue features = [ np.mean(f1_list) if f1_list else 0, np.std(f1_list) if f1_list else 0, np.median(f1_list) if f1_list else 0, (np.percentile(f1_list, 75) - np.percentile(f1_list, 25)) if f1_list else 0, # IQR np.mean(f2_list) if f2_list else 0, np.std(f2_list) if f2_list else 0, np.median(f2_list) if f2_list else 0, (np.percentile(f2_list, 75) - np.percentile(f2_list, 25)) if f2_list else 0, # IQR np.mean(f3_list) if f3_list else 0, np.std(f3_list) if f3_list else 0, np.median(f3_list) if f3_list else 0, (np.percentile(f3_list, 75) - np.percentile(f3_list, 25)) if f3_list else 0 # IQR ] return np.array(features) except Exception: return None def _calculate_jitter(self, y, sr): try: sound = parselmouth.Sound(y, sampling_frequency=sr) pointProcess = call(sound, "To PointProcess (periodic, cc)", 75, 500) harmonicity = call(sound, "To Harmonicity (cc)", 0.01, 75, 0.1, 1.0) metrics = np.array([ call(harmonicity, "Get mean", 0, 0), call(pointProcess, "Get jitter (local)", 0, 0, 0.0001, 0.02, 1.3), call(harmonicity, "Get mean", 0, 0), call(pointProcess, "Get jitter (local)", 0, 0, 0.0001, 0.02, 1.3), call(pointProcess, "Get jitter (local, absolute)", 0, 0, 0.0001, 0.02, 1.3), call(pointProcess, "Get jitter (rap)", 0, 0, 0.0001, 0.02, 1.3), call(pointProcess, "Get jitter (ppq5)", 0, 0, 0.0001, 0.02, 1.3), call(pointProcess, "Get jitter (ddp)", 0, 0, 0.0001, 0.02, 1.3), call([sound, pointProcess], "Get shimmer (local)", 0, 0, 0.0001, 0.02, 1.3, 1.6), call([sound, pointProcess], "Get shimmer (local_dB)", 0, 0, 0.0001, 0.02, 1.3, 1.6), call([sound, pointProcess], "Get shimmer (apq3)", 0, 0, 0.0001, 0.02, 1.3, 1.6) ]) return metrics except Exception: return None def _extract_features(self, audio_path: str) -> Optional[np.ndarray]: """Main feature extraction pipeline.""" try: y, sr = librosa.load(audio_path, sr=16000, duration=7) y = self._preprocess_audio(y, sr) # Extract all feature types jitter_features = self._calculate_jitter(y, sr) formant_features = self._extract_formants(y, sr) # F0 features f0, _, _ = librosa.pyin(y, sr=sr, fmin=75, fmax=500, frame_length=1024) f0 = f0[~np.isnan(f0)] f0_features = self._get_f0_features(f0) if len(f0) > 0 else self._get_default_f0_features() # MFCCs mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13, n_fft=512, hop_length=256) mfcc_features = np.concatenate([np.mean(mfccs, axis=1), np.std(mfccs, axis=1)]) # Spectral features spectral_tilt = self._compute_spectral_tilt(y, sr) cpp = self._compute_cpp(y, sr) speaking_rate = self._compute_speaking_rate(y, sr) # Combine all features features = np.concatenate([ [spectral_tilt, cpp, speaking_rate], mfcc_features, formant_features, jitter_features, f0_features ]) return features if not (np.any(np.isnan(features)) or np.any(np.isinf(features))) else None except Exception as e: print(f"Feature extraction error: {str(e)}") return None # Helper methods for feature extraction @staticmethod def _get_f0_features(f0): f0_diff = np.diff(f0) return np.array([ 0, # is_distorted=False float(np.mean(f0)), float(np.std(f0)), float(np.median(f0)), float(np.max(f0) - np.min(f0)), float(np.mean(np.abs(f0_diff)) / np.mean(f0)) if np.mean(f0) > 0 else 0.0 ]) @staticmethod def _get_default_f0_features(): return np.array([1, 150.0, 20.0, 150.0, 100.0, 0.1]) # Default values @staticmethod def _compute_spectral_tilt(y, sr): S = np.abs(librosa.stft(y)) return np.max(S[1:10]) - np.max(S[10:20]) @staticmethod def _compute_cpp(y, sr): cepstrum = np.abs(np.fft.irfft(np.log(np.abs(np.fft.rfft(y))))) return np.max(cepstrum[10:60]) @staticmethod def _compute_speaking_rate(y, sr): onset_env = librosa.onset.onset_strength(y=y, sr=sr) peaks = librosa.util.peak_pick(onset_env, 3, 3, 3, 3, 0.5, 10) return len(peaks) / (len(y) / sr)