Spaces:
Running
Running
| from __future__ import annotations | |
| import argparse | |
| import json | |
| import sys | |
| from dataclasses import asdict, dataclass | |
| from pathlib import Path | |
| sys.path.insert(0, str(Path(__file__).resolve().parents[1])) | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.ensemble import ExtraTreesClassifier | |
| from sklearn.impute import SimpleImputer | |
| from sklearn.linear_model import LogisticRegression | |
| from sklearn.metrics import accuracy_score, brier_score_loss, log_loss, roc_auc_score | |
| from sklearn.pipeline import make_pipeline | |
| from sklearn.preprocessing import StandardScaler | |
| from nifty_backend.runtime import ( | |
| DECISION_OVERLAYS, | |
| MODEL_DIR, | |
| MODEL_PATH, | |
| OPENING_DATASET_PATH, | |
| ProbabilityBlend, | |
| apply_decision_overlays, | |
| directional_confidence, | |
| predict_proba_up, | |
| ) | |
| DEFAULT_TRAIN_END = pd.Timestamp("2023-12-31") | |
| DEFAULT_VALID_END = pd.Timestamp("2025-08-17") | |
| RANDOM_SEED = 42 | |
| class RetrainSummary: | |
| model_name: str | |
| threshold: float | |
| train_rows: int | |
| valid_rows: int | |
| test_rows: int | |
| validation_accuracy: float | |
| test_accuracy: float | |
| validation_auc: float | |
| test_auc: float | |
| test_brier: float | |
| latest_prediction: str | |
| latest_prob_up: float | |
| latest_confidence: float | |
| feature_count: int | |
| def feature_columns(frame: pd.DataFrame) -> list[str]: | |
| excluded = { | |
| "date", | |
| "first5_start", | |
| "first5_end", | |
| "target", | |
| "day_open", | |
| "day_high", | |
| "day_low", | |
| "day_close", | |
| "day_volume", | |
| "day_return", | |
| } | |
| cols = [] | |
| for col in frame.columns: | |
| if col in excluded: | |
| continue | |
| if pd.api.types.is_numeric_dtype(frame[col]) and frame[col].notna().mean() >= 0.40: | |
| if frame[col].nunique(dropna=True) > 1: | |
| cols.append(col) | |
| return cols | |
| def best_threshold(y_true: np.ndarray, prob_up: np.ndarray) -> tuple[float, float]: | |
| thresholds = np.linspace(0.35, 0.65, 301) | |
| scores = ((prob_up[:, None] >= thresholds[None, :]) == y_true[:, None]).mean(axis=0) | |
| idx = int(np.argmax(scores)) | |
| return float(thresholds[idx]), float(scores[idx]) | |
| def score_auc(y_true: np.ndarray, prob_up: np.ndarray) -> float: | |
| if len(np.unique(y_true)) < 2: | |
| return float("nan") | |
| return float(roc_auc_score(y_true, prob_up)) | |
| def parse_args() -> argparse.Namespace: | |
| parser = argparse.ArgumentParser(description="Retrain the compact NIFTY opening-direction model from Parquet data.") | |
| parser.add_argument("--train-end", default=DEFAULT_TRAIN_END.date().isoformat()) | |
| parser.add_argument("--valid-end", default=DEFAULT_VALID_END.date().isoformat()) | |
| return parser.parse_args() | |
| def main() -> None: | |
| args = parse_args() | |
| train_end = pd.Timestamp(args.train_end) | |
| valid_end = pd.Timestamp(args.valid_end) | |
| frame = pd.read_parquet(OPENING_DATASET_PATH) | |
| frame["date"] = pd.to_datetime(frame["date"], errors="coerce") | |
| model_frame = frame.dropna(subset=["target"]).sort_values("date").reset_index(drop=True) | |
| features = feature_columns(model_frame) | |
| train_df = model_frame[model_frame["date"] <= train_end] | |
| valid_df = model_frame[(model_frame["date"] > train_end) & (model_frame["date"] <= valid_end)] | |
| test_df = model_frame[model_frame["date"] > valid_end] | |
| if train_df.empty or valid_df.empty or test_df.empty: | |
| raise RuntimeError("Training, validation, and test windows must all contain rows.") | |
| x_train = train_df[features] | |
| y_train = train_df["target"].to_numpy(dtype="int64") | |
| x_valid = valid_df[features] | |
| y_valid = valid_df["target"].to_numpy(dtype="int64") | |
| x_test = test_df[features] | |
| y_test = test_df["target"].to_numpy(dtype="int64") | |
| extra_trees = make_pipeline( | |
| SimpleImputer(strategy="median"), | |
| ExtraTreesClassifier( | |
| n_estimators=800, | |
| max_depth=4, | |
| min_samples_leaf=28, | |
| max_features=0.60, | |
| class_weight="balanced_subsample", | |
| random_state=RANDOM_SEED + 13, | |
| n_jobs=-1, | |
| ), | |
| ) | |
| logit = make_pipeline( | |
| SimpleImputer(strategy="median"), | |
| StandardScaler(), | |
| LogisticRegression(C=0.25, class_weight="balanced", max_iter=2000, random_state=RANDOM_SEED), | |
| ) | |
| extra_trees.fit(x_train, y_train) | |
| logit.fit(x_train, y_train) | |
| model = ProbabilityBlend([extra_trees, logit], np.array([0.75, 0.25])) | |
| valid_prob = predict_proba_up(model, x_valid) | |
| test_prob = predict_proba_up(model, x_test) | |
| threshold, _ = best_threshold(y_valid, valid_prob) | |
| valid_pred = apply_decision_overlays((valid_prob >= threshold).astype("int64"), valid_df, DECISION_OVERLAYS) | |
| test_pred = apply_decision_overlays((test_prob >= threshold).astype("int64"), test_df, DECISION_OVERLAYS) | |
| latest = frame.iloc[[-1]].copy() | |
| latest_prob = predict_proba_up(model, latest[features]) | |
| latest_pred = apply_decision_overlays((latest_prob >= threshold).astype("int64"), latest, DECISION_OVERLAYS) | |
| latest_conf = directional_confidence(latest_prob, latest_pred, threshold) | |
| payload = { | |
| "model": model, | |
| "features": features, | |
| "threshold": threshold, | |
| "target": "same-day NIFTY 50 close > same-day NIFTY 50 open after first five 1-minute bars", | |
| "model_name": "compact_extra_trees_logit_overlay", | |
| "decision_overlays": DECISION_OVERLAYS, | |
| } | |
| joblib.dump(payload, MODEL_PATH) | |
| summary = RetrainSummary( | |
| model_name=payload["model_name"], | |
| threshold=float(threshold), | |
| train_rows=int(len(train_df)), | |
| valid_rows=int(len(valid_df)), | |
| test_rows=int(len(test_df)), | |
| validation_accuracy=float(accuracy_score(y_valid, valid_pred)), | |
| test_accuracy=float(accuracy_score(y_test, test_pred)), | |
| validation_auc=score_auc(y_valid, valid_prob), | |
| test_auc=score_auc(y_test, test_prob), | |
| test_brier=float(brier_score_loss(y_test, np.clip(test_prob, 1e-6, 1 - 1e-6))), | |
| latest_prediction="UP" if int(latest_pred[0]) == 1 else "DOWN", | |
| latest_prob_up=float(latest_prob[0]), | |
| latest_confidence=float(latest_conf[0]), | |
| feature_count=int(len(features)), | |
| ) | |
| (MODEL_DIR / "summary.json").write_text(json.dumps(asdict(summary), indent=2), encoding="utf-8") | |
| pd.DataFrame([asdict(summary)]).to_csv(MODEL_DIR / "retrain_summary.csv", index=False) | |
| print(asdict(summary)) | |
| if __name__ == "__main__": | |
| main() | |