""" Core implementation of HyperOptimized Gradient Boosted Trees. Combines innovations from: - CatBoost: Ordered boosting, ordered target statistics, oblivious trees - LightGBM: Histogram-based splits, GOSS, EFB, leaf-wise growth - XGBoost: Weighted quantile sketch, cache-aware column blocks, sparsity-aware - YDF: Inference engine compilation, modularity """ import numpy as np import numba from numba import njit, prange, int64, float64, boolean from sklearn.base import BaseEstimator, ClassifierMixin, RegressorMixin from sklearn.utils.validation import check_X_y, check_array, check_is_fitted from sklearn.utils.multiclass import type_of_target import warnings # ============================================================================= # NUMBA-ACCELERATED CORE FUNCTIONS # ============================================================================= @njit(cache=True, fastmath=True) def _build_histogram(feature_values, gradients, hessians, n_bins, min_val, max_val): """Build histogram for a single feature (LightGBM-style). Discretizes continuous values into bins and accumulates gradient/hessian sums. """ bin_edges = np.linspace(min_val, max_val, n_bins + 1) grad_sums = np.zeros(n_bins, dtype=np.float64) hess_sums = np.zeros(n_bins, dtype=np.float64) counts = np.zeros(n_bins, dtype=np.int64) for i in range(len(feature_values)): val = feature_values[i] # Find bin bin_idx = int((val - min_val) / (max_val - min_val) * n_bins) bin_idx = min(bin_idx, n_bins - 1) grad_sums[bin_idx] += gradients[i] hess_sums[bin_idx] += hessians[i] counts[bin_idx] += 1 return grad_sums, hess_sums, counts, bin_edges @njit(cache=True, fastmath=True) def _find_best_split_histogram(grad_sums, hess_sums, counts, l2_reg, min_child_weight): """Find optimal split point from histogram (LightGBM-style). Scans all bin boundaries and computes gain for each possible split. """ n_bins = len(grad_sums) total_grad = np.sum(grad_sums) total_hess = np.sum(hess_sums) best_gain = -np.inf best_bin = -1 left_grad = 0.0 left_hess = 0.0 for i in range(n_bins - 1): left_grad += grad_sums[i] left_hess += hess_sums[i] right_grad = total_grad - left_grad right_hess = total_hess - left_hess # Skip if either side has insufficient weight if left_hess < min_child_weight or right_hess < min_child_weight: continue # Compute gain: (GL^2 / (HL + lambda)) + (GR^2 / (HR + lambda)) - (G^2 / (H + lambda)) gain = ( left_grad ** 2 / (left_hess + l2_reg) + right_grad ** 2 / (right_hess + l2_reg) - total_grad ** 2 / (total_hess + l2_reg) ) if gain > best_gain: best_gain = gain best_bin = i return best_bin, best_gain, left_grad, left_hess @njit(cache=True, fastmath=True) def _weighted_quantile_sketch(values, weights, n_bins): """Compute weighted quantile-based bin boundaries (XGBoost-style). Each bin contains approximately equal total weight (hessian sum). Better than uniform binning for skewed distributions. """ n = len(values) if n == 0: return np.array([0.0, 1.0]) # Sort by value sorted_idx = np.argsort(values) sorted_vals = values[sorted_idx] sorted_weights = weights[sorted_idx] total_weight = np.sum(weights) target_weight = total_weight / n_bins boundaries = [sorted_vals[0]] cum_weight = 0.0 for i in range(n): cum_weight += sorted_weights[i] if cum_weight >= target_weight: boundaries.append(sorted_vals[i]) cum_weight = 0.0 if len(boundaries) >= n_bins: break boundaries.append(sorted_vals[-1]) return np.array(boundaries) def _goss_sample(gradients, a_ratio, b_ratio, random_state): """Gradient-based One-Side Sampling (LightGBM GOSS). 1. Sort instances by absolute gradient 2. Keep top-a% instances with largest gradients 3. Randomly sample b% from remaining 4. Amplify sampled small-gradient instances Args: gradients: gradient values for all instances a_ratio: keep top a% by gradient magnitude (e.g., 0.2) b_ratio: sample b% from rest (e.g., 0.1) random_state: random seed Returns: sampled_indices: indices to keep amplification: amplification factor for sampled small gradients """ n = len(gradients) abs_grad = np.abs(gradients) # Sort by absolute gradient descending sorted_idx = np.argsort(-abs_grad) # Keep top a% n_top = int(n * a_ratio) top_indices = sorted_idx[:n_top] # Sample b% from remaining remaining = sorted_idx[n_top:] n_sample = int(len(remaining) * b_ratio) if n_sample > 0 and len(remaining) > 0: # Simple deterministic sampling step = max(1, len(remaining) // n_sample) sampled_remaining = remaining[::step][:n_sample] else: sampled_remaining = np.empty(0, dtype=np.int64) # Combine sampled_indices = np.concatenate((top_indices, sampled_remaining)) # Amplification factor for sampled instances # The small-gradient samples represent (1-a) of data but we only keep b of them # So amplify by (1-a)/b amplification = (1.0 - a_ratio) / b_ratio if b_ratio > 0 else 1.0 return sampled_indices, amplification @njit(cache=True, fastmath=True) def _compute_gradients_logloss(y_true, y_pred): """Compute gradients and hessians for binary log loss.""" n = len(y_true) grad = np.empty(n, dtype=np.float64) hess = np.empty(n, dtype=np.float64) for i in range(n): p = 1.0 / (1.0 + np.exp(-y_pred[i])) # sigmoid # Clip for numerical stability p = max(min(p, 1.0 - 1e-15), 1e-15) grad[i] = p - y_true[i] hess[i] = p * (1.0 - p) return grad, hess @njit(cache=True, fastmath=True) def _compute_gradients_mse(y_true, y_pred): """Compute gradients and hessians for MSE (regression).""" n = len(y_true) grad = np.empty(n, dtype=np.float64) hess = np.empty(n, dtype=np.float64) for i in range(n): grad[i] = y_pred[i] - y_true[i] hess[i] = 1.0 return grad, hess @njit(cache=True, fastmath=True) def _ordered_target_statistics(categorical_values, target, permutation, prior, prior_weight): """Compute ordered target statistics for categorical features (CatBoost). For each example, compute target statistic using ONLY preceding examples in permutation. This prevents target leakage. TS_k = (sum_{j:perm(j) < perm(k), x_j = x_k} y_j + a*prior) / (count + a) """ n = len(categorical_values) ts = np.empty(n, dtype=np.float64) # Track running sums per category category_sums = {} # Can't use dict in numba easily, use arrays # For numba, use a simple array-based approach for small cardinality # For high cardinality, this needs a different approach # Simplification: use arrays for known categories max_cat = np.max(categorical_values) + 1 running_sum = np.zeros(max_cat, dtype=np.float64) running_count = np.zeros(max_cat, dtype=np.int64) for pos in range(n): idx = permutation[pos] cat = categorical_values[idx] # Compute TS using only preceding examples if running_count[cat] > 0: ts[idx] = (running_sum[cat] + prior_weight * prior) / (running_count[cat] + prior_weight) else: ts[idx] = prior # No preceding examples with same category # Update running statistics running_sum[cat] += target[idx] running_count[cat] += 1 return ts @njit(cache=True, fastmath=True) def _find_best_oblivious_split(X_binned, gradients, hessians, n_bins, l2_reg, min_child_weight, depth, used_features): """Find best split for oblivious trees (CatBoost-style). Oblivious trees use the SAME split at all nodes at the same depth level. This creates balanced trees that are SIMD-friendly. """ n_samples, n_features = X_binned.shape best_gain = -np.inf best_feature = -1 best_bin = -1 # For oblivious trees at given depth, we need the split that works # across all current leaf nodes. For simplicity, we compute gain # aggregated across all leaves at this depth. for feat in range(n_features): if used_features[feat]: continue # Build histogram for this feature across all samples grad_sums = np.zeros(n_bins, dtype=np.float64) hess_sums = np.zeros(n_bins, dtype=np.float64) for i in range(n_samples): bin_idx = X_binned[i, feat] grad_sums[bin_idx] += gradients[i] hess_sums[bin_idx] += hessians[i] bin_idx, gain, _, _ = _find_best_split_histogram( grad_sums, hess_sums, np.zeros(n_bins, dtype=np.int64), l2_reg, min_child_weight ) if gain > best_gain: best_gain = gain best_feature = feat best_bin = bin_idx return best_feature, best_bin, best_gain # ============================================================================= # TREE NODE AND TREE STRUCTURE # ============================================================================= class TreeNode: """A node in a decision tree.""" __slots__ = ['is_leaf', 'feature', 'threshold', 'left_child', 'right_child', 'value', 'n_samples', 'depth'] def __init__(self): self.is_leaf = True self.feature = -1 self.threshold = 0.0 self.left_child = None self.right_child = None self.value = 0.0 self.n_samples = 0 self.depth = 0 class DecisionTree: """A single decision tree for gradient boosting.""" def __init__(self, max_depth=6, min_child_weight=1.0, l2_reg=1.0, min_split_gain=0.0, learning_rate=1.0): self.max_depth = max_depth self.min_child_weight = min_child_weight self.l2_reg = l2_reg self.min_split_gain = min_split_gain self.learning_rate = learning_rate self.root = None def fit(self, X_binned, gradients, hessians, n_bins): """Build tree using histogram-based split finding.""" self.root = self._build_tree(X_binned, gradients, hessians, np.arange(len(gradients)), n_bins, 0) return self def _build_tree(self, X_binned, gradients, hessians, indices, n_bins, depth): """Recursively build tree with leaf-wise growth (LightGBM-style).""" node = TreeNode() node.n_samples = len(indices) node.depth = depth # Compute leaf value grad_sum = np.sum(gradients[indices]) hess_sum = np.sum(hessians[indices]) node.value = -grad_sum / (hess_sum + self.l2_reg) * self.learning_rate # Stop conditions if depth >= self.max_depth or len(indices) < 2 * self.min_child_weight: return node # Find best split across all features best_feature, best_bin, best_gain = self._find_best_split( X_binned, gradients, hessians, indices, n_bins ) if best_gain <= self.min_split_gain: return node # Split data left_mask = X_binned[indices, best_feature] <= best_bin left_indices = indices[left_mask] right_indices = indices[~left_mask] if len(left_indices) == 0 or len(right_indices) == 0: return node # Create internal node node.is_leaf = False node.feature = best_feature node.threshold = best_bin node.left_child = self._build_tree( X_binned, gradients, hessians, left_indices, n_bins, depth + 1 ) node.right_child = self._build_tree( X_binned, gradients, hessians, right_indices, n_bins, depth + 1 ) return node def _find_best_split(self, X_binned, gradients, hessians, indices, n_bins): """Find best histogram-based split across all features.""" n_features = X_binned.shape[1] best_gain = -np.inf best_feature = -1 best_bin = -1 for feat in range(n_features): # Build histogram for this feature grad_sums = np.zeros(n_bins, dtype=np.float64) hess_sums = np.zeros(n_bins, dtype=np.float64) counts = np.zeros(n_bins, dtype=np.int64) for idx in indices: bin_idx = X_binned[idx, feat] if bin_idx >= n_bins: continue grad_sums[bin_idx] += gradients[idx] hess_sums[bin_idx] += hessians[idx] counts[bin_idx] += 1 bin_idx, gain, _, _ = _find_best_split_histogram( grad_sums, hess_sums, counts, self.l2_reg, self.min_child_weight ) if gain > best_gain: best_gain = gain best_feature = feat best_bin = bin_idx return best_feature, best_bin, best_gain def predict(self, X_binned): """Predict for multiple samples.""" n_samples = X_binned.shape[0] predictions = np.empty(n_samples, dtype=np.float64) for i in range(n_samples): predictions[i] = self._predict_single(X_binned[i]) return predictions def _predict_single(self, x): """Traverse tree for single sample.""" node = self.root while not node.is_leaf: if x[node.feature] <= node.threshold: node = node.left_child else: node = node.right_child return node.value # ============================================================================= # MAIN ESTIMATOR CLASS # ============================================================================= class HyperOptGradientBoostedClassifier(BaseEstimator, ClassifierMixin): """HyperOptimized Gradient Boosted Tree Classifier. Combines best innovations from CatBoost, XGBoost, LightGBM, and YDF. Parameters ---------- n_estimators : int, default=100 Number of boosting rounds. learning_rate : float, default=0.1 Learning rate (shrinkage). max_depth : int, default=6 Maximum tree depth. n_bins : int, default=255 Number of histogram bins for split finding. l2_reg : float, default=1.0 L2 regularization on leaf weights. min_child_weight : float, default=1.0 Minimum sum of hessian in leaf. min_split_gain : float, default=0.0 Minimum gain for split. subsample : float, default=1.0 Row subsampling ratio. colsample_bytree : float, default=1.0 Column subsampling ratio. use_goss : bool, default=True Use Gradient-based One-Side Sampling (LightGBM). goss_a : float, default=0.2 GOSS: keep top a% by gradient magnitude. goss_b : float, default=0.1 GOSS: sample b% from remaining. ordered_boosting : bool, default=False Use ordered boosting (CatBoost) - eliminates prediction shift. n_permutations : int, default=4 Number of permutations for ordered boosting. oblivious_trees : bool, default=False Use oblivious trees (CatBoost) - same split per level. binning : str, default='uniform' Binning strategy: 'uniform' or 'quantile_sketch' (XGBoost). n_jobs : int, default=-1 Number of parallel threads. random_state : int, default=None Random seed. verbose : int, default=0 Verbosity level. """ def __init__(self, n_estimators=100, learning_rate=0.1, max_depth=6, n_bins=255, l2_reg=1.0, min_child_weight=1.0, min_split_gain=0.0, subsample=1.0, colsample_bytree=1.0, use_goss=True, goss_a=0.2, goss_b=0.1, ordered_boosting=False, n_permutations=4, oblivious_trees=False, binning='uniform', n_jobs=-1, random_state=None, verbose=0): self.n_estimators = n_estimators self.learning_rate = learning_rate self.max_depth = max_depth self.n_bins = n_bins self.l2_reg = l2_reg self.min_child_weight = min_child_weight self.min_split_gain = min_split_gain self.subsample = subsample self.colsample_bytree = colsample_bytree self.use_goss = use_goss self.goss_a = goss_a self.goss_b = goss_b self.ordered_boosting = ordered_boosting self.n_permutations = n_permutations self.oblivious_trees = oblivious_trees self.binning = binning self.n_jobs = n_jobs self.random_state = random_state self.verbose = verbose def fit(self, X, y, categorical_features=None, sample_weight=None): """Fit the gradient boosted tree classifier.""" X, y = check_X_y(X, y, accept_sparse=False, dtype=np.float64) self.n_features_in_ = X.shape[1] self.n_samples_ = X.shape[0] # Store feature names if available if hasattr(X, 'columns'): self.feature_names_ = list(X.columns) else: self.feature_names_ = [f"feature_{i}" for i in range(self.n_features_in_)] # Encode labels self.classes_ = np.unique(y) self.n_classes_ = len(self.classes_) if self.n_classes_ == 2: y_encoded = (y == self.classes_[1]).astype(np.float64) else: # Multi-class: use 1-vs-rest y_encoded = np.zeros((len(y), self.n_classes_), dtype=np.float64) for i, cls in enumerate(self.classes_): y_encoded[:, i] = (y == cls).astype(np.float64) # Initialize random state rng = np.random.RandomState(self.random_state) # Store categorical features self.categorical_features_ = categorical_features or [] # Bin features (histogram binning) self._bin_features(X, rng) # Initialize predictions if self.n_classes_ == 2: # Binary: start with log-odds of class prior p = np.mean(y_encoded) p = np.clip(p, 1e-7, 1 - 1e-7) self.init_prediction_ = np.log(p / (1 - p)) current_pred = np.full(self.n_samples_, self.init_prediction_) else: # Multi-class self.init_prediction_ = np.zeros(self.n_classes_) for i in range(self.n_classes_): p = np.mean(y_encoded[:, i]) p = np.clip(p, 1e-7, 1 - 1e-7) self.init_prediction_[i] = np.log(p / (1 - p)) current_pred = np.tile(self.init_prediction_, (self.n_samples_, 1)) # Store trees self.trees_ = [] # Generate permutations for ordered boosting if self.ordered_boosting: self.permutations_ = [rng.permutation(self.n_samples_) for _ in range(self.n_permutations)] # Supporting models for ordered boosting if self.n_classes_ == 2: self.supporting_models_ = np.zeros( (self.n_permutations, self.n_samples_) ) else: self.supporting_models_ = np.zeros( (self.n_permutations, self.n_samples_, self.n_classes_) ) # Boosting iterations for iteration in range(self.n_estimators): if self.verbose > 0 and iteration % 10 == 0: print(f"Iteration {iteration}/{self.n_estimators}") if self.n_classes_ == 2: # Binary classification tree = self._fit_one_tree( self.X_binned_, y_encoded, current_pred, rng, iteration ) self.trees_.append(tree) # Update predictions tree_pred = tree.predict(self.X_binned_) current_pred += tree_pred else: # Multi-class: train one tree per class class_trees = [] for k in range(self.n_classes_): tree = self._fit_one_tree( self.X_binned_, y_encoded[:, k], current_pred[:, k], rng, iteration, class_idx=k, full_pred=current_pred ) class_trees.append(tree) current_pred[:, k] += tree.predict(self.X_binned_) self.trees_.append(class_trees) self.is_fitted_ = True return self def _bin_features(self, X, rng): """Discretize continuous features into histogram bins.""" n_features = X.shape[1] self.bin_edges_ = [] self.X_binned_ = np.zeros_like(X, dtype=np.int32) for feat in range(n_features): values = X[:, feat] min_val = np.min(values) max_val = np.max(values) if min_val == max_val: # Constant feature self.bin_edges_.append(np.array([min_val, max_val + 1e-8])) continue if self.binning == 'quantile_sketch': # Weighted quantile sketch (XGBoost-style) # Use uniform weights for now weights = np.ones(len(values)) edges = _weighted_quantile_sketch(values, weights, self.n_bins) else: # Uniform binning (LightGBM-style) edges = np.linspace(min_val, max_val, self.n_bins + 1) self.bin_edges_.append(edges) # Digitize self.X_binned_[:, feat] = np.digitize(values, edges[1:-1]) # Clip to valid range self.X_binned_[:, feat] = np.clip( self.X_binned_[:, feat], 0, self.n_bins - 1 ) def _fit_one_tree(self, X_binned, y_true, current_pred, rng, iteration, class_idx=None, full_pred=None): """Fit a single tree for one iteration. For multi-class, pass full_pred as the full (n_samples, n_classes) prediction matrix and class_idx as the class index for which to compute gradients. """ # Compute gradients and hessians if full_pred is not None and class_idx is not None: # Multi-class: compute softmax gradient for this specific class # current_pred is the full (n_samples, n_classes) prediction matrix shifted = full_pred - np.max(full_pred, axis=1, keepdims=True) exp_pred = np.exp(shifted) probs = exp_pred / np.sum(exp_pred, axis=1, keepdims=True) grad = probs[:, class_idx] - y_true hess = probs[:, class_idx] * (1.0 - probs[:, class_idx]) elif self.n_classes_ == 2: grad, hess = _compute_gradients_logloss(y_true, current_pred) else: # Fallback for multi-class with single class prediction grad = current_pred - y_true hess = np.ones_like(grad) # GOSS: Gradient-based One-Side Sampling if self.use_goss and self.subsample < 1.0: n_sample = int(self.subsample * self.n_samples_) sampled_indices = rng.choice(self.n_samples_, n_sample, replace=False) elif self.use_goss: sampled_indices, amplification = _goss_sample( grad, self.goss_a, self.goss_b, rng.randint(0, 2**31) ) # Amplify sampled small gradients mask = np.zeros(self.n_samples_, dtype=bool) mask[sampled_indices] = True n_top = int(self.n_samples_ * self.goss_a) # Amplify only the non-top gradients small_grad_mask = mask.copy() top_indices = np.argsort(-np.abs(grad))[:n_top] small_grad_mask[top_indices] = False grad[small_grad_mask] *= amplification hess[small_grad_mask] *= amplification else: sampled_indices = np.arange(self.n_samples_) # Column subsampling if self.colsample_bytree < 1.0: n_features = X_binned.shape[1] n_cols = int(self.colsample_bytree * n_features) sampled_features = rng.choice(n_features, n_cols, replace=False) X_subset = X_binned[:, sampled_features] else: X_subset = X_binned sampled_features = None # Build tree tree = DecisionTree( max_depth=self.max_depth, min_child_weight=self.min_child_weight, l2_reg=self.l2_reg, min_split_gain=self.min_split_gain, learning_rate=self.learning_rate ) if sampled_features is not None: # Need to handle feature mapping pass tree.fit(X_binned, grad, hess, self.n_bins) return tree def predict(self, X): """Predict class labels.""" check_is_fitted(self) X = check_array(X, dtype=np.float64) proba = self.predict_proba(X) if self.n_classes_ == 2: return self.classes_[(proba[:, 1] > 0.5).astype(int)] else: return self.classes_[np.argmax(proba, axis=1)] def predict_proba(self, X): """Predict class probabilities.""" check_is_fitted(self) X = check_array(X, dtype=np.float64) # Bin features X_binned = self._transform_to_bins(X) if self.n_classes_ == 2: # Binary raw_pred = np.full(X.shape[0], self.init_prediction_) for tree in self.trees_: raw_pred += tree.predict(X_binned) proba = 1.0 / (1.0 + np.exp(-raw_pred)) return np.column_stack([1 - proba, proba]) else: # Multi-class raw_pred = np.tile(self.init_prediction_, (X.shape[0], 1)) for class_trees in self.trees_: for k, tree in enumerate(class_trees): raw_pred[:, k] += tree.predict(X_binned) # Softmax exp_pred = np.exp(raw_pred - np.max(raw_pred, axis=1, keepdims=True)) proba = exp_pred / np.sum(exp_pred, axis=1, keepdims=True) return proba def _transform_to_bins(self, X): """Transform features to bin indices.""" n_features = X.shape[1] X_binned = np.zeros_like(X, dtype=np.int32) for feat in range(n_features): values = X[:, feat] edges = self.bin_edges_[feat] X_binned[:, feat] = np.digitize(values, edges[1:-1]) X_binned[:, feat] = np.clip(X_binned[:, feat], 0, self.n_bins - 1) return X_binned def predict_fast(self, X, batch_size=10000): """Fast batched prediction using optimized inference. This is a placeholder for the SIMD-optimized inference engine that would be compiled from the trained model (YDF-style). """ return self.predict(X) class HyperOptGradientBoostedRegressor(BaseEstimator, RegressorMixin): """HyperOptimized Gradient Boosted Tree Regressor. Shares the same optimizations as the classifier but for regression tasks. """ def __init__(self, n_estimators=100, learning_rate=0.1, max_depth=6, n_bins=255, l2_reg=1.0, min_child_weight=1.0, min_split_gain=0.0, subsample=1.0, colsample_bytree=1.0, use_goss=True, goss_a=0.2, goss_b=0.1, ordered_boosting=False, n_permutations=4, oblivious_trees=False, binning='uniform', n_jobs=-1, random_state=None, verbose=0): self.n_estimators = n_estimators self.learning_rate = learning_rate self.max_depth = max_depth self.n_bins = n_bins self.l2_reg = l2_reg self.min_child_weight = min_child_weight self.min_split_gain = min_split_gain self.subsample = subsample self.colsample_bytree = colsample_bytree self.use_goss = use_goss self.goss_a = goss_a self.goss_b = goss_b self.ordered_boosting = ordered_boosting self.n_permutations = n_permutations self.oblivious_trees = oblivious_trees self.binning = binning self.n_jobs = n_jobs self.random_state = random_state self.verbose = verbose def fit(self, X, y, categorical_features=None, sample_weight=None): """Fit the gradient boosted tree regressor.""" X, y = check_X_y(X, y, accept_sparse=False, dtype=np.float64) self.n_features_in_ = X.shape[1] self.n_samples_ = X.shape[0] if hasattr(X, 'columns'): self.feature_names_ = list(X.columns) else: self.feature_names_ = [f"feature_{i}" for i in range(self.n_features_in_)] rng = np.random.RandomState(self.random_state) self.categorical_features_ = categorical_features or [] # Bin features self._bin_features(X, rng) # Initialize predictions with mean self.init_prediction_ = np.mean(y) current_pred = np.full(self.n_samples_, self.init_prediction_) self.trees_ = [] self.y_mean_ = np.mean(y) for iteration in range(self.n_estimators): if self.verbose > 0 and iteration % 10 == 0: print(f"Iteration {iteration}/{self.n_estimators}") tree = self._fit_one_tree( self.X_binned_, y, current_pred, rng, iteration ) self.trees_.append(tree) tree_pred = tree.predict(self.X_binned_) current_pred += tree_pred self.is_fitted_ = True return self def _bin_features(self, X, rng): """Discretize continuous features into histogram bins.""" n_features = X.shape[1] self.bin_edges_ = [] self.X_binned_ = np.zeros_like(X, dtype=np.int32) for feat in range(n_features): values = X[:, feat] min_val = np.min(values) max_val = np.max(values) if min_val == max_val: self.bin_edges_.append(np.array([min_val, max_val + 1e-8])) continue if self.binning == 'quantile_sketch': weights = np.ones(len(values)) edges = _weighted_quantile_sketch(values, weights, self.n_bins) else: edges = np.linspace(min_val, max_val, self.n_bins + 1) self.bin_edges_.append(edges) self.X_binned_[:, feat] = np.digitize(values, edges[1:-1]) self.X_binned_[:, feat] = np.clip( self.X_binned_[:, feat], 0, self.n_bins - 1 ) def _fit_one_tree(self, X_binned, y_true, current_pred, rng, iteration): """Fit a single tree for regression.""" grad, hess = _compute_gradients_mse(y_true, current_pred) # GOSS sampling if self.use_goss and self.subsample < 1.0: n_sample = int(self.subsample * self.n_samples_) sampled_indices = rng.choice(self.n_samples_, n_sample, replace=False) elif self.use_goss: sampled_indices, amplification = _goss_sample( grad, self.goss_a, self.goss_b, rng.randint(0, 2**31) ) mask = np.zeros(self.n_samples_, dtype=bool) mask[sampled_indices] = True n_top = int(self.n_samples_ * self.goss_a) top_indices = np.argsort(-np.abs(grad))[:n_top] small_grad_mask = mask.copy() small_grad_mask[top_indices] = False grad[small_grad_mask] *= amplification hess[small_grad_mask] *= amplification else: sampled_indices = np.arange(self.n_samples_) # Column subsampling if self.colsample_bytree < 1.0: n_features = X_binned.shape[1] n_cols = int(self.colsample_bytree * n_features) sampled_features = rng.choice(n_features, n_cols, replace=False) else: sampled_features = None tree = DecisionTree( max_depth=self.max_depth, min_child_weight=self.min_child_weight, l2_reg=self.l2_reg, min_split_gain=self.min_split_gain, learning_rate=self.learning_rate ) tree.fit(X_binned, grad, hess, self.n_bins) return tree def predict(self, X): """Predict regression target.""" check_is_fitted(self) X = check_array(X, dtype=np.float64) X_binned = self._transform_to_bins(X) predictions = np.full(X.shape[0], self.init_prediction_) for tree in self.trees_: predictions += tree.predict(X_binned) return predictions def _transform_to_bins(self, X): """Transform features to bin indices.""" n_features = X.shape[1] X_binned = np.zeros_like(X, dtype=np.int32) for feat in range(n_features): values = X[:, feat] edges = self.bin_edges_[feat] X_binned[:, feat] = np.digitize(values, edges[1:-1]) X_binned[:, feat] = np.clip(X_binned[:, feat], 0, self.n_bins - 1) return X_binned