polymer-aging-with-ml / backend /utils /preprocessing_fixed.py
devjas1
Initial Release: Polymer Aging With ML [Standalone Appliance]
4a0e21d
Raw
History Blame Contribute Delete
10.7 kB
# pylint: disable=missing-function-docstring, missing-class-docstring, missing-module-docstring, redefined-outer-name, unused-argument, unused-import, singleton-comparison, invalid-name, wrong-import-position, too-many-arguments, too-many-locals, too-many-statements, wrong-import-order
"""
preprocessing_fixed.py
Data leakage-free preprocessing pipeline for polymer aging classification.
This module ensures that preprocessing transformations (normalization, scaling, etc.)
are fitted only on training data within each cross-validation fold.
CRITICAL: This fixes the data leakage issue where preprocessing was applied
to the entire dataset before cross-validation splits.
"""
import os
import sys
import numpy as np
from typing import Tuple, Optional, Dict, Any
# Add parent directory to path for imports
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "../..")))
from .raman_util import list_txt_files, label_file, load_spectrum
from backend.utils.preprocessing import preprocess_spectrum, TARGET_LENGTH
class SpectrumPreprocessor:
"""
Data leakage-free preprocessing pipeline for spectral data.
This class ensures that normalization and other transformations
are fitted only on training data within each CV fold.
"""
def __init__(
self,
target_len: int = TARGET_LENGTH,
do_baseline: bool = True,
do_smooth: bool = True,
do_normalize: bool = True,
modality: str = "raman"
):
"""
Initialize the preprocessor with configuration.
Args:
target_len (int): Target length for resampling
do_baseline (bool): Whether to apply baseline correction
do_smooth (bool): Whether to apply smoothing
do_normalize (bool): Whether to apply normalization
modality (str): Spectroscopy modality ('raman' or 'ftir')
"""
self.target_len = target_len
self.do_baseline = do_baseline
self.do_smooth = do_smooth
self.do_normalize = do_normalize
self.modality = modality
# Stats fitted on training data only
self.normalization_stats = None
self.is_fitted = False
def load_raw_data(self, dataset_dir: str) -> Tuple[np.ndarray, np.ndarray, list]:
"""
Load raw spectrum data without preprocessing.
Args:
dataset_dir (str): Path to dataset directory
Returns:
tuple: (raw_spectra, labels, file_paths)
"""
txt_paths = list_txt_files(dataset_dir)
raw_spectra = []
labels = []
valid_files = []
for path in txt_paths:
label = label_file(path)
if label is None:
continue
try:
x_raw, y_raw = load_spectrum(path)
if len(x_raw) < 10:
continue # Skip files with too few points
raw_spectra.append((x_raw, y_raw))
labels.append(int(label))
valid_files.append(path)
except (IOError, ValueError) as e:
print(f"⚠️ Warning: Failed to load {path}: {e}")
continue
return np.array(raw_spectra, dtype=object), np.array(labels), valid_files
def preprocess_single_spectrum(
self,
x_raw: np.ndarray,
y_raw: np.ndarray,
use_fitted_stats: bool = False
) -> np.ndarray:
"""
Preprocess a single spectrum.
Args:
x_raw (np.ndarray): Raw wavenumber values
y_raw (np.ndarray): Raw intensity values
use_fitted_stats (bool): Whether to use fitted normalization stats
Returns:
np.ndarray: Preprocessed spectrum
"""
# Apply resampling, baseline correction, and smoothing
# These don't cause data leakage as they're applied per-sample
_, y_processed = preprocess_spectrum(
np.asarray(x_raw),
np.asarray(y_raw),
target_len=self.target_len,
modality=self.modality,
do_baseline=self.do_baseline,
do_smooth=self.do_smooth,
do_normalize=False, # We handle normalization separately
out_dtype=np.float32
)
# Apply normalization using fitted stats if available
if self.do_normalize and use_fitted_stats and self.is_fitted:
y_processed = self._apply_fitted_normalization(y_processed)
elif self.do_normalize and not use_fitted_stats:
# Apply per-sample normalization (min-max)
y_min, y_max = y_processed.min(), y_processed.max()
if y_max > y_min:
y_processed = (y_processed - y_min) / (y_max - y_min)
return y_processed
def fit_normalization_stats(self, train_spectra: list) -> None:
"""
Fit normalization statistics on training data only.
Args:
train_spectra (list): List of (x_raw, y_raw) tuples for training
"""
if not self.do_normalize:
return
# Preprocess training spectra without normalization
processed_spectra = []
for x_raw, y_raw in train_spectra:
y_processed = self.preprocess_single_spectrum(
x_raw, y_raw, use_fitted_stats=False
)
processed_spectra.append(y_processed)
# Calculate global statistics from training data
all_values = np.concatenate(processed_spectra)
self.normalization_stats = {
'mean': np.mean(all_values),
'std': np.std(all_values),
'min': np.min(all_values),
'max': np.max(all_values)
}
self.is_fitted = True
print("✅ Fitted normalization statistics on training data")
def _apply_fitted_normalization(self, spectrum: np.ndarray) -> np.ndarray:
"""
Apply fitted normalization to a spectrum.
Args:
spectrum (np.ndarray): Preprocessed spectrum
Returns:
np.ndarray: Normalized spectrum
"""
if not self.is_fitted:
raise ValueError("Normalization stats not fitted. Call fit_normalization_stats first.")
# Use min-max normalization based on training data
stats = self.normalization_stats
if stats is not None and stats['max'] > stats['min']:
spectrum = (spectrum - stats['min']) / (stats['max'] - stats['min'])
return spectrum
def transform_fold(
self,
raw_spectra: np.ndarray,
train_indices: np.ndarray,
val_indices: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
"""
Transform data for a single CV fold without data leakage.
Args:
raw_spectra (np.ndarray): Array of (x_raw, y_raw) tuples
train_indices (np.ndarray): Training indices for this fold
val_indices (np.ndarray): Validation indices for this fold
Returns:
tuple: (X_train, X_val) preprocessed data
"""
# Get training and validation raw data
train_raw = raw_spectra[train_indices]
val_raw = raw_spectra[val_indices]
# Fit normalization stats on training data only
self.fit_normalization_stats(train_raw.tolist())
# Preprocess training data
X_train = []
for x_raw, y_raw in train_raw:
processed = self.preprocess_single_spectrum(
x_raw, y_raw, use_fitted_stats=True
)
X_train.append(processed)
# Preprocess validation data using fitted stats
X_val = []
for x_raw, y_raw in val_raw:
processed = self.preprocess_single_spectrum(
x_raw, y_raw, use_fitted_stats=True
)
X_val.append(processed)
return np.array(X_train), np.array(X_val)
def load_data_for_cv(
dataset_dir: str,
preprocessor_config: Optional[Dict[str, Any]] = None
) -> Tuple[np.ndarray, np.ndarray, SpectrumPreprocessor]:
"""
Load raw data for cross-validation without data leakage.
Args:
dataset_dir (str): Path to dataset directory
preprocessor_config (dict): Configuration for preprocessor
Returns:
tuple: (raw_spectra, labels, preprocessor)
"""
config = preprocessor_config or {}
preprocessor = SpectrumPreprocessor(**config)
raw_spectra, labels, _ = preprocessor.load_raw_data(dataset_dir)
print(f"✅ Loaded {len(raw_spectra)} raw spectra for CV")
print(f"Class distribution: {np.bincount(labels)}")
return raw_spectra, labels, preprocessor
def preprocess_holdout_test_set(
test_spectra: np.ndarray,
fitted_preprocessor: SpectrumPreprocessor
) -> np.ndarray:
"""
Preprocess hold-out test set using fitted preprocessor.
Args:
test_spectra (np.ndarray): Raw test spectra
fitted_preprocessor (SpectrumPreprocessor): Preprocessor fitted on training data
Returns:
np.ndarray: Preprocessed test data
"""
if not fitted_preprocessor.is_fitted:
raise ValueError("Preprocessor must be fitted on training data first")
X_test = []
for x_raw, y_raw in test_spectra:
processed = fitted_preprocessor.preprocess_single_spectrum(
x_raw, y_raw, use_fitted_stats=True
)
X_test.append(processed)
return np.array(X_test)
if __name__ == "__main__":
# Test the data leakage-free preprocessing pipeline
print("Testing data leakage-free preprocessing pipeline...")
# Test with sample data
dataset_dir = "sample_data"
# Load raw data
raw_spectra, labels, preprocessor = load_data_for_cv(dataset_dir)
# Simulate a single CV fold
from sklearn.model_selection import StratifiedKFold
if len(raw_spectra) >= 2:
cv = StratifiedKFold(n_splits=2, shuffle=True, random_state=42)
train_idx, val_idx = next(cv.split(raw_spectra, labels))
# Transform without data leakage
X_train, X_val = preprocessor.transform_fold(raw_spectra, train_idx, val_idx)
print("✅ Fold transformation completed")
print(f" Train: {X_train.shape}")
print(f" Val: {X_val.shape}")
print(f" Normalization fitted: {preprocessor.is_fitted}")
print("✅ Data leakage-free preprocessing test completed!")