| |
| import pickle |
|
|
| import lightgbm as lgb |
| import numpy as np |
| import pandas as pd |
| from sklearn.metrics import accuracy_score, f1_score, mean_squared_error |
|
|
|
|
| class StockNewsModel: |
| def __init__(self): |
| self.m_dir = None |
| self.m_hh = None |
| self.m_ret = None |
| self.feature_names = None |
| self.dir_threshold = 0.5 |
| self.hh_threshold = 0.5 |
|
|
| def _build_sample_weights(self, X): |
| weights = np.ones(len(X), dtype=float) |
| if len(X) == 0: |
| return weights |
|
|
| if "source_tier" in X.columns: |
| weights *= 1.0 + 0.18 * X["source_tier"].fillna(0).clip(0, 3).to_numpy(float) |
| if "event_total" in X.columns: |
| weights *= 1.0 + 0.06 * np.log1p(X["event_total"].fillna(0).clip(lower=0).to_numpy(float)) |
| if "n_urgent" in X.columns: |
| weights *= 1.0 + 0.04 * X["n_urgent"].fillna(0).clip(0, 5).to_numpy(float) |
| if "sent_abs" in X.columns: |
| weights *= 1.0 + 0.03 * X["sent_abs"].fillna(0).clip(0, 1).to_numpy(float) |
| if "signal_intensity" in X.columns: |
| weights *= 1.0 + 0.05 * X["signal_intensity"].fillna(0).clip(0, 5).to_numpy(float) |
| if "is_market_hrs" in X.columns: |
| weights *= 1.0 + 0.02 * X["is_market_hrs"].fillna(0).to_numpy(float) |
|
|
| recency = np.linspace(0.9, 1.15, len(X)) |
| weights *= recency |
| return np.clip(weights, 0.75, 4.0) |
|
|
| def _tune_threshold(self, y_true, y_prob): |
| y_true = np.asarray(y_true).astype(int) |
| y_prob = np.asarray(y_prob).astype(float) |
| best_threshold = 0.5 |
| best_score = -1.0 |
| for threshold in np.linspace(0.05, 0.95, 37): |
| y_pred = (y_prob >= threshold).astype(int) |
| acc = accuracy_score(y_true, y_pred) |
| f1 = f1_score(y_true, y_pred, zero_division=0) |
| score = (0.55 * acc) + (0.45 * f1) |
| if score > best_score: |
| best_score = score |
| best_threshold = float(threshold) |
| return best_threshold |
|
|
| def _split_fit_val(self, X, *arrays, val_frac=0.2): |
| n = len(X) |
| if n < 30: |
| split = max(n - 5, 1) |
| else: |
| split = int(n * (1.0 - val_frac)) |
| split = min(max(split, 20), n - 5) |
|
|
| train_slice = slice(0, split) |
| val_slice = slice(split, n) |
| split_arrays = [] |
| for arr in arrays: |
| if hasattr(arr, "iloc"): |
| split_arrays.append((arr.iloc[train_slice], arr.iloc[val_slice])) |
| else: |
| arr = np.asarray(arr) |
| split_arrays.append((arr[train_slice], arr[val_slice])) |
| return (X.iloc[train_slice], X.iloc[val_slice], *split_arrays) |
|
|
| def _train_classifier(self, X, y, extra_params=None, sample_weight=None, eval_set=None): |
| params = { |
| "objective": "binary", |
| "metric": ["binary_logloss", "auc"], |
| "boosting_type": "gbdt", |
| "num_leaves": 48, |
| "learning_rate": 0.02, |
| "feature_fraction": 0.85, |
| "bagging_fraction": 0.8, |
| "bagging_freq": 1, |
| "min_child_samples": 12, |
| "reg_alpha": 0.35, |
| "reg_lambda": 0.65, |
| "n_estimators": 1200, |
| "random_state": 42, |
| "verbose": -1, |
| "n_jobs": -1, |
| } |
| if extra_params: |
| params.update(extra_params) |
|
|
| model = lgb.LGBMClassifier(**params) |
| fit_kwargs = {} |
| if sample_weight is not None: |
| fit_kwargs["sample_weight"] = sample_weight |
| if eval_set is not None: |
| fit_kwargs["eval_set"] = [eval_set] |
| fit_kwargs["eval_metric"] = "binary_logloss" |
| fit_kwargs["callbacks"] = [lgb.early_stopping(50, verbose=False)] |
| model.fit(X, y, **fit_kwargs) |
| return model |
|
|
| def _train_regressor(self, X, y, extra_params=None, sample_weight=None, eval_set=None): |
| params = { |
| "objective": "huber", |
| "metric": "rmse", |
| "boosting_type": "gbdt", |
| "num_leaves": 48, |
| "learning_rate": 0.02, |
| "feature_fraction": 0.85, |
| "bagging_fraction": 0.8, |
| "bagging_freq": 1, |
| "min_child_samples": 12, |
| "reg_alpha": 0.2, |
| "reg_lambda": 0.6, |
| "alpha": 0.9, |
| "n_estimators": 1500, |
| "random_state": 42, |
| "verbose": -1, |
| "n_jobs": -1, |
| } |
| if extra_params: |
| params.update(extra_params) |
|
|
| model = lgb.LGBMRegressor(**params) |
| fit_kwargs = {} |
| if sample_weight is not None: |
| fit_kwargs["sample_weight"] = sample_weight |
| if eval_set is not None: |
| fit_kwargs["eval_set"] = [eval_set] |
| fit_kwargs["eval_metric"] = "rmse" |
| fit_kwargs["callbacks"] = [lgb.early_stopping(60, verbose=False)] |
| model.fit(X, y, **fit_kwargs) |
| return model |
|
|
| def _best_iteration(self, model, fallback): |
| best_iter = getattr(model, "best_iteration_", None) |
| if best_iter is None or best_iter <= 0: |
| return int(fallback) |
| return int(max(best_iter, 50)) |
|
|
| def _predict_with_thresholds(self, X_test, test_df): |
| res = test_df.copy() |
| dir_prob = self.m_dir.predict_proba(X_test)[:, 1] |
| hh_prob = self.m_hh.predict_proba(X_test)[:, 1] |
| ret_pred = self.m_ret.predict(X_test) |
|
|
| res["pred_dir_prob"] = dir_prob |
| res["pred_hh_prob"] = hh_prob |
| res["pred_ret"] = ret_pred |
| res["pred_ret_pct"] = ret_pred * 100 |
| res["pred_dir"] = (dir_prob >= self.dir_threshold).astype(int) |
| res["pred_hh"] = (hh_prob >= self.hh_threshold).astype(int) |
| res["pred_dir_label"] = res["pred_dir"].map({1: "UP", 0: "DOWN"}) |
| res["impact"] = ( |
| res["pred_hh_prob"] * 0.4 |
| + np.abs(res["pred_ret"]) * 20 * 0.3 |
| + np.abs(res["pred_dir_prob"] - 0.5) * 2 * 0.3 |
| ) |
| return res.sort_values("impact", ascending=False) |
|
|
| def train_and_evaluate(self, X_train, y_dir, y_hh, y_ret, X_test, test_df): |
| self.feature_names = list(X_train.columns) |
|
|
| fit_X, val_X, (y_dir_fit, y_dir_val), (y_hh_fit, y_hh_val), (y_ret_fit, y_ret_val) = self._split_fit_val( |
| X_train, y_dir, y_hh, y_ret |
| ) |
|
|
| fit_dir_w = self._build_sample_weights(fit_X) |
| fit_hh_w = self._build_sample_weights(fit_X) |
| fit_ret_w = self._build_sample_weights(fit_X) |
| full_dir_w = self._build_sample_weights(X_train) |
| full_hh_w = self._build_sample_weights(X_train) |
| full_ret_w = self._build_sample_weights(X_train) |
|
|
| n_pos_dir = int(np.sum(y_dir_fit)) |
| n_neg_dir = int(len(y_dir_fit) - n_pos_dir) |
| dir_scale = n_neg_dir / max(n_pos_dir, 1) |
|
|
| n_pos_hh = int(np.sum(y_hh_fit)) |
| n_neg_hh = int(len(y_hh_fit) - n_pos_hh) |
| hh_scale = n_neg_hh / max(n_pos_hh, 1) |
|
|
| val_dir_model = self._train_classifier( |
| fit_X, |
| y_dir_fit, |
| extra_params={"scale_pos_weight": dir_scale}, |
| sample_weight=fit_dir_w, |
| eval_set=(val_X, y_dir_val), |
| ) |
| val_hh_model = self._train_classifier( |
| fit_X, |
| y_hh_fit, |
| extra_params={"scale_pos_weight": hh_scale}, |
| sample_weight=fit_hh_w, |
| eval_set=(val_X, y_hh_val), |
| ) |
| val_ret_model = self._train_regressor( |
| fit_X, |
| y_ret_fit, |
| sample_weight=fit_ret_w, |
| eval_set=(val_X, y_ret_val), |
| ) |
|
|
| if len(val_X) > 0: |
| val_dir_prob = val_dir_model.predict_proba(val_X)[:, 1] |
| val_hh_prob = val_hh_model.predict_proba(val_X)[:, 1] |
| self.dir_threshold = self._tune_threshold(y_dir_val, val_dir_prob) |
| self.hh_threshold = self._tune_threshold(y_hh_val, val_hh_prob) |
|
|
| dir_trees = self._best_iteration(val_dir_model, 900) |
| hh_trees = self._best_iteration(val_hh_model, 900) |
| ret_trees = self._best_iteration(val_ret_model, 1000) |
|
|
| self.m_dir = self._train_classifier( |
| X_train, |
| y_dir, |
| extra_params={ |
| "scale_pos_weight": int(np.sum(y_dir == 0)) / max(int(np.sum(y_dir == 1)), 1), |
| "n_estimators": dir_trees, |
| }, |
| sample_weight=full_dir_w, |
| ) |
| self.m_hh = self._train_classifier( |
| X_train, |
| y_hh, |
| extra_params={ |
| "scale_pos_weight": int(np.sum(y_hh == 0)) / max(int(np.sum(y_hh == 1)), 1), |
| "n_estimators": hh_trees, |
| }, |
| sample_weight=full_hh_w, |
| ) |
| self.m_ret = self._train_regressor( |
| X_train, |
| y_ret, |
| extra_params={"n_estimators": ret_trees}, |
| sample_weight=full_ret_w, |
| ) |
|
|
| test_results = self._predict_with_thresholds(X_test, test_df) |
| return test_results |
|
|
| def predict_new(self, X): |
| res = pd.DataFrame(index=X.index) |
| dir_prob = self.m_dir.predict_proba(X)[:, 1] |
| hh_prob = self.m_hh.predict_proba(X)[:, 1] |
| ret_pred = self.m_ret.predict(X) |
| res["pred_dir_prob"] = dir_prob |
| res["pred_hh_prob"] = hh_prob |
| res["pred_ret"] = ret_pred |
| res["pred_ret_pct"] = ret_pred * 100 |
| res["pred_dir"] = (dir_prob >= self.dir_threshold).astype(int) |
| res["pred_hh"] = (hh_prob >= self.hh_threshold).astype(int) |
| res["impact"] = ( |
| res["pred_hh_prob"] * 0.4 |
| + np.abs(res["pred_ret"]) * 20 * 0.3 |
| + np.abs(res["pred_dir_prob"] - 0.5) * 2 * 0.3 |
| ) |
| return res |
|
|
| def save(self, path): |
| with open(path, "wb") as f: |
| pickle.dump( |
| { |
| "m_dir": self.m_dir, |
| "m_hh": self.m_hh, |
| "m_ret": self.m_ret, |
| "features": self.feature_names, |
| "dir_threshold": self.dir_threshold, |
| "hh_threshold": self.hh_threshold, |
| }, |
| f, |
| ) |
|
|
| @classmethod |
| def load(cls, path): |
| with open(path, "rb") as f: |
| data = pickle.load(f) |
| obj = cls() |
| obj.m_dir = data["m_dir"] |
| obj.m_hh = data["m_hh"] |
| obj.m_ret = data["m_ret"] |
| obj.feature_names = data["features"] |
| obj.dir_threshold = data.get("dir_threshold", 0.5) |
| obj.hh_threshold = data.get("hh_threshold", 0.5) |
| return obj |
|
|