Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| NBA Quant AI β Neural Network Models (2025-2026 SOTA) | |
| ====================================================== | |
| Real, production-grade neural architectures for NBA game prediction. | |
| Models implemented: | |
| 1. LSTMSequenceModel β Bidirectional LSTM over last N games | |
| 2. TransformerAttentionModel β Self-attention over game history | |
| 3. TabNetModel β Attention-based tabular learning (Arik & Pfister 2021) | |
| 4. FTTransformerModel β Feature Tokenizer + Transformer (Gorishniy et al. 2021) | |
| 5. DeepEnsemble β N independent nets, averaged predictions | |
| 6. ConformalPredictionWrapper β Calibrated prediction intervals (any base model) | |
| 7. AutoGluonEnsemble β Auto-stacking over hundreds of configs | |
| All models: | |
| - Handle NaN gracefully (median imputation) | |
| - Work with 6000+ features | |
| - Use early stopping | |
| - CPU-only PyTorch (no CUDA needed) | |
| - Fit in 16 GB RAM (HF Spaces free tier) | |
| THIS RUNS ON HF SPACES ONLY β NOT ON VM. | |
| """ | |
| from __future__ import annotations | |
| import copy | |
| import json | |
| import math | |
| import os | |
| import pickle | |
| import warnings | |
| from abc import ABC, abstractmethod | |
| from pathlib import Path | |
| from typing import Any, Dict, List, Optional, Tuple, Union | |
| import numpy as np | |
| from sklearn.model_selection import train_test_split | |
| from sklearn.preprocessing import StandardScaler | |
| warnings.filterwarnings("ignore", category=UserWarning) | |
| # --------------------------------------------------------------------------- | |
| # Lazy imports β heavy libraries loaded only when a model is instantiated | |
| # --------------------------------------------------------------------------- | |
| def _import_torch(): | |
| """Import torch lazily to avoid startup cost.""" | |
| import torch | |
| import torch.nn as nn | |
| import torch.optim as optim | |
| from torch.utils.data import DataLoader, TensorDataset | |
| return torch, nn, optim, DataLoader, TensorDataset | |
| # --------------------------------------------------------------------------- | |
| # Base class β common interface for all models | |
| # --------------------------------------------------------------------------- | |
| class BaseNBAModel(ABC): | |
| """Abstract base for all NBA prediction models.""" | |
| def __init__(self, **params): | |
| self.params = params | |
| self._scaler: Optional[StandardScaler] = None | |
| self._feature_medians: Optional[np.ndarray] = None | |
| self._is_fitted = False | |
| # --- public interface --------------------------------------------------- | |
| def fit( | |
| self, | |
| X_train: np.ndarray, | |
| y_train: np.ndarray, | |
| X_val: Optional[np.ndarray] = None, | |
| y_val: Optional[np.ndarray] = None, | |
| ) -> "BaseNBAModel": | |
| """Train the model. Returns self.""" | |
| ... | |
| def predict_proba(self, X: np.ndarray) -> np.ndarray: | |
| """Return P(home_win) for each row β shape (n,).""" | |
| ... | |
| def get_params(self) -> Dict[str, Any]: | |
| """Return hyperparameter dict (JSON-serialisable).""" | |
| return {k: v for k, v in self.params.items() if _is_jsonable(v)} | |
| def save(self, path: Union[str, Path]) -> None: | |
| """Persist to disk.""" | |
| path = Path(path) | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with open(path, "wb") as f: | |
| pickle.dump(self, f, protocol=pickle.HIGHEST_PROTOCOL) | |
| def load(cls, path: Union[str, Path]) -> "BaseNBAModel": | |
| """Load from disk.""" | |
| with open(path, "rb") as f: | |
| obj = pickle.load(f) | |
| return obj | |
| # --- NaN handling & scaling -------------------------------------------- | |
| def _impute(self, X: np.ndarray, fit: bool = False) -> np.ndarray: | |
| """Replace NaN/Inf with column medians. If *fit*, compute medians first.""" | |
| X = np.array(X, dtype=np.float32) | |
| X = np.where(np.isfinite(X), X, np.nan) | |
| if fit: | |
| self._feature_medians = np.nanmedian(X, axis=0) | |
| self._feature_medians = np.where( | |
| np.isfinite(self._feature_medians), self._feature_medians, 0.0 | |
| ) | |
| medians = self._feature_medians if self._feature_medians is not None else np.zeros(X.shape[1]) | |
| inds = np.where(np.isnan(X)) | |
| X[inds] = np.take(medians, inds[1]) | |
| return X | |
| def _scale(self, X: np.ndarray, fit: bool = False) -> np.ndarray: | |
| """Standard-scale features.""" | |
| if fit: | |
| self._scaler = StandardScaler() | |
| return self._scaler.fit_transform(X).astype(np.float32) | |
| if self._scaler is not None: | |
| return self._scaler.transform(X).astype(np.float32) | |
| return X.astype(np.float32) | |
| def _prepare(self, X: np.ndarray, fit: bool = False) -> np.ndarray: | |
| """Impute + scale.""" | |
| X = self._impute(X, fit=fit) | |
| X = self._scale(X, fit=fit) | |
| return X | |
| def _auto_val_split( | |
| self, | |
| X: np.ndarray, | |
| y: np.ndarray, | |
| X_val: Optional[np.ndarray], | |
| y_val: Optional[np.ndarray], | |
| val_frac: float = 0.15, | |
| ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: | |
| """If no validation set provided, carve one from the tail (time-ordered).""" | |
| if X_val is not None and y_val is not None: | |
| return X, y, X_val, y_val | |
| split = int(len(X) * (1 - val_frac)) | |
| return X[:split], y[:split], X[split:], y[split:] | |
| # =========================================================================== | |
| # 1. LSTM Game Sequence Model | |
| # =========================================================================== | |
| class LSTMSequenceModel(BaseNBAModel): | |
| """ | |
| Bidirectional LSTM over the last *seq_len* games of features per team. | |
| Input shape: (batch, seq_len, n_features) | |
| Architecture: BiLSTM(128) -> BiLSTM(64) -> Dense(32) -> Sigmoid | |
| For flat input (n_samples, n_features), the model internally reshapes | |
| using a sliding window of *seq_len* rows, treating consecutive games as | |
| the sequence dimension. For true per-team sequences, pass 3-D arrays | |
| directly. | |
| """ | |
| def __init__( | |
| self, | |
| seq_len: int = 10, | |
| hidden1: int = 128, | |
| hidden2: int = 64, | |
| dense_dim: int = 32, | |
| dropout: float = 0.3, | |
| lr: float = 1e-3, | |
| weight_decay: float = 1e-5, | |
| batch_size: int = 256, | |
| epochs: int = 120, | |
| patience: int = 15, | |
| **kw, | |
| ): | |
| super().__init__( | |
| seq_len=seq_len, hidden1=hidden1, hidden2=hidden2, | |
| dense_dim=dense_dim, dropout=dropout, lr=lr, | |
| weight_decay=weight_decay, batch_size=batch_size, | |
| epochs=epochs, patience=patience, **kw, | |
| ) | |
| self.seq_len = seq_len | |
| self.hidden1 = hidden1 | |
| self.hidden2 = hidden2 | |
| self.dense_dim = dense_dim | |
| self.dropout = dropout | |
| self.lr = lr | |
| self.weight_decay = weight_decay | |
| self.batch_size = batch_size | |
| self.epochs = epochs | |
| self.patience = patience | |
| self._net = None | |
| # --- PyTorch module (defined inside method to keep torch lazy) ---------- | |
| def _build_net(n_features: int, cfg: dict): | |
| torch, nn, _, _, _ = _import_torch() | |
| class BiLSTMNet(nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.lstm1 = nn.LSTM( | |
| input_size=n_features, | |
| hidden_size=cfg["hidden1"], | |
| batch_first=True, | |
| bidirectional=True, | |
| dropout=cfg["dropout"] if cfg["hidden2"] else 0, | |
| ) | |
| self.lstm2 = nn.LSTM( | |
| input_size=cfg["hidden1"] * 2, # bidirectional doubles | |
| hidden_size=cfg["hidden2"], | |
| batch_first=True, | |
| bidirectional=True, | |
| ) | |
| self.dropout = nn.Dropout(cfg["dropout"]) | |
| self.fc1 = nn.Linear(cfg["hidden2"] * 2, cfg["dense_dim"]) | |
| self.relu = nn.ReLU() | |
| self.fc2 = nn.Linear(cfg["dense_dim"], 1) | |
| def forward(self, x): | |
| # x: (batch, seq_len, features) | |
| out, _ = self.lstm1(x) | |
| out = self.dropout(out) | |
| out, _ = self.lstm2(out) | |
| # Take last hidden state | |
| out = out[:, -1, :] | |
| out = self.dropout(out) | |
| out = self.relu(self.fc1(out)) | |
| out = self.dropout(out) | |
| return torch.sigmoid(self.fc2(out)).squeeze(-1) | |
| return BiLSTMNet() | |
| # --- Sequence construction from flat arrays ---------------------------- | |
| def _make_sequences( | |
| self, X: np.ndarray, y: np.ndarray | |
| ) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Convert flat (n_games, n_features) into (n_sequences, seq_len, n_features). | |
| Uses a sliding window β game i maps to window [i-seq_len+1 .. i]. | |
| The first seq_len-1 games are dropped (not enough history). | |
| """ | |
| if X.ndim == 3: | |
| return X, y # already sequential | |
| seqs, labels = [], [] | |
| for i in range(self.seq_len - 1, len(X)): | |
| seqs.append(X[i - self.seq_len + 1 : i + 1]) | |
| labels.append(y[i]) | |
| return np.array(seqs, dtype=np.float32), np.array(labels, dtype=np.float32) | |
| # --- fit / predict ----------------------------------------------------- | |
| def fit( | |
| self, | |
| X_train: np.ndarray, | |
| y_train: np.ndarray, | |
| X_val: Optional[np.ndarray] = None, | |
| y_val: Optional[np.ndarray] = None, | |
| ) -> "LSTMSequenceModel": | |
| torch, nn, optim, DataLoader, TensorDataset = _import_torch() | |
| # Prepare | |
| X_train = self._prepare(X_train, fit=True) | |
| X_train, y_train, X_val, y_val = self._auto_val_split(X_train, y_train, X_val, y_val) | |
| if X_val is not None: | |
| X_val = self._prepare(X_val) | |
| # Build sequences | |
| X_tr_seq, y_tr_seq = self._make_sequences(X_train, y_train) | |
| X_va_seq, y_va_seq = self._make_sequences(X_val, y_val) | |
| n_features = X_tr_seq.shape[2] | |
| self._net = self._build_net(n_features, { | |
| "hidden1": self.hidden1, "hidden2": self.hidden2, | |
| "dense_dim": self.dense_dim, "dropout": self.dropout, | |
| }) | |
| optimizer = optim.AdamW( | |
| self._net.parameters(), lr=self.lr, weight_decay=self.weight_decay | |
| ) | |
| scheduler = optim.lr_scheduler.ReduceLROnPlateau( | |
| optimizer, mode="min", factor=0.5, patience=5, min_lr=1e-6 | |
| ) | |
| criterion = nn.BCELoss() | |
| train_ds = TensorDataset( | |
| torch.from_numpy(X_tr_seq), torch.from_numpy(y_tr_seq) | |
| ) | |
| train_dl = DataLoader(train_ds, batch_size=self.batch_size, shuffle=True) | |
| val_X_t = torch.from_numpy(X_va_seq) | |
| val_y_t = torch.from_numpy(y_va_seq) | |
| best_val_loss = float("inf") | |
| best_state = None | |
| wait = 0 | |
| self._net.train() | |
| for epoch in range(self.epochs): | |
| epoch_loss = 0.0 | |
| for xb, yb in train_dl: | |
| optimizer.zero_grad() | |
| preds = self._net(xb) | |
| loss = criterion(preds, yb) | |
| loss.backward() | |
| torch.nn.utils.clip_grad_norm_(self._net.parameters(), 1.0) | |
| optimizer.step() | |
| epoch_loss += loss.item() * len(xb) | |
| epoch_loss /= len(train_ds) | |
| # Validation | |
| self._net.eval() | |
| with torch.no_grad(): | |
| val_preds = self._net(val_X_t) | |
| val_loss = criterion(val_preds, val_y_t).item() | |
| self._net.train() | |
| scheduler.step(val_loss) | |
| if val_loss < best_val_loss - 1e-6: | |
| best_val_loss = val_loss | |
| best_state = copy.deepcopy(self._net.state_dict()) | |
| wait = 0 | |
| else: | |
| wait += 1 | |
| if wait >= self.patience: | |
| break | |
| if best_state is not None: | |
| self._net.load_state_dict(best_state) | |
| self._net.eval() | |
| self._is_fitted = True | |
| return self | |
| def predict_proba(self, X: np.ndarray) -> np.ndarray: | |
| torch, _, _, _, _ = _import_torch() | |
| assert self._is_fitted, "Model not fitted yet" | |
| X = self._prepare(X) | |
| # If flat, create sequences with padding for early games | |
| if X.ndim == 2: | |
| seqs = [] | |
| for i in range(len(X)): | |
| start = max(0, i - self.seq_len + 1) | |
| seq = X[start : i + 1] | |
| if len(seq) < self.seq_len: | |
| pad = np.zeros((self.seq_len - len(seq), X.shape[1]), dtype=np.float32) | |
| seq = np.concatenate([pad, seq], axis=0) | |
| seqs.append(seq) | |
| X_seq = np.array(seqs, dtype=np.float32) | |
| else: | |
| X_seq = X.astype(np.float32) | |
| self._net.eval() | |
| with torch.no_grad(): | |
| preds = self._net(torch.from_numpy(X_seq)) | |
| return preds.numpy() | |
| # =========================================================================== | |
| # 2. Transformer Attention Model | |
| # =========================================================================== | |
| class TransformerAttentionModel(BaseNBAModel): | |
| """ | |
| Self-attention over team performance history. | |
| Architecture: | |
| Linear projection -> Positional encoding -> | |
| TransformerEncoder (2 layers, 4 heads) -> | |
| Global average pool -> Dense -> Sigmoid | |
| For flat input the model treats each game as one token in a | |
| sequence of *seq_len* tokens (same sliding-window as LSTM model). | |
| """ | |
| def __init__( | |
| self, | |
| seq_len: int = 10, | |
| d_model: int = 128, | |
| n_heads: int = 4, | |
| n_layers: int = 2, | |
| dim_ff: int = 256, | |
| dropout: float = 0.2, | |
| lr: float = 5e-4, | |
| weight_decay: float = 1e-4, | |
| batch_size: int = 256, | |
| epochs: int = 120, | |
| patience: int = 15, | |
| **kw, | |
| ): | |
| super().__init__( | |
| seq_len=seq_len, d_model=d_model, n_heads=n_heads, | |
| n_layers=n_layers, dim_ff=dim_ff, dropout=dropout, | |
| lr=lr, weight_decay=weight_decay, batch_size=batch_size, | |
| epochs=epochs, patience=patience, **kw, | |
| ) | |
| self.seq_len = seq_len | |
| self.d_model = d_model | |
| self.n_heads = n_heads | |
| self.n_layers = n_layers | |
| self.dim_ff = dim_ff | |
| self.dropout = dropout | |
| self.lr = lr | |
| self.weight_decay = weight_decay | |
| self.batch_size = batch_size | |
| self.epochs = epochs | |
| self.patience = patience | |
| self._net = None | |
| def _build_net(n_features: int, cfg: dict): | |
| torch, nn, _, _, _ = _import_torch() | |
| class PositionalEncoding(nn.Module): | |
| """Sinusoidal positional encoding for game order.""" | |
| def __init__(self, d_model: int, max_len: int = 200): | |
| super().__init__() | |
| pe = torch.zeros(max_len, d_model) | |
| position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) | |
| div_term = torch.exp( | |
| torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model) | |
| ) | |
| pe[:, 0::2] = torch.sin(position * div_term) | |
| pe[:, 1::2] = torch.cos(position * div_term[: d_model // 2]) # handle odd d_model | |
| pe = pe.unsqueeze(0) # (1, max_len, d_model) | |
| self.register_buffer("pe", pe) | |
| def forward(self, x): | |
| return x + self.pe[:, : x.size(1), :] | |
| class TransformerNet(nn.Module): | |
| def __init__(self): | |
| super().__init__() | |
| self.input_proj = nn.Linear(n_features, cfg["d_model"]) | |
| self.pos_enc = PositionalEncoding(cfg["d_model"], max_len=cfg["seq_len"] + 10) | |
| self.layer_norm_in = nn.LayerNorm(cfg["d_model"]) | |
| encoder_layer = nn.TransformerEncoderLayer( | |
| d_model=cfg["d_model"], | |
| nhead=cfg["n_heads"], | |
| dim_feedforward=cfg["dim_ff"], | |
| dropout=cfg["dropout"], | |
| batch_first=True, | |
| activation="gelu", | |
| ) | |
| self.encoder = nn.TransformerEncoder( | |
| encoder_layer, num_layers=cfg["n_layers"] | |
| ) | |
| self.dropout = nn.Dropout(cfg["dropout"]) | |
| self.fc1 = nn.Linear(cfg["d_model"], cfg["d_model"] // 2) | |
| self.gelu = nn.GELU() | |
| self.fc2 = nn.Linear(cfg["d_model"] // 2, 1) | |
| def forward(self, x): | |
| # x: (batch, seq_len, n_features) | |
| x = self.input_proj(x) | |
| x = self.pos_enc(x) | |
| x = self.layer_norm_in(x) | |
| x = self.encoder(x) | |
| # Global average pooling across sequence dim | |
| x = x.mean(dim=1) | |
| x = self.dropout(x) | |
| x = self.gelu(self.fc1(x)) | |
| x = self.dropout(x) | |
| return torch.sigmoid(self.fc2(x)).squeeze(-1) | |
| return TransformerNet() | |
| def _make_sequences(self, X: np.ndarray, y: np.ndarray): | |
| if X.ndim == 3: | |
| return X, y | |
| seqs, labels = [], [] | |
| for i in range(self.seq_len - 1, len(X)): | |
| seqs.append(X[i - self.seq_len + 1 : i + 1]) | |
| labels.append(y[i]) | |
| return np.array(seqs, dtype=np.float32), np.array(labels, dtype=np.float32) | |
| def fit( | |
| self, | |
| X_train: np.ndarray, | |
| y_train: np.ndarray, | |
| X_val: Optional[np.ndarray] = None, | |
| y_val: Optional[np.ndarray] = None, | |
| ) -> "TransformerAttentionModel": | |
| torch, nn, optim, DataLoader, TensorDataset = _import_torch() | |
| X_train = self._prepare(X_train, fit=True) | |
| X_train, y_train, X_val, y_val = self._auto_val_split(X_train, y_train, X_val, y_val) | |
| if X_val is not None: | |
| X_val = self._prepare(X_val) | |
| X_tr_seq, y_tr_seq = self._make_sequences(X_train, y_train) | |
| X_va_seq, y_va_seq = self._make_sequences(X_val, y_val) | |
| n_features = X_tr_seq.shape[2] | |
| self._net = self._build_net(n_features, { | |
| "d_model": self.d_model, "n_heads": self.n_heads, | |
| "n_layers": self.n_layers, "dim_ff": self.dim_ff, | |
| "dropout": self.dropout, "seq_len": self.seq_len, | |
| }) | |
| optimizer = optim.AdamW( | |
| self._net.parameters(), lr=self.lr, weight_decay=self.weight_decay | |
| ) | |
| scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts( | |
| optimizer, T_0=10, T_mult=2, eta_min=1e-6 | |
| ) | |
| criterion = nn.BCELoss() | |
| train_ds = TensorDataset( | |
| torch.from_numpy(X_tr_seq), torch.from_numpy(y_tr_seq) | |
| ) | |
| train_dl = DataLoader(train_ds, batch_size=self.batch_size, shuffle=True) | |
| val_X_t = torch.from_numpy(X_va_seq) | |
| val_y_t = torch.from_numpy(y_va_seq) | |
| best_val_loss = float("inf") | |
| best_state = None | |
| wait = 0 | |
| self._net.train() | |
| for epoch in range(self.epochs): | |
| epoch_loss = 0.0 | |
| for xb, yb in train_dl: | |
| optimizer.zero_grad() | |
| preds = self._net(xb) | |
| loss = criterion(preds, yb) | |
| loss.backward() | |
| torch.nn.utils.clip_grad_norm_(self._net.parameters(), 1.0) | |
| optimizer.step() | |
| epoch_loss += loss.item() * len(xb) | |
| epoch_loss /= len(train_ds) | |
| scheduler.step(epoch + epoch_loss) # warm restart input | |
| self._net.eval() | |
| with torch.no_grad(): | |
| val_preds = self._net(val_X_t) | |
| val_loss = criterion(val_preds, val_y_t).item() | |
| self._net.train() | |
| if val_loss < best_val_loss - 1e-6: | |
| best_val_loss = val_loss | |
| best_state = copy.deepcopy(self._net.state_dict()) | |
| wait = 0 | |
| else: | |
| wait += 1 | |
| if wait >= self.patience: | |
| break | |
| if best_state is not None: | |
| self._net.load_state_dict(best_state) | |
| self._net.eval() | |
| self._is_fitted = True | |
| return self | |
| def predict_proba(self, X: np.ndarray) -> np.ndarray: | |
| torch, _, _, _, _ = _import_torch() | |
| assert self._is_fitted, "Model not fitted yet" | |
| X = self._prepare(X) | |
| if X.ndim == 2: | |
| seqs = [] | |
| for i in range(len(X)): | |
| start = max(0, i - self.seq_len + 1) | |
| seq = X[start : i + 1] | |
| if len(seq) < self.seq_len: | |
| pad = np.zeros((self.seq_len - len(seq), X.shape[1]), dtype=np.float32) | |
| seq = np.concatenate([pad, seq], axis=0) | |
| seqs.append(seq) | |
| X_seq = np.array(seqs, dtype=np.float32) | |
| else: | |
| X_seq = X.astype(np.float32) | |
| self._net.eval() | |
| with torch.no_grad(): | |
| preds = self._net(torch.from_numpy(X_seq)) | |
| return preds.numpy() | |
| # =========================================================================== | |
| # 3. TabNet β Attention-based Tabular Model | |
| # =========================================================================== | |
| class TabNetModel(BaseNBAModel): | |
| """ | |
| TabNet (Arik & Pfister 2021) β SOTA attention-based tabular learning. | |
| Uses sequential attention to select features at each decision step, | |
| providing built-in interpretability via attention masks. | |
| Wraps pytorch_tabnet.TabNetClassifier with NaN handling and | |
| early stopping. | |
| """ | |
| def __init__( | |
| self, | |
| n_d: int = 32, | |
| n_a: int = 32, | |
| n_steps: int = 5, | |
| gamma: float = 1.5, | |
| lambda_sparse: float = 1e-4, | |
| n_independent: int = 2, | |
| n_shared: int = 2, | |
| lr: float = 2e-2, | |
| batch_size: int = 1024, | |
| virtual_batch_size: int = 256, | |
| epochs: int = 200, | |
| patience: int = 20, | |
| mask_type: str = "entmax", | |
| **kw, | |
| ): | |
| super().__init__( | |
| n_d=n_d, n_a=n_a, n_steps=n_steps, gamma=gamma, | |
| lambda_sparse=lambda_sparse, n_independent=n_independent, | |
| n_shared=n_shared, lr=lr, batch_size=batch_size, | |
| virtual_batch_size=virtual_batch_size, epochs=epochs, | |
| patience=patience, mask_type=mask_type, **kw, | |
| ) | |
| self.n_d = n_d | |
| self.n_a = n_a | |
| self.n_steps = n_steps | |
| self.gamma = gamma | |
| self.lambda_sparse = lambda_sparse | |
| self.n_independent = n_independent | |
| self.n_shared = n_shared | |
| self.lr = lr | |
| self.batch_size = batch_size | |
| self.virtual_batch_size = virtual_batch_size | |
| self.epochs = epochs | |
| self.patience = patience | |
| self.mask_type = mask_type | |
| self._clf = None | |
| self._feature_importances: Optional[np.ndarray] = None | |
| def fit( | |
| self, | |
| X_train: np.ndarray, | |
| y_train: np.ndarray, | |
| X_val: Optional[np.ndarray] = None, | |
| y_val: Optional[np.ndarray] = None, | |
| ) -> "TabNetModel": | |
| from pytorch_tabnet.tab_model import TabNetClassifier | |
| X_train = self._impute(X_train, fit=True) | |
| X_train, y_train, X_val, y_val = self._auto_val_split(X_train, y_train, X_val, y_val) | |
| if X_val is not None: | |
| X_val = self._impute(X_val) | |
| y_train = y_train.astype(np.int64) | |
| y_val = y_val.astype(np.int64) | |
| self._clf = TabNetClassifier( | |
| n_d=self.n_d, | |
| n_a=self.n_a, | |
| n_steps=self.n_steps, | |
| gamma=self.gamma, | |
| lambda_sparse=self.lambda_sparse, | |
| n_independent=self.n_independent, | |
| n_shared=self.n_shared, | |
| optimizer_fn=None, # default Adam | |
| optimizer_params={"lr": self.lr}, | |
| mask_type=self.mask_type, | |
| scheduler_fn=None, | |
| scheduler_params=None, | |
| verbose=0, | |
| device_name="cpu", | |
| ) | |
| self._clf.fit( | |
| X_train=X_train, | |
| y_train=y_train, | |
| eval_set=[(X_val, y_val)], | |
| eval_name=["val"], | |
| eval_metric=["logloss"], | |
| max_epochs=self.epochs, | |
| patience=self.patience, | |
| batch_size=self.batch_size, | |
| virtual_batch_size=min(self.virtual_batch_size, self.batch_size), | |
| drop_last=False, | |
| ) | |
| self._feature_importances = self._clf.feature_importances_ | |
| self._is_fitted = True | |
| return self | |
| def predict_proba(self, X: np.ndarray) -> np.ndarray: | |
| assert self._is_fitted, "Model not fitted yet" | |
| X = self._impute(X) | |
| proba = self._clf.predict_proba(X) # shape (n, 2) | |
| return proba[:, 1] | |
| def get_feature_importances(self) -> Optional[np.ndarray]: | |
| """Return TabNet attention-based feature importances.""" | |
| return self._feature_importances | |
| def explain(self, X: np.ndarray) -> np.ndarray: | |
| """Return per-sample feature attention masks.""" | |
| assert self._is_fitted, "Model not fitted yet" | |
| X = self._impute(X) | |
| masks, _ = self._clf.explain(X) | |
| return masks | |
| # =========================================================================== | |
| # 4. FT-Transformer (Feature Tokenizer + Transformer) | |
| # =========================================================================== | |
| class FTTransformerModel(BaseNBAModel): | |
| """ | |
| FT-Transformer (Gorishniy et al. 2021) β confirmed SOTA for tabular | |
| data in 2025-2026 benchmarks. | |
| Each numerical feature is projected into a *d_token*-dimensional embedding. | |
| A [CLS] token is prepended. Self-attention across all feature tokens | |
| captures cross-feature interactions. The [CLS] representation feeds a | |
| classification head. | |
| Because the full 6000+ features would create 6000+ tokens (too large for | |
| self-attention on CPU), we first apply a learned linear bottleneck to | |
| reduce to *n_tokens* feature groups. | |
| """ | |
| def __init__( | |
| self, | |
| n_tokens: int = 128, | |
| d_token: int = 64, | |
| n_heads: int = 4, | |
| n_layers: int = 3, | |
| dim_ff: int = 256, | |
| dropout: float = 0.2, | |
| attention_dropout: float = 0.1, | |
| lr: float = 1e-4, | |
| weight_decay: float = 1e-5, | |
| batch_size: int = 512, | |
| epochs: int = 120, | |
| patience: int = 15, | |
| **kw, | |
| ): | |
| super().__init__( | |
| n_tokens=n_tokens, d_token=d_token, n_heads=n_heads, | |
| n_layers=n_layers, dim_ff=dim_ff, dropout=dropout, | |
| attention_dropout=attention_dropout, lr=lr, | |
| weight_decay=weight_decay, batch_size=batch_size, | |
| epochs=epochs, patience=patience, **kw, | |
| ) | |
| self.n_tokens = n_tokens | |
| self.d_token = d_token | |
| self.n_heads = n_heads | |
| self.n_layers = n_layers | |
| self.dim_ff = dim_ff | |
| self.dropout = dropout | |
| self.attention_dropout = attention_dropout | |
| self.lr = lr | |
| self.weight_decay = weight_decay | |
| self.batch_size = batch_size | |
| self.epochs = epochs | |
| self.patience = patience | |
| self._net = None | |
| def _build_net(n_features: int, cfg: dict): | |
| torch, nn, _, _, _ = _import_torch() | |
| class FTTransformerNet(nn.Module): | |
| """ | |
| Feature Tokenizer + Transformer. | |
| 1) Bottleneck: Linear(n_features -> n_tokens) β group features | |
| 2) Token embed: each of *n_tokens* scalars -> d_token vector | |
| 3) Prepend [CLS] token | |
| 4) TransformerEncoder | |
| 5) [CLS] output -> classification head | |
| """ | |
| def __init__(self): | |
| super().__init__() | |
| n_tok = cfg["n_tokens"] | |
| d_tok = cfg["d_token"] | |
| # Bottleneck projection: reduce 6000 features to n_tokens groups | |
| self.bottleneck = nn.Linear(n_features, n_tok) | |
| self.bn_norm = nn.LayerNorm(n_tok) | |
| # Per-token embedding: each scalar -> d_token vector | |
| # Implemented as a shared Linear(1 -> d_token) + per-token bias | |
| self.token_weight = nn.Parameter(torch.randn(n_tok, d_tok) * 0.02) | |
| self.token_bias = nn.Parameter(torch.zeros(n_tok, d_tok)) | |
| # [CLS] token | |
| self.cls_token = nn.Parameter(torch.randn(1, 1, d_tok) * 0.02) | |
| # Transformer | |
| self.layer_norm = nn.LayerNorm(d_tok) | |
| encoder_layer = nn.TransformerEncoderLayer( | |
| d_model=d_tok, | |
| nhead=cfg["n_heads"], | |
| dim_feedforward=cfg["dim_ff"], | |
| dropout=cfg["dropout"], | |
| batch_first=True, | |
| activation="gelu", | |
| ) | |
| self.encoder = nn.TransformerEncoder( | |
| encoder_layer, num_layers=cfg["n_layers"] | |
| ) | |
| # Head | |
| self.head = nn.Sequential( | |
| nn.LayerNorm(d_tok), | |
| nn.Linear(d_tok, d_tok // 2), | |
| nn.GELU(), | |
| nn.Dropout(cfg["dropout"]), | |
| nn.Linear(d_tok // 2, 1), | |
| ) | |
| def forward(self, x): | |
| # x: (batch, n_features) | |
| batch_size = x.size(0) | |
| # Bottleneck: (batch, n_features) -> (batch, n_tokens) | |
| x = self.bn_norm(self.bottleneck(x)) | |
| # Token embedding: (batch, n_tokens) -> (batch, n_tokens, d_token) | |
| # x_i * weight_i + bias_i for each token | |
| x = x.unsqueeze(-1) * self.token_weight.unsqueeze(0) + self.token_bias.unsqueeze(0) | |
| # Prepend [CLS] | |
| cls = self.cls_token.expand(batch_size, -1, -1) | |
| x = torch.cat([cls, x], dim=1) # (batch, 1 + n_tokens, d_token) | |
| x = self.layer_norm(x) | |
| x = self.encoder(x) | |
| # Extract [CLS] output | |
| cls_out = x[:, 0, :] | |
| return torch.sigmoid(self.head(cls_out)).squeeze(-1) | |
| return FTTransformerNet() | |
| def fit( | |
| self, | |
| X_train: np.ndarray, | |
| y_train: np.ndarray, | |
| X_val: Optional[np.ndarray] = None, | |
| y_val: Optional[np.ndarray] = None, | |
| ) -> "FTTransformerModel": | |
| torch, nn, optim, DataLoader, TensorDataset = _import_torch() | |
| X_train = self._prepare(X_train, fit=True) | |
| X_train, y_train, X_val, y_val = self._auto_val_split(X_train, y_train, X_val, y_val) | |
| if X_val is not None: | |
| X_val = self._prepare(X_val) | |
| y_train = y_train.astype(np.float32) | |
| y_val = y_val.astype(np.float32) | |
| n_features = X_train.shape[1] | |
| self._net = self._build_net(n_features, { | |
| "n_tokens": min(self.n_tokens, n_features), | |
| "d_token": self.d_token, | |
| "n_heads": self.n_heads, | |
| "n_layers": self.n_layers, | |
| "dim_ff": self.dim_ff, | |
| "dropout": self.dropout, | |
| }) | |
| optimizer = optim.AdamW( | |
| self._net.parameters(), lr=self.lr, weight_decay=self.weight_decay | |
| ) | |
| scheduler = optim.lr_scheduler.OneCycleLR( | |
| optimizer, max_lr=self.lr * 10, total_steps=self.epochs, | |
| pct_start=0.1, anneal_strategy="cos", | |
| ) | |
| criterion = nn.BCELoss() | |
| train_ds = TensorDataset( | |
| torch.from_numpy(X_train), torch.from_numpy(y_train) | |
| ) | |
| train_dl = DataLoader(train_ds, batch_size=self.batch_size, shuffle=True) | |
| val_X_t = torch.from_numpy(X_val) | |
| val_y_t = torch.from_numpy(y_val) | |
| best_val_loss = float("inf") | |
| best_state = None | |
| wait = 0 | |
| self._net.train() | |
| for epoch in range(self.epochs): | |
| epoch_loss = 0.0 | |
| for xb, yb in train_dl: | |
| optimizer.zero_grad() | |
| preds = self._net(xb) | |
| loss = criterion(preds, yb) | |
| loss.backward() | |
| torch.nn.utils.clip_grad_norm_(self._net.parameters(), 1.0) | |
| optimizer.step() | |
| epoch_loss += loss.item() * len(xb) | |
| epoch_loss /= len(train_ds) | |
| scheduler.step() | |
| self._net.eval() | |
| with torch.no_grad(): | |
| val_preds = self._net(val_X_t) | |
| val_loss = criterion(val_preds, val_y_t).item() | |
| self._net.train() | |
| if val_loss < best_val_loss - 1e-6: | |
| best_val_loss = val_loss | |
| best_state = copy.deepcopy(self._net.state_dict()) | |
| wait = 0 | |
| else: | |
| wait += 1 | |
| if wait >= self.patience: | |
| break | |
| if best_state is not None: | |
| self._net.load_state_dict(best_state) | |
| self._net.eval() | |
| self._is_fitted = True | |
| return self | |
| def predict_proba(self, X: np.ndarray) -> np.ndarray: | |
| torch, _, _, _, _ = _import_torch() | |
| assert self._is_fitted, "Model not fitted yet" | |
| X = self._prepare(X) | |
| X_t = torch.from_numpy(X) | |
| self._net.eval() | |
| # Batch to avoid OOM on large inputs | |
| preds_list = [] | |
| bs = self.batch_size | |
| for i in range(0, len(X_t), bs): | |
| with torch.no_grad(): | |
| p = self._net(X_t[i : i + bs]) | |
| preds_list.append(p.numpy()) | |
| return np.concatenate(preds_list) | |
| # =========================================================================== | |
| # 5. Deep Ensemble | |
| # =========================================================================== | |
| class DeepEnsemble(BaseNBAModel): | |
| """ | |
| Train N independent neural networks with different random seeds. | |
| Average their predictions for: | |
| - Better calibration (ensemble smoothing) | |
| - Uncertainty estimation (prediction variance) | |
| Each member is a simple but effective MLP with skip connections (ResNet-style), | |
| which is the 2025 consensus best architecture for tabular deep learning | |
| when ensembled (Kadra et al. 2021 "Well-Tuned Simple Nets"). | |
| """ | |
| def __init__( | |
| self, | |
| n_members: int = 10, | |
| hidden_dims: Tuple[int, ...] = (512, 256, 128), | |
| dropout: float = 0.3, | |
| lr: float = 1e-3, | |
| weight_decay: float = 1e-4, | |
| batch_size: int = 512, | |
| epochs: int = 100, | |
| patience: int = 12, | |
| **kw, | |
| ): | |
| super().__init__( | |
| n_members=n_members, hidden_dims=list(hidden_dims), | |
| dropout=dropout, lr=lr, weight_decay=weight_decay, | |
| batch_size=batch_size, epochs=epochs, patience=patience, **kw, | |
| ) | |
| self.n_members = n_members | |
| self.hidden_dims = hidden_dims | |
| self.dropout = dropout | |
| self.lr = lr | |
| self.weight_decay = weight_decay | |
| self.batch_size = batch_size | |
| self.epochs = epochs | |
| self.patience = patience | |
| self._members: List = [] | |
| def _build_mlp(n_features: int, hidden_dims: Tuple[int, ...], dropout: float, seed: int): | |
| """Build one ResNet-style MLP member.""" | |
| torch, nn, _, _, _ = _import_torch() | |
| torch.manual_seed(seed) | |
| class ResBlock(nn.Module): | |
| """Pre-activation residual block.""" | |
| def __init__(self, dim: int, drop: float): | |
| super().__init__() | |
| self.net = nn.Sequential( | |
| nn.LayerNorm(dim), | |
| nn.GELU(), | |
| nn.Linear(dim, dim), | |
| nn.Dropout(drop), | |
| nn.LayerNorm(dim), | |
| nn.GELU(), | |
| nn.Linear(dim, dim), | |
| nn.Dropout(drop), | |
| ) | |
| def forward(self, x): | |
| return x + self.net(x) | |
| layers = [] | |
| in_dim = n_features | |
| for h_dim in hidden_dims: | |
| layers.append(nn.Linear(in_dim, h_dim)) | |
| layers.append(nn.GELU()) | |
| layers.append(nn.Dropout(dropout)) | |
| # Add residual block at each hidden layer | |
| layers.append(ResBlock(h_dim, dropout)) | |
| in_dim = h_dim | |
| layers.append(nn.Linear(in_dim, 1)) | |
| class EnsembleMLP(nn.Module): | |
| def __init__(self, layer_list): | |
| super().__init__() | |
| self.net = nn.Sequential(*layer_list) | |
| def forward(self, x): | |
| return torch.sigmoid(self.net(x)).squeeze(-1) | |
| return EnsembleMLP(layers) | |
| def fit( | |
| self, | |
| X_train: np.ndarray, | |
| y_train: np.ndarray, | |
| X_val: Optional[np.ndarray] = None, | |
| y_val: Optional[np.ndarray] = None, | |
| ) -> "DeepEnsemble": | |
| torch, nn, optim, DataLoader, TensorDataset = _import_torch() | |
| X_train = self._prepare(X_train, fit=True) | |
| X_train, y_train, X_val, y_val = self._auto_val_split(X_train, y_train, X_val, y_val) | |
| if X_val is not None: | |
| X_val = self._prepare(X_val) | |
| y_train = y_train.astype(np.float32) | |
| y_val = y_val.astype(np.float32) | |
| n_features = X_train.shape[1] | |
| val_X_t = torch.from_numpy(X_val) | |
| val_y_t = torch.from_numpy(y_val) | |
| criterion = nn.BCELoss() | |
| self._members = [] | |
| for member_idx in range(self.n_members): | |
| seed = 42 + member_idx * 1337 | |
| net = self._build_mlp(n_features, self.hidden_dims, self.dropout, seed) | |
| # Each member gets a different random seed for data shuffling too | |
| torch.manual_seed(seed) | |
| np.random.seed(seed) | |
| optimizer = optim.AdamW( | |
| net.parameters(), lr=self.lr, weight_decay=self.weight_decay | |
| ) | |
| scheduler = optim.lr_scheduler.ReduceLROnPlateau( | |
| optimizer, mode="min", factor=0.5, patience=5, min_lr=1e-6 | |
| ) | |
| train_ds = TensorDataset( | |
| torch.from_numpy(X_train), torch.from_numpy(y_train) | |
| ) | |
| train_dl = DataLoader(train_ds, batch_size=self.batch_size, shuffle=True) | |
| best_val_loss = float("inf") | |
| best_state = None | |
| wait = 0 | |
| net.train() | |
| for epoch in range(self.epochs): | |
| for xb, yb in train_dl: | |
| optimizer.zero_grad() | |
| preds = net(xb) | |
| loss = criterion(preds, yb) | |
| loss.backward() | |
| torch.nn.utils.clip_grad_norm_(net.parameters(), 1.0) | |
| optimizer.step() | |
| net.eval() | |
| with torch.no_grad(): | |
| vp = net(val_X_t) | |
| vl = criterion(vp, val_y_t).item() | |
| net.train() | |
| scheduler.step(vl) | |
| if vl < best_val_loss - 1e-6: | |
| best_val_loss = vl | |
| best_state = copy.deepcopy(net.state_dict()) | |
| wait = 0 | |
| else: | |
| wait += 1 | |
| if wait >= self.patience: | |
| break | |
| if best_state is not None: | |
| net.load_state_dict(best_state) | |
| net.eval() | |
| self._members.append(net) | |
| self._is_fitted = True | |
| return self | |
| def predict_proba(self, X: np.ndarray) -> np.ndarray: | |
| """Return mean prediction across ensemble members.""" | |
| torch, _, _, _, _ = _import_torch() | |
| assert self._is_fitted and self._members, "Model not fitted yet" | |
| X = self._prepare(X) | |
| X_t = torch.from_numpy(X) | |
| all_preds = [] | |
| for net in self._members: | |
| net.eval() | |
| with torch.no_grad(): | |
| p = net(X_t).numpy() | |
| all_preds.append(p) | |
| return np.mean(all_preds, axis=0) | |
| def predict_uncertainty(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Return (mean_prediction, std_prediction) across ensemble members. | |
| High std = high model uncertainty = less confident prediction. | |
| """ | |
| torch, _, _, _, _ = _import_torch() | |
| assert self._is_fitted and self._members, "Model not fitted yet" | |
| X = self._prepare(X) | |
| X_t = torch.from_numpy(X) | |
| all_preds = [] | |
| for net in self._members: | |
| net.eval() | |
| with torch.no_grad(): | |
| p = net(X_t).numpy() | |
| all_preds.append(p) | |
| stacked = np.array(all_preds) # (n_members, n_samples) | |
| return stacked.mean(axis=0), stacked.std(axis=0) | |
| # =========================================================================== | |
| # 6. Conformal Prediction Wrapper | |
| # =========================================================================== | |
| class ConformalPredictionWrapper(BaseNBAModel): | |
| """ | |
| Wraps ANY model to provide calibrated prediction intervals with | |
| guaranteed coverage. | |
| Uses split conformal prediction: | |
| 1. Train base model on training set | |
| 2. Compute non-conformity scores on calibration holdout | |
| 3. At inference, use quantile of scores to produce prediction sets | |
| For binary classification: | |
| - Returns P(home_win) from base model (point prediction) | |
| - Also provides prediction_set() that returns {0}, {1}, or {0,1} | |
| with guaranteed marginal coverage >= (1 - alpha) | |
| """ | |
| def __init__( | |
| self, | |
| base_model: BaseNBAModel, | |
| alpha: float = 0.10, | |
| cal_fraction: float = 0.20, | |
| **kw, | |
| ): | |
| super().__init__(alpha=alpha, cal_fraction=cal_fraction, **kw) | |
| self.base_model = base_model | |
| self.alpha = alpha | |
| self.cal_fraction = cal_fraction | |
| self._qhat: Optional[float] = None | |
| self._cal_scores: Optional[np.ndarray] = None | |
| def fit( | |
| self, | |
| X_train: np.ndarray, | |
| y_train: np.ndarray, | |
| X_val: Optional[np.ndarray] = None, | |
| y_val: Optional[np.ndarray] = None, | |
| ) -> "ConformalPredictionWrapper": | |
| """ | |
| Split data into proper-training and calibration sets. | |
| Train base model on proper-training, compute conformal scores on calibration. | |
| """ | |
| n = len(X_train) | |
| cal_size = int(n * self.cal_fraction) | |
| # Use the LAST cal_size samples for calibration (time-ordered) | |
| X_proper = X_train[: n - cal_size] | |
| y_proper = y_train[: n - cal_size] | |
| X_cal = X_train[n - cal_size :] | |
| y_cal = y_train[n - cal_size :] | |
| # Train base model | |
| self.base_model.fit(X_proper, y_proper, X_val, y_val) | |
| # Compute non-conformity scores on calibration set | |
| cal_probs = self.base_model.predict_proba(X_cal) | |
| # Score = 1 - P(true_class) | |
| scores = np.where(y_cal == 1, 1.0 - cal_probs, cal_probs) | |
| self._cal_scores = np.sort(scores) | |
| # Quantile for desired coverage | |
| n_cal = len(self._cal_scores) | |
| level = np.ceil((1.0 - self.alpha) * (n_cal + 1)) / n_cal | |
| level = min(level, 1.0) | |
| self._qhat = np.quantile(self._cal_scores, level, method="higher") | |
| self._is_fitted = True | |
| return self | |
| def predict_proba(self, X: np.ndarray) -> np.ndarray: | |
| """Return point predictions from base model.""" | |
| assert self._is_fitted, "Model not fitted yet" | |
| return self.base_model.predict_proba(X) | |
| def predict_sets(self, X: np.ndarray) -> List[set]: | |
| """ | |
| Return prediction sets with guaranteed (1-alpha) coverage. | |
| Each set is one of: | |
| - {1} β confident home win | |
| - {0} β confident away win | |
| - {0, 1} β uncertain (both plausible) | |
| """ | |
| assert self._is_fitted, "Model not fitted yet" | |
| probs = self.base_model.predict_proba(X) | |
| sets = [] | |
| for p in probs: | |
| s = set() | |
| # Include class 1 if score would be <= qhat | |
| if 1.0 - p <= self._qhat: | |
| s.add(1) | |
| # Include class 0 if score would be <= qhat | |
| if p <= self._qhat: | |
| s.add(0) | |
| if not s: | |
| # Shouldn't happen, but include most likely | |
| s.add(1 if p >= 0.5 else 0) | |
| sets.append(s) | |
| return sets | |
| def predict_intervals(self, X: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: | |
| """ | |
| Return (lower_bound, upper_bound) calibrated probability intervals. | |
| Width of interval reflects model uncertainty after conformal calibration. | |
| """ | |
| assert self._is_fitted, "Model not fitted yet" | |
| probs = self.base_model.predict_proba(X) | |
| lower = np.clip(probs - self._qhat, 0.0, 1.0) | |
| upper = np.clip(probs + self._qhat, 0.0, 1.0) | |
| return lower, upper | |
| def get_params(self) -> Dict[str, Any]: | |
| base_params = self.base_model.get_params() | |
| return { | |
| "wrapper": "conformal", | |
| "alpha": self.alpha, | |
| "cal_fraction": self.cal_fraction, | |
| "qhat": float(self._qhat) if self._qhat is not None else None, | |
| "base_model": base_params, | |
| } | |
| # =========================================================================== | |
| # 7. AutoGluon Ensemble | |
| # =========================================================================== | |
| class AutoGluonEnsemble(BaseNBAModel): | |
| """ | |
| AutoGluon Tabular β auto-search and stack hundreds of model configurations. | |
| Time-budgeted: runs for *max_time* seconds, tries GBMs, neural nets, | |
| linear models, k-NN, then stacks the best ones. | |
| Presets: "best_quality" = maximum stacking/bagging (slow but best), | |
| "good_quality" = reasonable speed/quality trade-off, | |
| "medium_quality" = fastest. | |
| """ | |
| def __init__( | |
| self, | |
| max_time: int = 3600, | |
| preset: str = "best_quality", | |
| eval_metric: str = "log_loss", | |
| num_bag_folds: int = 5, | |
| num_stack_levels: int = 1, | |
| verbosity: int = 1, | |
| **kw, | |
| ): | |
| super().__init__( | |
| max_time=max_time, preset=preset, eval_metric=eval_metric, | |
| num_bag_folds=num_bag_folds, num_stack_levels=num_stack_levels, | |
| verbosity=verbosity, **kw, | |
| ) | |
| self.max_time = max_time | |
| self.preset = preset | |
| self.eval_metric = eval_metric | |
| self.num_bag_folds = num_bag_folds | |
| self.num_stack_levels = num_stack_levels | |
| self.verbosity = verbosity | |
| self._predictor = None | |
| def fit( | |
| self, | |
| X_train: np.ndarray, | |
| y_train: np.ndarray, | |
| X_val: Optional[np.ndarray] = None, | |
| y_val: Optional[np.ndarray] = None, | |
| ) -> "AutoGluonEnsemble": | |
| try: | |
| from autogluon.tabular import TabularPredictor | |
| import pandas as pd | |
| except ImportError: | |
| raise ImportError( | |
| "autogluon.tabular not installed. Install with: " | |
| "pip install autogluon.tabular" | |
| ) | |
| X_train = self._impute(X_train, fit=True) | |
| # Build DataFrame with feature columns + label | |
| n_features = X_train.shape[1] | |
| col_names = [f"f_{i}" for i in range(n_features)] | |
| df_train = pd.DataFrame(X_train, columns=col_names) | |
| df_train["label"] = y_train.astype(int) | |
| # Validation data (optional tuning set) | |
| df_val = None | |
| if X_val is not None and y_val is not None: | |
| X_val = self._impute(X_val) | |
| df_val = pd.DataFrame(X_val, columns=col_names) | |
| df_val["label"] = y_val.astype(int) | |
| self._col_names = col_names | |
| self._predictor = TabularPredictor( | |
| label="label", | |
| eval_metric=self.eval_metric, | |
| problem_type="binary", | |
| verbosity=self.verbosity, | |
| ) | |
| fit_kwargs = { | |
| "train_data": df_train, | |
| "time_limit": self.max_time, | |
| "presets": self.preset, | |
| "num_bag_folds": self.num_bag_folds, | |
| "num_stack_levels": self.num_stack_levels, | |
| } | |
| if df_val is not None: | |
| fit_kwargs["tuning_data"] = df_val | |
| self._predictor.fit(**fit_kwargs) | |
| self._is_fitted = True | |
| return self | |
| def predict_proba(self, X: np.ndarray) -> np.ndarray: | |
| import pandas as pd | |
| assert self._is_fitted, "Model not fitted yet" | |
| X = self._impute(X) | |
| df = pd.DataFrame(X, columns=self._col_names) | |
| proba = self._predictor.predict_proba(df) | |
| # Returns DataFrame with columns 0, 1 β we want P(class=1) | |
| if isinstance(proba, pd.DataFrame): | |
| return proba[1].values | |
| return proba | |
| def leaderboard(self): | |
| """Return AutoGluon model leaderboard.""" | |
| assert self._is_fitted, "Model not fitted yet" | |
| return self._predictor.leaderboard(silent=True) | |
| def feature_importance(self, X: np.ndarray, y: np.ndarray) -> "pd.DataFrame": | |
| """Return permutation feature importance.""" | |
| import pandas as pd | |
| X = self._impute(X) | |
| df = pd.DataFrame(X, columns=self._col_names) | |
| df["label"] = y.astype(int) | |
| return self._predictor.feature_importance(df) | |
| def save(self, path: Union[str, Path]) -> None: | |
| """AutoGluon has its own save mechanism.""" | |
| path = Path(path) | |
| path.mkdir(parents=True, exist_ok=True) | |
| if self._predictor is not None: | |
| self._predictor.save(str(path / "autogluon_predictor")) | |
| # Save wrapper state | |
| state = { | |
| "params": self.params, | |
| "_col_names": getattr(self, "_col_names", None), | |
| "_feature_medians": self._feature_medians.tolist() if self._feature_medians is not None else None, | |
| "_is_fitted": self._is_fitted, | |
| } | |
| with open(path / "wrapper_state.json", "w") as f: | |
| json.dump(state, f) | |
| def load(cls, path: Union[str, Path]) -> "AutoGluonEnsemble": | |
| from autogluon.tabular import TabularPredictor | |
| path = Path(path) | |
| with open(path / "wrapper_state.json") as f: | |
| state = json.load(f) | |
| obj = cls(**state["params"]) | |
| obj._col_names = state["_col_names"] | |
| if state["_feature_medians"] is not None: | |
| obj._feature_medians = np.array(state["_feature_medians"], dtype=np.float32) | |
| obj._predictor = TabularPredictor.load(str(path / "autogluon_predictor")) | |
| obj._is_fitted = state["_is_fitted"] | |
| return obj | |
| # =========================================================================== | |
| # Utilities | |
| # =========================================================================== | |
| def _is_jsonable(v: Any) -> bool: | |
| """Check if a value is JSON serialisable.""" | |
| try: | |
| json.dumps(v) | |
| return True | |
| except (TypeError, OverflowError, ValueError): | |
| return False | |
| # =========================================================================== | |
| # Model Registry β maps names to classes for the genetic algorithm | |
| # =========================================================================== | |
| NEURAL_MODEL_REGISTRY: Dict[str, type] = { | |
| "lstm": LSTMSequenceModel, | |
| "transformer": TransformerAttentionModel, | |
| "tabnet": TabNetModel, | |
| "ft_transformer": FTTransformerModel, | |
| "deep_ensemble": DeepEnsemble, | |
| "conformal": ConformalPredictionWrapper, | |
| "autogluon": AutoGluonEnsemble, | |
| } | |
| def build_neural_model(model_type: str, **params) -> BaseNBAModel: | |
| """ | |
| Factory function to build a neural model by name. | |
| Usage: | |
| model = build_neural_model("ft_transformer", n_tokens=128, d_token=64) | |
| model.fit(X_train, y_train) | |
| probs = model.predict_proba(X_test) | |
| For conformal wrapper, pass base_model_type and base_model_params: | |
| model = build_neural_model( | |
| "conformal", | |
| base_model_type="deep_ensemble", | |
| base_model_params={"n_members": 5}, | |
| alpha=0.1, | |
| ) | |
| """ | |
| if model_type == "conformal": | |
| base_type = params.pop("base_model_type", "deep_ensemble") | |
| base_params = params.pop("base_model_params", {}) | |
| base_model = build_neural_model(base_type, **base_params) | |
| return ConformalPredictionWrapper(base_model=base_model, **params) | |
| cls = NEURAL_MODEL_REGISTRY.get(model_type) | |
| if cls is None: | |
| raise ValueError( | |
| f"Unknown model type '{model_type}'. " | |
| f"Available: {list(NEURAL_MODEL_REGISTRY.keys())}" | |
| ) | |
| return cls(**params) | |
| # =========================================================================== | |
| # Quick smoke test (runs if executed directly) | |
| # =========================================================================== | |
| if __name__ == "__main__": | |
| print("=" * 60) | |
| print("NBA Quant AI β Neural Models Smoke Test") | |
| print("=" * 60) | |
| np.random.seed(42) | |
| N_TRAIN, N_TEST, N_FEAT = 500, 100, 200 | |
| X_train = np.random.randn(N_TRAIN, N_FEAT).astype(np.float32) | |
| # Inject some NaNs to test imputation | |
| mask = np.random.random(X_train.shape) < 0.05 | |
| X_train[mask] = np.nan | |
| y_train = (np.random.random(N_TRAIN) > 0.5).astype(np.float32) | |
| X_test = np.random.randn(N_TEST, N_FEAT).astype(np.float32) | |
| y_test = (np.random.random(N_TEST) > 0.5).astype(np.float32) | |
| # Test each model (with small configs for speed) | |
| tests = [ | |
| ("FT-Transformer", FTTransformerModel( | |
| n_tokens=32, d_token=16, n_heads=2, n_layers=1, | |
| epochs=5, patience=3, batch_size=128, | |
| )), | |
| ("Deep Ensemble (3 members)", DeepEnsemble( | |
| n_members=3, hidden_dims=(64, 32), | |
| epochs=5, patience=3, batch_size=128, | |
| )), | |
| ("LSTM Sequence", LSTMSequenceModel( | |
| seq_len=5, hidden1=32, hidden2=16, dense_dim=16, | |
| epochs=5, patience=3, batch_size=128, | |
| )), | |
| ("Transformer Attention", TransformerAttentionModel( | |
| seq_len=5, d_model=32, n_heads=2, n_layers=1, | |
| dim_ff=64, epochs=5, patience=3, batch_size=128, | |
| )), | |
| ] | |
| for name, model in tests: | |
| print(f"\n--- {name} ---") | |
| try: | |
| model.fit(X_train, y_train) | |
| probs = model.predict_proba(X_test) | |
| print(f" Predictions shape: {probs.shape}") | |
| print(f" Mean pred: {probs.mean():.4f}, Std: {probs.std():.4f}") | |
| print(f" Min: {probs.min():.4f}, Max: {probs.max():.4f}") | |
| print(f" Params: {list(model.get_params().keys())}") | |
| except Exception as e: | |
| print(f" ERROR: {e}") | |
| # Test conformal wrapper | |
| print("\n--- Conformal Prediction Wrapper ---") | |
| try: | |
| base = DeepEnsemble( | |
| n_members=2, hidden_dims=(64, 32), | |
| epochs=5, patience=3, batch_size=128, | |
| ) | |
| conformal = ConformalPredictionWrapper(base_model=base, alpha=0.1) | |
| conformal.fit(X_train, y_train) | |
| probs = conformal.predict_proba(X_test) | |
| sets = conformal.predict_sets(X_test) | |
| lower, upper = conformal.predict_intervals(X_test) | |
| print(f" Point preds shape: {probs.shape}") | |
| print(f" Prediction sets (first 5): {sets[:5]}") | |
| print(f" Intervals: [{lower[:3]}] - [{upper[:3]}]") | |
| print(f" Avg interval width: {(upper - lower).mean():.4f}") | |
| except Exception as e: | |
| print(f" ERROR: {e}") | |
| # Test TabNet (may fail if pytorch_tabnet not installed) | |
| print("\n--- TabNet ---") | |
| try: | |
| tab = TabNetModel( | |
| n_d=8, n_a=8, n_steps=3, epochs=5, patience=3, batch_size=128, | |
| ) | |
| tab.fit(X_train, y_train) | |
| probs = tab.predict_proba(X_test) | |
| print(f" Predictions shape: {probs.shape}") | |
| print(f" Mean pred: {probs.mean():.4f}") | |
| fi = tab.get_feature_importances() | |
| if fi is not None: | |
| print(f" Feature importances shape: {fi.shape}") | |
| except ImportError: | |
| print(" SKIPPED (pytorch_tabnet not installed)") | |
| except Exception as e: | |
| print(f" ERROR: {e}") | |
| # Test factory | |
| print("\n--- Factory: build_neural_model ---") | |
| try: | |
| m = build_neural_model("ft_transformer", n_tokens=32, d_token=16, | |
| n_heads=2, n_layers=1, epochs=3, batch_size=128) | |
| m.fit(X_train, y_train) | |
| print(f" Factory FT-Transformer OK, preds mean: {m.predict_proba(X_test).mean():.4f}") | |
| except Exception as e: | |
| print(f" ERROR: {e}") | |
| print("\n" + "=" * 60) | |
| print("Smoke test complete.") | |
| print("=" * 60) | |