import os import pandas as pd import numpy as np from tqdm import tqdm import joblib import librosa import noisereduce as nr import parselmouth from parselmouth.praat import call from concurrent.futures import ProcessPoolExecutor def normalize_volume(audio, target_dBFS=-20): rms = np.sqrt(np.mean(audio**2)) gain = 10**((target_dBFS - 20*np.log10(rms))/20) return audio * gain def remove_silence(audio, top_db=20): intervals = librosa.effects.split(audio, top_db=top_db) return np.concatenate([audio[start:end] for start, end in intervals]) def equalize_audio(audio, sr, bass_boost=2, treble_boost=1.5): # Simple EQ example S = librosa.stft(audio) freqs = librosa.fft_frequencies(sr=sr) # Bass boost (low frequencies) bass_mask = freqs < 250 S[bass_mask] *= bass_boost # Treble boost (high frequencies) treble_mask = freqs > 4000 S[treble_mask] *= treble_boost return librosa.istft(S) def preprocess_audio(audio, sr, target_sr=16000): # Remove silence audio = remove_silence(audio) # Reduce noise audio = nr.reduce_noise(y=audio, sr=target_sr) # Normalize volume audio = normalize_volume(audio) # Equalize frequency response audio = equalize_audio(audio, target_sr) return audio def extract_formants(y, sr): """ Optimized formant extraction using vectorized operations Returns 20 features (6 for F1, 6 for F2, 6 for F3, 2 ratios each for F2/F1 and F3/F1) """ try: sound = parselmouth.Sound(y, sampling_frequency=sr) # Use Praat's formant extractor formant = sound.to_formant_burg(time_step=0.01) # Get formant values for the first N frames (or average over time) f1_list = [] f2_list = [] f3_list = [] for t in np.arange(0, sound.duration, 0.01): try: f1 = formant.get_value_at_time(1, t) f2 = formant.get_value_at_time(2, t) f3 = formant.get_value_at_time(3, t) if f1 and f2 and f3 and not np.isnan(f1) and not np.isnan(f2) and not np.isnan(f3): f1_list.append(f1) f2_list.append(f2) f3_list.append(f3) except Exception: continue # Aggregate features: mean and std deviation features = [ np.mean(f1_list) if f1_list else 0, np.std(f1_list) if f1_list else 0, np.median(f1_list) if f1_list else 0, (np.percentile(f1_list, 75) - np.percentile(f1_list, 25)) if f1_list else 0, # IQR np.mean(f2_list) if f2_list else 0, np.std(f2_list) if f2_list else 0, np.median(f2_list) if f2_list else 0, (np.percentile(f2_list, 75) - np.percentile(f2_list, 25)) if f2_list else 0, # IQR np.mean(f3_list) if f3_list else 0, np.std(f3_list) if f3_list else 0, np.median(f3_list) if f3_list else 0, (np.percentile(f3_list, 75) - np.percentile(f3_list, 25)) if f3_list else 0 # IQR ] return np.array(features) except Exception as e: return None def calculate_jitter(y, sr,file_path): try: sound = parselmouth.Sound(y, sampling_frequency=sr) pointProcess = call(sound, "To PointProcess (periodic, cc)", 75, 500) harmonicity = call(sound, "To Harmonicity (cc)", 0.01, 75, 0.1, 1.0) hnr = call(harmonicity, "Get mean", 0, 0) pointProcess = call(sound, "To PointProcess (periodic, cc)", 75, 500) localJitter = call(pointProcess, "Get jitter (local)", 0, 0, 0.0001, 0.02, 1.3) localabsoluteJitter = call(pointProcess, "Get jitter (local, absolute)", 0, 0, 0.0001, 0.02, 1.3) rapJitter = call(pointProcess, "Get jitter (rap)", 0, 0, 0.0001, 0.02, 1.3) ddpJitter = call(pointProcess, "Get jitter (ddp)", 0, 0, 0.0001, 0.02, 1.3) localShimmer = call([sound, pointProcess], "Get shimmer (local)", 0, 0, 0.0001, 0.02, 1.3, 1.6) localdbShimmer = call([sound, pointProcess], "Get shimmer (local_dB)", 0, 0, 0.0001, 0.02, 1.3, 1.6) metrics = np.array([ hnr, # Harmonic-to-Noise Ratio (HNR) in dB localJitter, # Local jitter (%) localabsoluteJitter, # Local absolute jitter (seconds) rapJitter, # RAP jitter (%) ddpJitter, # DDP jitter (%) localShimmer, # Local shimmer (%) localdbShimmer, # Local shimmer (dB) ]) return metrics except Exception as e: return None def extract_features(file_path, n_mfcc=13, sr=16000, duration=7): """Extracts MFCCs with fixed-length padding/trimming.""" try: # Load audio (resampled to `sr` Hz) y, sr = librosa.load(file_path, sr=sr, duration=duration) y = preprocess_audio(y, sr) jitter_features = calculate_jitter(y,sr,file_path) # if jitter_features==None or (np.any(np.isnan(jitter_features)) or # np.any(np.isinf(jitter_features))): # return("jitter") # Extract fundamental frequency using a probabilistic approach f0_mean = 150.0 # Neutral speech pitch f0_std = 20.0 # Moderate variability f0_median = 150.0 f0_range = 100.0 # Max - min f0_norm_diff = 0.1 # Normalized mean abs difference is_distorted = 1 # Explicit flag f0, _, _ = librosa.pyin(y, sr=sr, fmin=75, fmax=500, frame_length=1024) f0 = f0[~np.isnan(f0)] if len(f0) > 0: is_distorted = 0 f0_diff = np.diff(f0) f0_mean = float(np.mean(f0)) # Ensure scalar value f0_std = float(np.std(f0)) # Ensure scalar value f0_median = float(np.median(f0)) # Ensure scalar value f0_range = float(np.max(f0) - np.min(f0)) # Ensure scalar value f0_norm_diff = float(np.mean(np.abs(f0_diff)) / f0_mean) if f0_mean > 0 else 0.0 # Create the feature array ensuring all elements are scalars f0_features = np.array([ float(is_distorted), float(f0_mean), float(f0_std), float(f0_median), float(f0_range), float(f0_norm_diff) ]) # if f0_features==None or (np.any(np.isnan(f0_features)) or # np.any(np.isinf(f0_features))): # return("f0") formant_features = extract_formants(y,sr) # if formant_features==None or (np.any(np.isnan(formant_features)) or # np.any(np.isinf(formant_features))): # return("formant") # Extract MFCCs (shape: [n_mfcc, time_frames]) mfccs = librosa.feature.mfcc( y=y, sr=sr, n_mfcc=n_mfcc, n_fft=512, hop_length=256 ) # # Aggregate statistics over time (mean + std) mfcc_features = np.concatenate([np.mean(mfccs, axis=1), np.std(mfccs, axis=1)]) # if mfcc_features==None or (np.any(np.isnan(mfcc_features)) or # np.any(np.isinf(mfcc_features))): # return("mfcc") # --- New Feature 2: Spectral Tilt (H1-H2) --- def compute_spectral_tilt(y, sr): S = np.abs(librosa.stft(y)) h1 = np.max(S[1:10]) # First harmonic (avoid DC) h2 = np.max(S[10:20]) # Second harmonic return h1 - h2 spectral_tilt = compute_spectral_tilt(y, sr) # --- New Feature 4: Cepstral Peak Prominence (CPP) --- def compute_cpp(y, sr): cepstrum = np.abs(np.fft.irfft(np.log(np.abs(np.fft.rfft(y))))) cpp = np.max(cepstrum[10:60]) # Peak in typical F0 range return cpp cpp = compute_cpp(y, sr) # --- New Feature 5: Speaking Rate (Syllables per Second) --- def compute_speaking_rate(y, sr): onset_env = librosa.onset.onset_strength(y=y, sr=sr) peaks = librosa.util.peak_pick(onset_env, pre_max=3, post_max=3, pre_avg=3, post_avg=3, delta=0.5, wait=10) return len(peaks) / (len(y) / sr) speaking_rate = compute_speaking_rate(y, sr) # Return the 5 new features features = np.concatenate([ [spectral_tilt, cpp, speaking_rate], mfcc_features, formant_features, jitter_features, f0_features ]) if (np.any(np.isnan(features)) or np.any(np.isinf(features))): return None return features except Exception as e: return None def process_file(file_path): if file_path.lower().endswith(('.wav', '.mp3')): features = extract_features(file_path) return (file_path, features) return None def testing_pipeline(folder_path): # Load models from file paths model_gender = joblib.load("stacked_age_model.joblib") model_age = joblib.load("stacked_gender_model.joblib") _, features = process_file(folder_path) features_df = pd.DataFrame.from_dict(features, orient='index') non_nan_indices = features_df.dropna().index X = features_df.loc[non_nan_indices] # Step 3: Predict y_pred_age = model_age.predict(X) y_pred_gender = model_gender.predict(X) y_pred_combined = (y_pred_age << 1) + y_pred_gender # Step 4: Write to text file return y_pred_combined[0] print("Predictions written to predictions.txt") if __name__ == "__main__": import sys testing_pipeline(sys.argv[1])