TWLab's picture
Add publication-ready ML project structure with full source code
e2b220f verified
"""
Data loading and preprocessing module.
Handles loading from HuggingFace Hub or local CSV, data cleaning,
train/val/test splitting with stratification by material type.
"""
from __future__ import annotations
import logging
from pathlib import Path
from typing import Optional, Tuple
import numpy as np
import pandas as pd
from sklearn.model_selection import (
GroupShuffleSplit,
StratifiedShuffleSplit,
train_test_split,
)
logger = logging.getLogger(__name__)
def load_dataset(
source: str,
local_path: Optional[str] = None,
random_state: int = 42,
) -> pd.DataFrame:
"""
Load dataset from HuggingFace Hub or local CSV.
Parameters
----------
source : str
HuggingFace dataset ID (e.g., 'TWLAb/femtosecond-laser-hydrogel-etching-data')
local_path : str, optional
Path to local CSV file (used if available, avoids re-downloading)
random_state : int
Random seed for reproducibility
Returns
-------
pd.DataFrame
Loaded and cleaned dataset
"""
if local_path and Path(local_path).exists():
logger.info(f"Loading dataset from local path: {local_path}")
df = pd.read_csv(local_path)
else:
logger.info(f"Loading dataset from HuggingFace Hub: {source}")
try:
from datasets import load_dataset as hf_load
ds = hf_load(source, split="train")
df = ds.to_pandas()
except Exception as e:
logger.error(f"Failed to load from Hub: {e}")
raise
logger.info(f"Dataset loaded: {df.shape[0]} samples, {df.shape[1]} columns")
return df
def clean_dataset(df: pd.DataFrame) -> pd.DataFrame:
"""
Clean dataset: handle missing values, remove outliers, fix dtypes.
Parameters
----------
df : pd.DataFrame
Raw dataset
Returns
-------
pd.DataFrame
Cleaned dataset
"""
initial_size = len(df)
# Remove rows with any NaN in numeric columns
numeric_cols = df.select_dtypes(include=[np.number]).columns
df = df.dropna(subset=numeric_cols)
# Remove physically impossible values
df = df[df["etch_depth_um"] > 0]
df = df[df["etch_width_um"] > 0]
df = df[df["surface_roughness_Sa_um"] > 0]
df = df[df["power_mW"] > 0]
df = df[df["scan_speed_mm_s"] > 0]
# Remove extreme outliers (>5 sigma from mean for targets)
target_cols = [
"etch_depth_um", "etch_width_um", "surface_roughness_Sa_um",
"aspect_ratio", "side_wall_angle_deg"
]
for col in target_cols:
if col in df.columns:
mean, std = df[col].mean(), df[col].std()
df = df[(df[col] >= mean - 5 * std) & (df[col] <= mean + 5 * std)]
removed = initial_size - len(df)
if removed > 0:
logger.info(f"Removed {removed} rows during cleaning ({removed/initial_size*100:.1f}%)")
return df.reset_index(drop=True)
def split_dataset(
df: pd.DataFrame,
test_size: float = 0.15,
val_size: float = 0.15,
group_column: str = "material_type",
random_state: int = 42,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
Split dataset into train/validation/test with stratification by material type.
Uses StratifiedShuffleSplit to ensure proportional material representation
across all splits, which is critical for generalization assessment.
Parameters
----------
df : pd.DataFrame
Full dataset
test_size : float
Fraction for test set
val_size : float
Fraction for validation set (from remaining after test)
group_column : str
Column to stratify by
random_state : int
Random seed
Returns
-------
tuple of (train_df, val_df, test_df)
"""
if group_column not in df.columns:
logger.warning(f"Group column '{group_column}' not found. Using random split.")
train_val, test = train_test_split(df, test_size=test_size, random_state=random_state)
val_frac = val_size / (1 - test_size)
train, val = train_test_split(train_val, test_size=val_frac, random_state=random_state)
else:
# Stratified split by material type
strat_col = df[group_column]
sss_test = StratifiedShuffleSplit(n_splits=1, test_size=test_size, random_state=random_state)
train_val_idx, test_idx = next(sss_test.split(df, strat_col))
train_val = df.iloc[train_val_idx]
test = df.iloc[test_idx]
# Split train_val into train and validation
val_frac = val_size / (1 - test_size)
strat_col_tv = train_val[group_column]
sss_val = StratifiedShuffleSplit(n_splits=1, test_size=val_frac, random_state=random_state)
train_idx, val_idx = next(sss_val.split(train_val, strat_col_tv))
train = train_val.iloc[train_idx]
val = train_val.iloc[val_idx]
logger.info(
f"Split sizes - Train: {len(train)} ({len(train)/len(df)*100:.1f}%), "
f"Val: {len(val)} ({len(val)/len(df)*100:.1f}%), "
f"Test: {len(test)} ({len(test)/len(df)*100:.1f}%)"
)
# Verify material distribution
if group_column in df.columns:
for name, subset in [("Train", train), ("Val", val), ("Test", test)]:
dist = subset[group_column].value_counts(normalize=True)
logger.debug(f"{name} material distribution:\n{dist}")
return train.reset_index(drop=True), val.reset_index(drop=True), test.reset_index(drop=True)
def get_feature_target_arrays(
df: pd.DataFrame,
feature_columns: list[str],
target_columns: list[str],
) -> Tuple[np.ndarray, np.ndarray]:
"""
Extract feature and target arrays from DataFrame.
Parameters
----------
df : pd.DataFrame
feature_columns : list of str
target_columns : list of str
Returns
-------
tuple of (X, y) as numpy arrays
"""
X = df[feature_columns].values.astype(np.float32)
y = df[target_columns].values.astype(np.float32)
return X, y