| | """ |
| | model_backend.py β Gradient boosting abstraction for LightGBM / sklearn HGBM. |
| | |
| | LightGBM (preferred): |
| | pip install lightgbm |
| | Set USE_LIGHTGBM = True below. |
| | |
| | Fallback: sklearn HistGradientBoostingClassifier. |
| | Same algorithm family, native NaN support, comparable speed. |
| | Feature importances use permutation importance (val set). |
| | |
| | Interface is identical regardless of backend: |
| | .fit() β trains + calibrates |
| | .predict_win_prob() β P(win) per row |
| | .feature_importances_ β normalized importance array |
| | """ |
| |
|
| | import numpy as np |
| |
|
| | try: |
| | import lightgbm as lgb |
| | _LGBM_AVAILABLE = True |
| | except ImportError: |
| | _LGBM_AVAILABLE = False |
| |
|
| | from sklearn.ensemble import HistGradientBoostingClassifier |
| | from sklearn.calibration import CalibratedClassifierCV |
| | from sklearn.inspection import permutation_importance |
| |
|
| | USE_LIGHTGBM = False |
| |
|
| |
|
| | def _build_lgbm(p: dict): |
| | return lgb.LGBMClassifier( |
| | n_estimators = p.get("n_estimators", 400), |
| | learning_rate = p.get("learning_rate", 0.03), |
| | max_depth = p.get("max_depth", 5), |
| | min_child_samples = p.get("min_samples_leaf", 40), |
| | reg_lambda = p.get("l2_regularization", 2.0), |
| | feature_fraction = p.get("max_features", 0.70), |
| | subsample = 0.80, |
| | subsample_freq = 1, |
| | n_jobs = -1, |
| | random_state = p.get("random_state", 42), |
| | verbosity = -1, |
| | objective = "binary", |
| | metric = "binary_logloss", |
| | early_stopping_rounds = p.get("early_stopping_rounds", 30), |
| | ) |
| |
|
| |
|
| | def _build_hgbm(p: dict): |
| | return HistGradientBoostingClassifier( |
| | max_iter = p.get("n_estimators", 400), |
| | learning_rate = p.get("learning_rate", 0.03), |
| | max_depth = p.get("max_depth", 5), |
| | min_samples_leaf = p.get("min_samples_leaf", 40), |
| | l2_regularization = p.get("l2_regularization", 2.0), |
| | max_features = p.get("max_features", 0.70), |
| | early_stopping = True, |
| | validation_fraction = p.get("validation_fraction", 0.15), |
| | n_iter_no_change = p.get("n_iter_no_change", 30), |
| | random_state = p.get("random_state", 42), |
| | verbose = 0, |
| | ) |
| |
|
| |
|
| | class ModelBackend: |
| | """ |
| | Unified classifier. After fit(): |
| | .predict_proba(X) β (N, 2) array |
| | .predict_win_prob(X) β (N,) array of P(win) |
| | .feature_importances_ β (n_features,) normalized importances |
| | .n_iter_ β actual boosting rounds used |
| | """ |
| |
|
| | def __init__(self, params: dict, calibrate: bool = True): |
| | self.params = params |
| | self.calibrate = calibrate |
| | self._base = None |
| | self._model = None |
| | self.feature_importances_: np.ndarray = np.array([]) |
| | self.n_iter_: int = 0 |
| | self._backend_name = "lightgbm" if (USE_LIGHTGBM and _LGBM_AVAILABLE) else "hgbm" |
| |
|
| | @property |
| | def backend_name(self) -> str: |
| | return self._backend_name |
| |
|
| | def fit( |
| | self, |
| | X_train: np.ndarray, |
| | y_train: np.ndarray, |
| | X_val: np.ndarray = None, |
| | y_val: np.ndarray = None, |
| | sample_weight: np.ndarray = None, |
| | ) -> "ModelBackend": |
| | sw = sample_weight |
| |
|
| | if self._backend_name == "lightgbm": |
| | self._base = _build_lgbm(self.params) |
| | kw = {} |
| | if X_val is not None: |
| | kw["eval_set"] = [(X_val, y_val)] |
| | if sw is not None: |
| | kw["sample_weight"] = sw |
| | self._base.fit(X_train, y_train, **kw) |
| | self.n_iter_ = int(getattr(self._base, "best_iteration_", 0)) |
| | else: |
| | self._base = _build_hgbm(self.params) |
| | kw = {} |
| | if sw is not None: |
| | kw["sample_weight"] = sw |
| | self._base.fit(X_train, y_train, **kw) |
| | self.n_iter_ = int(getattr(self._base, "n_iter_", self.params.get("n_estimators", 400))) |
| |
|
| | |
| | if (self.calibrate and X_val is not None and |
| | len(X_val) >= 50 and len(np.unique(y_val)) == 2): |
| | cal = CalibratedClassifierCV(self._base, method="isotonic", cv=5) |
| | cal.fit(X_val, y_val) |
| | self._model = cal |
| | else: |
| | self._model = self._base |
| |
|
| | |
| | self._compute_importances(X_val, y_val) |
| | return self |
| |
|
| | def _compute_importances(self, X_val: np.ndarray = None, y_val: np.ndarray = None): |
| | base = self._base |
| | if base is None: |
| | return |
| |
|
| | |
| | if hasattr(base, "feature_importances_"): |
| | imp = np.array(base.feature_importances_, dtype=np.float64) |
| | |
| | elif X_val is not None and len(X_val) >= 20: |
| | result = permutation_importance( |
| | base, X_val, y_val, |
| | n_repeats=5, |
| | random_state=42, |
| | n_jobs=-1, |
| | ) |
| | imp = np.maximum(result.importances_mean, 0.0) |
| | else: |
| | |
| | n_feat = getattr(base, "n_features_in_", 1) |
| | imp = np.ones(n_feat, dtype=np.float64) |
| |
|
| | |
| | total = imp.sum() |
| | self.feature_importances_ = imp / total if total > 0 else imp |
| |
|
| | def predict_proba(self, X: np.ndarray) -> np.ndarray: |
| | if self._model is None: |
| | raise RuntimeError("Call .fit() before .predict_proba().") |
| | return self._model.predict_proba(X) |
| |
|
| | def predict_win_prob(self, X: np.ndarray) -> np.ndarray: |
| | """Return 1-D array of P(win) for each row.""" |
| | return self.predict_proba(X)[:, 1] |
| |
|