|
|
import os |
|
|
import numpy as np |
|
|
import librosa |
|
|
import noisereduce as nr |
|
|
import parselmouth |
|
|
from parselmouth.praat import call |
|
|
import joblib |
|
|
from typing import Dict, Optional |
|
|
|
|
|
class SpeakerClassifier: |
|
|
def __init__(self): |
|
|
"""Initialize models and ensure they're loaded once.""" |
|
|
self.gender_model = joblib.load("stacked_gender_model.joblib") |
|
|
self.age_model = joblib.load("stacked_age_model.joblib") |
|
|
|
|
|
def predict(self, audio_path: str) -> Dict[str, str]: |
|
|
""" |
|
|
Predict gender and age from an audio file. |
|
|
Returns: {'gender': 'male/female', 'age': '20s/50s'} |
|
|
""" |
|
|
features = self._extract_features(audio_path) |
|
|
if features is None: |
|
|
return {"error": "Feature extraction failed"} |
|
|
|
|
|
|
|
|
gender_num = self.gender_model.predict([features])[0] |
|
|
age_num = self.age_model.predict([features])[0] |
|
|
|
|
|
|
|
|
gender = "male" if gender_num == 0 else "female" |
|
|
age = "20s" if age_num == 0 else "50s" |
|
|
|
|
|
return {"gender": gender, "age": age} |
|
|
|
|
|
|
|
|
@staticmethod |
|
|
def _normalize_volume(audio, target_dBFS=-20): |
|
|
rms = np.sqrt(np.mean(audio**2)) |
|
|
gain = 10**((target_dBFS - 20*np.log10(rms))/20) |
|
|
return audio * gain |
|
|
|
|
|
@staticmethod |
|
|
def _remove_silence(audio, top_db=20): |
|
|
intervals = librosa.effects.split(audio, top_db=top_db) |
|
|
return np.concatenate([audio[start:end] for start, end in intervals]) |
|
|
|
|
|
@staticmethod |
|
|
def _equalize_audio(audio, sr, bass_boost=2, treble_boost=1.5): |
|
|
S = librosa.stft(audio) |
|
|
freqs = librosa.fft_frequencies(sr=sr) |
|
|
S[freqs < 250] *= bass_boost |
|
|
S[freqs > 4000] *= treble_boost |
|
|
return librosa.istft(S) |
|
|
|
|
|
def _preprocess_audio(self, audio, sr, target_sr=16000): |
|
|
audio = self._remove_silence(audio) |
|
|
audio = nr.reduce_noise(y=audio, sr=target_sr) |
|
|
audio = self._normalize_volume(audio) |
|
|
audio = self._equalize_audio(audio, target_sr) |
|
|
return audio |
|
|
|
|
|
def _extract_formants(self, y, sr): |
|
|
try: |
|
|
sound = parselmouth.Sound(y, sampling_frequency=sr) |
|
|
formant = sound.to_formant_burg(time_step=0.01) |
|
|
|
|
|
f1_list, f2_list, f3_list = [], [], [] |
|
|
for t in np.arange(0, sound.duration, 0.01): |
|
|
try: |
|
|
f1 = formant.get_value_at_time(1, t) |
|
|
f2 = formant.get_value_at_time(2, t) |
|
|
f3 = formant.get_value_at_time(3, t) |
|
|
if all(v and not np.isnan(v) for v in [f1, f2, f3]): |
|
|
f1_list.append(f1) |
|
|
f2_list.append(f2) |
|
|
f3_list.append(f3) |
|
|
except Exception: |
|
|
continue |
|
|
|
|
|
features = [ |
|
|
np.mean(f1_list) if f1_list else 0, |
|
|
np.std(f1_list) if f1_list else 0, |
|
|
np.median(f1_list) if f1_list else 0, |
|
|
(np.percentile(f1_list, 75) - np.percentile(f1_list, 25)) if f1_list else 0, |
|
|
np.mean(f2_list) if f2_list else 0, |
|
|
np.std(f2_list) if f2_list else 0, |
|
|
np.median(f2_list) if f2_list else 0, |
|
|
(np.percentile(f2_list, 75) - np.percentile(f2_list, 25)) if f2_list else 0, |
|
|
np.mean(f3_list) if f3_list else 0, |
|
|
np.std(f3_list) if f3_list else 0, |
|
|
np.median(f3_list) if f3_list else 0, |
|
|
(np.percentile(f3_list, 75) - np.percentile(f3_list, 25)) if f3_list else 0 |
|
|
] |
|
|
return np.array(features) |
|
|
except Exception: |
|
|
return None |
|
|
|
|
|
def _calculate_jitter(self, y, sr): |
|
|
try: |
|
|
sound = parselmouth.Sound(y, sampling_frequency=sr) |
|
|
pointProcess = call(sound, "To PointProcess (periodic, cc)", 75, 500) |
|
|
harmonicity = call(sound, "To Harmonicity (cc)", 0.01, 75, 0.1, 1.0) |
|
|
|
|
|
metrics = np.array([ |
|
|
call(harmonicity, "Get mean", 0, 0), |
|
|
call(pointProcess, "Get jitter (local)", 0, 0, 0.0001, 0.02, 1.3), |
|
|
call(harmonicity, "Get mean", 0, 0), |
|
|
call(pointProcess, "Get jitter (local)", 0, 0, 0.0001, 0.02, 1.3), |
|
|
call(pointProcess, "Get jitter (local, absolute)", 0, 0, 0.0001, 0.02, 1.3), |
|
|
call(pointProcess, "Get jitter (rap)", 0, 0, 0.0001, 0.02, 1.3), |
|
|
call(pointProcess, "Get jitter (ppq5)", 0, 0, 0.0001, 0.02, 1.3), |
|
|
call(pointProcess, "Get jitter (ddp)", 0, 0, 0.0001, 0.02, 1.3), |
|
|
call([sound, pointProcess], "Get shimmer (local)", 0, 0, 0.0001, 0.02, 1.3, 1.6), |
|
|
call([sound, pointProcess], "Get shimmer (local_dB)", 0, 0, 0.0001, 0.02, 1.3, 1.6), |
|
|
call([sound, pointProcess], "Get shimmer (apq3)", 0, 0, 0.0001, 0.02, 1.3, 1.6) |
|
|
]) |
|
|
return metrics |
|
|
except Exception: |
|
|
return None |
|
|
|
|
|
def _extract_features(self, audio_path: str) -> Optional[np.ndarray]: |
|
|
"""Main feature extraction pipeline.""" |
|
|
try: |
|
|
y, sr = librosa.load(audio_path, sr=16000, duration=7) |
|
|
y = self._preprocess_audio(y, sr) |
|
|
|
|
|
|
|
|
jitter_features = self._calculate_jitter(y, sr) |
|
|
formant_features = self._extract_formants(y, sr) |
|
|
|
|
|
|
|
|
f0, _, _ = librosa.pyin(y, sr=sr, fmin=75, fmax=500, frame_length=1024) |
|
|
f0 = f0[~np.isnan(f0)] |
|
|
f0_features = self._get_f0_features(f0) if len(f0) > 0 else self._get_default_f0_features() |
|
|
|
|
|
|
|
|
mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13, n_fft=512, hop_length=256) |
|
|
mfcc_features = np.concatenate([np.mean(mfccs, axis=1), np.std(mfccs, axis=1)]) |
|
|
|
|
|
|
|
|
spectral_tilt = self._compute_spectral_tilt(y, sr) |
|
|
cpp = self._compute_cpp(y, sr) |
|
|
speaking_rate = self._compute_speaking_rate(y, sr) |
|
|
|
|
|
|
|
|
features = np.concatenate([ |
|
|
[spectral_tilt, cpp, speaking_rate], |
|
|
mfcc_features, |
|
|
formant_features, |
|
|
jitter_features, |
|
|
f0_features |
|
|
]) |
|
|
|
|
|
return features if not (np.any(np.isnan(features)) or np.any(np.isinf(features))) else None |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Feature extraction error: {str(e)}") |
|
|
return None |
|
|
|
|
|
|
|
|
@staticmethod |
|
|
def _get_f0_features(f0): |
|
|
f0_diff = np.diff(f0) |
|
|
return np.array([ |
|
|
0, |
|
|
float(np.mean(f0)), |
|
|
float(np.std(f0)), |
|
|
float(np.median(f0)), |
|
|
float(np.max(f0) - np.min(f0)), |
|
|
float(np.mean(np.abs(f0_diff)) / np.mean(f0)) if np.mean(f0) > 0 else 0.0 |
|
|
]) |
|
|
|
|
|
@staticmethod |
|
|
def _get_default_f0_features(): |
|
|
return np.array([1, 150.0, 20.0, 150.0, 100.0, 0.1]) |
|
|
|
|
|
@staticmethod |
|
|
def _compute_spectral_tilt(y, sr): |
|
|
S = np.abs(librosa.stft(y)) |
|
|
return np.max(S[1:10]) - np.max(S[10:20]) |
|
|
|
|
|
@staticmethod |
|
|
def _compute_cpp(y, sr): |
|
|
cepstrum = np.abs(np.fft.irfft(np.log(np.abs(np.fft.rfft(y))))) |
|
|
return np.max(cepstrum[10:60]) |
|
|
|
|
|
@staticmethod |
|
|
def _compute_speaking_rate(y, sr): |
|
|
onset_env = librosa.onset.onset_strength(y=y, sr=sr) |
|
|
peaks = librosa.util.peak_pick(onset_env, 3, 3, 3, 3, 0.5, 10) |
|
|
return len(peaks) / (len(y) / sr) |