| | import os |
| | import pandas as pd |
| | import numpy as np |
| | from tqdm import tqdm |
| | import joblib |
| | import librosa |
| | import noisereduce as nr |
| | import parselmouth |
| | from parselmouth.praat import call |
| | from concurrent.futures import ProcessPoolExecutor |
| |
|
| | def normalize_volume(audio, target_dBFS=-20): |
| | rms = np.sqrt(np.mean(audio**2)) |
| | gain = 10**((target_dBFS - 20*np.log10(rms))/20) |
| | return audio * gain |
| |
|
| | def remove_silence(audio, top_db=20): |
| | intervals = librosa.effects.split(audio, top_db=top_db) |
| | return np.concatenate([audio[start:end] for start, end in intervals]) |
| |
|
| | def equalize_audio(audio, sr, bass_boost=2, treble_boost=1.5): |
| | |
| | S = librosa.stft(audio) |
| | freqs = librosa.fft_frequencies(sr=sr) |
| |
|
| | |
| | bass_mask = freqs < 250 |
| | S[bass_mask] *= bass_boost |
| |
|
| | |
| | treble_mask = freqs > 4000 |
| | S[treble_mask] *= treble_boost |
| |
|
| | return librosa.istft(S) |
| |
|
| | def preprocess_audio(audio, sr, target_sr=16000): |
| |
|
| | |
| | audio = remove_silence(audio) |
| |
|
| | |
| | audio = nr.reduce_noise(y=audio, sr=target_sr) |
| |
|
| | |
| | audio = normalize_volume(audio) |
| |
|
| | |
| | audio = equalize_audio(audio, target_sr) |
| |
|
| | return audio |
| |
|
| | def extract_formants(y, sr): |
| | """ |
| | Optimized formant extraction using vectorized operations |
| | Returns 20 features (6 for F1, 6 for F2, 6 for F3, 2 ratios each for F2/F1 and F3/F1) |
| | """ |
| | try: |
| | sound = parselmouth.Sound(y, sampling_frequency=sr) |
| |
|
| | |
| | formant = sound.to_formant_burg(time_step=0.01) |
| | |
| | f1_list = [] |
| | f2_list = [] |
| | f3_list = [] |
| | for t in np.arange(0, sound.duration, 0.01): |
| | try: |
| | f1 = formant.get_value_at_time(1, t) |
| | f2 = formant.get_value_at_time(2, t) |
| | f3 = formant.get_value_at_time(3, t) |
| | if f1 and f2 and f3 and not np.isnan(f1) and not np.isnan(f2) and not np.isnan(f3): |
| | f1_list.append(f1) |
| | f2_list.append(f2) |
| | f3_list.append(f3) |
| | except Exception: |
| | continue |
| | |
| | features = [ |
| | np.mean(f1_list) if f1_list else 0, |
| | np.std(f1_list) if f1_list else 0, |
| | np.median(f1_list) if f1_list else 0, |
| | (np.percentile(f1_list, 75) - np.percentile(f1_list, 25)) if f1_list else 0, |
| | np.mean(f2_list) if f2_list else 0, |
| | np.std(f2_list) if f2_list else 0, |
| | np.median(f2_list) if f2_list else 0, |
| | (np.percentile(f2_list, 75) - np.percentile(f2_list, 25)) if f2_list else 0, |
| | np.mean(f3_list) if f3_list else 0, |
| | np.std(f3_list) if f3_list else 0, |
| | np.median(f3_list) if f3_list else 0, |
| | (np.percentile(f3_list, 75) - np.percentile(f3_list, 25)) if f3_list else 0 |
| | ] |
| | return np.array(features) |
| | |
| | except Exception as e: |
| | return None |
| | def calculate_jitter(y, sr,file_path): |
| | try: |
| | sound = parselmouth.Sound(y, sampling_frequency=sr) |
| | pointProcess = call(sound, "To PointProcess (periodic, cc)", 75, 500) |
| | harmonicity = call(sound, "To Harmonicity (cc)", 0.01, 75, 0.1, 1.0) |
| | hnr = call(harmonicity, "Get mean", 0, 0) |
| | pointProcess = call(sound, "To PointProcess (periodic, cc)", 75, 500) |
| | localJitter = call(pointProcess, "Get jitter (local)", 0, 0, 0.0001, 0.02, 1.3) |
| | localabsoluteJitter = call(pointProcess, "Get jitter (local, absolute)", 0, 0, 0.0001, 0.02, 1.3) |
| | rapJitter = call(pointProcess, "Get jitter (rap)", 0, 0, 0.0001, 0.02, 1.3) |
| | ddpJitter = call(pointProcess, "Get jitter (ddp)", 0, 0, 0.0001, 0.02, 1.3) |
| | localShimmer = call([sound, pointProcess], "Get shimmer (local)", 0, 0, 0.0001, 0.02, 1.3, 1.6) |
| | localdbShimmer = call([sound, pointProcess], "Get shimmer (local_dB)", 0, 0, 0.0001, 0.02, 1.3, 1.6) |
| |
|
| | metrics = np.array([ |
| | hnr, |
| | localJitter, |
| | localabsoluteJitter, |
| | rapJitter, |
| | ddpJitter, |
| | localShimmer, |
| | localdbShimmer, |
| | ]) |
| | return metrics |
| | except Exception as e: |
| | return None |
| |
|
| | def extract_features(file_path, n_mfcc=13, sr=16000, duration=7): |
| | """Extracts MFCCs with fixed-length padding/trimming.""" |
| | try: |
| |
|
| | |
| | y, sr = librosa.load(file_path, sr=sr, duration=duration) |
| | y = preprocess_audio(y, sr) |
| |
|
| | jitter_features = calculate_jitter(y,sr,file_path) |
| |
|
| | |
| | |
| | |
| |
|
| | |
| | f0_mean = 150.0 |
| | f0_std = 20.0 |
| | f0_median = 150.0 |
| | f0_range = 100.0 |
| | f0_norm_diff = 0.1 |
| | is_distorted = 1 |
| |
|
| | f0, _, _ = librosa.pyin(y, sr=sr, fmin=75, fmax=500, frame_length=1024) |
| | f0 = f0[~np.isnan(f0)] |
| |
|
| | if len(f0) > 0: |
| | is_distorted = 0 |
| | f0_diff = np.diff(f0) |
| | f0_mean = float(np.mean(f0)) |
| | f0_std = float(np.std(f0)) |
| | f0_median = float(np.median(f0)) |
| | f0_range = float(np.max(f0) - np.min(f0)) |
| | f0_norm_diff = float(np.mean(np.abs(f0_diff)) / f0_mean) if f0_mean > 0 else 0.0 |
| |
|
| | |
| | f0_features = np.array([ |
| | float(is_distorted), |
| | float(f0_mean), |
| | float(f0_std), |
| | float(f0_median), |
| | float(f0_range), |
| | float(f0_norm_diff) |
| | ]) |
| |
|
| | |
| | |
| | |
| |
|
| | formant_features = extract_formants(y,sr) |
| |
|
| | |
| | |
| | |
| | |
| |
|
| | |
| |
|
| | mfccs = librosa.feature.mfcc( |
| | y=y, sr=sr, n_mfcc=n_mfcc, |
| | n_fft=512, hop_length=256 |
| | ) |
| |
|
| | |
| | mfcc_features = np.concatenate([np.mean(mfccs, axis=1), np.std(mfccs, axis=1)]) |
| |
|
| | |
| | |
| | |
| | |
| | def compute_spectral_tilt(y, sr): |
| | S = np.abs(librosa.stft(y)) |
| | h1 = np.max(S[1:10]) |
| | h2 = np.max(S[10:20]) |
| | return h1 - h2 |
| | spectral_tilt = compute_spectral_tilt(y, sr) |
| | |
| | |
| | def compute_cpp(y, sr): |
| | cepstrum = np.abs(np.fft.irfft(np.log(np.abs(np.fft.rfft(y))))) |
| | cpp = np.max(cepstrum[10:60]) |
| | return cpp |
| | cpp = compute_cpp(y, sr) |
| | |
| | |
| | def compute_speaking_rate(y, sr): |
| | onset_env = librosa.onset.onset_strength(y=y, sr=sr) |
| | peaks = librosa.util.peak_pick(onset_env, pre_max=3, post_max=3, pre_avg=3, post_avg=3, delta=0.5, wait=10) |
| | return len(peaks) / (len(y) / sr) |
| | speaking_rate = compute_speaking_rate(y, sr) |
| | |
| | |
| | features = np.concatenate([ |
| | [spectral_tilt, cpp, speaking_rate], |
| | mfcc_features, |
| | formant_features, |
| | jitter_features, |
| | f0_features |
| | ]) |
| | if (np.any(np.isnan(features)) or |
| | np.any(np.isinf(features))): |
| | return None |
| | return features |
| |
|
| | except Exception as e: |
| | return None |
| |
|
| | def process_file(file_path): |
| | if file_path.lower().endswith(('.wav', '.mp3')): |
| | features = extract_features(file_path) |
| | return (file_path, features) |
| | return None |
| |
|
| | def testing_pipeline(folder_path): |
| | |
| | model_gender = joblib.load("stacked_age_model.joblib") |
| | model_age = joblib.load("stacked_gender_model.joblib") |
| |
|
| | _, features = process_file(folder_path) |
| | features_df = pd.DataFrame.from_dict(features, orient='index') |
| | non_nan_indices = features_df.dropna().index |
| | X = features_df.loc[non_nan_indices] |
| |
|
| | |
| | y_pred_age = model_age.predict(X) |
| | y_pred_gender = model_gender.predict(X) |
| | y_pred_combined = (y_pred_age << 1) + y_pred_gender |
| |
|
| | |
| | return y_pred_combined[0] |
| |
|
| | print("Predictions written to predictions.txt") |
| |
|
| | if __name__ == "__main__": |
| | import sys |
| | testing_pipeline(sys.argv[1]) |