""" Data loading and preprocessing module. Handles loading from HuggingFace Hub or local CSV, data cleaning, train/val/test splitting with stratification by material type. """ from __future__ import annotations import logging from pathlib import Path from typing import Optional, Tuple import numpy as np import pandas as pd from sklearn.model_selection import ( GroupShuffleSplit, StratifiedShuffleSplit, train_test_split, ) logger = logging.getLogger(__name__) def load_dataset( source: str, local_path: Optional[str] = None, random_state: int = 42, ) -> pd.DataFrame: """ Load dataset from HuggingFace Hub or local CSV. Parameters ---------- source : str HuggingFace dataset ID (e.g., 'TWLAb/femtosecond-laser-hydrogel-etching-data') local_path : str, optional Path to local CSV file (used if available, avoids re-downloading) random_state : int Random seed for reproducibility Returns ------- pd.DataFrame Loaded and cleaned dataset """ if local_path and Path(local_path).exists(): logger.info(f"Loading dataset from local path: {local_path}") df = pd.read_csv(local_path) else: logger.info(f"Loading dataset from HuggingFace Hub: {source}") try: from datasets import load_dataset as hf_load ds = hf_load(source, split="train") df = ds.to_pandas() except Exception as e: logger.error(f"Failed to load from Hub: {e}") raise logger.info(f"Dataset loaded: {df.shape[0]} samples, {df.shape[1]} columns") return df def clean_dataset(df: pd.DataFrame) -> pd.DataFrame: """ Clean dataset: handle missing values, remove outliers, fix dtypes. Parameters ---------- df : pd.DataFrame Raw dataset Returns ------- pd.DataFrame Cleaned dataset """ initial_size = len(df) # Remove rows with any NaN in numeric columns numeric_cols = df.select_dtypes(include=[np.number]).columns df = df.dropna(subset=numeric_cols) # Remove physically impossible values df = df[df["etch_depth_um"] > 0] df = df[df["etch_width_um"] > 0] df = df[df["surface_roughness_Sa_um"] > 0] df = df[df["power_mW"] > 0] df = df[df["scan_speed_mm_s"] > 0] # Remove extreme outliers (>5 sigma from mean for targets) target_cols = [ "etch_depth_um", "etch_width_um", "surface_roughness_Sa_um", "aspect_ratio", "side_wall_angle_deg" ] for col in target_cols: if col in df.columns: mean, std = df[col].mean(), df[col].std() df = df[(df[col] >= mean - 5 * std) & (df[col] <= mean + 5 * std)] removed = initial_size - len(df) if removed > 0: logger.info(f"Removed {removed} rows during cleaning ({removed/initial_size*100:.1f}%)") return df.reset_index(drop=True) def split_dataset( df: pd.DataFrame, test_size: float = 0.15, val_size: float = 0.15, group_column: str = "material_type", random_state: int = 42, ) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: """ Split dataset into train/validation/test with stratification by material type. Uses StratifiedShuffleSplit to ensure proportional material representation across all splits, which is critical for generalization assessment. Parameters ---------- df : pd.DataFrame Full dataset test_size : float Fraction for test set val_size : float Fraction for validation set (from remaining after test) group_column : str Column to stratify by random_state : int Random seed Returns ------- tuple of (train_df, val_df, test_df) """ if group_column not in df.columns: logger.warning(f"Group column '{group_column}' not found. Using random split.") train_val, test = train_test_split(df, test_size=test_size, random_state=random_state) val_frac = val_size / (1 - test_size) train, val = train_test_split(train_val, test_size=val_frac, random_state=random_state) else: # Stratified split by material type strat_col = df[group_column] sss_test = StratifiedShuffleSplit(n_splits=1, test_size=test_size, random_state=random_state) train_val_idx, test_idx = next(sss_test.split(df, strat_col)) train_val = df.iloc[train_val_idx] test = df.iloc[test_idx] # Split train_val into train and validation val_frac = val_size / (1 - test_size) strat_col_tv = train_val[group_column] sss_val = StratifiedShuffleSplit(n_splits=1, test_size=val_frac, random_state=random_state) train_idx, val_idx = next(sss_val.split(train_val, strat_col_tv)) train = train_val.iloc[train_idx] val = train_val.iloc[val_idx] logger.info( f"Split sizes - Train: {len(train)} ({len(train)/len(df)*100:.1f}%), " f"Val: {len(val)} ({len(val)/len(df)*100:.1f}%), " f"Test: {len(test)} ({len(test)/len(df)*100:.1f}%)" ) # Verify material distribution if group_column in df.columns: for name, subset in [("Train", train), ("Val", val), ("Test", test)]: dist = subset[group_column].value_counts(normalize=True) logger.debug(f"{name} material distribution:\n{dist}") return train.reset_index(drop=True), val.reset_index(drop=True), test.reset_index(drop=True) def get_feature_target_arrays( df: pd.DataFrame, feature_columns: list[str], target_columns: list[str], ) -> Tuple[np.ndarray, np.ndarray]: """ Extract feature and target arrays from DataFrame. Parameters ---------- df : pd.DataFrame feature_columns : list of str target_columns : list of str Returns ------- tuple of (X, y) as numpy arrays """ X = df[feature_columns].values.astype(np.float32) y = df[target_columns].values.astype(np.float32) return X, y