File size: 6,014 Bytes
e2b220f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
"""
Data loading and preprocessing module.

Handles loading from HuggingFace Hub or local CSV, data cleaning,
train/val/test splitting with stratification by material type.
"""

from __future__ import annotations

import logging
from pathlib import Path
from typing import Optional, Tuple

import numpy as np
import pandas as pd
from sklearn.model_selection import (
    GroupShuffleSplit,
    StratifiedShuffleSplit,
    train_test_split,
)

logger = logging.getLogger(__name__)


def load_dataset(
    source: str,
    local_path: Optional[str] = None,
    random_state: int = 42,
) -> pd.DataFrame:
    """
    Load dataset from HuggingFace Hub or local CSV.

    Parameters
    ----------
    source : str
        HuggingFace dataset ID (e.g., 'TWLAb/femtosecond-laser-hydrogel-etching-data')
    local_path : str, optional
        Path to local CSV file (used if available, avoids re-downloading)
    random_state : int
        Random seed for reproducibility

    Returns
    -------
    pd.DataFrame
        Loaded and cleaned dataset
    """
    if local_path and Path(local_path).exists():
        logger.info(f"Loading dataset from local path: {local_path}")
        df = pd.read_csv(local_path)
    else:
        logger.info(f"Loading dataset from HuggingFace Hub: {source}")
        try:
            from datasets import load_dataset as hf_load
            ds = hf_load(source, split="train")
            df = ds.to_pandas()
        except Exception as e:
            logger.error(f"Failed to load from Hub: {e}")
            raise

    logger.info(f"Dataset loaded: {df.shape[0]} samples, {df.shape[1]} columns")
    return df


def clean_dataset(df: pd.DataFrame) -> pd.DataFrame:
    """
    Clean dataset: handle missing values, remove outliers, fix dtypes.

    Parameters
    ----------
    df : pd.DataFrame
        Raw dataset

    Returns
    -------
    pd.DataFrame
        Cleaned dataset
    """
    initial_size = len(df)

    # Remove rows with any NaN in numeric columns
    numeric_cols = df.select_dtypes(include=[np.number]).columns
    df = df.dropna(subset=numeric_cols)

    # Remove physically impossible values
    df = df[df["etch_depth_um"] > 0]
    df = df[df["etch_width_um"] > 0]
    df = df[df["surface_roughness_Sa_um"] > 0]
    df = df[df["power_mW"] > 0]
    df = df[df["scan_speed_mm_s"] > 0]

    # Remove extreme outliers (>5 sigma from mean for targets)
    target_cols = [
        "etch_depth_um", "etch_width_um", "surface_roughness_Sa_um",
        "aspect_ratio", "side_wall_angle_deg"
    ]
    for col in target_cols:
        if col in df.columns:
            mean, std = df[col].mean(), df[col].std()
            df = df[(df[col] >= mean - 5 * std) & (df[col] <= mean + 5 * std)]

    removed = initial_size - len(df)
    if removed > 0:
        logger.info(f"Removed {removed} rows during cleaning ({removed/initial_size*100:.1f}%)")

    return df.reset_index(drop=True)


def split_dataset(
    df: pd.DataFrame,
    test_size: float = 0.15,
    val_size: float = 0.15,
    group_column: str = "material_type",
    random_state: int = 42,
) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """
    Split dataset into train/validation/test with stratification by material type.

    Uses StratifiedShuffleSplit to ensure proportional material representation
    across all splits, which is critical for generalization assessment.

    Parameters
    ----------
    df : pd.DataFrame
        Full dataset
    test_size : float
        Fraction for test set
    val_size : float
        Fraction for validation set (from remaining after test)
    group_column : str
        Column to stratify by
    random_state : int
        Random seed

    Returns
    -------
    tuple of (train_df, val_df, test_df)
    """
    if group_column not in df.columns:
        logger.warning(f"Group column '{group_column}' not found. Using random split.")
        train_val, test = train_test_split(df, test_size=test_size, random_state=random_state)
        val_frac = val_size / (1 - test_size)
        train, val = train_test_split(train_val, test_size=val_frac, random_state=random_state)
    else:
        # Stratified split by material type
        strat_col = df[group_column]

        sss_test = StratifiedShuffleSplit(n_splits=1, test_size=test_size, random_state=random_state)
        train_val_idx, test_idx = next(sss_test.split(df, strat_col))

        train_val = df.iloc[train_val_idx]
        test = df.iloc[test_idx]

        # Split train_val into train and validation
        val_frac = val_size / (1 - test_size)
        strat_col_tv = train_val[group_column]
        sss_val = StratifiedShuffleSplit(n_splits=1, test_size=val_frac, random_state=random_state)
        train_idx, val_idx = next(sss_val.split(train_val, strat_col_tv))

        train = train_val.iloc[train_idx]
        val = train_val.iloc[val_idx]

    logger.info(
        f"Split sizes - Train: {len(train)} ({len(train)/len(df)*100:.1f}%), "
        f"Val: {len(val)} ({len(val)/len(df)*100:.1f}%), "
        f"Test: {len(test)} ({len(test)/len(df)*100:.1f}%)"
    )

    # Verify material distribution
    if group_column in df.columns:
        for name, subset in [("Train", train), ("Val", val), ("Test", test)]:
            dist = subset[group_column].value_counts(normalize=True)
            logger.debug(f"{name} material distribution:\n{dist}")

    return train.reset_index(drop=True), val.reset_index(drop=True), test.reset_index(drop=True)


def get_feature_target_arrays(
    df: pd.DataFrame,
    feature_columns: list[str],
    target_columns: list[str],
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Extract feature and target arrays from DataFrame.

    Parameters
    ----------
    df : pd.DataFrame
    feature_columns : list of str
    target_columns : list of str

    Returns
    -------
    tuple of (X, y) as numpy arrays
    """
    X = df[feature_columns].values.astype(np.float32)
    y = df[target_columns].values.astype(np.float32)
    return X, y