File size: 10,693 Bytes
4a0e21d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# pylint: disable=missing-function-docstring, missing-class-docstring, missing-module-docstring, redefined-outer-name, unused-argument, unused-import, singleton-comparison, invalid-name, wrong-import-position, too-many-arguments, too-many-locals, too-many-statements, wrong-import-order
"""

preprocessing_fixed.py

Data leakage-free preprocessing pipeline for polymer aging classification.

This module ensures that preprocessing transformations (normalization, scaling, etc.)

are fitted only on training data within each cross-validation fold.

CRITICAL: This fixes the data leakage issue where preprocessing was applied

to the entire dataset before cross-validation splits.

"""

import os
import sys
import numpy as np
from typing import Tuple, Optional, Dict, Any

# Add parent directory to path for imports
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))

from .raman_util import list_txt_files, label_file, load_spectrum
from backend.utils.preprocessing import preprocess_spectrum, TARGET_LENGTH

class SpectrumPreprocessor:
    """

    Data leakage-free preprocessing pipeline for spectral data.



    This class ensures that normalization and other transformations

    are fitted only on training data within each CV fold.

    """

    def __init__(

        self,

        target_len: int = TARGET_LENGTH,

        do_baseline: bool = True,

        do_smooth: bool = True,

        do_normalize: bool = True,

        modality: str = "raman"

    ):
        """

        Initialize the preprocessor with configuration.



        Args:

            target_len (int): Target length for resampling

            do_baseline (bool): Whether to apply baseline correction

            do_smooth (bool): Whether to apply smoothing

            do_normalize (bool): Whether to apply normalization

            modality (str): Spectroscopy modality ('raman' or 'ftir')

        """
        self.target_len = target_len
        self.do_baseline = do_baseline
        self.do_smooth = do_smooth
        self.do_normalize = do_normalize
        self.modality = modality

        # Stats fitted on training data only
        self.normalization_stats = None
        self.is_fitted = False

    def load_raw_data(self, dataset_dir: str) -> Tuple[np.ndarray, np.ndarray, list]:
        """

        Load raw spectrum data without preprocessing.



        Args:

            dataset_dir (str): Path to dataset directory



        Returns:

            tuple: (raw_spectra, labels, file_paths)

        """
        txt_paths = list_txt_files(dataset_dir)
        raw_spectra = []
        labels = []
        valid_files = []

        for path in txt_paths:
            label = label_file(path)
            if label is None:
                continue

            try:
                x_raw, y_raw = load_spectrum(path)
                if len(x_raw) < 10:
                    continue  # Skip files with too few points

                raw_spectra.append((x_raw, y_raw))
                labels.append(int(label))
                valid_files.append(path)

            except (IOError, ValueError) as e:
                print(f"⚠️ Warning: Failed to load {path}: {e}")
                continue

        return np.array(raw_spectra, dtype=object), np.array(labels), valid_files

    def preprocess_single_spectrum(

        self,

        x_raw: np.ndarray,

        y_raw: np.ndarray,

        use_fitted_stats: bool = False

    ) -> np.ndarray:
        """

        Preprocess a single spectrum.



        Args:

            x_raw (np.ndarray): Raw wavenumber values

            y_raw (np.ndarray): Raw intensity values

            use_fitted_stats (bool): Whether to use fitted normalization stats



        Returns:

            np.ndarray: Preprocessed spectrum

        """
        # Apply resampling, baseline correction, and smoothing
        # These don't cause data leakage as they're applied per-sample
        _, y_processed = preprocess_spectrum(
            np.asarray(x_raw),
            np.asarray(y_raw),
            target_len=self.target_len,
            modality=self.modality,
            do_baseline=self.do_baseline,
            do_smooth=self.do_smooth,
            do_normalize=False,  # We handle normalization separately
            out_dtype=np.float32
        )

        # Apply normalization using fitted stats if available
        if self.do_normalize and use_fitted_stats and self.is_fitted:
            y_processed = self._apply_fitted_normalization(y_processed)
        elif self.do_normalize and not use_fitted_stats:
            # Apply per-sample normalization (min-max)
            y_min, y_max = y_processed.min(), y_processed.max()
            if y_max > y_min:
                y_processed = (y_processed - y_min) / (y_max - y_min)

        return y_processed

    def fit_normalization_stats(self, train_spectra: list) -> None:
        """

        Fit normalization statistics on training data only.



        Args:

            train_spectra (list): List of (x_raw, y_raw) tuples for training

        """
        if not self.do_normalize:
            return

        # Preprocess training spectra without normalization
        processed_spectra = []
        for x_raw, y_raw in train_spectra:
            y_processed = self.preprocess_single_spectrum(
                x_raw, y_raw, use_fitted_stats=False
            )
            processed_spectra.append(y_processed)

        # Calculate global statistics from training data
        all_values = np.concatenate(processed_spectra)
        self.normalization_stats = {
            'mean': np.mean(all_values),
            'std': np.std(all_values),
            'min': np.min(all_values),
            'max': np.max(all_values)
        }

        self.is_fitted = True
        print("✅ Fitted normalization statistics on training data")

    def _apply_fitted_normalization(self, spectrum: np.ndarray) -> np.ndarray:
        """

        Apply fitted normalization to a spectrum.



        Args:

            spectrum (np.ndarray): Preprocessed spectrum



        Returns:

            np.ndarray: Normalized spectrum

        """
        if not self.is_fitted:
            raise ValueError("Normalization stats not fitted. Call fit_normalization_stats first.")

        # Use min-max normalization based on training data
        stats = self.normalization_stats
        if stats is not None and stats['max'] > stats['min']:
            spectrum = (spectrum - stats['min']) / (stats['max'] - stats['min'])

        return spectrum

    def transform_fold(

        self,

        raw_spectra: np.ndarray,

        train_indices: np.ndarray,

        val_indices: np.ndarray

    ) -> Tuple[np.ndarray, np.ndarray]:
        """

        Transform data for a single CV fold without data leakage.



        Args:

            raw_spectra (np.ndarray): Array of (x_raw, y_raw) tuples

            train_indices (np.ndarray): Training indices for this fold

            val_indices (np.ndarray): Validation indices for this fold



        Returns:

            tuple: (X_train, X_val) preprocessed data

        """
        # Get training and validation raw data
        train_raw = raw_spectra[train_indices]
        val_raw = raw_spectra[val_indices]

        # Fit normalization stats on training data only
        self.fit_normalization_stats(train_raw.tolist())

        # Preprocess training data
        X_train = []
        for x_raw, y_raw in train_raw:
            processed = self.preprocess_single_spectrum(
                x_raw, y_raw, use_fitted_stats=True
            )
            X_train.append(processed)

        # Preprocess validation data using fitted stats
        X_val = []
        for x_raw, y_raw in val_raw:
            processed = self.preprocess_single_spectrum(
                x_raw, y_raw, use_fitted_stats=True
            )
            X_val.append(processed)

        return np.array(X_train), np.array(X_val)

