from __future__ import annotations import argparse import json import sys from dataclasses import asdict, dataclass from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parents[1])) import joblib import numpy as np import pandas as pd from sklearn.ensemble import ExtraTreesClassifier from sklearn.impute import SimpleImputer from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score, brier_score_loss, log_loss, roc_auc_score from sklearn.pipeline import make_pipeline from sklearn.preprocessing import StandardScaler from nifty_backend.runtime import ( DECISION_OVERLAYS, MODEL_DIR, MODEL_PATH, OPENING_DATASET_PATH, ProbabilityBlend, apply_decision_overlays, directional_confidence, predict_proba_up, ) DEFAULT_TRAIN_END = pd.Timestamp("2023-12-31") DEFAULT_VALID_END = pd.Timestamp("2025-08-17") RANDOM_SEED = 42 @dataclass class RetrainSummary: model_name: str threshold: float train_rows: int valid_rows: int test_rows: int validation_accuracy: float test_accuracy: float validation_auc: float test_auc: float test_brier: float latest_prediction: str latest_prob_up: float latest_confidence: float feature_count: int def feature_columns(frame: pd.DataFrame) -> list[str]: excluded = { "date", "first5_start", "first5_end", "target", "day_open", "day_high", "day_low", "day_close", "day_volume", "day_return", } cols = [] for col in frame.columns: if col in excluded: continue if pd.api.types.is_numeric_dtype(frame[col]) and frame[col].notna().mean() >= 0.40: if frame[col].nunique(dropna=True) > 1: cols.append(col) return cols def best_threshold(y_true: np.ndarray, prob_up: np.ndarray) -> tuple[float, float]: thresholds = np.linspace(0.35, 0.65, 301) scores = ((prob_up[:, None] >= thresholds[None, :]) == y_true[:, None]).mean(axis=0) idx = int(np.argmax(scores)) return float(thresholds[idx]), float(scores[idx]) def score_auc(y_true: np.ndarray, prob_up: np.ndarray) -> float: if len(np.unique(y_true)) < 2: return float("nan") return float(roc_auc_score(y_true, prob_up)) def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Retrain the compact NIFTY opening-direction model from Parquet data.") parser.add_argument("--train-end", default=DEFAULT_TRAIN_END.date().isoformat()) parser.add_argument("--valid-end", default=DEFAULT_VALID_END.date().isoformat()) return parser.parse_args() def main() -> None: args = parse_args() train_end = pd.Timestamp(args.train_end) valid_end = pd.Timestamp(args.valid_end) frame = pd.read_parquet(OPENING_DATASET_PATH) frame["date"] = pd.to_datetime(frame["date"], errors="coerce") model_frame = frame.dropna(subset=["target"]).sort_values("date").reset_index(drop=True) features = feature_columns(model_frame) train_df = model_frame[model_frame["date"] <= train_end] valid_df = model_frame[(model_frame["date"] > train_end) & (model_frame["date"] <= valid_end)] test_df = model_frame[model_frame["date"] > valid_end] if train_df.empty or valid_df.empty or test_df.empty: raise RuntimeError("Training, validation, and test windows must all contain rows.") x_train = train_df[features] y_train = train_df["target"].to_numpy(dtype="int64") x_valid = valid_df[features] y_valid = valid_df["target"].to_numpy(dtype="int64") x_test = test_df[features] y_test = test_df["target"].to_numpy(dtype="int64") extra_trees = make_pipeline( SimpleImputer(strategy="median"), ExtraTreesClassifier( n_estimators=800, max_depth=4, min_samples_leaf=28, max_features=0.60, class_weight="balanced_subsample", random_state=RANDOM_SEED + 13, n_jobs=-1, ), ) logit = make_pipeline( SimpleImputer(strategy="median"), StandardScaler(), LogisticRegression(C=0.25, class_weight="balanced", max_iter=2000, random_state=RANDOM_SEED), ) extra_trees.fit(x_train, y_train) logit.fit(x_train, y_train) model = ProbabilityBlend([extra_trees, logit], np.array([0.75, 0.25])) valid_prob = predict_proba_up(model, x_valid) test_prob = predict_proba_up(model, x_test) threshold, _ = best_threshold(y_valid, valid_prob) valid_pred = apply_decision_overlays((valid_prob >= threshold).astype("int64"), valid_df, DECISION_OVERLAYS) test_pred = apply_decision_overlays((test_prob >= threshold).astype("int64"), test_df, DECISION_OVERLAYS) latest = frame.iloc[[-1]].copy() latest_prob = predict_proba_up(model, latest[features]) latest_pred = apply_decision_overlays((latest_prob >= threshold).astype("int64"), latest, DECISION_OVERLAYS) latest_conf = directional_confidence(latest_prob, latest_pred, threshold) payload = { "model": model, "features": features, "threshold": threshold, "target": "same-day NIFTY 50 close > same-day NIFTY 50 open after first five 1-minute bars", "model_name": "compact_extra_trees_logit_overlay", "decision_overlays": DECISION_OVERLAYS, } joblib.dump(payload, MODEL_PATH) summary = RetrainSummary( model_name=payload["model_name"], threshold=float(threshold), train_rows=int(len(train_df)), valid_rows=int(len(valid_df)), test_rows=int(len(test_df)), validation_accuracy=float(accuracy_score(y_valid, valid_pred)), test_accuracy=float(accuracy_score(y_test, test_pred)), validation_auc=score_auc(y_valid, valid_prob), test_auc=score_auc(y_test, test_prob), test_brier=float(brier_score_loss(y_test, np.clip(test_prob, 1e-6, 1 - 1e-6))), latest_prediction="UP" if int(latest_pred[0]) == 1 else "DOWN", latest_prob_up=float(latest_prob[0]), latest_confidence=float(latest_conf[0]), feature_count=int(len(features)), ) (MODEL_DIR / "summary.json").write_text(json.dumps(asdict(summary), indent=2), encoding="utf-8") pd.DataFrame([asdict(summary)]).to_csv(MODEL_DIR / "retrain_summary.csv", index=False) print(asdict(summary)) if __name__ == "__main__": main()