Jitendra12421's picture
Upload 5 files
d3ce5a6 verified
# model.py
import pickle
import lightgbm as lgb
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score, f1_score, mean_squared_error
class StockNewsModel:
def __init__(self):
self.m_dir = None
self.m_hh = None
self.m_ret = None
self.feature_names = None
self.dir_threshold = 0.5
self.hh_threshold = 0.5
def _build_sample_weights(self, X):
weights = np.ones(len(X), dtype=float)
if len(X) == 0:
return weights
if "source_tier" in X.columns:
weights *= 1.0 + 0.18 * X["source_tier"].fillna(0).clip(0, 3).to_numpy(float)
if "event_total" in X.columns:
weights *= 1.0 + 0.06 * np.log1p(X["event_total"].fillna(0).clip(lower=0).to_numpy(float))
if "n_urgent" in X.columns:
weights *= 1.0 + 0.04 * X["n_urgent"].fillna(0).clip(0, 5).to_numpy(float)
if "sent_abs" in X.columns:
weights *= 1.0 + 0.03 * X["sent_abs"].fillna(0).clip(0, 1).to_numpy(float)
if "signal_intensity" in X.columns:
weights *= 1.0 + 0.05 * X["signal_intensity"].fillna(0).clip(0, 5).to_numpy(float)
if "is_market_hrs" in X.columns:
weights *= 1.0 + 0.02 * X["is_market_hrs"].fillna(0).to_numpy(float)
recency = np.linspace(0.9, 1.15, len(X))
weights *= recency
return np.clip(weights, 0.75, 4.0)
def _tune_threshold(self, y_true, y_prob):
y_true = np.asarray(y_true).astype(int)
y_prob = np.asarray(y_prob).astype(float)
best_threshold = 0.5
best_score = -1.0
for threshold in np.linspace(0.05, 0.95, 37):
y_pred = (y_prob >= threshold).astype(int)
acc = accuracy_score(y_true, y_pred)
f1 = f1_score(y_true, y_pred, zero_division=0)
score = (0.55 * acc) + (0.45 * f1)
if score > best_score:
best_score = score
best_threshold = float(threshold)
return best_threshold
def _split_fit_val(self, X, *arrays, val_frac=0.2):
n = len(X)
if n < 30:
split = max(n - 5, 1)
else:
split = int(n * (1.0 - val_frac))
split = min(max(split, 20), n - 5)
train_slice = slice(0, split)
val_slice = slice(split, n)
split_arrays = []
for arr in arrays:
if hasattr(arr, "iloc"):
split_arrays.append((arr.iloc[train_slice], arr.iloc[val_slice]))
else:
arr = np.asarray(arr)
split_arrays.append((arr[train_slice], arr[val_slice]))
return (X.iloc[train_slice], X.iloc[val_slice], *split_arrays)
def _train_classifier(self, X, y, extra_params=None, sample_weight=None, eval_set=None):
params = {
"objective": "binary",
"metric": ["binary_logloss", "auc"],
"boosting_type": "gbdt",
"num_leaves": 48,
"learning_rate": 0.02,
"feature_fraction": 0.85,
"bagging_fraction": 0.8,
"bagging_freq": 1,
"min_child_samples": 12,
"reg_alpha": 0.35,
"reg_lambda": 0.65,
"n_estimators": 1200,
"random_state": 42,
"verbose": -1,
"n_jobs": -1,
}
if extra_params:
params.update(extra_params)
model = lgb.LGBMClassifier(**params)
fit_kwargs = {}
if sample_weight is not None:
fit_kwargs["sample_weight"] = sample_weight
if eval_set is not None:
fit_kwargs["eval_set"] = [eval_set]
fit_kwargs["eval_metric"] = "binary_logloss"
fit_kwargs["callbacks"] = [lgb.early_stopping(50, verbose=False)]
model.fit(X, y, **fit_kwargs)
return model
def _train_regressor(self, X, y, extra_params=None, sample_weight=None, eval_set=None):
params = {
"objective": "huber",
"metric": "rmse",
"boosting_type": "gbdt",
"num_leaves": 48,
"learning_rate": 0.02,
"feature_fraction": 0.85,
"bagging_fraction": 0.8,
"bagging_freq": 1,
"min_child_samples": 12,
"reg_alpha": 0.2,
"reg_lambda": 0.6,
"alpha": 0.9,
"n_estimators": 1500,
"random_state": 42,
"verbose": -1,
"n_jobs": -1,
}
if extra_params:
params.update(extra_params)
model = lgb.LGBMRegressor(**params)
fit_kwargs = {}
if sample_weight is not None:
fit_kwargs["sample_weight"] = sample_weight
if eval_set is not None:
fit_kwargs["eval_set"] = [eval_set]
fit_kwargs["eval_metric"] = "rmse"
fit_kwargs["callbacks"] = [lgb.early_stopping(60, verbose=False)]
model.fit(X, y, **fit_kwargs)
return model
def _best_iteration(self, model, fallback):
best_iter = getattr(model, "best_iteration_", None)
if best_iter is None or best_iter <= 0:
return int(fallback)
return int(max(best_iter, 50))
def _predict_with_thresholds(self, X_test, test_df):
res = test_df.copy()
dir_prob = self.m_dir.predict_proba(X_test)[:, 1]
hh_prob = self.m_hh.predict_proba(X_test)[:, 1]
ret_pred = self.m_ret.predict(X_test)
res["pred_dir_prob"] = dir_prob
res["pred_hh_prob"] = hh_prob
res["pred_ret"] = ret_pred
res["pred_ret_pct"] = ret_pred * 100
res["pred_dir"] = (dir_prob >= self.dir_threshold).astype(int)
res["pred_hh"] = (hh_prob >= self.hh_threshold).astype(int)
res["pred_dir_label"] = res["pred_dir"].map({1: "UP", 0: "DOWN"})
res["impact"] = (
res["pred_hh_prob"] * 0.4
+ np.abs(res["pred_ret"]) * 20 * 0.3
+ np.abs(res["pred_dir_prob"] - 0.5) * 2 * 0.3
)
return res.sort_values("impact", ascending=False)
def train_and_evaluate(self, X_train, y_dir, y_hh, y_ret, X_test, test_df):
self.feature_names = list(X_train.columns)
fit_X, val_X, (y_dir_fit, y_dir_val), (y_hh_fit, y_hh_val), (y_ret_fit, y_ret_val) = self._split_fit_val(
X_train, y_dir, y_hh, y_ret
)
fit_dir_w = self._build_sample_weights(fit_X)
fit_hh_w = self._build_sample_weights(fit_X)
fit_ret_w = self._build_sample_weights(fit_X)
full_dir_w = self._build_sample_weights(X_train)
full_hh_w = self._build_sample_weights(X_train)
full_ret_w = self._build_sample_weights(X_train)
n_pos_dir = int(np.sum(y_dir_fit))
n_neg_dir = int(len(y_dir_fit) - n_pos_dir)
dir_scale = n_neg_dir / max(n_pos_dir, 1)
n_pos_hh = int(np.sum(y_hh_fit))
n_neg_hh = int(len(y_hh_fit) - n_pos_hh)
hh_scale = n_neg_hh / max(n_pos_hh, 1)
val_dir_model = self._train_classifier(
fit_X,
y_dir_fit,
extra_params={"scale_pos_weight": dir_scale},
sample_weight=fit_dir_w,
eval_set=(val_X, y_dir_val),
)
val_hh_model = self._train_classifier(
fit_X,
y_hh_fit,
extra_params={"scale_pos_weight": hh_scale},
sample_weight=fit_hh_w,
eval_set=(val_X, y_hh_val),
)
val_ret_model = self._train_regressor(
fit_X,
y_ret_fit,
sample_weight=fit_ret_w,
eval_set=(val_X, y_ret_val),
)
if len(val_X) > 0:
val_dir_prob = val_dir_model.predict_proba(val_X)[:, 1]
val_hh_prob = val_hh_model.predict_proba(val_X)[:, 1]
self.dir_threshold = self._tune_threshold(y_dir_val, val_dir_prob)
self.hh_threshold = self._tune_threshold(y_hh_val, val_hh_prob)
dir_trees = self._best_iteration(val_dir_model, 900)
hh_trees = self._best_iteration(val_hh_model, 900)
ret_trees = self._best_iteration(val_ret_model, 1000)
self.m_dir = self._train_classifier(
X_train,
y_dir,
extra_params={
"scale_pos_weight": int(np.sum(y_dir == 0)) / max(int(np.sum(y_dir == 1)), 1),
"n_estimators": dir_trees,
},
sample_weight=full_dir_w,
)
self.m_hh = self._train_classifier(
X_train,
y_hh,
extra_params={
"scale_pos_weight": int(np.sum(y_hh == 0)) / max(int(np.sum(y_hh == 1)), 1),
"n_estimators": hh_trees,
},
sample_weight=full_hh_w,
)
self.m_ret = self._train_regressor(
X_train,
y_ret,
extra_params={"n_estimators": ret_trees},
sample_weight=full_ret_w,
)
test_results = self._predict_with_thresholds(X_test, test_df)
return test_results
def predict_new(self, X):
res = pd.DataFrame(index=X.index)
dir_prob = self.m_dir.predict_proba(X)[:, 1]
hh_prob = self.m_hh.predict_proba(X)[:, 1]
ret_pred = self.m_ret.predict(X)
res["pred_dir_prob"] = dir_prob
res["pred_hh_prob"] = hh_prob
res["pred_ret"] = ret_pred
res["pred_ret_pct"] = ret_pred * 100
res["pred_dir"] = (dir_prob >= self.dir_threshold).astype(int)
res["pred_hh"] = (hh_prob >= self.hh_threshold).astype(int)
res["impact"] = (
res["pred_hh_prob"] * 0.4
+ np.abs(res["pred_ret"]) * 20 * 0.3
+ np.abs(res["pred_dir_prob"] - 0.5) * 2 * 0.3
)
return res
def save(self, path):
with open(path, "wb") as f:
pickle.dump(
{
"m_dir": self.m_dir,
"m_hh": self.m_hh,
"m_ret": self.m_ret,
"features": self.feature_names,
"dir_threshold": self.dir_threshold,
"hh_threshold": self.hh_threshold,
},
f,
)
@classmethod
def load(cls, path):
with open(path, "rb") as f:
data = pickle.load(f)
obj = cls()
obj.m_dir = data["m_dir"]
obj.m_hh = data["m_hh"]
obj.m_ret = data["m_ret"]
obj.feature_names = data["features"]
obj.dir_threshold = data.get("dir_threshold", 0.5)
obj.hh_threshold = data.get("hh_threshold", 0.5)
return obj