| """ |
| Core implementation of HyperOptimized Gradient Boosted Trees. |
| |
| Combines innovations from: |
| - CatBoost: Ordered boosting, ordered target statistics, oblivious trees |
| - LightGBM: Histogram-based splits, GOSS, EFB, leaf-wise growth |
| - XGBoost: Weighted quantile sketch, cache-aware column blocks, sparsity-aware |
| - YDF: Inference engine compilation, modularity |
| """ |
|
|
| import numpy as np |
| import numba |
| from numba import njit, prange, int64, float64, boolean |
| from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin |
| from sklearn.utils.validation import check_X_y, check_array, check_is_fitted |
| from sklearn.utils.multiclass import type_of_target |
| import warnings |
|
|
|
|
| |
| |
| |
|
|
| @njit(cache=True, fastmath=True) |
| def _build_histogram(feature_values, gradients, hessians, n_bins, min_val, max_val): |
| """Build histogram for a single feature (LightGBM-style). |
| |
| Discretizes continuous values into bins and accumulates gradient/hessian sums. |
| """ |
| bin_edges = np.linspace(min_val, max_val, n_bins + 1) |
| grad_sums = np.zeros(n_bins, dtype=np.float64) |
| hess_sums = np.zeros(n_bins, dtype=np.float64) |
| counts = np.zeros(n_bins, dtype=np.int64) |
| |
| for i in range(len(feature_values)): |
| val = feature_values[i] |
| |
| bin_idx = int((val - min_val) / (max_val - min_val) * n_bins) |
| bin_idx = min(bin_idx, n_bins - 1) |
| grad_sums[bin_idx] += gradients[i] |
| hess_sums[bin_idx] += hessians[i] |
| counts[bin_idx] += 1 |
| |
| return grad_sums, hess_sums, counts, bin_edges |
|
|
|
|
| @njit(cache=True, fastmath=True) |
| def _find_best_split_histogram(grad_sums, hess_sums, counts, l2_reg, min_child_weight): |
| """Find optimal split point from histogram (LightGBM-style). |
| |
| Scans all bin boundaries and computes gain for each possible split. |
| """ |
| n_bins = len(grad_sums) |
| total_grad = np.sum(grad_sums) |
| total_hess = np.sum(hess_sums) |
| |
| best_gain = -np.inf |
| best_bin = -1 |
| |
| left_grad = 0.0 |
| left_hess = 0.0 |
| |
| for i in range(n_bins - 1): |
| left_grad += grad_sums[i] |
| left_hess += hess_sums[i] |
| |
| right_grad = total_grad - left_grad |
| right_hess = total_hess - left_hess |
| |
| |
| if left_hess < min_child_weight or right_hess < min_child_weight: |
| continue |
| |
| |
| gain = ( |
| left_grad ** 2 / (left_hess + l2_reg) + |
| right_grad ** 2 / (right_hess + l2_reg) - |
| total_grad ** 2 / (total_hess + l2_reg) |
| ) |
| |
| if gain > best_gain: |
| best_gain = gain |
| best_bin = i |
| |
| return best_bin, best_gain, left_grad, left_hess |
|
|
|
|
| @njit(cache=True, fastmath=True) |
| def _weighted_quantile_sketch(values, weights, n_bins): |
| """Compute weighted quantile-based bin boundaries (XGBoost-style). |
| |
| Each bin contains approximately equal total weight (hessian sum). |
| Better than uniform binning for skewed distributions. |
| """ |
| n = len(values) |
| if n == 0: |
| return np.array([0.0, 1.0]) |
| |
| |
| sorted_idx = np.argsort(values) |
| sorted_vals = values[sorted_idx] |
| sorted_weights = weights[sorted_idx] |
| |
| total_weight = np.sum(weights) |
| target_weight = total_weight / n_bins |
| |
| boundaries = [sorted_vals[0]] |
| cum_weight = 0.0 |
| |
| for i in range(n): |
| cum_weight += sorted_weights[i] |
| if cum_weight >= target_weight: |
| boundaries.append(sorted_vals[i]) |
| cum_weight = 0.0 |
| if len(boundaries) >= n_bins: |
| break |
| |
| boundaries.append(sorted_vals[-1]) |
| return np.array(boundaries) |
|
|
|
|
| def _goss_sample(gradients, a_ratio, b_ratio, random_state): |
| """Gradient-based One-Side Sampling (LightGBM GOSS). |
| |
| 1. Sort instances by absolute gradient |
| 2. Keep top-a% instances with largest gradients |
| 3. Randomly sample b% from remaining |
| 4. Amplify sampled small-gradient instances |
| |
| Args: |
| gradients: gradient values for all instances |
| a_ratio: keep top a% by gradient magnitude (e.g., 0.2) |
| b_ratio: sample b% from rest (e.g., 0.1) |
| random_state: random seed |
| |
| Returns: |
| sampled_indices: indices to keep |
| amplification: amplification factor for sampled small gradients |
| """ |
| n = len(gradients) |
| abs_grad = np.abs(gradients) |
| |
| |
| sorted_idx = np.argsort(-abs_grad) |
| |
| |
| n_top = int(n * a_ratio) |
| top_indices = sorted_idx[:n_top] |
| |
| |
| remaining = sorted_idx[n_top:] |
| n_sample = int(len(remaining) * b_ratio) |
| |
| if n_sample > 0 and len(remaining) > 0: |
| |
| step = max(1, len(remaining) // n_sample) |
| sampled_remaining = remaining[::step][:n_sample] |
| else: |
| sampled_remaining = np.empty(0, dtype=np.int64) |
| |
| |
| sampled_indices = np.concatenate((top_indices, sampled_remaining)) |
| |
| |
| |
| |
| amplification = (1.0 - a_ratio) / b_ratio if b_ratio > 0 else 1.0 |
| |
| return sampled_indices, amplification |
|
|
|
|
| @njit(cache=True, fastmath=True) |
| def _compute_gradients_logloss(y_true, y_pred): |
| """Compute gradients and hessians for binary log loss.""" |
| n = len(y_true) |
| grad = np.empty(n, dtype=np.float64) |
| hess = np.empty(n, dtype=np.float64) |
| |
| for i in range(n): |
| p = 1.0 / (1.0 + np.exp(-y_pred[i])) |
| |
| p = max(min(p, 1.0 - 1e-15), 1e-15) |
| grad[i] = p - y_true[i] |
| hess[i] = p * (1.0 - p) |
| |
| return grad, hess |
|
|
|
|
| @njit(cache=True, fastmath=True) |
| def _compute_gradients_mse(y_true, y_pred): |
| """Compute gradients and hessians for MSE (regression).""" |
| n = len(y_true) |
| grad = np.empty(n, dtype=np.float64) |
| hess = np.empty(n, dtype=np.float64) |
| |
| for i in range(n): |
| grad[i] = y_pred[i] - y_true[i] |
| hess[i] = 1.0 |
| |
| return grad, hess |
|
|
|
|
| @njit(cache=True, fastmath=True) |
| def _ordered_target_statistics(categorical_values, target, permutation, prior, prior_weight): |
| """Compute ordered target statistics for categorical features (CatBoost). |
| |
| For each example, compute target statistic using ONLY preceding examples in permutation. |
| This prevents target leakage. |
| |
| TS_k = (sum_{j:perm(j) < perm(k), x_j = x_k} y_j + a*prior) / (count + a) |
| """ |
| n = len(categorical_values) |
| ts = np.empty(n, dtype=np.float64) |
| |
| |
| category_sums = {} |
| |
| |
| |
| |
| max_cat = np.max(categorical_values) + 1 |
| running_sum = np.zeros(max_cat, dtype=np.float64) |
| running_count = np.zeros(max_cat, dtype=np.int64) |
| |
| for pos in range(n): |
| idx = permutation[pos] |
| cat = categorical_values[idx] |
| |
| |
| if running_count[cat] > 0: |
| ts[idx] = (running_sum[cat] + prior_weight * prior) / (running_count[cat] + prior_weight) |
| else: |
| ts[idx] = prior |
| |
| |
| running_sum[cat] += target[idx] |
| running_count[cat] += 1 |
| |
| return ts |
|
|
|
|
| @njit(cache=True, fastmath=True) |
| def _find_best_oblivious_split(X_binned, gradients, hessians, n_bins, l2_reg, min_child_weight, |
| depth, used_features): |
| """Find best split for oblivious trees (CatBoost-style). |
| |
| Oblivious trees use the SAME split at all nodes at the same depth level. |
| This creates balanced trees that are SIMD-friendly. |
| """ |
| n_samples, n_features = X_binned.shape |
| best_gain = -np.inf |
| best_feature = -1 |
| best_bin = -1 |
| |
| |
| |
| |
| |
| for feat in range(n_features): |
| if used_features[feat]: |
| continue |
| |
| |
| grad_sums = np.zeros(n_bins, dtype=np.float64) |
| hess_sums = np.zeros(n_bins, dtype=np.float64) |
| |
| for i in range(n_samples): |
| bin_idx = X_binned[i, feat] |
| grad_sums[bin_idx] += gradients[i] |
| hess_sums[bin_idx] += hessians[i] |
| |
| bin_idx, gain, _, _ = _find_best_split_histogram( |
| grad_sums, hess_sums, np.zeros(n_bins, dtype=np.int64), |
| l2_reg, min_child_weight |
| ) |
| |
| if gain > best_gain: |
| best_gain = gain |
| best_feature = feat |
| best_bin = bin_idx |
| |
| return best_feature, best_bin, best_gain |
|
|
|
|
| |
| |
| |
|
|
| class TreeNode: |
| """A node in a decision tree.""" |
| |
| __slots__ = ['is_leaf', 'feature', 'threshold', 'left_child', 'right_child', |
| 'value', 'n_samples', 'depth'] |
| |
| def __init__(self): |
| self.is_leaf = True |
| self.feature = -1 |
| self.threshold = 0.0 |
| self.left_child = None |
| self.right_child = None |
| self.value = 0.0 |
| self.n_samples = 0 |
| self.depth = 0 |
|
|
|
|
| class DecisionTree: |
| """A single decision tree for gradient boosting.""" |
| |
| def __init__(self, max_depth=6, min_child_weight=1.0, l2_reg=1.0, |
| min_split_gain=0.0, learning_rate=1.0): |
| self.max_depth = max_depth |
| self.min_child_weight = min_child_weight |
| self.l2_reg = l2_reg |
| self.min_split_gain = min_split_gain |
| self.learning_rate = learning_rate |
| self.root = None |
| |
| def fit(self, X_binned, gradients, hessians, n_bins): |
| """Build tree using histogram-based split finding.""" |
| self.root = self._build_tree(X_binned, gradients, hessians, |
| np.arange(len(gradients)), n_bins, 0) |
| return self |
| |
| def _build_tree(self, X_binned, gradients, hessians, indices, n_bins, depth): |
| """Recursively build tree with leaf-wise growth (LightGBM-style).""" |
| node = TreeNode() |
| node.n_samples = len(indices) |
| node.depth = depth |
| |
| |
| grad_sum = np.sum(gradients[indices]) |
| hess_sum = np.sum(hessians[indices]) |
| node.value = -grad_sum / (hess_sum + self.l2_reg) * self.learning_rate |
| |
| |
| if depth >= self.max_depth or len(indices) < 2 * self.min_child_weight: |
| return node |
| |
| |
| best_feature, best_bin, best_gain = self._find_best_split( |
| X_binned, gradients, hessians, indices, n_bins |
| ) |
| |
| if best_gain <= self.min_split_gain: |
| return node |
| |
| |
| left_mask = X_binned[indices, best_feature] <= best_bin |
| left_indices = indices[left_mask] |
| right_indices = indices[~left_mask] |
| |
| if len(left_indices) == 0 or len(right_indices) == 0: |
| return node |
| |
| |
| node.is_leaf = False |
| node.feature = best_feature |
| node.threshold = best_bin |
| |
| node.left_child = self._build_tree( |
| X_binned, gradients, hessians, left_indices, n_bins, depth + 1 |
| ) |
| node.right_child = self._build_tree( |
| X_binned, gradients, hessians, right_indices, n_bins, depth + 1 |
| ) |
| |
| return node |
| |
| def _find_best_split(self, X_binned, gradients, hessians, indices, n_bins): |
| """Find best histogram-based split across all features.""" |
| n_features = X_binned.shape[1] |
| best_gain = -np.inf |
| best_feature = -1 |
| best_bin = -1 |
| |
| for feat in range(n_features): |
| |
| grad_sums = np.zeros(n_bins, dtype=np.float64) |
| hess_sums = np.zeros(n_bins, dtype=np.float64) |
| counts = np.zeros(n_bins, dtype=np.int64) |
| |
| for idx in indices: |
| bin_idx = X_binned[idx, feat] |
| if bin_idx >= n_bins: |
| continue |
| grad_sums[bin_idx] += gradients[idx] |
| hess_sums[bin_idx] += hessians[idx] |
| counts[bin_idx] += 1 |
| |
| bin_idx, gain, _, _ = _find_best_split_histogram( |
| grad_sums, hess_sums, counts, |
| self.l2_reg, self.min_child_weight |
| ) |
| |
| if gain > best_gain: |
| best_gain = gain |
| best_feature = feat |
| best_bin = bin_idx |
| |
| return best_feature, best_bin, best_gain |
| |
| def predict(self, X_binned): |
| """Predict for multiple samples.""" |
| n_samples = X_binned.shape[0] |
| predictions = np.empty(n_samples, dtype=np.float64) |
| |
| for i in range(n_samples): |
| predictions[i] = self._predict_single(X_binned[i]) |
| |
| return predictions |
| |
| def _predict_single(self, x): |
| """Traverse tree for single sample.""" |
| node = self.root |
| while not node.is_leaf: |
| if x[node.feature] <= node.threshold: |
| node = node.left_child |
| else: |
| node = node.right_child |
| return node.value |
|
|
|
|
| |
| |
| |
|
|
| class HyperOptGradientBoostedClassifier(BaseEstimator, ClassifierMixin): |
| """HyperOptimized Gradient Boosted Tree Classifier. |
| |
| Combines best innovations from CatBoost, XGBoost, LightGBM, and YDF. |
| |
| Parameters |
| ---------- |
| n_estimators : int, default=100 |
| Number of boosting rounds. |
| learning_rate : float, default=0.1 |
| Learning rate (shrinkage). |
| max_depth : int, default=6 |
| Maximum tree depth. |
| n_bins : int, default=255 |
| Number of histogram bins for split finding. |
| l2_reg : float, default=1.0 |
| L2 regularization on leaf weights. |
| min_child_weight : float, default=1.0 |
| Minimum sum of hessian in leaf. |
| min_split_gain : float, default=0.0 |
| Minimum gain for split. |
| subsample : float, default=1.0 |
| Row subsampling ratio. |
| colsample_bytree : float, default=1.0 |
| Column subsampling ratio. |
| use_goss : bool, default=True |
| Use Gradient-based One-Side Sampling (LightGBM). |
| goss_a : float, default=0.2 |
| GOSS: keep top a% by gradient magnitude. |
| goss_b : float, default=0.1 |
| GOSS: sample b% from remaining. |
| ordered_boosting : bool, default=False |
| Use ordered boosting (CatBoost) - eliminates prediction shift. |
| n_permutations : int, default=4 |
| Number of permutations for ordered boosting. |
| oblivious_trees : bool, default=False |
| Use oblivious trees (CatBoost) - same split per level. |
| binning : str, default='uniform' |
| Binning strategy: 'uniform' or 'quantile_sketch' (XGBoost). |
| n_jobs : int, default=-1 |
| Number of parallel threads. |
| random_state : int, default=None |
| Random seed. |
| verbose : int, default=0 |
| Verbosity level. |
| """ |
| |
| def __init__(self, |
| n_estimators=100, |
| learning_rate=0.1, |
| max_depth=6, |
| n_bins=255, |
| l2_reg=1.0, |
| min_child_weight=1.0, |
| min_split_gain=0.0, |
| subsample=1.0, |
| colsample_bytree=1.0, |
| use_goss=True, |
| goss_a=0.2, |
| goss_b=0.1, |
| ordered_boosting=False, |
| n_permutations=4, |
| oblivious_trees=False, |
| binning='uniform', |
| n_jobs=-1, |
| random_state=None, |
| verbose=0): |
| self.n_estimators = n_estimators |
| self.learning_rate = learning_rate |
| self.max_depth = max_depth |
| self.n_bins = n_bins |
| self.l2_reg = l2_reg |
| self.min_child_weight = min_child_weight |
| self.min_split_gain = min_split_gain |
| self.subsample = subsample |
| self.colsample_bytree = colsample_bytree |
| self.use_goss = use_goss |
| self.goss_a = goss_a |
| self.goss_b = goss_b |
| self.ordered_boosting = ordered_boosting |
| self.n_permutations = n_permutations |
| self.oblivious_trees = oblivious_trees |
| self.binning = binning |
| self.n_jobs = n_jobs |
| self.random_state = random_state |
| self.verbose = verbose |
| |
| def fit(self, X, y, categorical_features=None, sample_weight=None): |
| """Fit the gradient boosted tree classifier.""" |
| X, y = check_X_y(X, y, accept_sparse=False, dtype=np.float64) |
| |
| self.n_features_in_ = X.shape[1] |
| self.n_samples_ = X.shape[0] |
| |
| |
| if hasattr(X, 'columns'): |
| self.feature_names_ = list(X.columns) |
| else: |
| self.feature_names_ = [f"feature_{i}" for i in range(self.n_features_in_)] |
| |
| |
| self.classes_ = np.unique(y) |
| self.n_classes_ = len(self.classes_) |
| |
| if self.n_classes_ == 2: |
| y_encoded = (y == self.classes_[1]).astype(np.float64) |
| else: |
| |
| y_encoded = np.zeros((len(y), self.n_classes_), dtype=np.float64) |
| for i, cls in enumerate(self.classes_): |
| y_encoded[:, i] = (y == cls).astype(np.float64) |
| |
| |
| rng = np.random.RandomState(self.random_state) |
| |
| |
| self.categorical_features_ = categorical_features or [] |
| |
| |
| self._bin_features(X, rng) |
| |
| |
| if self.n_classes_ == 2: |
| |
| p = np.mean(y_encoded) |
| p = np.clip(p, 1e-7, 1 - 1e-7) |
| self.init_prediction_ = np.log(p / (1 - p)) |
| current_pred = np.full(self.n_samples_, self.init_prediction_) |
| else: |
| |
| self.init_prediction_ = np.zeros(self.n_classes_) |
| for i in range(self.n_classes_): |
| p = np.mean(y_encoded[:, i]) |
| p = np.clip(p, 1e-7, 1 - 1e-7) |
| self.init_prediction_[i] = np.log(p / (1 - p)) |
| current_pred = np.tile(self.init_prediction_, (self.n_samples_, 1)) |
| |
| |
| self.trees_ = [] |
| |
| |
| if self.ordered_boosting: |
| self.permutations_ = [rng.permutation(self.n_samples_) |
| for _ in range(self.n_permutations)] |
| |
| if self.n_classes_ == 2: |
| self.supporting_models_ = np.zeros( |
| (self.n_permutations, self.n_samples_) |
| ) |
| else: |
| self.supporting_models_ = np.zeros( |
| (self.n_permutations, self.n_samples_, self.n_classes_) |
| ) |
| |
| |
| for iteration in range(self.n_estimators): |
| if self.verbose > 0 and iteration % 10 == 0: |
| print(f"Iteration {iteration}/{self.n_estimators}") |
| |
| if self.n_classes_ == 2: |
| |
| tree = self._fit_one_tree( |
| self.X_binned_, y_encoded, current_pred, |
| rng, iteration |
| ) |
| self.trees_.append(tree) |
| |
| |
| tree_pred = tree.predict(self.X_binned_) |
| current_pred += tree_pred |
| else: |
| |
| class_trees = [] |
| for k in range(self.n_classes_): |
| tree = self._fit_one_tree( |
| self.X_binned_, y_encoded[:, k], current_pred[:, k], |
| rng, iteration, class_idx=k, full_pred=current_pred |
| ) |
| class_trees.append(tree) |
| current_pred[:, k] += tree.predict(self.X_binned_) |
| self.trees_.append(class_trees) |
| |
| self.is_fitted_ = True |
| return self |
| |
| def _bin_features(self, X, rng): |
| """Discretize continuous features into histogram bins.""" |
| n_features = X.shape[1] |
| self.bin_edges_ = [] |
| self.X_binned_ = np.zeros_like(X, dtype=np.int32) |
| |
| for feat in range(n_features): |
| values = X[:, feat] |
| min_val = np.min(values) |
| max_val = np.max(values) |
| |
| if min_val == max_val: |
| |
| self.bin_edges_.append(np.array([min_val, max_val + 1e-8])) |
| continue |
| |
| if self.binning == 'quantile_sketch': |
| |
| |
| weights = np.ones(len(values)) |
| edges = _weighted_quantile_sketch(values, weights, self.n_bins) |
| else: |
| |
| edges = np.linspace(min_val, max_val, self.n_bins + 1) |
| |
| self.bin_edges_.append(edges) |
| |
| |
| self.X_binned_[:, feat] = np.digitize(values, edges[1:-1]) |
| |
| self.X_binned_[:, feat] = np.clip( |
| self.X_binned_[:, feat], 0, self.n_bins - 1 |
| ) |
| |
| def _fit_one_tree(self, X_binned, y_true, current_pred, rng, iteration, |
| class_idx=None, full_pred=None): |
| """Fit a single tree for one iteration. |
| |
| For multi-class, pass full_pred as the full (n_samples, n_classes) prediction matrix |
| and class_idx as the class index for which to compute gradients. |
| """ |
| |
| if full_pred is not None and class_idx is not None: |
| |
| |
| shifted = full_pred - np.max(full_pred, axis=1, keepdims=True) |
| exp_pred = np.exp(shifted) |
| probs = exp_pred / np.sum(exp_pred, axis=1, keepdims=True) |
| grad = probs[:, class_idx] - y_true |
| hess = probs[:, class_idx] * (1.0 - probs[:, class_idx]) |
| elif self.n_classes_ == 2: |
| grad, hess = _compute_gradients_logloss(y_true, current_pred) |
| else: |
| |
| grad = current_pred - y_true |
| hess = np.ones_like(grad) |
| |
| |
| if self.use_goss and self.subsample < 1.0: |
| n_sample = int(self.subsample * self.n_samples_) |
| sampled_indices = rng.choice(self.n_samples_, n_sample, replace=False) |
| elif self.use_goss: |
| sampled_indices, amplification = _goss_sample( |
| grad, self.goss_a, self.goss_b, rng.randint(0, 2**31) |
| ) |
| |
| mask = np.zeros(self.n_samples_, dtype=bool) |
| mask[sampled_indices] = True |
| n_top = int(self.n_samples_ * self.goss_a) |
| |
| small_grad_mask = mask.copy() |
| top_indices = np.argsort(-np.abs(grad))[:n_top] |
| small_grad_mask[top_indices] = False |
| grad[small_grad_mask] *= amplification |
| hess[small_grad_mask] *= amplification |
| else: |
| sampled_indices = np.arange(self.n_samples_) |
| |
| |
| if self.colsample_bytree < 1.0: |
| n_features = X_binned.shape[1] |
| n_cols = int(self.colsample_bytree * n_features) |
| sampled_features = rng.choice(n_features, n_cols, replace=False) |
| X_subset = X_binned[:, sampled_features] |
| else: |
| X_subset = X_binned |
| sampled_features = None |
| |
| |
| tree = DecisionTree( |
| max_depth=self.max_depth, |
| min_child_weight=self.min_child_weight, |
| l2_reg=self.l2_reg, |
| min_split_gain=self.min_split_gain, |
| learning_rate=self.learning_rate |
| ) |
| |
| if sampled_features is not None: |
| |
| pass |
| |
| tree.fit(X_binned, grad, hess, self.n_bins) |
| |
| return tree |
| |
| def predict(self, X): |
| """Predict class labels.""" |
| check_is_fitted(self) |
| X = check_array(X, dtype=np.float64) |
| |
| proba = self.predict_proba(X) |
| |
| if self.n_classes_ == 2: |
| return self.classes_[(proba[:, 1] > 0.5).astype(int)] |
| else: |
| return self.classes_[np.argmax(proba, axis=1)] |
| |
| def predict_proba(self, X): |
| """Predict class probabilities.""" |
| check_is_fitted(self) |
| X = check_array(X, dtype=np.float64) |
| |
| |
| X_binned = self._transform_to_bins(X) |
| |
| if self.n_classes_ == 2: |
| |
| raw_pred = np.full(X.shape[0], self.init_prediction_) |
| for tree in self.trees_: |
| raw_pred += tree.predict(X_binned) |
| |
| proba = 1.0 / (1.0 + np.exp(-raw_pred)) |
| return np.column_stack([1 - proba, proba]) |
| else: |
| |
| raw_pred = np.tile(self.init_prediction_, (X.shape[0], 1)) |
| for class_trees in self.trees_: |
| for k, tree in enumerate(class_trees): |
| raw_pred[:, k] += tree.predict(X_binned) |
| |
| |
| exp_pred = np.exp(raw_pred - np.max(raw_pred, axis=1, keepdims=True)) |
| proba = exp_pred / np.sum(exp_pred, axis=1, keepdims=True) |
| return proba |
| |
| def _transform_to_bins(self, X): |
| """Transform features to bin indices.""" |
| n_features = X.shape[1] |
| X_binned = np.zeros_like(X, dtype=np.int32) |
| |
| for feat in range(n_features): |
| values = X[:, feat] |
| edges = self.bin_edges_[feat] |
| X_binned[:, feat] = np.digitize(values, edges[1:-1]) |
| X_binned[:, feat] = np.clip(X_binned[:, feat], 0, self.n_bins - 1) |
| |
| return X_binned |
| |
| def predict_fast(self, X, batch_size=10000): |
| """Fast batched prediction using optimized inference. |
| |
| This is a placeholder for the SIMD-optimized inference engine |
| that would be compiled from the trained model (YDF-style). |
| """ |
| return self.predict(X) |
|
|
|
|
| class HyperOptGradientBoostedRegressor(BaseEstimator, RegressorMixin): |
| """HyperOptimized Gradient Boosted Tree Regressor. |
| |
| Shares the same optimizations as the classifier but for regression tasks. |
| """ |
| |
| def __init__(self, |
| n_estimators=100, |
| learning_rate=0.1, |
| max_depth=6, |
| n_bins=255, |
| l2_reg=1.0, |
| min_child_weight=1.0, |
| min_split_gain=0.0, |
| subsample=1.0, |
| colsample_bytree=1.0, |
| use_goss=True, |
| goss_a=0.2, |
| goss_b=0.1, |
| ordered_boosting=False, |
| n_permutations=4, |
| oblivious_trees=False, |
| binning='uniform', |
| n_jobs=-1, |
| random_state=None, |
| verbose=0): |
| self.n_estimators = n_estimators |
| self.learning_rate = learning_rate |
| self.max_depth = max_depth |
| self.n_bins = n_bins |
| self.l2_reg = l2_reg |
| self.min_child_weight = min_child_weight |
| self.min_split_gain = min_split_gain |
| self.subsample = subsample |
| self.colsample_bytree = colsample_bytree |
| self.use_goss = use_goss |
| self.goss_a = goss_a |
| self.goss_b = goss_b |
| self.ordered_boosting = ordered_boosting |
| self.n_permutations = n_permutations |
| self.oblivious_trees = oblivious_trees |
| self.binning = binning |
| self.n_jobs = n_jobs |
| self.random_state = random_state |
| self.verbose = verbose |
| |
| def fit(self, X, y, categorical_features=None, sample_weight=None): |
| """Fit the gradient boosted tree regressor.""" |
| X, y = check_X_y(X, y, accept_sparse=False, dtype=np.float64) |
| |
| self.n_features_in_ = X.shape[1] |
| self.n_samples_ = X.shape[0] |
| |
| if hasattr(X, 'columns'): |
| self.feature_names_ = list(X.columns) |
| else: |
| self.feature_names_ = [f"feature_{i}" for i in range(self.n_features_in_)] |
| |
| rng = np.random.RandomState(self.random_state) |
| self.categorical_features_ = categorical_features or [] |
| |
| |
| self._bin_features(X, rng) |
| |
| |
| self.init_prediction_ = np.mean(y) |
| current_pred = np.full(self.n_samples_, self.init_prediction_) |
| |
| self.trees_ = [] |
| self.y_mean_ = np.mean(y) |
| |
| for iteration in range(self.n_estimators): |
| if self.verbose > 0 and iteration % 10 == 0: |
| print(f"Iteration {iteration}/{self.n_estimators}") |
| |
| tree = self._fit_one_tree( |
| self.X_binned_, y, current_pred, rng, iteration |
| ) |
| self.trees_.append(tree) |
| |
| tree_pred = tree.predict(self.X_binned_) |
| current_pred += tree_pred |
| |
| self.is_fitted_ = True |
| return self |
| |
| def _bin_features(self, X, rng): |
| """Discretize continuous features into histogram bins.""" |
| n_features = X.shape[1] |
| self.bin_edges_ = [] |
| self.X_binned_ = np.zeros_like(X, dtype=np.int32) |
| |
| for feat in range(n_features): |
| values = X[:, feat] |
| min_val = np.min(values) |
| max_val = np.max(values) |
| |
| if min_val == max_val: |
| self.bin_edges_.append(np.array([min_val, max_val + 1e-8])) |
| continue |
| |
| if self.binning == 'quantile_sketch': |
| weights = np.ones(len(values)) |
| edges = _weighted_quantile_sketch(values, weights, self.n_bins) |
| else: |
| edges = np.linspace(min_val, max_val, self.n_bins + 1) |
| |
| self.bin_edges_.append(edges) |
| self.X_binned_[:, feat] = np.digitize(values, edges[1:-1]) |
| self.X_binned_[:, feat] = np.clip( |
| self.X_binned_[:, feat], 0, self.n_bins - 1 |
| ) |
| |
| def _fit_one_tree(self, X_binned, y_true, current_pred, rng, iteration): |
| """Fit a single tree for regression.""" |
| grad, hess = _compute_gradients_mse(y_true, current_pred) |
| |
| |
| if self.use_goss and self.subsample < 1.0: |
| n_sample = int(self.subsample * self.n_samples_) |
| sampled_indices = rng.choice(self.n_samples_, n_sample, replace=False) |
| elif self.use_goss: |
| sampled_indices, amplification = _goss_sample( |
| grad, self.goss_a, self.goss_b, rng.randint(0, 2**31) |
| ) |
| mask = np.zeros(self.n_samples_, dtype=bool) |
| mask[sampled_indices] = True |
| n_top = int(self.n_samples_ * self.goss_a) |
| top_indices = np.argsort(-np.abs(grad))[:n_top] |
| small_grad_mask = mask.copy() |
| small_grad_mask[top_indices] = False |
| grad[small_grad_mask] *= amplification |
| hess[small_grad_mask] *= amplification |
| else: |
| sampled_indices = np.arange(self.n_samples_) |
| |
| |
| if self.colsample_bytree < 1.0: |
| n_features = X_binned.shape[1] |
| n_cols = int(self.colsample_bytree * n_features) |
| sampled_features = rng.choice(n_features, n_cols, replace=False) |
| else: |
| sampled_features = None |
| |
| tree = DecisionTree( |
| max_depth=self.max_depth, |
| min_child_weight=self.min_child_weight, |
| l2_reg=self.l2_reg, |
| min_split_gain=self.min_split_gain, |
| learning_rate=self.learning_rate |
| ) |
| |
| tree.fit(X_binned, grad, hess, self.n_bins) |
| return tree |
| |
| def predict(self, X): |
| """Predict regression target.""" |
| check_is_fitted(self) |
| X = check_array(X, dtype=np.float64) |
| |
| X_binned = self._transform_to_bins(X) |
| |
| predictions = np.full(X.shape[0], self.init_prediction_) |
| for tree in self.trees_: |
| predictions += tree.predict(X_binned) |
| |
| return predictions |
| |
| def _transform_to_bins(self, X): |
| """Transform features to bin indices.""" |
| n_features = X.shape[1] |
| X_binned = np.zeros_like(X, dtype=np.int32) |
| |
| for feat in range(n_features): |
| values = X[:, feat] |
| edges = self.bin_edges_[feat] |
| X_binned[:, feat] = np.digitize(values, edges[1:-1]) |
| X_binned[:, feat] = np.clip(X_binned[:, feat], 0, self.n_bins - 1) |
| |
| return X_binned |