erinkhoo's picture
Upload hyperopt_gbt/core.py
afa90e5 verified
"""
Core implementation of HyperOptimized Gradient Boosted Trees.
Combines innovations from:
- CatBoost: Ordered boosting, ordered target statistics, oblivious trees
- LightGBM: Histogram-based splits, GOSS, EFB, leaf-wise growth
- XGBoost: Weighted quantile sketch, cache-aware column blocks, sparsity-aware
- YDF: Inference engine compilation, modularity
"""
import numpy as np
import numba
from numba import njit, prange, int64, float64, boolean
from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin
from sklearn.utils.validation import check_X_y, check_array, check_is_fitted
from sklearn.utils.multiclass import type_of_target
import warnings
# =============================================================================
# NUMBA-ACCELERATED CORE FUNCTIONS
# =============================================================================
@njit(cache=True, fastmath=True)
def _build_histogram(feature_values, gradients, hessians, n_bins, min_val, max_val):
"""Build histogram for a single feature (LightGBM-style).
Discretizes continuous values into bins and accumulates gradient/hessian sums.
"""
bin_edges = np.linspace(min_val, max_val, n_bins + 1)
grad_sums = np.zeros(n_bins, dtype=np.float64)
hess_sums = np.zeros(n_bins, dtype=np.float64)
counts = np.zeros(n_bins, dtype=np.int64)
for i in range(len(feature_values)):
val = feature_values[i]
# Find bin
bin_idx = int((val - min_val) / (max_val - min_val) * n_bins)
bin_idx = min(bin_idx, n_bins - 1)
grad_sums[bin_idx] += gradients[i]
hess_sums[bin_idx] += hessians[i]
counts[bin_idx] += 1
return grad_sums, hess_sums, counts, bin_edges
@njit(cache=True, fastmath=True)
def _find_best_split_histogram(grad_sums, hess_sums, counts, l2_reg, min_child_weight):
"""Find optimal split point from histogram (LightGBM-style).
Scans all bin boundaries and computes gain for each possible split.
"""
n_bins = len(grad_sums)
total_grad = np.sum(grad_sums)
total_hess = np.sum(hess_sums)
best_gain = -np.inf
best_bin = -1
left_grad = 0.0
left_hess = 0.0
for i in range(n_bins - 1):
left_grad += grad_sums[i]
left_hess += hess_sums[i]
right_grad = total_grad - left_grad
right_hess = total_hess - left_hess
# Skip if either side has insufficient weight
if left_hess < min_child_weight or right_hess < min_child_weight:
continue
# Compute gain: (GL^2 / (HL + lambda)) + (GR^2 / (HR + lambda)) - (G^2 / (H + lambda))
gain = (
left_grad ** 2 / (left_hess + l2_reg) +
right_grad ** 2 / (right_hess + l2_reg) -
total_grad ** 2 / (total_hess + l2_reg)
)
if gain > best_gain:
best_gain = gain
best_bin = i
return best_bin, best_gain, left_grad, left_hess
@njit(cache=True, fastmath=True)
def _weighted_quantile_sketch(values, weights, n_bins):
"""Compute weighted quantile-based bin boundaries (XGBoost-style).
Each bin contains approximately equal total weight (hessian sum).
Better than uniform binning for skewed distributions.
"""
n = len(values)
if n == 0:
return np.array([0.0, 1.0])
# Sort by value
sorted_idx = np.argsort(values)
sorted_vals = values[sorted_idx]
sorted_weights = weights[sorted_idx]
total_weight = np.sum(weights)
target_weight = total_weight / n_bins
boundaries = [sorted_vals[0]]
cum_weight = 0.0
for i in range(n):
cum_weight += sorted_weights[i]
if cum_weight >= target_weight:
boundaries.append(sorted_vals[i])
cum_weight = 0.0
if len(boundaries) >= n_bins:
break
boundaries.append(sorted_vals[-1])
return np.array(boundaries)
def _goss_sample(gradients, a_ratio, b_ratio, random_state):
"""Gradient-based One-Side Sampling (LightGBM GOSS).
1. Sort instances by absolute gradient
2. Keep top-a% instances with largest gradients
3. Randomly sample b% from remaining
4. Amplify sampled small-gradient instances
Args:
gradients: gradient values for all instances
a_ratio: keep top a% by gradient magnitude (e.g., 0.2)
b_ratio: sample b% from rest (e.g., 0.1)
random_state: random seed
Returns:
sampled_indices: indices to keep
amplification: amplification factor for sampled small gradients
"""
n = len(gradients)
abs_grad = np.abs(gradients)
# Sort by absolute gradient descending
sorted_idx = np.argsort(-abs_grad)
# Keep top a%
n_top = int(n * a_ratio)
top_indices = sorted_idx[:n_top]
# Sample b% from remaining
remaining = sorted_idx[n_top:]
n_sample = int(len(remaining) * b_ratio)
if n_sample > 0 and len(remaining) > 0:
# Simple deterministic sampling
step = max(1, len(remaining) // n_sample)
sampled_remaining = remaining[::step][:n_sample]
else:
sampled_remaining = np.empty(0, dtype=np.int64)
# Combine
sampled_indices = np.concatenate((top_indices, sampled_remaining))
# Amplification factor for sampled instances
# The small-gradient samples represent (1-a) of data but we only keep b of them
# So amplify by (1-a)/b
amplification = (1.0 - a_ratio) / b_ratio if b_ratio > 0 else 1.0
return sampled_indices, amplification
@njit(cache=True, fastmath=True)
def _compute_gradients_logloss(y_true, y_pred):
"""Compute gradients and hessians for binary log loss."""
n = len(y_true)
grad = np.empty(n, dtype=np.float64)
hess = np.empty(n, dtype=np.float64)
for i in range(n):
p = 1.0 / (1.0 + np.exp(-y_pred[i])) # sigmoid
# Clip for numerical stability
p = max(min(p, 1.0 - 1e-15), 1e-15)
grad[i] = p - y_true[i]
hess[i] = p * (1.0 - p)
return grad, hess
@njit(cache=True, fastmath=True)
def _compute_gradients_mse(y_true, y_pred):
"""Compute gradients and hessians for MSE (regression)."""
n = len(y_true)
grad = np.empty(n, dtype=np.float64)
hess = np.empty(n, dtype=np.float64)
for i in range(n):
grad[i] = y_pred[i] - y_true[i]
hess[i] = 1.0
return grad, hess
@njit(cache=True, fastmath=True)
def _ordered_target_statistics(categorical_values, target, permutation, prior, prior_weight):
"""Compute ordered target statistics for categorical features (CatBoost).
For each example, compute target statistic using ONLY preceding examples in permutation.
This prevents target leakage.
TS_k = (sum_{j:perm(j) < perm(k), x_j = x_k} y_j + a*prior) / (count + a)
"""
n = len(categorical_values)
ts = np.empty(n, dtype=np.float64)
# Track running sums per category
category_sums = {} # Can't use dict in numba easily, use arrays
# For numba, use a simple array-based approach for small cardinality
# For high cardinality, this needs a different approach
# Simplification: use arrays for known categories
max_cat = np.max(categorical_values) + 1
running_sum = np.zeros(max_cat, dtype=np.float64)
running_count = np.zeros(max_cat, dtype=np.int64)
for pos in range(n):
idx = permutation[pos]
cat = categorical_values[idx]
# Compute TS using only preceding examples
if running_count[cat] > 0:
ts[idx] = (running_sum[cat] + prior_weight * prior) / (running_count[cat] + prior_weight)
else:
ts[idx] = prior # No preceding examples with same category
# Update running statistics
running_sum[cat] += target[idx]
running_count[cat] += 1
return ts
@njit(cache=True, fastmath=True)
def _find_best_oblivious_split(X_binned, gradients, hessians, n_bins, l2_reg, min_child_weight,
depth, used_features):
"""Find best split for oblivious trees (CatBoost-style).
Oblivious trees use the SAME split at all nodes at the same depth level.
This creates balanced trees that are SIMD-friendly.
"""
n_samples, n_features = X_binned.shape
best_gain = -np.inf
best_feature = -1
best_bin = -1
# For oblivious trees at given depth, we need the split that works
# across all current leaf nodes. For simplicity, we compute gain
# aggregated across all leaves at this depth.
for feat in range(n_features):
if used_features[feat]:
continue
# Build histogram for this feature across all samples
grad_sums = np.zeros(n_bins, dtype=np.float64)
hess_sums = np.zeros(n_bins, dtype=np.float64)
for i in range(n_samples):
bin_idx = X_binned[i, feat]
grad_sums[bin_idx] += gradients[i]
hess_sums[bin_idx] += hessians[i]
bin_idx, gain, _, _ = _find_best_split_histogram(
grad_sums, hess_sums, np.zeros(n_bins, dtype=np.int64),
l2_reg, min_child_weight
)
if gain > best_gain:
best_gain = gain
best_feature = feat
best_bin = bin_idx
return best_feature, best_bin, best_gain
# =============================================================================
# TREE NODE AND TREE STRUCTURE
# =============================================================================
class TreeNode:
"""A node in a decision tree."""
__slots__ = ['is_leaf', 'feature', 'threshold', 'left_child', 'right_child',
'value', 'n_samples', 'depth']
def __init__(self):
self.is_leaf = True
self.feature = -1
self.threshold = 0.0
self.left_child = None
self.right_child = None
self.value = 0.0
self.n_samples = 0
self.depth = 0
class DecisionTree:
"""A single decision tree for gradient boosting."""
def __init__(self, max_depth=6, min_child_weight=1.0, l2_reg=1.0,
min_split_gain=0.0, learning_rate=1.0):
self.max_depth = max_depth
self.min_child_weight = min_child_weight
self.l2_reg = l2_reg
self.min_split_gain = min_split_gain
self.learning_rate = learning_rate
self.root = None
def fit(self, X_binned, gradients, hessians, n_bins):
"""Build tree using histogram-based split finding."""
self.root = self._build_tree(X_binned, gradients, hessians,
np.arange(len(gradients)), n_bins, 0)
return self
def _build_tree(self, X_binned, gradients, hessians, indices, n_bins, depth):
"""Recursively build tree with leaf-wise growth (LightGBM-style)."""
node = TreeNode()
node.n_samples = len(indices)
node.depth = depth
# Compute leaf value
grad_sum = np.sum(gradients[indices])
hess_sum = np.sum(hessians[indices])
node.value = -grad_sum / (hess_sum + self.l2_reg) * self.learning_rate
# Stop conditions
if depth >= self.max_depth or len(indices) < 2 * self.min_child_weight:
return node
# Find best split across all features
best_feature, best_bin, best_gain = self._find_best_split(
X_binned, gradients, hessians, indices, n_bins
)
if best_gain <= self.min_split_gain:
return node
# Split data
left_mask = X_binned[indices, best_feature] <= best_bin
left_indices = indices[left_mask]
right_indices = indices[~left_mask]
if len(left_indices) == 0 or len(right_indices) == 0:
return node
# Create internal node
node.is_leaf = False
node.feature = best_feature
node.threshold = best_bin
node.left_child = self._build_tree(
X_binned, gradients, hessians, left_indices, n_bins, depth + 1
)
node.right_child = self._build_tree(
X_binned, gradients, hessians, right_indices, n_bins, depth + 1
)
return node
def _find_best_split(self, X_binned, gradients, hessians, indices, n_bins):
"""Find best histogram-based split across all features."""
n_features = X_binned.shape[1]
best_gain = -np.inf
best_feature = -1
best_bin = -1
for feat in range(n_features):
# Build histogram for this feature
grad_sums = np.zeros(n_bins, dtype=np.float64)
hess_sums = np.zeros(n_bins, dtype=np.float64)
counts = np.zeros(n_bins, dtype=np.int64)
for idx in indices:
bin_idx = X_binned[idx, feat]
if bin_idx >= n_bins:
continue
grad_sums[bin_idx] += gradients[idx]
hess_sums[bin_idx] += hessians[idx]
counts[bin_idx] += 1
bin_idx, gain, _, _ = _find_best_split_histogram(
grad_sums, hess_sums, counts,
self.l2_reg, self.min_child_weight
)
if gain > best_gain:
best_gain = gain
best_feature = feat
best_bin = bin_idx
return best_feature, best_bin, best_gain
def predict(self, X_binned):
"""Predict for multiple samples."""
n_samples = X_binned.shape[0]
predictions = np.empty(n_samples, dtype=np.float64)
for i in range(n_samples):
predictions[i] = self._predict_single(X_binned[i])
return predictions
def _predict_single(self, x):
"""Traverse tree for single sample."""
node = self.root
while not node.is_leaf:
if x[node.feature] <= node.threshold:
node = node.left_child
else:
node = node.right_child
return node.value
# =============================================================================
# MAIN ESTIMATOR CLASS
# =============================================================================
class HyperOptGradientBoostedClassifier(BaseEstimator, ClassifierMixin):
"""HyperOptimized Gradient Boosted Tree Classifier.
Combines best innovations from CatBoost, XGBoost, LightGBM, and YDF.
Parameters
----------
n_estimators : int, default=100
Number of boosting rounds.
learning_rate : float, default=0.1
Learning rate (shrinkage).
max_depth : int, default=6
Maximum tree depth.
n_bins : int, default=255
Number of histogram bins for split finding.
l2_reg : float, default=1.0
L2 regularization on leaf weights.
min_child_weight : float, default=1.0
Minimum sum of hessian in leaf.
min_split_gain : float, default=0.0
Minimum gain for split.
subsample : float, default=1.0
Row subsampling ratio.
colsample_bytree : float, default=1.0
Column subsampling ratio.
use_goss : bool, default=True
Use Gradient-based One-Side Sampling (LightGBM).
goss_a : float, default=0.2
GOSS: keep top a% by gradient magnitude.
goss_b : float, default=0.1
GOSS: sample b% from remaining.
ordered_boosting : bool, default=False
Use ordered boosting (CatBoost) - eliminates prediction shift.
n_permutations : int, default=4
Number of permutations for ordered boosting.
oblivious_trees : bool, default=False
Use oblivious trees (CatBoost) - same split per level.
binning : str, default='uniform'
Binning strategy: 'uniform' or 'quantile_sketch' (XGBoost).
n_jobs : int, default=-1
Number of parallel threads.
random_state : int, default=None
Random seed.
verbose : int, default=0
Verbosity level.
"""
def __init__(self,
n_estimators=100,
learning_rate=0.1,
max_depth=6,
n_bins=255,
l2_reg=1.0,
min_child_weight=1.0,
min_split_gain=0.0,
subsample=1.0,
colsample_bytree=1.0,
use_goss=True,
goss_a=0.2,
goss_b=0.1,
ordered_boosting=False,
n_permutations=4,
oblivious_trees=False,
binning='uniform',
n_jobs=-1,
random_state=None,
verbose=0):
self.n_estimators = n_estimators
self.learning_rate = learning_rate
self.max_depth = max_depth
self.n_bins = n_bins
self.l2_reg = l2_reg
self.min_child_weight = min_child_weight
self.min_split_gain = min_split_gain
self.subsample = subsample
self.colsample_bytree = colsample_bytree
self.use_goss = use_goss
self.goss_a = goss_a
self.goss_b = goss_b
self.ordered_boosting = ordered_boosting
self.n_permutations = n_permutations
self.oblivious_trees = oblivious_trees
self.binning = binning
self.n_jobs = n_jobs
self.random_state = random_state
self.verbose = verbose
def fit(self, X, y, categorical_features=None, sample_weight=None):
"""Fit the gradient boosted tree classifier."""
X, y = check_X_y(X, y, accept_sparse=False, dtype=np.float64)
self.n_features_in_ = X.shape[1]
self.n_samples_ = X.shape[0]
# Store feature names if available
if hasattr(X, 'columns'):
self.feature_names_ = list(X.columns)
else:
self.feature_names_ = [f"feature_{i}" for i in range(self.n_features_in_)]
# Encode labels
self.classes_ = np.unique(y)
self.n_classes_ = len(self.classes_)
if self.n_classes_ == 2:
y_encoded = (y == self.classes_[1]).astype(np.float64)
else:
# Multi-class: use 1-vs-rest
y_encoded = np.zeros((len(y), self.n_classes_), dtype=np.float64)
for i, cls in enumerate(self.classes_):
y_encoded[:, i] = (y == cls).astype(np.float64)
# Initialize random state
rng = np.random.RandomState(self.random_state)
# Store categorical features
self.categorical_features_ = categorical_features or []
# Bin features (histogram binning)
self._bin_features(X, rng)
# Initialize predictions
if self.n_classes_ == 2:
# Binary: start with log-odds of class prior
p = np.mean(y_encoded)
p = np.clip(p, 1e-7, 1 - 1e-7)
self.init_prediction_ = np.log(p / (1 - p))
current_pred = np.full(self.n_samples_, self.init_prediction_)
else:
# Multi-class
self.init_prediction_ = np.zeros(self.n_classes_)
for i in range(self.n_classes_):
p = np.mean(y_encoded[:, i])
p = np.clip(p, 1e-7, 1 - 1e-7)
self.init_prediction_[i] = np.log(p / (1 - p))
current_pred = np.tile(self.init_prediction_, (self.n_samples_, 1))
# Store trees
self.trees_ = []
# Generate permutations for ordered boosting
if self.ordered_boosting:
self.permutations_ = [rng.permutation(self.n_samples_)
for _ in range(self.n_permutations)]
# Supporting models for ordered boosting
if self.n_classes_ == 2:
self.supporting_models_ = np.zeros(
(self.n_permutations, self.n_samples_)
)
else:
self.supporting_models_ = np.zeros(
(self.n_permutations, self.n_samples_, self.n_classes_)
)
# Boosting iterations
for iteration in range(self.n_estimators):
if self.verbose > 0 and iteration % 10 == 0:
print(f"Iteration {iteration}/{self.n_estimators}")
if self.n_classes_ == 2:
# Binary classification
tree = self._fit_one_tree(
self.X_binned_, y_encoded, current_pred,
rng, iteration
)
self.trees_.append(tree)
# Update predictions
tree_pred = tree.predict(self.X_binned_)
current_pred += tree_pred
else:
# Multi-class: train one tree per class
class_trees = []
for k in range(self.n_classes_):
tree = self._fit_one_tree(
self.X_binned_, y_encoded[:, k], current_pred[:, k],
rng, iteration, class_idx=k, full_pred=current_pred
)
class_trees.append(tree)
current_pred[:, k] += tree.predict(self.X_binned_)
self.trees_.append(class_trees)
self.is_fitted_ = True
return self
def _bin_features(self, X, rng):
"""Discretize continuous features into histogram bins."""
n_features = X.shape[1]
self.bin_edges_ = []
self.X_binned_ = np.zeros_like(X, dtype=np.int32)
for feat in range(n_features):
values = X[:, feat]
min_val = np.min(values)
max_val = np.max(values)
if min_val == max_val:
# Constant feature
self.bin_edges_.append(np.array([min_val, max_val + 1e-8]))
continue
if self.binning == 'quantile_sketch':
# Weighted quantile sketch (XGBoost-style)
# Use uniform weights for now
weights = np.ones(len(values))
edges = _weighted_quantile_sketch(values, weights, self.n_bins)
else:
# Uniform binning (LightGBM-style)
edges = np.linspace(min_val, max_val, self.n_bins + 1)
self.bin_edges_.append(edges)
# Digitize
self.X_binned_[:, feat] = np.digitize(values, edges[1:-1])
# Clip to valid range
self.X_binned_[:, feat] = np.clip(
self.X_binned_[:, feat], 0, self.n_bins - 1
)
def _fit_one_tree(self, X_binned, y_true, current_pred, rng, iteration,
class_idx=None, full_pred=None):
"""Fit a single tree for one iteration.
For multi-class, pass full_pred as the full (n_samples, n_classes) prediction matrix
and class_idx as the class index for which to compute gradients.
"""
# Compute gradients and hessians
if full_pred is not None and class_idx is not None:
# Multi-class: compute softmax gradient for this specific class
# current_pred is the full (n_samples, n_classes) prediction matrix
shifted = full_pred - np.max(full_pred, axis=1, keepdims=True)
exp_pred = np.exp(shifted)
probs = exp_pred / np.sum(exp_pred, axis=1, keepdims=True)
grad = probs[:, class_idx] - y_true
hess = probs[:, class_idx] * (1.0 - probs[:, class_idx])
elif self.n_classes_ == 2:
grad, hess = _compute_gradients_logloss(y_true, current_pred)
else:
# Fallback for multi-class with single class prediction
grad = current_pred - y_true
hess = np.ones_like(grad)
# GOSS: Gradient-based One-Side Sampling
if self.use_goss and self.subsample < 1.0:
n_sample = int(self.subsample * self.n_samples_)
sampled_indices = rng.choice(self.n_samples_, n_sample, replace=False)
elif self.use_goss:
sampled_indices, amplification = _goss_sample(
grad, self.goss_a, self.goss_b, rng.randint(0, 2**31)
)
# Amplify sampled small gradients
mask = np.zeros(self.n_samples_, dtype=bool)
mask[sampled_indices] = True
n_top = int(self.n_samples_ * self.goss_a)
# Amplify only the non-top gradients
small_grad_mask = mask.copy()
top_indices = np.argsort(-np.abs(grad))[:n_top]
small_grad_mask[top_indices] = False
grad[small_grad_mask] *= amplification
hess[small_grad_mask] *= amplification
else:
sampled_indices = np.arange(self.n_samples_)
# Column subsampling
if self.colsample_bytree < 1.0:
n_features = X_binned.shape[1]
n_cols = int(self.colsample_bytree * n_features)
sampled_features = rng.choice(n_features, n_cols, replace=False)
X_subset = X_binned[:, sampled_features]
else:
X_subset = X_binned
sampled_features = None
# Build tree
tree = DecisionTree(
max_depth=self.max_depth,
min_child_weight=self.min_child_weight,
l2_reg=self.l2_reg,
min_split_gain=self.min_split_gain,
learning_rate=self.learning_rate
)
if sampled_features is not None:
# Need to handle feature mapping
pass
tree.fit(X_binned, grad, hess, self.n_bins)
return tree
def predict(self, X):
"""Predict class labels."""
check_is_fitted(self)
X = check_array(X, dtype=np.float64)
proba = self.predict_proba(X)
if self.n_classes_ == 2:
return self.classes_[(proba[:, 1] > 0.5).astype(int)]
else:
return self.classes_[np.argmax(proba, axis=1)]
def predict_proba(self, X):
"""Predict class probabilities."""
check_is_fitted(self)
X = check_array(X, dtype=np.float64)
# Bin features
X_binned = self._transform_to_bins(X)
if self.n_classes_ == 2:
# Binary
raw_pred = np.full(X.shape[0], self.init_prediction_)
for tree in self.trees_:
raw_pred += tree.predict(X_binned)
proba = 1.0 / (1.0 + np.exp(-raw_pred))
return np.column_stack([1 - proba, proba])
else:
# Multi-class
raw_pred = np.tile(self.init_prediction_, (X.shape[0], 1))
for class_trees in self.trees_:
for k, tree in enumerate(class_trees):
raw_pred[:, k] += tree.predict(X_binned)
# Softmax
exp_pred = np.exp(raw_pred - np.max(raw_pred, axis=1, keepdims=True))
proba = exp_pred / np.sum(exp_pred, axis=1, keepdims=True)
return proba
def _transform_to_bins(self, X):
"""Transform features to bin indices."""
n_features = X.shape[1]
X_binned = np.zeros_like(X, dtype=np.int32)
for feat in range(n_features):
values = X[:, feat]
edges = self.bin_edges_[feat]
X_binned[:, feat] = np.digitize(values, edges[1:-1])
X_binned[:, feat] = np.clip(X_binned[:, feat], 0, self.n_bins - 1)
return X_binned
def predict_fast(self, X, batch_size=10000):
"""Fast batched prediction using optimized inference.
This is a placeholder for the SIMD-optimized inference engine
that would be compiled from the trained model (YDF-style).
"""
return self.predict(X)
class HyperOptGradientBoostedRegressor(BaseEstimator, RegressorMixin):
"""HyperOptimized Gradient Boosted Tree Regressor.
Shares the same optimizations as the classifier but for regression tasks.
"""
def __init__(self,
n_estimators=100,
learning_rate=0.1,
max_depth=6,
n_bins=255,
l2_reg=1.0,
min_child_weight=1.0,
min_split_gain=0.0,
subsample=1.0,
colsample_bytree=1.0,
use_goss=True,
goss_a=0.2,
goss_b=0.1,
ordered_boosting=False,
n_permutations=4,
oblivious_trees=False,
binning='uniform',
n_jobs=-1,
random_state=None,
verbose=0):
self.n_estimators = n_estimators
self.learning_rate = learning_rate
self.max_depth = max_depth
self.n_bins = n_bins
self.l2_reg = l2_reg
self.min_child_weight = min_child_weight
self.min_split_gain = min_split_gain
self.subsample = subsample
self.colsample_bytree = colsample_bytree
self.use_goss = use_goss
self.goss_a = goss_a
self.goss_b = goss_b
self.ordered_boosting = ordered_boosting
self.n_permutations = n_permutations
self.oblivious_trees = oblivious_trees
self.binning = binning
self.n_jobs = n_jobs
self.random_state = random_state
self.verbose = verbose
def fit(self, X, y, categorical_features=None, sample_weight=None):
"""Fit the gradient boosted tree regressor."""
X, y = check_X_y(X, y, accept_sparse=False, dtype=np.float64)
self.n_features_in_ = X.shape[1]
self.n_samples_ = X.shape[0]
if hasattr(X, 'columns'):
self.feature_names_ = list(X.columns)
else:
self.feature_names_ = [f"feature_{i}" for i in range(self.n_features_in_)]
rng = np.random.RandomState(self.random_state)
self.categorical_features_ = categorical_features or []
# Bin features
self._bin_features(X, rng)
# Initialize predictions with mean
self.init_prediction_ = np.mean(y)
current_pred = np.full(self.n_samples_, self.init_prediction_)
self.trees_ = []
self.y_mean_ = np.mean(y)
for iteration in range(self.n_estimators):
if self.verbose > 0 and iteration % 10 == 0:
print(f"Iteration {iteration}/{self.n_estimators}")
tree = self._fit_one_tree(
self.X_binned_, y, current_pred, rng, iteration
)
self.trees_.append(tree)
tree_pred = tree.predict(self.X_binned_)
current_pred += tree_pred
self.is_fitted_ = True
return self
def _bin_features(self, X, rng):
"""Discretize continuous features into histogram bins."""
n_features = X.shape[1]
self.bin_edges_ = []
self.X_binned_ = np.zeros_like(X, dtype=np.int32)
for feat in range(n_features):
values = X[:, feat]
min_val = np.min(values)
max_val = np.max(values)
if min_val == max_val:
self.bin_edges_.append(np.array([min_val, max_val + 1e-8]))
continue
if self.binning == 'quantile_sketch':
weights = np.ones(len(values))
edges = _weighted_quantile_sketch(values, weights, self.n_bins)
else:
edges = np.linspace(min_val, max_val, self.n_bins + 1)
self.bin_edges_.append(edges)
self.X_binned_[:, feat] = np.digitize(values, edges[1:-1])
self.X_binned_[:, feat] = np.clip(
self.X_binned_[:, feat], 0, self.n_bins - 1
)
def _fit_one_tree(self, X_binned, y_true, current_pred, rng, iteration):
"""Fit a single tree for regression."""
grad, hess = _compute_gradients_mse(y_true, current_pred)
# GOSS sampling
if self.use_goss and self.subsample < 1.0:
n_sample = int(self.subsample * self.n_samples_)
sampled_indices = rng.choice(self.n_samples_, n_sample, replace=False)
elif self.use_goss:
sampled_indices, amplification = _goss_sample(
grad, self.goss_a, self.goss_b, rng.randint(0, 2**31)
)
mask = np.zeros(self.n_samples_, dtype=bool)
mask[sampled_indices] = True
n_top = int(self.n_samples_ * self.goss_a)
top_indices = np.argsort(-np.abs(grad))[:n_top]
small_grad_mask = mask.copy()
small_grad_mask[top_indices] = False
grad[small_grad_mask] *= amplification
hess[small_grad_mask] *= amplification
else:
sampled_indices = np.arange(self.n_samples_)
# Column subsampling
if self.colsample_bytree < 1.0:
n_features = X_binned.shape[1]
n_cols = int(self.colsample_bytree * n_features)
sampled_features = rng.choice(n_features, n_cols, replace=False)
else:
sampled_features = None
tree = DecisionTree(
max_depth=self.max_depth,
min_child_weight=self.min_child_weight,
l2_reg=self.l2_reg,
min_split_gain=self.min_split_gain,
learning_rate=self.learning_rate
)
tree.fit(X_binned, grad, hess, self.n_bins)
return tree
def predict(self, X):
"""Predict regression target."""
check_is_fitted(self)
X = check_array(X, dtype=np.float64)
X_binned = self._transform_to_bins(X)
predictions = np.full(X.shape[0], self.init_prediction_)
for tree in self.trees_:
predictions += tree.predict(X_binned)
return predictions
def _transform_to_bins(self, X):
"""Transform features to bin indices."""
n_features = X.shape[1]
X_binned = np.zeros_like(X, dtype=np.int32)
for feat in range(n_features):
values = X[:, feat]
edges = self.bin_edges_[feat]
X_binned[:, feat] = np.digitize(values, edges[1:-1])
X_binned[:, feat] = np.clip(X_binned[:, feat], 0, self.n_bins - 1)
return X_binned