File size: 4,502 Bytes
4c8f740
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
import numpy as np
import librosa
from config import PREPROCESSED_CACHE
import noisereduce as nr
from sklearn.model_selection import train_test_split
from typing import Optional
from audiomentations import Compose, AddGaussianNoise, TimeStretch, PitchShift, Shift
from imblearn.combine import SMOTETomek
import random
from collections import Counter


# === Preprocessing ===
class AudioPreprocessor:
    def __init__(self):
        self.augment_pipeline = Compose([
            AddGaussianNoise(min_amplitude=0.001, max_amplitude=0.015, p=1.0),
            TimeStretch(min_rate=0.9, max_rate=1.1, p=1.0),
            PitchShift(min_semitones=-2, max_semitones=2, p=1.0),
            Shift(min_shift=-0.2, max_shift=0.2, p=1.0),
        ])
        self.augment_prob_by_class = {  # set your probabilities here
            0: 0.01,
            1: 0.8,
            2: 0.9,
            3: 0.95
        }

    def load_audio(self, path: str, sr: int = 16000) -> Optional[np.ndarray]:
        try:
            y, _ = librosa.load(path, sr=sr)
            return y
        except Exception as e:
            print(f"[ERROR] {path}: {e}")
            return None

    def preprocess(self, y: Optional[np.ndarray], sr: int = 16000, padding: bool = False, label: Optional[int] = None) -> Optional[np.ndarray]:
        if y is None: return None

        # Remove silence
        intervals = librosa.effects.split(y, top_db=20)
        y_trimmed = np.concatenate([y[start:end] for start, end in intervals])

        # Normalize volume: Volume variations, Different microphone quality
        y_norm = librosa.util.normalize(y_trimmed)

        # Noise reduction
        y_denoised = nr.reduce_noise(y=y_norm, sr=sr, n_jobs=-1)


        # Conditional augmentation
        if label is not None and random.random() < self.augment_prob_by_class.get(label, 0.5):
            y_augmented = self.augment_pipeline(samples=y_denoised, sample_rate=sr)
        else:
            y_augmented = y_denoised

        # Padding
        if padding:
            desired_len = sr * 5
            if len(y_augmented) > desired_len:
                y_augmented = y_augmented[:desired_len]
            else:
                y_augmented = np.pad(y_augmented, (0, max(0, desired_len - len(y_augmented))))

        return y_augmented

    def cache_preprocessed(self, idx: str, y: np.ndarray, force_update: bool = False) -> None:
        path = PREPROCESSED_CACHE / f"{idx}.npy"
        if force_update or not path.exists():
            np.save(path, y)

    def load_cached_preprocessed(self, idx: str) -> Optional[np.ndarray]:
        try:
            path = PREPROCESSED_CACHE / f"{idx}.npy"
            return np.load(path) if path.exists() else None
        except Exception as e:
            print(f"[ERROR] {path}: {e}")
            return None
    
    def split_data(self, X, y, train_size: float = 0.75, val_size: float = 0.1, random_state: int = 42, stratify: bool = True, 
                    apply_smote: bool = False, smote_percentage: float = 0.7, verbose = True) -> tuple:
        
        # First split: train vs (val + test)
        stratify_option = y if stratify else None
        X_train, X_temp, y_train, y_temp = train_test_split(
            X, y, train_size=train_size, random_state=random_state, stratify=stratify_option
        )

        # Second split: validation vs test
        stratify_temp = y_temp if stratify else None
        X_val, X_test, y_val, y_test = train_test_split(
            X_temp, y_temp, train_size=val_size / (1 - train_size), random_state=random_state, stratify=stratify_temp
        )

        if apply_smote:
            if verbose: print(f"[INFO] Class distribution before SMOTE: {Counter(y_train)}")
            
            class_counts = Counter(y_train)
            majority_class_count = max(class_counts.values())
            sampling_strategy = {
                cls: int(majority_class_count * smote_percentage) for cls in class_counts.keys()
            }
            sampling_strategy[0] = majority_class_count

            resampler = SMOTETomek(
                random_state=random_state, 
                n_jobs=-1, 
                sampling_strategy=sampling_strategy  # Specify sampling strategy as a dictionary
            )
            X_train, y_train = resampler.fit_resample(X_train, y_train)
            
            if verbose: print(f"[INFO] Class distribution after SMOTE: {Counter(y_train)}")

        return X_train, y_train, X_val, y_val, X_test, y_test