Spaces:
Runtime error
Runtime error
| import os | |
| import numpy as np | |
| import librosa | |
| import noisereduce as nr | |
| import parselmouth | |
| from parselmouth.praat import call | |
| import joblib | |
| from typing import Dict, Optional | |
| class SpeakerClassifier: | |
| def __init__(self): | |
| """Initialize models and ensure they're loaded once.""" | |
| self.gender_model = joblib.load("stacked_gender_model.joblib") | |
| self.age_model = joblib.load("stacked_age_model.joblib") | |
| def predict(self, audio_path: str) -> Dict[str, str]: | |
| """ | |
| Predict gender and age from an audio file. | |
| Returns: {'gender': 'male/female', 'age': '20s/50s'} | |
| """ | |
| features = self._extract_features(audio_path) | |
| if features is None: | |
| return {"error": "Feature extraction failed"} | |
| # Predict using your models | |
| gender_num = self.gender_model.predict([features])[0] | |
| age_num = self.age_model.predict([features])[0] | |
| # Map numerical predictions to labels | |
| gender = "male" if gender_num == 0 else "female" | |
| age = "20s" if age_num == 0 else "50s" | |
| return {"gender": gender, "age": age} | |
| # --- Your Feature Extraction Functions (adapted) --- | |
| def _normalize_volume(audio, target_dBFS=-20): | |
| rms = np.sqrt(np.mean(audio**2)) | |
| gain = 10**((target_dBFS - 20*np.log10(rms))/20) | |
| return audio * gain | |
| def _remove_silence(audio, top_db=20): | |
| intervals = librosa.effects.split(audio, top_db=top_db) | |
| return np.concatenate([audio[start:end] for start, end in intervals]) | |
| def _equalize_audio(audio, sr, bass_boost=2, treble_boost=1.5): | |
| S = librosa.stft(audio) | |
| freqs = librosa.fft_frequencies(sr=sr) | |
| S[freqs < 250] *= bass_boost | |
| S[freqs > 4000] *= treble_boost | |
| return librosa.istft(S) | |
| def _preprocess_audio(self, audio, sr, target_sr=16000): | |
| audio = self._remove_silence(audio) | |
| audio = nr.reduce_noise(y=audio, sr=target_sr) | |
| audio = self._normalize_volume(audio) | |
| audio = self._equalize_audio(audio, target_sr) | |
| return audio | |
| def _extract_formants(self, y, sr): | |
| try: | |
| sound = parselmouth.Sound(y, sampling_frequency=sr) | |
| formant = sound.to_formant_burg(time_step=0.01) | |
| f1_list, f2_list, f3_list = [], [], [] | |
| for t in np.arange(0, sound.duration, 0.01): | |
| try: | |
| f1 = formant.get_value_at_time(1, t) | |
| f2 = formant.get_value_at_time(2, t) | |
| f3 = formant.get_value_at_time(3, t) | |
| if all(v and not np.isnan(v) for v in [f1, f2, f3]): | |
| f1_list.append(f1) | |
| f2_list.append(f2) | |
| f3_list.append(f3) | |
| except Exception: | |
| continue | |
| features = [ | |
| np.mean(f1_list) if f1_list else 0, | |
| np.std(f1_list) if f1_list else 0, | |
| np.median(f1_list) if f1_list else 0, | |
| (np.percentile(f1_list, 75) - np.percentile(f1_list, 25)) if f1_list else 0, # IQR | |
| np.mean(f2_list) if f2_list else 0, | |
| np.std(f2_list) if f2_list else 0, | |
| np.median(f2_list) if f2_list else 0, | |
| (np.percentile(f2_list, 75) - np.percentile(f2_list, 25)) if f2_list else 0, # IQR | |
| np.mean(f3_list) if f3_list else 0, | |
| np.std(f3_list) if f3_list else 0, | |
| np.median(f3_list) if f3_list else 0, | |
| (np.percentile(f3_list, 75) - np.percentile(f3_list, 25)) if f3_list else 0 # IQR | |
| ] | |
| return np.array(features) | |
| except Exception: | |
| return None | |
| def _calculate_jitter(self, y, sr): | |
| try: | |
| sound = parselmouth.Sound(y, sampling_frequency=sr) | |
| pointProcess = call(sound, "To PointProcess (periodic, cc)", 75, 500) | |
| harmonicity = call(sound, "To Harmonicity (cc)", 0.01, 75, 0.1, 1.0) | |
| metrics = np.array([ | |
| call(harmonicity, "Get mean", 0, 0), | |
| call(pointProcess, "Get jitter (local)", 0, 0, 0.0001, 0.02, 1.3), | |
| call(harmonicity, "Get mean", 0, 0), | |
| call(pointProcess, "Get jitter (local)", 0, 0, 0.0001, 0.02, 1.3), | |
| call(pointProcess, "Get jitter (local, absolute)", 0, 0, 0.0001, 0.02, 1.3), | |
| call(pointProcess, "Get jitter (rap)", 0, 0, 0.0001, 0.02, 1.3), | |
| call(pointProcess, "Get jitter (ppq5)", 0, 0, 0.0001, 0.02, 1.3), | |
| call(pointProcess, "Get jitter (ddp)", 0, 0, 0.0001, 0.02, 1.3), | |
| call([sound, pointProcess], "Get shimmer (local)", 0, 0, 0.0001, 0.02, 1.3, 1.6), | |
| call([sound, pointProcess], "Get shimmer (local_dB)", 0, 0, 0.0001, 0.02, 1.3, 1.6), | |
| call([sound, pointProcess], "Get shimmer (apq3)", 0, 0, 0.0001, 0.02, 1.3, 1.6) | |
| ]) | |
| return metrics | |
| except Exception: | |
| return None | |
| def _extract_features(self, audio_path: str) -> Optional[np.ndarray]: | |
| """Main feature extraction pipeline with detailed error reporting.""" | |
| print(f"\n=== Processing {audio_path} ===") | |
| try: | |
| # 1. Audio Loading | |
| print("Loading audio file...") | |
| y, sr = librosa.load(audio_path, sr=16000, duration=7) | |
| print(f"Successfully loaded: {len(y)} samples at {sr}Hz") | |
| # 2. Preprocessing | |
| print("Preprocessing audio...") | |
| y = self._preprocess_audio(y, sr) | |
| print("Audio preprocessed successfully") | |
| # 3. Feature Extraction | |
| print("Extracting jitter features...") | |
| jitter_features = self._calculate_jitter(y, sr) | |
| print(f"Jitter features: {jitter_features is not None}") | |
| print("Extracting formants...") | |
| formant_features = self._extract_formants(y, sr) | |
| print(f"Formant features: {formant_features is not None}") | |
| print("Extracting F0 features...") | |
| f0, _, _ = librosa.pyin(y, sr=sr, fmin=75, fmax=500, frame_length=1024) | |
| f0 = f0[~np.isnan(f0)] | |
| print(f"Found {len(f0)} valid F0 values") | |
| f0_features = self._get_f0_features(f0) if len(f0) > 0 else self._get_default_f0_features() | |
| print("Extracting MFCCs...") | |
| mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13, n_fft=512, hop_length=256) | |
| mfcc_features = np.concatenate([np.mean(mfccs, axis=1), np.std(mfccs, axis=1)]) | |
| # Combine all features | |
| features = np.concatenate([ | |
| [self._compute_spectral_tilt(y, sr), | |
| self._compute_cpp(y, sr), | |
| self._compute_speaking_rate(y, sr)], | |
| mfcc_features, | |
| formant_features, | |
| jitter_features, | |
| f0_features | |
| ]) | |
| if np.any(np.isnan(features)) or np.any(np.isinf(features)): | |
| print("Warning: Features contain NaN/inf values") | |
| return None | |
| print("Feature extraction successful!") | |
| return features | |
| except Exception as e: | |
| print(f"Feature extraction failed: {str(e)}") | |
| import traceback | |
| traceback.print_exc() | |
| return None | |
| # Helper methods for feature extraction | |
| def _get_f0_features(f0): | |
| f0_diff = np.diff(f0) | |
| return np.array([ | |
| 0, # is_distorted=False | |
| float(np.mean(f0)), | |
| float(np.std(f0)), | |
| float(np.median(f0)), | |
| float(np.max(f0) - np.min(f0)), | |
| float(np.mean(np.abs(f0_diff)) / np.mean(f0)) if np.mean(f0) > 0 else 0.0 | |
| ]) | |
| def _get_default_f0_features(): | |
| return np.array([1, 150.0, 20.0, 150.0, 100.0, 0.1]) # Default values | |
| def _compute_spectral_tilt(y, sr): | |
| S = np.abs(librosa.stft(y)) | |
| return np.max(S[1:10]) - np.max(S[10:20]) | |
| def _compute_cpp(y, sr): | |
| cepstrum = np.abs(np.fft.irfft(np.log(np.abs(np.fft.rfft(y))))) | |
| return np.max(cepstrum[10:60]) | |
| def _compute_speaking_rate(y, sr): | |
| onset_env = librosa.onset.onset_strength(y=y, sr=sr) | |
| peaks = librosa.util.peak_pick(onset_env, 3, 3, 3, 3, 0.5, 10) | |
| return len(peaks) / (len(y) / sr) |