""" Centralized Spectrum Preprocessing Module Single source of truth for all preprocessing operations across training, validation, testing, and live inference. Ensures no drift between different processing stages. """ import hashlib import json import numpy as np from pathlib import Path from typing import Dict, Any, Tuple, Optional, List from dataclasses import dataclass, asdict from datetime import datetime from backend.utils.preprocessing import ( preprocess_spectrum, validate_spectrum_modality, MODALITY_PARAMS, MODALITY_RANGES, TARGET_LENGTH ) @dataclass class PreprocessingConfig: """Immutable preprocessing configuration.""" target_length: int = TARGET_LENGTH modality: str = "raman" do_baseline: bool = True baseline_degree: Optional[int] = None # Uses modality default if None do_smooth: bool = True smooth_window: Optional[int] = None # Uses modality default if None smooth_polyorder: Optional[int] = None # Uses modality default if None do_normalize: bool = True validate_range: bool = True version: str = "1.0.0" # Version for config compatibility def __post_init__(self): """Validate configuration and set modality defaults.""" if self.modality not in MODALITY_PARAMS: raise ValueError(f"Invalid modality: {self.modality}") # Set modality defaults if None modality_config = MODALITY_PARAMS[self.modality] if self.baseline_degree is None: object.__setattr__(self, 'baseline_degree', modality_config['baseline_degree']) if self.smooth_window is None: object.__setattr__(self, 'smooth_window', modality_config['smooth_window']) if self.smooth_polyorder is None: object.__setattr__(self, 'smooth_polyorder', modality_config['smooth_polyorder']) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for serialization.""" return asdict(self) def get_hash(self) -> str: """Get deterministic hash of configuration.""" config_str = json.dumps(self.to_dict(), sort_keys=True) return hashlib.sha256(config_str.encode()).hexdigest()[:16] @dataclass class PreprocessingResult: """Result of preprocessing with full provenance.""" x_processed: np.ndarray y_processed: np.ndarray x_original: np.ndarray y_original: np.ndarray config: PreprocessingConfig metadata: Dict[str, Any] processing_time: float timestamp: str def get_content_hash(self) -> str: """Get hash of processed content for drift detection.""" # Combine config hash with data hash config_hash = self.config.get_hash() data_hash = hashlib.sha256( np.concatenate([self.x_processed, self.y_processed]).tobytes() ).hexdigest()[:16] return f"{config_hash}_{data_hash}" class SpectrumPreprocessor: """ Centralized spectrum preprocessor ensuring consistent processing across all stages. Single source of truth for preprocessing logic. """ def __init__(self, config: Optional[PreprocessingConfig] = None): """Initialize with preprocessing configuration.""" self.config = config or PreprocessingConfig() self._processing_history: List[Dict[str, Any]] = [] def process( self, x: np.ndarray, y: np.ndarray, metadata: Optional[Dict[str, Any]] = None ) -> PreprocessingResult: """ Process spectrum with full provenance tracking. Args: x: Input wavenumber array y: Input intensity array metadata: Optional metadata to include Returns: PreprocessingResult with full provenance """ import time start_time = time.time() # Store original data x_original = np.array(x, copy=True) y_original = np.array(y, copy=True) # Process using centralized function x_processed, y_processed = preprocess_spectrum( x, y, target_len=self.config.target_length, modality=self.config.modality, do_baseline=self.config.do_baseline, degree=self.config.baseline_degree, do_smooth=self.config.do_smooth, window_length=self.config.smooth_window, polyorder=self.config.smooth_polyorder, do_normalize=self.config.do_normalize, validate_range=self.config.validate_range ) processing_time = time.time() - start_time # Create metadata result_metadata = { "original_length": len(x_original), "processed_length": len(x_processed), "wavenumber_range": [float(x_processed.min()), float(x_processed.max())], "intensity_range": [float(y_processed.min()), float(y_processed.max())], "modality_validated": validate_spectrum_modality(x_original, y_original, self.config.modality)[0], **(metadata or {}) } # Create result result = PreprocessingResult( x_processed=x_processed, y_processed=y_processed, x_original=x_original, y_original=y_original, config=self.config, metadata=result_metadata, processing_time=processing_time, timestamp=datetime.now().isoformat() ) # Track processing self._processing_history.append({ "timestamp": result.timestamp, "config_hash": self.config.get_hash(), "content_hash": result.get_content_hash(), "processing_time": processing_time }) return result def process_batch( self, spectra: List[Tuple[np.ndarray, np.ndarray]], metadata_list: Optional[List[Dict[str, Any]]] = None ) -> List[PreprocessingResult]: """Process multiple spectra with consistent configuration.""" if metadata_list is None: metadata_list = [None] * len(spectra) results = [] for i, (x, y) in enumerate(spectra): metadata = metadata_list[i] if i < len(metadata_list) else None result = self.process(x, y, metadata) results.append(result) return results def get_processing_summary(self) -> Dict[str, Any]: """Get summary of all processing operations.""" if not self._processing_history: return {"total_processed": 0} return { "total_processed": len(self._processing_history), "config_hash": self.config.get_hash(), "config": self.config.to_dict(), "processing_times": { "min": min(h["processing_time"] for h in self._processing_history), "max": max(h["processing_time"] for h in self._processing_history), "mean": np.mean([h["processing_time"] for h in self._processing_history]) }, "first_processed": self._processing_history[0]["timestamp"], "last_processed": self._processing_history[-1]["timestamp"] } # Global preprocessor instances for different stages TRAINING_PREPROCESSOR = SpectrumPreprocessor(PreprocessingConfig(modality="raman")) VALIDATION_PREPROCESSOR = SpectrumPreprocessor(PreprocessingConfig(modality="raman")) INFERENCE_PREPROCESSOR = SpectrumPreprocessor(PreprocessingConfig(modality="raman")) # Factory function for creating stage-specific preprocessors def create_preprocessor(stage: str, modality: str = "raman") -> SpectrumPreprocessor: """ Create preprocessor for specific stage with identical configuration. Args: stage: 'training', 'validation', 'testing', or 'inference' modality: 'raman' or 'ftir' Returns: SpectrumPreprocessor configured for the stage """ config = PreprocessingConfig(modality=modality) return SpectrumPreprocessor(config) # Utility functions for external compatibility def preprocess_for_inference(x: np.ndarray, y: np.ndarray, modality: str = "raman") -> Tuple[np.ndarray, np.ndarray]: """ Process spectrum for inference using centralized preprocessor. Maintains compatibility with existing code. """ preprocessor = create_preprocessor("inference", modality) result = preprocessor.process(x, y) return result.x_processed, result.y_processed def preprocess_for_training(x: np.ndarray, y: np.ndarray, modality: str = "raman") -> Tuple[np.ndarray, np.ndarray]: """ Process spectrum for training using centralized preprocessor. Maintains compatibility with existing code. """ preprocessor = create_preprocessor("training", modality) result = preprocessor.process(x, y) return result.x_processed, result.y_processed