PREDICTIONSITE_backup / scripts /retrain_opening_model.py
Jitendra12421's picture
Upload 20 files
bf4b95a verified
Raw
History Blame Contribute Delete
6.47 kB
from __future__ import annotations
import argparse
import json
import sys
from dataclasses import asdict, dataclass
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
import joblib
import numpy as np
import pandas as pd
from sklearn.ensemble import ExtraTreesClassifier
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, brier_score_loss, log_loss, roc_auc_score
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler
from nifty_backend.runtime import (
DECISION_OVERLAYS,
MODEL_DIR,
MODEL_PATH,
OPENING_DATASET_PATH,
ProbabilityBlend,
apply_decision_overlays,
directional_confidence,
predict_proba_up,
)
DEFAULT_TRAIN_END = pd.Timestamp("2023-12-31")
DEFAULT_VALID_END = pd.Timestamp("2025-08-17")
RANDOM_SEED = 42
@dataclass
class RetrainSummary:
model_name: str
threshold: float
train_rows: int
valid_rows: int
test_rows: int
validation_accuracy: float
test_accuracy: float
validation_auc: float
test_auc: float
test_brier: float
latest_prediction: str
latest_prob_up: float
latest_confidence: float
feature_count: int
def feature_columns(frame: pd.DataFrame) -> list[str]:
excluded = {
"date",
"first5_start",
"first5_end",
"target",
"day_open",
"day_high",
"day_low",
"day_close",
"day_volume",
"day_return",
}
cols = []
for col in frame.columns:
if col in excluded:
continue
if pd.api.types.is_numeric_dtype(frame[col]) and frame[col].notna().mean() >= 0.40:
if frame[col].nunique(dropna=True) > 1:
cols.append(col)
return cols
def best_threshold(y_true: np.ndarray, prob_up: np.ndarray) -> tuple[float, float]:
thresholds = np.linspace(0.35, 0.65, 301)
scores = ((prob_up[:, None] >= thresholds[None, :]) == y_true[:, None]).mean(axis=0)
idx = int(np.argmax(scores))
return float(thresholds[idx]), float(scores[idx])
def score_auc(y_true: np.ndarray, prob_up: np.ndarray) -> float:
if len(np.unique(y_true)) < 2:
return float("nan")
return float(roc_auc_score(y_true, prob_up))
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Retrain the compact NIFTY opening-direction model from Parquet data.")
parser.add_argument("--train-end", default=DEFAULT_TRAIN_END.date().isoformat())
parser.add_argument("--valid-end", default=DEFAULT_VALID_END.date().isoformat())
return parser.parse_args()
def main() -> None:
args = parse_args()
train_end = pd.Timestamp(args.train_end)
valid_end = pd.Timestamp(args.valid_end)
frame = pd.read_parquet(OPENING_DATASET_PATH)
frame["date"] = pd.to_datetime(frame["date"], errors="coerce")
model_frame = frame.dropna(subset=["target"]).sort_values("date").reset_index(drop=True)
features = feature_columns(model_frame)
train_df = model_frame[model_frame["date"] <= train_end]
valid_df = model_frame[(model_frame["date"] > train_end) & (model_frame["date"] <= valid_end)]
test_df = model_frame[model_frame["date"] > valid_end]
if train_df.empty or valid_df.empty or test_df.empty:
raise RuntimeError("Training, validation, and test windows must all contain rows.")
x_train = train_df[features]
y_train = train_df["target"].to_numpy(dtype="int64")
x_valid = valid_df[features]
y_valid = valid_df["target"].to_numpy(dtype="int64")
x_test = test_df[features]
y_test = test_df["target"].to_numpy(dtype="int64")
extra_trees = make_pipeline(
SimpleImputer(strategy="median"),
ExtraTreesClassifier(
n_estimators=800,
max_depth=4,
min_samples_leaf=28,
max_features=0.60,
class_weight="balanced_subsample",
random_state=RANDOM_SEED + 13,
n_jobs=-1,
),
)
logit = make_pipeline(
SimpleImputer(strategy="median"),
StandardScaler(),
LogisticRegression(C=0.25, class_weight="balanced", max_iter=2000, random_state=RANDOM_SEED),
)
extra_trees.fit(x_train, y_train)
logit.fit(x_train, y_train)
model = ProbabilityBlend([extra_trees, logit], np.array([0.75, 0.25]))
valid_prob = predict_proba_up(model, x_valid)
test_prob = predict_proba_up(model, x_test)
threshold, _ = best_threshold(y_valid, valid_prob)
valid_pred = apply_decision_overlays((valid_prob >= threshold).astype("int64"), valid_df, DECISION_OVERLAYS)
test_pred = apply_decision_overlays((test_prob >= threshold).astype("int64"), test_df, DECISION_OVERLAYS)
latest = frame.iloc[[-1]].copy()
latest_prob = predict_proba_up(model, latest[features])
latest_pred = apply_decision_overlays((latest_prob >= threshold).astype("int64"), latest, DECISION_OVERLAYS)
latest_conf = directional_confidence(latest_prob, latest_pred, threshold)
payload = {
"model": model,
"features": features,
"threshold": threshold,
"target": "same-day NIFTY 50 close > same-day NIFTY 50 open after first five 1-minute bars",
"model_name": "compact_extra_trees_logit_overlay",
"decision_overlays": DECISION_OVERLAYS,
}
joblib.dump(payload, MODEL_PATH)
summary = RetrainSummary(
model_name=payload["model_name"],
threshold=float(threshold),
train_rows=int(len(train_df)),
valid_rows=int(len(valid_df)),
test_rows=int(len(test_df)),
validation_accuracy=float(accuracy_score(y_valid, valid_pred)),
test_accuracy=float(accuracy_score(y_test, test_pred)),
validation_auc=score_auc(y_valid, valid_prob),
test_auc=score_auc(y_test, test_prob),
test_brier=float(brier_score_loss(y_test, np.clip(test_prob, 1e-6, 1 - 1e-6))),
latest_prediction="UP" if int(latest_pred[0]) == 1 else "DOWN",
latest_prob_up=float(latest_prob[0]),
latest_confidence=float(latest_conf[0]),
feature_count=int(len(features)),
)
(MODEL_DIR / "summary.json").write_text(json.dumps(asdict(summary), indent=2), encoding="utf-8")
pd.DataFrame([asdict(summary)]).to_csv(MODEL_DIR / "retrain_summary.csv", index=False)
print(asdict(summary))
if __name__ == "__main__":
main()