# pylint: disable=missing-function-docstring, missing-class-docstring, missing-module-docstring, redefined-outer-name, unused-argument, unused-import, singleton-comparison, invalid-name, wrong-import-position, too-many-arguments, too-many-locals, too-many-statements, wrong-import-order """ preprocessing_fixed.py Data leakage-free preprocessing pipeline for polymer aging classification. This module ensures that preprocessing transformations (normalization, scaling, etc.) are fitted only on training data within each cross-validation fold. CRITICAL: This fixes the data leakage issue where preprocessing was applied to the entire dataset before cross-validation splits. """ import os import sys import numpy as np from typing import Tuple, Optional, Dict, Any # Add parent directory to path for imports sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../.."))) from .raman_util import list_txt_files, label_file, load_spectrum from backend.utils.preprocessing import preprocess_spectrum, TARGET_LENGTH class SpectrumPreprocessor: """ Data leakage-free preprocessing pipeline for spectral data. This class ensures that normalization and other transformations are fitted only on training data within each CV fold. """ def __init__( self, target_len: int = TARGET_LENGTH, do_baseline: bool = True, do_smooth: bool = True, do_normalize: bool = True, modality: str = "raman" ): """ Initialize the preprocessor with configuration. Args: target_len (int): Target length for resampling do_baseline (bool): Whether to apply baseline correction do_smooth (bool): Whether to apply smoothing do_normalize (bool): Whether to apply normalization modality (str): Spectroscopy modality ('raman' or 'ftir') """ self.target_len = target_len self.do_baseline = do_baseline self.do_smooth = do_smooth self.do_normalize = do_normalize self.modality = modality # Stats fitted on training data only self.normalization_stats = None self.is_fitted = False def load_raw_data(self, dataset_dir: str) -> Tuple[np.ndarray, np.ndarray, list]: """ Load raw spectrum data without preprocessing. Args: dataset_dir (str): Path to dataset directory Returns: tuple: (raw_spectra, labels, file_paths) """ txt_paths = list_txt_files(dataset_dir) raw_spectra = [] labels = [] valid_files = [] for path in txt_paths: label = label_file(path) if label is None: continue try: x_raw, y_raw = load_spectrum(path) if len(x_raw) < 10: continue # Skip files with too few points raw_spectra.append((x_raw, y_raw)) labels.append(int(label)) valid_files.append(path) except (IOError, ValueError) as e: print(f"⚠️ Warning: Failed to load {path}: {e}") continue return np.array(raw_spectra, dtype=object), np.array(labels), valid_files def preprocess_single_spectrum( self, x_raw: np.ndarray, y_raw: np.ndarray, use_fitted_stats: bool = False ) -> np.ndarray: """ Preprocess a single spectrum. Args: x_raw (np.ndarray): Raw wavenumber values y_raw (np.ndarray): Raw intensity values use_fitted_stats (bool): Whether to use fitted normalization stats Returns: np.ndarray: Preprocessed spectrum """ # Apply resampling, baseline correction, and smoothing # These don't cause data leakage as they're applied per-sample _, y_processed = preprocess_spectrum( np.asarray(x_raw), np.asarray(y_raw), target_len=self.target_len, modality=self.modality, do_baseline=self.do_baseline, do_smooth=self.do_smooth, do_normalize=False, # We handle normalization separately out_dtype=np.float32 ) # Apply normalization using fitted stats if available if self.do_normalize and use_fitted_stats and self.is_fitted: y_processed = self._apply_fitted_normalization(y_processed) elif self.do_normalize and not use_fitted_stats: # Apply per-sample normalization (min-max) y_min, y_max = y_processed.min(), y_processed.max() if y_max > y_min: y_processed = (y_processed - y_min) / (y_max - y_min) return y_processed def fit_normalization_stats(self, train_spectra: list) -> None: """ Fit normalization statistics on training data only. Args: train_spectra (list): List of (x_raw, y_raw) tuples for training """ if not self.do_normalize: return # Preprocess training spectra without normalization processed_spectra = [] for x_raw, y_raw in train_spectra: y_processed = self.preprocess_single_spectrum( x_raw, y_raw, use_fitted_stats=False ) processed_spectra.append(y_processed) # Calculate global statistics from training data all_values = np.concatenate(processed_spectra) self.normalization_stats = { 'mean': np.mean(all_values), 'std': np.std(all_values), 'min': np.min(all_values), 'max': np.max(all_values) } self.is_fitted = True print("✅ Fitted normalization statistics on training data") def _apply_fitted_normalization(self, spectrum: np.ndarray) -> np.ndarray: """ Apply fitted normalization to a spectrum. Args: spectrum (np.ndarray): Preprocessed spectrum Returns: np.ndarray: Normalized spectrum """ if not self.is_fitted: raise ValueError("Normalization stats not fitted. Call fit_normalization_stats first.") # Use min-max normalization based on training data stats = self.normalization_stats if stats is not None and stats['max'] > stats['min']: spectrum = (spectrum - stats['min']) / (stats['max'] - stats['min']) return spectrum def transform_fold( self, raw_spectra: np.ndarray, train_indices: np.ndarray, val_indices: np.ndarray ) -> Tuple[np.ndarray, np.ndarray]: """ Transform data for a single CV fold without data leakage. Args: raw_spectra (np.ndarray): Array of (x_raw, y_raw) tuples train_indices (np.ndarray): Training indices for this fold val_indices (np.ndarray): Validation indices for this fold Returns: tuple: (X_train, X_val) preprocessed data """ # Get training and validation raw data train_raw = raw_spectra[train_indices] val_raw = raw_spectra[val_indices] # Fit normalization stats on training data only self.fit_normalization_stats(train_raw.tolist()) # Preprocess training data X_train = [] for x_raw, y_raw in train_raw: processed = self.preprocess_single_spectrum( x_raw, y_raw, use_fitted_stats=True ) X_train.append(processed) # Preprocess validation data using fitted stats X_val = [] for x_raw, y_raw in val_raw: processed = self.preprocess_single_spectrum( x_raw, y_raw, use_fitted_stats=True ) X_val.append(processed) return np.array(X_train), np.array(X_val) def load_data_for_cv( dataset_dir: str, preprocessor_config: Optional[Dict[str, Any]] = None ) -> Tuple[np.ndarray, np.ndarray, SpectrumPreprocessor]: """ Load raw data for cross-validation without data leakage. Args: dataset_dir (str): Path to dataset directory preprocessor_config (dict): Configuration for preprocessor Returns: tuple: (raw_spectra, labels, preprocessor) """ config = preprocessor_config or {} preprocessor = SpectrumPreprocessor(**config) raw_spectra, labels, _ = preprocessor.load_raw_data(dataset_dir) print(f"✅ Loaded {len(raw_spectra)} raw spectra for CV") print(f"Class distribution: {np.bincount(labels)}") return raw_spectra, labels, preprocessor def preprocess_holdout_test_set( test_spectra: np.ndarray, fitted_preprocessor: SpectrumPreprocessor ) -> np.ndarray: """ Preprocess hold-out test set using fitted preprocessor. Args: test_spectra (np.ndarray): Raw test spectra fitted_preprocessor (SpectrumPreprocessor): Preprocessor fitted on training data Returns: np.ndarray: Preprocessed test data """ if not fitted_preprocessor.is_fitted: raise ValueError("Preprocessor must be fitted on training data first") X_test = [] for x_raw, y_raw in test_spectra: processed = fitted_preprocessor.preprocess_single_spectrum( x_raw, y_raw, use_fitted_stats=True ) X_test.append(processed) return np.array(X_test) if __name__ == "__main__": # Test the data leakage-free preprocessing pipeline print("Testing data leakage-free preprocessing pipeline...") # Test with sample data dataset_dir = "sample_data" # Load raw data raw_spectra, labels, preprocessor = load_data_for_cv(dataset_dir) # Simulate a single CV fold from sklearn.model_selection import StratifiedKFold if len(raw_spectra) >= 2: cv = StratifiedKFold(n_splits=2, shuffle=True, random_state=42) train_idx, val_idx = next(cv.split(raw_spectra, labels)) # Transform without data leakage X_train, X_val = preprocessor.transform_fold(raw_spectra, train_idx, val_idx) print("✅ Fold transformation completed") print(f" Train: {X_train.shape}") print(f" Val: {X_val.shape}") print(f" Normalization fitted: {preprocessor.is_fitted}") print("✅ Data leakage-free preprocessing test completed!")