Spaces:
Sleeping
Sleeping
| import numpy as np | |
| import librosa | |
| from config import PREPROCESSED_CACHE | |
| import noisereduce as nr | |
| from sklearn.model_selection import train_test_split | |
| from typing import Optional | |
| from audiomentations import Compose, AddGaussianNoise, TimeStretch, PitchShift, Shift | |
| from imblearn.combine import SMOTETomek | |
| import random | |
| from collections import Counter | |
| # === Preprocessing === | |
| class AudioPreprocessor: | |
| def __init__(self): | |
| self.augment_pipeline = Compose([ | |
| AddGaussianNoise(min_amplitude=0.001, max_amplitude=0.015, p=1.0), | |
| TimeStretch(min_rate=0.9, max_rate=1.1, p=1.0), | |
| PitchShift(min_semitones=-2, max_semitones=2, p=1.0), | |
| Shift(min_shift=-0.2, max_shift=0.2, p=1.0), | |
| ]) | |
| self.augment_prob_by_class = { # set your probabilities here | |
| 0: 0.01, | |
| 1: 0.8, | |
| 2: 0.9, | |
| 3: 0.95 | |
| } | |
| def load_audio(self, path: str, sr: int = 16000) -> Optional[np.ndarray]: | |
| try: | |
| y, _ = librosa.load(path, sr=sr) | |
| return y | |
| except Exception as e: | |
| print(f"[ERROR] {path}: {e}") | |
| return None | |
| def preprocess(self, y: Optional[np.ndarray], sr: int = 16000, padding: bool = False, label: Optional[int] = None) -> Optional[np.ndarray]: | |
| if y is None: return None | |
| # Remove silence | |
| intervals = librosa.effects.split(y, top_db=20) | |
| y_trimmed = np.concatenate([y[start:end] for start, end in intervals]) | |
| # Normalize volume: Volume variations, Different microphone quality | |
| y_norm = librosa.util.normalize(y_trimmed) | |
| # Noise reduction | |
| y_denoised = nr.reduce_noise(y=y_norm, sr=sr, n_jobs=-1) | |
| # Conditional augmentation | |
| if label is not None and random.random() < self.augment_prob_by_class.get(label, 0.5): | |
| y_augmented = self.augment_pipeline(samples=y_denoised, sample_rate=sr) | |
| else: | |
| y_augmented = y_denoised | |
| # Padding | |
| if padding: | |
| desired_len = sr * 5 | |
| if len(y_augmented) > desired_len: | |
| y_augmented = y_augmented[:desired_len] | |
| else: | |
| y_augmented = np.pad(y_augmented, (0, max(0, desired_len - len(y_augmented)))) | |
| return y_augmented | |
| def cache_preprocessed(self, idx: str, y: np.ndarray, force_update: bool = False) -> None: | |
| path = PREPROCESSED_CACHE / f"{idx}.npy" | |
| if force_update or not path.exists(): | |
| np.save(path, y) | |
| def load_cached_preprocessed(self, idx: str) -> Optional[np.ndarray]: | |
| try: | |
| path = PREPROCESSED_CACHE / f"{idx}.npy" | |
| return np.load(path) if path.exists() else None | |
| except Exception as e: | |
| print(f"[ERROR] {path}: {e}") | |
| return None | |
| def split_data(self, X, y, train_size: float = 0.75, val_size: float = 0.1, random_state: int = 42, stratify: bool = True, | |
| apply_smote: bool = False, smote_percentage: float = 0.7, verbose = True) -> tuple: | |
| # First split: train vs (val + test) | |
| stratify_option = y if stratify else None | |
| X_train, X_temp, y_train, y_temp = train_test_split( | |
| X, y, train_size=train_size, random_state=random_state, stratify=stratify_option | |
| ) | |
| # Second split: validation vs test | |
| stratify_temp = y_temp if stratify else None | |
| X_val, X_test, y_val, y_test = train_test_split( | |
| X_temp, y_temp, train_size=val_size / (1 - train_size), random_state=random_state, stratify=stratify_temp | |
| ) | |
| if apply_smote: | |
| if verbose: print(f"[INFO] Class distribution before SMOTE: {Counter(y_train)}") | |
| class_counts = Counter(y_train) | |
| majority_class_count = max(class_counts.values()) | |
| sampling_strategy = { | |
| cls: int(majority_class_count * smote_percentage) for cls in class_counts.keys() | |
| } | |
| sampling_strategy[0] = majority_class_count | |
| resampler = SMOTETomek( | |
| random_state=random_state, | |
| n_jobs=-1, | |
| sampling_strategy=sampling_strategy # Specify sampling strategy as a dictionary | |
| ) | |
| X_train, y_train = resampler.fit_resample(X_train, y_train) | |
| if verbose: print(f"[INFO] Class distribution after SMOTE: {Counter(y_train)}") | |
| return X_train, y_train, X_val, y_val, X_test, y_test | |