| from __future__ import annotations |
|
|
| import argparse |
| import json |
| import sys |
| from dataclasses import asdict, dataclass |
| from pathlib import Path |
|
|
| sys.path.insert(0, str(Path(__file__).resolve().parents[1])) |
|
|
| import joblib |
| import numpy as np |
| import pandas as pd |
| from sklearn.ensemble import ExtraTreesClassifier |
| from sklearn.impute import SimpleImputer |
| from sklearn.linear_model import LogisticRegression |
| from sklearn.metrics import accuracy_score, brier_score_loss, log_loss, roc_auc_score |
| from sklearn.pipeline import make_pipeline |
| from sklearn.preprocessing import StandardScaler |
|
|
| from nifty_backend.runtime import ( |
| DECISION_OVERLAYS, |
| MODEL_DIR, |
| MODEL_PATH, |
| OPENING_DATASET_PATH, |
| ProbabilityBlend, |
| apply_decision_overlays, |
| directional_confidence, |
| predict_proba_up, |
| ) |
|
|
|
|
| DEFAULT_TRAIN_END = pd.Timestamp("2023-12-31") |
| DEFAULT_VALID_END = pd.Timestamp("2025-08-17") |
| RANDOM_SEED = 42 |
|
|
|
|
| @dataclass |
| class RetrainSummary: |
| model_name: str |
| threshold: float |
| train_rows: int |
| valid_rows: int |
| test_rows: int |
| validation_accuracy: float |
| test_accuracy: float |
| validation_auc: float |
| test_auc: float |
| test_brier: float |
| latest_prediction: str |
| latest_prob_up: float |
| latest_confidence: float |
| feature_count: int |
|
|
|
|
| def feature_columns(frame: pd.DataFrame) -> list[str]: |
| excluded = { |
| "date", |
| "first5_start", |
| "first5_end", |
| "target", |
| "day_open", |
| "day_high", |
| "day_low", |
| "day_close", |
| "day_volume", |
| "day_return", |
| } |
| cols = [] |
| for col in frame.columns: |
| if col in excluded: |
| continue |
| if pd.api.types.is_numeric_dtype(frame[col]) and frame[col].notna().mean() >= 0.40: |
| if frame[col].nunique(dropna=True) > 1: |
| cols.append(col) |
| return cols |
|
|
|
|
| def best_threshold(y_true: np.ndarray, prob_up: np.ndarray) -> tuple[float, float]: |
| thresholds = np.linspace(0.35, 0.65, 301) |
| scores = ((prob_up[:, None] >= thresholds[None, :]) == y_true[:, None]).mean(axis=0) |
| idx = int(np.argmax(scores)) |
| return float(thresholds[idx]), float(scores[idx]) |
|
|
|
|
| def score_auc(y_true: np.ndarray, prob_up: np.ndarray) -> float: |
| if len(np.unique(y_true)) < 2: |
| return float("nan") |
| return float(roc_auc_score(y_true, prob_up)) |
|
|
|
|
| def parse_args() -> argparse.Namespace: |
| parser = argparse.ArgumentParser(description="Retrain the compact NIFTY opening-direction model from Parquet data.") |
| parser.add_argument("--train-end", default=DEFAULT_TRAIN_END.date().isoformat()) |
| parser.add_argument("--valid-end", default=DEFAULT_VALID_END.date().isoformat()) |
| return parser.parse_args() |
|
|
|
|
| def main() -> None: |
| args = parse_args() |
| train_end = pd.Timestamp(args.train_end) |
| valid_end = pd.Timestamp(args.valid_end) |
| frame = pd.read_parquet(OPENING_DATASET_PATH) |
| frame["date"] = pd.to_datetime(frame["date"], errors="coerce") |
| model_frame = frame.dropna(subset=["target"]).sort_values("date").reset_index(drop=True) |
| features = feature_columns(model_frame) |
| train_df = model_frame[model_frame["date"] <= train_end] |
| valid_df = model_frame[(model_frame["date"] > train_end) & (model_frame["date"] <= valid_end)] |
| test_df = model_frame[model_frame["date"] > valid_end] |
| if train_df.empty or valid_df.empty or test_df.empty: |
| raise RuntimeError("Training, validation, and test windows must all contain rows.") |
|
|
| x_train = train_df[features] |
| y_train = train_df["target"].to_numpy(dtype="int64") |
| x_valid = valid_df[features] |
| y_valid = valid_df["target"].to_numpy(dtype="int64") |
| x_test = test_df[features] |
| y_test = test_df["target"].to_numpy(dtype="int64") |
|
|
| extra_trees = make_pipeline( |
| SimpleImputer(strategy="median"), |
| ExtraTreesClassifier( |
| n_estimators=800, |
| max_depth=4, |
| min_samples_leaf=28, |
| max_features=0.60, |
| class_weight="balanced_subsample", |
| random_state=RANDOM_SEED + 13, |
| n_jobs=-1, |
| ), |
| ) |
| logit = make_pipeline( |
| SimpleImputer(strategy="median"), |
| StandardScaler(), |
| LogisticRegression(C=0.25, class_weight="balanced", max_iter=2000, random_state=RANDOM_SEED), |
| ) |
| extra_trees.fit(x_train, y_train) |
| logit.fit(x_train, y_train) |
| model = ProbabilityBlend([extra_trees, logit], np.array([0.75, 0.25])) |
| valid_prob = predict_proba_up(model, x_valid) |
| test_prob = predict_proba_up(model, x_test) |
| threshold, _ = best_threshold(y_valid, valid_prob) |
| valid_pred = apply_decision_overlays((valid_prob >= threshold).astype("int64"), valid_df, DECISION_OVERLAYS) |
| test_pred = apply_decision_overlays((test_prob >= threshold).astype("int64"), test_df, DECISION_OVERLAYS) |
| latest = frame.iloc[[-1]].copy() |
| latest_prob = predict_proba_up(model, latest[features]) |
| latest_pred = apply_decision_overlays((latest_prob >= threshold).astype("int64"), latest, DECISION_OVERLAYS) |
| latest_conf = directional_confidence(latest_prob, latest_pred, threshold) |
|
|
| payload = { |
| "model": model, |
| "features": features, |
| "threshold": threshold, |
| "target": "same-day NIFTY 50 close > same-day NIFTY 50 open after first five 1-minute bars", |
| "model_name": "compact_extra_trees_logit_overlay", |
| "decision_overlays": DECISION_OVERLAYS, |
| } |
| joblib.dump(payload, MODEL_PATH) |
|
|
| summary = RetrainSummary( |
| model_name=payload["model_name"], |
| threshold=float(threshold), |
| train_rows=int(len(train_df)), |
| valid_rows=int(len(valid_df)), |
| test_rows=int(len(test_df)), |
| validation_accuracy=float(accuracy_score(y_valid, valid_pred)), |
| test_accuracy=float(accuracy_score(y_test, test_pred)), |
| validation_auc=score_auc(y_valid, valid_prob), |
| test_auc=score_auc(y_test, test_prob), |
| test_brier=float(brier_score_loss(y_test, np.clip(test_prob, 1e-6, 1 - 1e-6))), |
| latest_prediction="UP" if int(latest_pred[0]) == 1 else "DOWN", |
| latest_prob_up=float(latest_prob[0]), |
| latest_confidence=float(latest_conf[0]), |
| feature_count=int(len(features)), |
| ) |
| (MODEL_DIR / "summary.json").write_text(json.dumps(asdict(summary), indent=2), encoding="utf-8") |
| pd.DataFrame([asdict(summary)]).to_csv(MODEL_DIR / "retrain_summary.csv", index=False) |
| print(asdict(summary)) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|