TWLab's picture
Add publication-ready ML project structure with full source code
e2b220f verified
"""
Model implementations: XGBoost, Neural Network, and Weighted Ensemble.
All models follow a consistent interface for training, prediction,
and cross-validation to enable fair comparison.
"""
from __future__ import annotations
import logging
from copy import deepcopy
from typing import Any, Dict, List, Optional, Tuple
import numpy as np
import torch
import torch.nn as nn
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.model_selection import KFold, StratifiedKFold
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import DataLoader, TensorDataset
import xgboost as xgb
logger = logging.getLogger(__name__)
# =============================================================================
# XGBoost Multi-Output Regressor
# =============================================================================
class XGBoostMultiOutput:
"""
XGBoost regressor trained independently per target.
This approach (separate models per target) is preferred over
MultiOutputRegressor wrapper as it allows per-target hyperparameter
inspection and feature importance analysis.
"""
def __init__(self, params: Dict[str, Any], target_names: List[str]):
self.params = params
self.target_names = target_names
self.models: Dict[str, xgb.XGBRegressor] = {}
def fit(
self,
X_train: np.ndarray,
y_train: np.ndarray,
X_val: Optional[np.ndarray] = None,
y_val: Optional[np.ndarray] = None,
) -> "XGBoostMultiOutput":
"""Train one XGBoost model per target."""
for i, target in enumerate(self.target_names):
model = xgb.XGBRegressor(**self.params)
eval_set = [(X_val, y_val[:, i])] if X_val is not None else None
model.fit(
X_train, y_train[:, i],
eval_set=eval_set,
verbose=False,
)
self.models[target] = model
logger.debug(f"XGBoost trained for {target}")
return self
def predict(self, X: np.ndarray) -> np.ndarray:
"""Predict all targets."""
preds = np.column_stack([
self.models[t].predict(X) for t in self.target_names
])
return preds
def get_feature_importance(self, feature_names: List[str]) -> Dict[str, Dict[str, float]]:
"""Get feature importance per target."""
importances = {}
for target, model in self.models.items():
imp = model.feature_importances_
importances[target] = dict(zip(feature_names, imp.tolist()))
return importances
# =============================================================================
# PyTorch Neural Network
# =============================================================================
class LaserEtchingNet(nn.Module):
"""
Multi-output regression neural network.
Architecture follows Behbahani et al. (2023) with BatchNorm and Dropout
for regularization, adapted to our multi-target prediction task.
"""
def __init__(
self,
n_features: int,
n_outputs: int,
hidden_layers: List[int] = [128, 64, 32],
dropout: float = 0.2,
batch_norm: bool = True,
):
super().__init__()
layers = []
prev_size = n_features
for i, h in enumerate(hidden_layers):
layers.append(nn.Linear(prev_size, h))
if batch_norm:
layers.append(nn.BatchNorm1d(h))
layers.append(nn.ReLU())
# Reduce dropout in deeper layers
drop_rate = dropout * (1 - i / len(hidden_layers))
if drop_rate > 0.05:
layers.append(nn.Dropout(drop_rate))
prev_size = h
layers.append(nn.Linear(prev_size, n_outputs))
self.network = nn.Sequential(*layers)
def forward(self, x: torch.Tensor) -> torch.Tensor:
return self.network(x)
class NeuralNetworkRegressor:
"""
Wrapper for PyTorch neural network with sklearn-like interface.
Includes MinMaxScaler for input/output normalization and
early stopping based on validation loss.
"""
def __init__(
self,
n_features: int,
n_outputs: int,
hidden_layers: List[int] = [128, 64, 32],
dropout: float = 0.2,
learning_rate: float = 1e-3,
weight_decay: float = 1e-4,
batch_size: int = 64,
max_epochs: int = 500,
patience: int = 30,
device: Optional[str] = None,
):
self.n_features = n_features
self.n_outputs = n_outputs
self.hidden_layers = hidden_layers
self.dropout = dropout
self.learning_rate = learning_rate
self.weight_decay = weight_decay
self.batch_size = batch_size
self.max_epochs = max_epochs
self.patience = patience
self.device = device or ("cuda" if torch.cuda.is_available() else "cpu")
self.scaler_X = MinMaxScaler()
self.scaler_y = MinMaxScaler()
self.model: Optional[LaserEtchingNet] = None
self.train_losses: List[float] = []
self.val_losses: List[float] = []
def fit(
self,
X_train: np.ndarray,
y_train: np.ndarray,
X_val: Optional[np.ndarray] = None,
y_val: Optional[np.ndarray] = None,
) -> "NeuralNetworkRegressor":
"""Train neural network with early stopping."""
# Scale data
X_train_s = self.scaler_X.fit_transform(X_train)
y_train_s = self.scaler_y.fit_transform(y_train)
X_val_s = self.scaler_X.transform(X_val) if X_val is not None else None
y_val_s = self.scaler_y.transform(y_val) if y_val is not None else None
# Create model
self.model = LaserEtchingNet(
n_features=self.n_features,
n_outputs=self.n_outputs,
hidden_layers=self.hidden_layers,
dropout=self.dropout,
).to(self.device)
optimizer = torch.optim.Adam(
self.model.parameters(),
lr=self.learning_rate,
weight_decay=self.weight_decay,
)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode="min", factor=0.5, patience=10, min_lr=1e-6
)
criterion = nn.MSELoss()
# DataLoader
train_ds = TensorDataset(
torch.FloatTensor(X_train_s), torch.FloatTensor(y_train_s)
)
train_dl = DataLoader(train_ds, batch_size=self.batch_size, shuffle=True)
if X_val_s is not None:
val_X_t = torch.FloatTensor(X_val_s).to(self.device)
val_y_t = torch.FloatTensor(y_val_s).to(self.device)
# Training loop
best_val_loss = float("inf")
patience_counter = 0
best_state = None
self.train_losses = []
self.val_losses = []
for epoch in range(self.max_epochs):
# Train
self.model.train()
epoch_loss = 0
for X_batch, y_batch in train_dl:
X_batch = X_batch.to(self.device)
y_batch = y_batch.to(self.device)
optimizer.zero_grad()
pred = self.model(X_batch)
loss = criterion(pred, y_batch)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
avg_train_loss = epoch_loss / len(train_dl)
self.train_losses.append(avg_train_loss)
# Validate
if X_val_s is not None:
self.model.eval()
with torch.no_grad():
val_pred = self.model(val_X_t)
val_loss = criterion(val_pred, val_y_t).item()
self.val_losses.append(val_loss)
scheduler.step(val_loss)
if val_loss < best_val_loss:
best_val_loss = val_loss
patience_counter = 0
best_state = {k: v.clone() for k, v in self.model.state_dict().items()}
else:
patience_counter += 1
if patience_counter >= self.patience:
logger.info(f"Early stopping at epoch {epoch + 1}")
break
if best_state is not None:
self.model.load_state_dict(best_state)
logger.info(f"NN training complete. Best val loss: {best_val_loss:.6f}")
return self
def predict(self, X: np.ndarray) -> np.ndarray:
"""Predict with inverse scaling."""
self.model.eval()
X_s = self.scaler_X.transform(X)
with torch.no_grad():
pred_s = self.model(torch.FloatTensor(X_s).to(self.device)).cpu().numpy()
return self.scaler_y.inverse_transform(pred_s)
# =============================================================================
# Weighted Ensemble
# =============================================================================
class WeightedEnsemble:
"""
Weighted ensemble combining XGBoost and Neural Network predictions.
Default weights (0.6 XGB + 0.4 NN) follow findings from Petit et al. (2025)
that tree-based methods slightly outperform NNs on tabular laser data,
but ensemble improves robustness.
"""
def __init__(
self,
xgb_model: XGBoostMultiOutput,
nn_model: NeuralNetworkRegressor,
xgb_weight: float = 0.6,
nn_weight: float = 0.4,
):
assert abs(xgb_weight + nn_weight - 1.0) < 1e-6, "Weights must sum to 1"
self.xgb_model = xgb_model
self.nn_model = nn_model
self.xgb_weight = xgb_weight
self.nn_weight = nn_weight
def predict(self, X: np.ndarray) -> np.ndarray:
"""Weighted average of XGBoost and NN predictions."""
xgb_pred = self.xgb_model.predict(X)
nn_pred = self.nn_model.predict(X)
return self.xgb_weight * xgb_pred + self.nn_weight * nn_pred
# =============================================================================
# Cross-Validation Framework
# =============================================================================
def cross_validate_model(
model_factory,
X: np.ndarray,
y: np.ndarray,
n_folds: int = 5,
groups: Optional[np.ndarray] = None,
random_state: int = 42,
) -> Dict[str, np.ndarray]:
"""
Perform k-fold cross-validation with a model factory.
Parameters
----------
model_factory : callable
Function that returns a fresh model instance
X : np.ndarray
Feature matrix
y : np.ndarray
Target matrix
n_folds : int
Number of CV folds
groups : np.ndarray, optional
Group labels for stratified splitting
random_state : int
Returns
-------
dict
CV results with keys: 'mae', 'rmse', 'r2', 'mape' — each (n_folds, n_targets)
"""
if groups is not None:
kf = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=random_state)
split_iter = kf.split(X, groups)
else:
kf = KFold(n_splits=n_folds, shuffle=True, random_state=random_state)
split_iter = kf.split(X)
n_targets = y.shape[1]
results = {
"mae": np.zeros((n_folds, n_targets)),
"rmse": np.zeros((n_folds, n_targets)),
"r2": np.zeros((n_folds, n_targets)),
"mape": np.zeros((n_folds, n_targets)),
}
for fold, (train_idx, val_idx) in enumerate(split_iter):
X_train, X_val = X[train_idx], X[val_idx]
y_train, y_val = y[train_idx], y[val_idx]
model = model_factory()
model.fit(X_train, y_train, X_val, y_val)
y_pred = model.predict(X_val)
for t in range(n_targets):
results["mae"][fold, t] = mean_absolute_error(y_val[:, t], y_pred[:, t])
results["rmse"][fold, t] = np.sqrt(mean_squared_error(y_val[:, t], y_pred[:, t]))
results["r2"][fold, t] = r2_score(y_val[:, t], y_pred[:, t])
# MAPE with epsilon to avoid division by zero
mape = np.mean(np.abs((y_val[:, t] - y_pred[:, t]) / (np.abs(y_val[:, t]) + 1e-8))) * 100
results["mape"][fold, t] = mape
logger.info(f"Fold {fold + 1}/{n_folds}: mean R² = {results['r2'][fold].mean():.4f}")
return results