# model.py import pickle import lightgbm as lgb import numpy as np import pandas as pd from sklearn.metrics import accuracy_score, f1_score, mean_squared_error class StockNewsModel: def __init__(self): self.m_dir = None self.m_hh = None self.m_ret = None self.feature_names = None self.dir_threshold = 0.5 self.hh_threshold = 0.5 def _build_sample_weights(self, X): weights = np.ones(len(X), dtype=float) if len(X) == 0: return weights if "source_tier" in X.columns: weights *= 1.0 + 0.18 * X["source_tier"].fillna(0).clip(0, 3).to_numpy(float) if "event_total" in X.columns: weights *= 1.0 + 0.06 * np.log1p(X["event_total"].fillna(0).clip(lower=0).to_numpy(float)) if "n_urgent" in X.columns: weights *= 1.0 + 0.04 * X["n_urgent"].fillna(0).clip(0, 5).to_numpy(float) if "sent_abs" in X.columns: weights *= 1.0 + 0.03 * X["sent_abs"].fillna(0).clip(0, 1).to_numpy(float) if "signal_intensity" in X.columns: weights *= 1.0 + 0.05 * X["signal_intensity"].fillna(0).clip(0, 5).to_numpy(float) if "is_market_hrs" in X.columns: weights *= 1.0 + 0.02 * X["is_market_hrs"].fillna(0).to_numpy(float) recency = np.linspace(0.9, 1.15, len(X)) weights *= recency return np.clip(weights, 0.75, 4.0) def _tune_threshold(self, y_true, y_prob): y_true = np.asarray(y_true).astype(int) y_prob = np.asarray(y_prob).astype(float) best_threshold = 0.5 best_score = -1.0 for threshold in np.linspace(0.05, 0.95, 37): y_pred = (y_prob >= threshold).astype(int) acc = accuracy_score(y_true, y_pred) f1 = f1_score(y_true, y_pred, zero_division=0) score = (0.55 * acc) + (0.45 * f1) if score > best_score: best_score = score best_threshold = float(threshold) return best_threshold def _split_fit_val(self, X, *arrays, val_frac=0.2): n = len(X) if n < 30: split = max(n - 5, 1) else: split = int(n * (1.0 - val_frac)) split = min(max(split, 20), n - 5) train_slice = slice(0, split) val_slice = slice(split, n) split_arrays = [] for arr in arrays: if hasattr(arr, "iloc"): split_arrays.append((arr.iloc[train_slice], arr.iloc[val_slice])) else: arr = np.asarray(arr) split_arrays.append((arr[train_slice], arr[val_slice])) return (X.iloc[train_slice], X.iloc[val_slice], *split_arrays) def _train_classifier(self, X, y, extra_params=None, sample_weight=None, eval_set=None): params = { "objective": "binary", "metric": ["binary_logloss", "auc"], "boosting_type": "gbdt", "num_leaves": 48, "learning_rate": 0.02, "feature_fraction": 0.85, "bagging_fraction": 0.8, "bagging_freq": 1, "min_child_samples": 12, "reg_alpha": 0.35, "reg_lambda": 0.65, "n_estimators": 1200, "random_state": 42, "verbose": -1, "n_jobs": -1, } if extra_params: params.update(extra_params) model = lgb.LGBMClassifier(**params) fit_kwargs = {} if sample_weight is not None: fit_kwargs["sample_weight"] = sample_weight if eval_set is not None: fit_kwargs["eval_set"] = [eval_set] fit_kwargs["eval_metric"] = "binary_logloss" fit_kwargs["callbacks"] = [lgb.early_stopping(50, verbose=False)] model.fit(X, y, **fit_kwargs) return model def _train_regressor(self, X, y, extra_params=None, sample_weight=None, eval_set=None): params = { "objective": "huber", "metric": "rmse", "boosting_type": "gbdt", "num_leaves": 48, "learning_rate": 0.02, "feature_fraction": 0.85, "bagging_fraction": 0.8, "bagging_freq": 1, "min_child_samples": 12, "reg_alpha": 0.2, "reg_lambda": 0.6, "alpha": 0.9, "n_estimators": 1500, "random_state": 42, "verbose": -1, "n_jobs": -1, } if extra_params: params.update(extra_params) model = lgb.LGBMRegressor(**params) fit_kwargs = {} if sample_weight is not None: fit_kwargs["sample_weight"] = sample_weight if eval_set is not None: fit_kwargs["eval_set"] = [eval_set] fit_kwargs["eval_metric"] = "rmse" fit_kwargs["callbacks"] = [lgb.early_stopping(60, verbose=False)] model.fit(X, y, **fit_kwargs) return model def _best_iteration(self, model, fallback): best_iter = getattr(model, "best_iteration_", None) if best_iter is None or best_iter <= 0: return int(fallback) return int(max(best_iter, 50)) def _predict_with_thresholds(self, X_test, test_df): res = test_df.copy() dir_prob = self.m_dir.predict_proba(X_test)[:, 1] hh_prob = self.m_hh.predict_proba(X_test)[:, 1] ret_pred = self.m_ret.predict(X_test) res["pred_dir_prob"] = dir_prob res["pred_hh_prob"] = hh_prob res["pred_ret"] = ret_pred res["pred_ret_pct"] = ret_pred * 100 res["pred_dir"] = (dir_prob >= self.dir_threshold).astype(int) res["pred_hh"] = (hh_prob >= self.hh_threshold).astype(int) res["pred_dir_label"] = res["pred_dir"].map({1: "UP", 0: "DOWN"}) res["impact"] = ( res["pred_hh_prob"] * 0.4 + np.abs(res["pred_ret"]) * 20 * 0.3 + np.abs(res["pred_dir_prob"] - 0.5) * 2 * 0.3 ) return res.sort_values("impact", ascending=False) def train_and_evaluate(self, X_train, y_dir, y_hh, y_ret, X_test, test_df): self.feature_names = list(X_train.columns) fit_X, val_X, (y_dir_fit, y_dir_val), (y_hh_fit, y_hh_val), (y_ret_fit, y_ret_val) = self._split_fit_val( X_train, y_dir, y_hh, y_ret ) fit_dir_w = self._build_sample_weights(fit_X) fit_hh_w = self._build_sample_weights(fit_X) fit_ret_w = self._build_sample_weights(fit_X) full_dir_w = self._build_sample_weights(X_train) full_hh_w = self._build_sample_weights(X_train) full_ret_w = self._build_sample_weights(X_train) n_pos_dir = int(np.sum(y_dir_fit)) n_neg_dir = int(len(y_dir_fit) - n_pos_dir) dir_scale = n_neg_dir / max(n_pos_dir, 1) n_pos_hh = int(np.sum(y_hh_fit)) n_neg_hh = int(len(y_hh_fit) - n_pos_hh) hh_scale = n_neg_hh / max(n_pos_hh, 1) val_dir_model = self._train_classifier( fit_X, y_dir_fit, extra_params={"scale_pos_weight": dir_scale}, sample_weight=fit_dir_w, eval_set=(val_X, y_dir_val), ) val_hh_model = self._train_classifier( fit_X, y_hh_fit, extra_params={"scale_pos_weight": hh_scale}, sample_weight=fit_hh_w, eval_set=(val_X, y_hh_val), ) val_ret_model = self._train_regressor( fit_X, y_ret_fit, sample_weight=fit_ret_w, eval_set=(val_X, y_ret_val), ) if len(val_X) > 0: val_dir_prob = val_dir_model.predict_proba(val_X)[:, 1] val_hh_prob = val_hh_model.predict_proba(val_X)[:, 1] self.dir_threshold = self._tune_threshold(y_dir_val, val_dir_prob) self.hh_threshold = self._tune_threshold(y_hh_val, val_hh_prob) dir_trees = self._best_iteration(val_dir_model, 900) hh_trees = self._best_iteration(val_hh_model, 900) ret_trees = self._best_iteration(val_ret_model, 1000) self.m_dir = self._train_classifier( X_train, y_dir, extra_params={ "scale_pos_weight": int(np.sum(y_dir == 0)) / max(int(np.sum(y_dir == 1)), 1), "n_estimators": dir_trees, }, sample_weight=full_dir_w, ) self.m_hh = self._train_classifier( X_train, y_hh, extra_params={ "scale_pos_weight": int(np.sum(y_hh == 0)) / max(int(np.sum(y_hh == 1)), 1), "n_estimators": hh_trees, }, sample_weight=full_hh_w, ) self.m_ret = self._train_regressor( X_train, y_ret, extra_params={"n_estimators": ret_trees}, sample_weight=full_ret_w, ) test_results = self._predict_with_thresholds(X_test, test_df) return test_results def predict_new(self, X): res = pd.DataFrame(index=X.index) dir_prob = self.m_dir.predict_proba(X)[:, 1] hh_prob = self.m_hh.predict_proba(X)[:, 1] ret_pred = self.m_ret.predict(X) res["pred_dir_prob"] = dir_prob res["pred_hh_prob"] = hh_prob res["pred_ret"] = ret_pred res["pred_ret_pct"] = ret_pred * 100 res["pred_dir"] = (dir_prob >= self.dir_threshold).astype(int) res["pred_hh"] = (hh_prob >= self.hh_threshold).astype(int) res["impact"] = ( res["pred_hh_prob"] * 0.4 + np.abs(res["pred_ret"]) * 20 * 0.3 + np.abs(res["pred_dir_prob"] - 0.5) * 2 * 0.3 ) return res def save(self, path): with open(path, "wb") as f: pickle.dump( { "m_dir": self.m_dir, "m_hh": self.m_hh, "m_ret": self.m_ret, "features": self.feature_names, "dir_threshold": self.dir_threshold, "hh_threshold": self.hh_threshold, }, f, ) @classmethod def load(cls, path): with open(path, "rb") as f: data = pickle.load(f) obj = cls() obj.m_dir = data["m_dir"] obj.m_hh = data["m_hh"] obj.m_ret = data["m_ret"] obj.feature_names = data["features"] obj.dir_threshold = data.get("dir_threshold", 0.5) obj.hh_threshold = data.get("hh_threshold", 0.5) return obj