Goshawk_Hedge_Pro / model_backend.py
GoshawkVortexAI's picture
Create model_backend.py
0fd33e0 verified
"""
model_backend.py β€” Gradient boosting abstraction for LightGBM / sklearn HGBM.
LightGBM (preferred):
pip install lightgbm
Set USE_LIGHTGBM = True below.
Fallback: sklearn HistGradientBoostingClassifier.
Same algorithm family, native NaN support, comparable speed.
Feature importances use permutation importance (val set).
Interface is identical regardless of backend:
.fit() β†’ trains + calibrates
.predict_win_prob() β†’ P(win) per row
.feature_importances_ β†’ normalized importance array
"""
import numpy as np
try:
import lightgbm as lgb
_LGBM_AVAILABLE = True
except ImportError:
_LGBM_AVAILABLE = False
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.calibration import CalibratedClassifierCV
from sklearn.inspection import permutation_importance
USE_LIGHTGBM = False # Set True after: pip install lightgbm
def _build_lgbm(p: dict):
return lgb.LGBMClassifier(
n_estimators = p.get("n_estimators", 400),
learning_rate = p.get("learning_rate", 0.03),
max_depth = p.get("max_depth", 5),
min_child_samples = p.get("min_samples_leaf", 40),
reg_lambda = p.get("l2_regularization", 2.0),
feature_fraction = p.get("max_features", 0.70),
subsample = 0.80,
subsample_freq = 1,
n_jobs = -1,
random_state = p.get("random_state", 42),
verbosity = -1,
objective = "binary",
metric = "binary_logloss",
early_stopping_rounds = p.get("early_stopping_rounds", 30),
)
def _build_hgbm(p: dict):
return HistGradientBoostingClassifier(
max_iter = p.get("n_estimators", 400),
learning_rate = p.get("learning_rate", 0.03),
max_depth = p.get("max_depth", 5),
min_samples_leaf = p.get("min_samples_leaf", 40),
l2_regularization = p.get("l2_regularization", 2.0),
max_features = p.get("max_features", 0.70),
early_stopping = True,
validation_fraction = p.get("validation_fraction", 0.15),
n_iter_no_change = p.get("n_iter_no_change", 30),
random_state = p.get("random_state", 42),
verbose = 0,
)
class ModelBackend:
"""
Unified classifier. After fit():
.predict_proba(X) β†’ (N, 2) array
.predict_win_prob(X) β†’ (N,) array of P(win)
.feature_importances_ β†’ (n_features,) normalized importances
.n_iter_ β†’ actual boosting rounds used
"""
def __init__(self, params: dict, calibrate: bool = True):
self.params = params
self.calibrate = calibrate
self._base = None
self._model = None
self.feature_importances_: np.ndarray = np.array([])
self.n_iter_: int = 0
self._backend_name = "lightgbm" if (USE_LIGHTGBM and _LGBM_AVAILABLE) else "hgbm"
@property
def backend_name(self) -> str:
return self._backend_name
def fit(
self,
X_train: np.ndarray,
y_train: np.ndarray,
X_val: np.ndarray = None,
y_val: np.ndarray = None,
sample_weight: np.ndarray = None,
) -> "ModelBackend":
sw = sample_weight
if self._backend_name == "lightgbm":
self._base = _build_lgbm(self.params)
kw = {}
if X_val is not None:
kw["eval_set"] = [(X_val, y_val)]
if sw is not None:
kw["sample_weight"] = sw
self._base.fit(X_train, y_train, **kw)
self.n_iter_ = int(getattr(self._base, "best_iteration_", 0))
else:
self._base = _build_hgbm(self.params)
kw = {}
if sw is not None:
kw["sample_weight"] = sw
self._base.fit(X_train, y_train, **kw)
self.n_iter_ = int(getattr(self._base, "n_iter_", self.params.get("n_estimators", 400)))
# Isotonic calibration on val set (improves probability reliability)
if (self.calibrate and X_val is not None and
len(X_val) >= 50 and len(np.unique(y_val)) == 2):
cal = CalibratedClassifierCV(self._base, method="isotonic", cv=5)
cal.fit(X_val, y_val)
self._model = cal
else:
self._model = self._base
# Feature importances
self._compute_importances(X_val, y_val)
return self
def _compute_importances(self, X_val: np.ndarray = None, y_val: np.ndarray = None):
base = self._base
if base is None:
return
# LightGBM exposes feature_importances_ directly
if hasattr(base, "feature_importances_"):
imp = np.array(base.feature_importances_, dtype=np.float64)
# HGBM: use permutation importance on val set
elif X_val is not None and len(X_val) >= 20:
result = permutation_importance(
base, X_val, y_val,
n_repeats=5,
random_state=42,
n_jobs=-1,
)
imp = np.maximum(result.importances_mean, 0.0)
else:
# Fallback: uniform importances
n_feat = getattr(base, "n_features_in_", 1)
imp = np.ones(n_feat, dtype=np.float64)
# Normalize to sum to 1
total = imp.sum()
self.feature_importances_ = imp / total if total > 0 else imp
def predict_proba(self, X: np.ndarray) -> np.ndarray:
if self._model is None:
raise RuntimeError("Call .fit() before .predict_proba().")
return self._model.predict_proba(X)
def predict_win_prob(self, X: np.ndarray) -> np.ndarray:
"""Return 1-D array of P(win) for each row."""
return self.predict_proba(X)[:, 1]