AbosamraOnFire13's picture
Create infer.py
496a6ee verified
import os
import pandas as pd
import numpy as np
from tqdm import tqdm
import joblib
import librosa
import noisereduce as nr
import parselmouth
from parselmouth.praat import call
from concurrent.futures import ProcessPoolExecutor
def normalize_volume(audio, target_dBFS=-20):
rms = np.sqrt(np.mean(audio**2))
gain = 10**((target_dBFS - 20*np.log10(rms))/20)
return audio * gain
def remove_silence(audio, top_db=20):
intervals = librosa.effects.split(audio, top_db=top_db)
return np.concatenate([audio[start:end] for start, end in intervals])
def equalize_audio(audio, sr, bass_boost=2, treble_boost=1.5):
# Simple EQ example
S = librosa.stft(audio)
freqs = librosa.fft_frequencies(sr=sr)
# Bass boost (low frequencies)
bass_mask = freqs < 250
S[bass_mask] *= bass_boost
# Treble boost (high frequencies)
treble_mask = freqs > 4000
S[treble_mask] *= treble_boost
return librosa.istft(S)
def preprocess_audio(audio, sr, target_sr=16000):
# Remove silence
audio = remove_silence(audio)
# Reduce noise
audio = nr.reduce_noise(y=audio, sr=target_sr)
# Normalize volume
audio = normalize_volume(audio)
# Equalize frequency response
audio = equalize_audio(audio, target_sr)
return audio
def extract_formants(y, sr):
"""
Optimized formant extraction using vectorized operations
Returns 20 features (6 for F1, 6 for F2, 6 for F3, 2 ratios each for F2/F1 and F3/F1)
"""
try:
sound = parselmouth.Sound(y, sampling_frequency=sr)
# Use Praat's formant extractor
formant = sound.to_formant_burg(time_step=0.01)
# Get formant values for the first N frames (or average over time)
f1_list = []
f2_list = []
f3_list = []
for t in np.arange(0, sound.duration, 0.01):
try:
f1 = formant.get_value_at_time(1, t)
f2 = formant.get_value_at_time(2, t)
f3 = formant.get_value_at_time(3, t)
if f1 and f2 and f3 and not np.isnan(f1) and not np.isnan(f2) and not np.isnan(f3):
f1_list.append(f1)
f2_list.append(f2)
f3_list.append(f3)
except Exception:
continue
# Aggregate features: mean and std deviation
features = [
np.mean(f1_list) if f1_list else 0,
np.std(f1_list) if f1_list else 0,
np.median(f1_list) if f1_list else 0,
(np.percentile(f1_list, 75) - np.percentile(f1_list, 25)) if f1_list else 0, # IQR
np.mean(f2_list) if f2_list else 0,
np.std(f2_list) if f2_list else 0,
np.median(f2_list) if f2_list else 0,
(np.percentile(f2_list, 75) - np.percentile(f2_list, 25)) if f2_list else 0, # IQR
np.mean(f3_list) if f3_list else 0,
np.std(f3_list) if f3_list else 0,
np.median(f3_list) if f3_list else 0,
(np.percentile(f3_list, 75) - np.percentile(f3_list, 25)) if f3_list else 0 # IQR
]
return np.array(features)
except Exception as e:
return None
def calculate_jitter(y, sr,file_path):
try:
sound = parselmouth.Sound(y, sampling_frequency=sr)
pointProcess = call(sound, "To PointProcess (periodic, cc)", 75, 500)
harmonicity = call(sound, "To Harmonicity (cc)", 0.01, 75, 0.1, 1.0)
hnr = call(harmonicity, "Get mean", 0, 0)
pointProcess = call(sound, "To PointProcess (periodic, cc)", 75, 500)
localJitter = call(pointProcess, "Get jitter (local)", 0, 0, 0.0001, 0.02, 1.3)
localabsoluteJitter = call(pointProcess, "Get jitter (local, absolute)", 0, 0, 0.0001, 0.02, 1.3)
rapJitter = call(pointProcess, "Get jitter (rap)", 0, 0, 0.0001, 0.02, 1.3)
ddpJitter = call(pointProcess, "Get jitter (ddp)", 0, 0, 0.0001, 0.02, 1.3)
localShimmer = call([sound, pointProcess], "Get shimmer (local)", 0, 0, 0.0001, 0.02, 1.3, 1.6)
localdbShimmer = call([sound, pointProcess], "Get shimmer (local_dB)", 0, 0, 0.0001, 0.02, 1.3, 1.6)
metrics = np.array([
hnr, # Harmonic-to-Noise Ratio (HNR) in dB
localJitter, # Local jitter (%)
localabsoluteJitter, # Local absolute jitter (seconds)
rapJitter, # RAP jitter (%)
ddpJitter, # DDP jitter (%)
localShimmer, # Local shimmer (%)
localdbShimmer, # Local shimmer (dB)
])
return metrics
except Exception as e:
return None
def extract_features(file_path, n_mfcc=13, sr=16000, duration=7):
"""Extracts MFCCs with fixed-length padding/trimming."""
try:
# Load audio (resampled to `sr` Hz)
y, sr = librosa.load(file_path, sr=sr, duration=duration)
y = preprocess_audio(y, sr)
jitter_features = calculate_jitter(y,sr,file_path)
# if jitter_features==None or (np.any(np.isnan(jitter_features)) or
# np.any(np.isinf(jitter_features))):
# return("jitter")
# Extract fundamental frequency using a probabilistic approach
f0_mean = 150.0 # Neutral speech pitch
f0_std = 20.0 # Moderate variability
f0_median = 150.0
f0_range = 100.0 # Max - min
f0_norm_diff = 0.1 # Normalized mean abs difference
is_distorted = 1 # Explicit flag
f0, _, _ = librosa.pyin(y, sr=sr, fmin=75, fmax=500, frame_length=1024)
f0 = f0[~np.isnan(f0)]
if len(f0) > 0:
is_distorted = 0
f0_diff = np.diff(f0)
f0_mean = float(np.mean(f0)) # Ensure scalar value
f0_std = float(np.std(f0)) # Ensure scalar value
f0_median = float(np.median(f0)) # Ensure scalar value
f0_range = float(np.max(f0) - np.min(f0)) # Ensure scalar value
f0_norm_diff = float(np.mean(np.abs(f0_diff)) / f0_mean) if f0_mean > 0 else 0.0
# Create the feature array ensuring all elements are scalars
f0_features = np.array([
float(is_distorted),
float(f0_mean),
float(f0_std),
float(f0_median),
float(f0_range),
float(f0_norm_diff)
])
# if f0_features==None or (np.any(np.isnan(f0_features)) or
# np.any(np.isinf(f0_features))):
# return("f0")
formant_features = extract_formants(y,sr)
# if formant_features==None or (np.any(np.isnan(formant_features)) or
# np.any(np.isinf(formant_features))):
# return("formant")
# Extract MFCCs (shape: [n_mfcc, time_frames])
mfccs = librosa.feature.mfcc(
y=y, sr=sr, n_mfcc=n_mfcc,
n_fft=512, hop_length=256
)
# # Aggregate statistics over time (mean + std)
mfcc_features = np.concatenate([np.mean(mfccs, axis=1), np.std(mfccs, axis=1)])
# if mfcc_features==None or (np.any(np.isnan(mfcc_features)) or
# np.any(np.isinf(mfcc_features))):
# return("mfcc")
# --- New Feature 2: Spectral Tilt (H1-H2) ---
def compute_spectral_tilt(y, sr):
S = np.abs(librosa.stft(y))
h1 = np.max(S[1:10]) # First harmonic (avoid DC)
h2 = np.max(S[10:20]) # Second harmonic
return h1 - h2
spectral_tilt = compute_spectral_tilt(y, sr)
# --- New Feature 4: Cepstral Peak Prominence (CPP) ---
def compute_cpp(y, sr):
cepstrum = np.abs(np.fft.irfft(np.log(np.abs(np.fft.rfft(y)))))
cpp = np.max(cepstrum[10:60]) # Peak in typical F0 range
return cpp
cpp = compute_cpp(y, sr)
# --- New Feature 5: Speaking Rate (Syllables per Second) ---
def compute_speaking_rate(y, sr):
onset_env = librosa.onset.onset_strength(y=y, sr=sr)
peaks = librosa.util.peak_pick(onset_env, pre_max=3, post_max=3, pre_avg=3, post_avg=3, delta=0.5, wait=10)
return len(peaks) / (len(y) / sr)
speaking_rate = compute_speaking_rate(y, sr)
# Return the 5 new features
features = np.concatenate([
[spectral_tilt, cpp, speaking_rate],
mfcc_features,
formant_features,
jitter_features,
f0_features
])
if (np.any(np.isnan(features)) or
np.any(np.isinf(features))):
return None
return features
except Exception as e:
return None
def process_file(file_path):
if file_path.lower().endswith(('.wav', '.mp3')):
features = extract_features(file_path)
return (file_path, features)
return None
def testing_pipeline(folder_path):
# Load models from file paths
model_gender = joblib.load("stacked_age_model.joblib")
model_age = joblib.load("stacked_gender_model.joblib")
_, features = process_file(folder_path)
features_df = pd.DataFrame.from_dict(features, orient='index')
non_nan_indices = features_df.dropna().index
X = features_df.loc[non_nan_indices]
# Step 3: Predict
y_pred_age = model_age.predict(X)
y_pred_gender = model_gender.predict(X)
y_pred_combined = (y_pred_age << 1) + y_pred_gender
# Step 4: Write to text file
return y_pred_combined[0]
print("Predictions written to predictions.txt")
if __name__ == "__main__":
import sys
testing_pipeline(sys.argv[1])