def load_data_for_cv(

    dataset_dir: str,

    preprocessor_config: Optional[Dict[str, Any]] = None

) -> Tuple[np.ndarray, np.ndarray, SpectrumPreprocessor]:
    """

    Load raw data for cross-validation without data leakage.



    Args:

        dataset_dir (str): Path to dataset directory

        preprocessor_config (dict): Configuration for preprocessor



    Returns:

        tuple: (raw_spectra, labels, preprocessor)

    """
    config = preprocessor_config or {}
    preprocessor = SpectrumPreprocessor(**config)

    raw_spectra, labels, _ = preprocessor.load_raw_data(dataset_dir)

    print(f"✅ Loaded {len(raw_spectra)} raw spectra for CV")
    print(f"Class distribution: {np.bincount(labels)}")

    return raw_spectra, labels, preprocessor

def preprocess_holdout_test_set(

    test_spectra: np.ndarray,

    fitted_preprocessor: SpectrumPreprocessor

) -> np.ndarray:
    """

    Preprocess hold-out test set using fitted preprocessor.



    Args:

        test_spectra (np.ndarray): Raw test spectra

        fitted_preprocessor (SpectrumPreprocessor): Preprocessor fitted on training data



    Returns:

        np.ndarray: Preprocessed test data

    """
    if not fitted_preprocessor.is_fitted:
        raise ValueError("Preprocessor must be fitted on training data first")

    X_test = []
    for x_raw, y_raw in test_spectra:
        processed = fitted_preprocessor.preprocess_single_spectrum(
            x_raw, y_raw, use_fitted_stats=True
        )
        X_test.append(processed)

    return np.array(X_test)

if __name__ == "__main__":
    # Test the data leakage-free preprocessing pipeline
    print("Testing data leakage-free preprocessing pipeline...")

    # Test with sample data
    dataset_dir = "sample_data"

    # Load raw data
    raw_spectra, labels, preprocessor = load_data_for_cv(dataset_dir)

    # Simulate a single CV fold
    from sklearn.model_selection import StratifiedKFold

    if len(raw_spectra) >= 2:
        cv = StratifiedKFold(n_splits=2, shuffle=True, random_state=42)
        train_idx, val_idx = next(cv.split(raw_spectra, labels))

        # Transform without data leakage
        X_train, X_val = preprocessor.transform_fold(raw_spectra, train_idx, val_idx)

        print("✅ Fold transformation completed")
        print(f"   Train: {X_train.shape}")
        print(f"   Val: {X_val.shape}")
        print(f"   Normalization fitted: {preprocessor.is_fitted}")

    print("✅ Data leakage-free preprocessing test completed!")