| from __future__ import annotations
|
|
|
| import json
|
| import copy
|
| import os
|
| import sys
|
| import threading
|
| from dataclasses import dataclass
|
| from datetime import date, datetime, time, timedelta
|
| from functools import lru_cache
|
| from pathlib import Path
|
| from typing import Any
|
| from zoneinfo import ZoneInfo
|
|
|
| import joblib
|
| import numpy as np
|
| import pandas as pd
|
| from nifty_backend.yahoo_history_client import YahooHistoryClient
|
|
|
| try:
|
| import pandas_market_calendars as mcal
|
| except ImportError:
|
| mcal = None
|
|
|
|
|
| IST = ZoneInfo("Asia/Kolkata")
|
| YAHOO_NIFTY_SYMBOL = "^NSEI"
|
| MARKET_CLOSE = time(15, 30)
|
| FIRST5_READY = time(9, 20)
|
| CLOSE_REFRESH_READY = time(15, 45)
|
| TPLUS1_READY = time(14, 30)
|
| STALE_CHECK_INTERVAL_SECONDS = 5
|
| BACKEND_ROOT = Path(__file__).resolve().parents[1]
|
| DATA_DIR = BACKEND_ROOT / "data"
|
| MODEL_DIR = BACKEND_ROOT / "models"
|
| YAHOO_CACHE_PATH = MODEL_DIR / "yahoo_history_cache.sqlite3"
|
| OPENING_DATASET_PATH = DATA_DIR / "opening_direction_training_dataset.parquet"
|
| NIFTY_1M_PATH = DATA_DIR / "nifty50_1m.parquet"
|
| NIFTY_1D_PATH = DATA_DIR / "nifty50_1d.parquet"
|
| MODEL_PATH = MODEL_DIR / "nifty_opening_direction_model.joblib"
|
| LATEST_PATH = MODEL_DIR / "latest_prediction.csv"
|
| TEST_PREDICTIONS_PATH = DATA_DIR / "test_predictions.parquet"
|
| TOMORROW_MODEL_PATH = MODEL_DIR / "nifty_tomorrow_direction_model.joblib"
|
| TOMORROW_LATEST_PATH = MODEL_DIR / "tomorrow_latest_prediction.csv"
|
| TOMORROW_SUMMARY_PATH = MODEL_DIR / "tomorrow_summary.json"
|
| TOMORROW_TEST_PREDICTIONS_PATH = DATA_DIR / "tomorrow_test_predictions.parquet"
|
| TOMORROW_PREDICTION_HISTORY_PATH = MODEL_DIR / "tomorrow_prediction_history.parquet"
|
| FORECASTING_PROJECT_ROOT = Path(
|
| os.environ.get(
|
| "FORECASTING_PROJECT_ROOT",
|
| str(BACKEND_ROOT.parent.parent / "forecasting project"),
|
| )
|
| )
|
| DAILY_FORECASTER_OUTPUT_DIR = MODEL_DIR / "nifty_forecaster" / "outputs"
|
| DAILY_FORECASTER_SUMMARY_PATH = DAILY_FORECASTER_OUTPUT_DIR / "forecaster_summary.json"
|
| DAILY_FORECASTER_LATEST_PATH = DAILY_FORECASTER_OUTPUT_DIR / "forecaster_latest.csv"
|
| DAILY_FORECASTER_PREDICTIONS_PATH = DAILY_FORECASTER_OUTPUT_DIR / "forecaster_test_predictions.csv"
|
| MFE_SOURCE_OUTPUT_DIR = FORECASTING_PROJECT_ROOT / "Code" / "models" / "nifty_opening_mfe_regressor" / "outputs"
|
| MFE_OUTPUT_DIR = MODEL_DIR / "nifty_opening_mfe_regressor" / "outputs"
|
| MFE_SUMMARY_PATH = MFE_OUTPUT_DIR / "summary.json"
|
| MFE_LATEST_PATH = MFE_OUTPUT_DIR / "latest_prediction.csv"
|
| MFE_TEST_PREDICTIONS_PATH = MFE_OUTPUT_DIR / "test_predictions.csv"
|
| MFE_MODEL_PATH = MFE_OUTPUT_DIR / "nifty_opening_mfe_regressor.joblib"
|
| MFE_LIVE_HISTORY_PATH = MFE_OUTPUT_DIR / "mfe_live_history.csv"
|
| TPLUS1_MODEL_PATH = MODEL_DIR / "nifty_1420_tplus1_logistic_model.joblib"
|
| TPLUS1_LATEST_PATH = MODEL_DIR / "tplus1_latest_prediction.csv"
|
| TPLUS1_SUMMARY_PATH = MODEL_DIR / "tplus1_summary.json"
|
| TPLUS1_TEST_PREDICTIONS_PATH = DATA_DIR / "tplus1_test_predictions.parquet"
|
| TPLUS1_PREDICTION_HISTORY_PATH = MODEL_DIR / "tplus1_prediction_history.parquet"
|
| T5_PREDICTION_HISTORY_PATH = MODEL_DIR / "t5_prediction_history.parquet"
|
| REFRESH_STATE_PATH = MODEL_DIR / "refresh_state.json"
|
| REFRESH_WAITING = "waiting_second_payload"
|
| REFRESH_REFRESHING = "refreshing"
|
| REFRESH_READY = "ready"
|
| REFRESH_FAILED = "failed"
|
| REFRESH_NORMAL = "normal"
|
| LIVE_ACCURACY_PATH = MODEL_DIR / "live_accuracy.json"
|
|
|
| DECISION_OVERLAYS = [
|
| {
|
| "name": "fifth_minute_momentum_flip",
|
| "feature": "m5_ret_1m",
|
| "op": ">=",
|
| "value": 0.0005085411885759201,
|
| },
|
| {
|
| "name": "vix_stretch_flip",
|
| "feature": "india_vix_close_vs_sma_20",
|
| "op": ">=",
|
| "value": 0.24641908937959742,
|
| },
|
| ]
|
|
|
| _dashboard_payload_lock = threading.Lock()
|
| _stale_refresh_lock = threading.Lock()
|
|
|
|
|
| def utc_now_iso() -> str:
|
| return datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
|
|
|
|
|
| def clear_dashboard_payload_cache() -> None:
|
| _dashboard_payload_cached.cache_clear()
|
|
|
|
|
| def save_refresh_state(phase: str, *, session_date: date | None = None, error: str | None = None) -> dict[str, Any]:
|
| previous = load_refresh_state()
|
| state = {
|
| "phase": phase,
|
| "started_at": previous.get("started_at"),
|
| "finished_at": previous.get("finished_at"),
|
| "session_date": session_date.isoformat() if session_date else previous.get("session_date"),
|
| "error": error,
|
| }
|
| if phase in {REFRESH_WAITING, REFRESH_REFRESHING} and not state["started_at"]:
|
| state["started_at"] = utc_now_iso()
|
| if phase in {REFRESH_READY, REFRESH_FAILED, REFRESH_NORMAL}:
|
| state["finished_at"] = utc_now_iso()
|
| REFRESH_STATE_PATH.write_text(json.dumps(state, indent=2), encoding="utf-8")
|
| return state
|
|
|
|
|
| def load_refresh_state() -> dict[str, Any]:
|
| if not REFRESH_STATE_PATH.exists():
|
| return {
|
| "phase": REFRESH_NORMAL,
|
| "started_at": None,
|
| "finished_at": None,
|
| "session_date": None,
|
| "error": None,
|
| }
|
| try:
|
| return json.loads(REFRESH_STATE_PATH.read_text(encoding="utf-8"))
|
| except Exception:
|
| return {
|
| "phase": REFRESH_FAILED,
|
| "started_at": None,
|
| "finished_at": None,
|
| "session_date": None,
|
| "error": "refresh_state.json could not be read",
|
| }
|
|
|
|
|
| @lru_cache(maxsize=1)
|
| def _nse_calendar():
|
| if mcal is None:
|
| return None
|
| for name in ("XNSE", "NSE", "BSE"):
|
| try:
|
| return mcal.get_calendar(name)
|
| except Exception:
|
| continue
|
| return None
|
|
|
|
|
| @lru_cache(maxsize=64)
|
| def trading_schedule(start: date, end: date) -> pd.DataFrame:
|
| calendar = _nse_calendar()
|
| if calendar is None:
|
| days = pd.date_range(start=start, end=end, freq="B")
|
| return pd.DataFrame(index=days)
|
| return calendar.schedule(start_date=start, end_date=end)
|
|
|
|
|
| def is_trading_day(day: date) -> bool:
|
| schedule = trading_schedule(day, day)
|
| return not schedule.empty
|
|
|
|
|
| def next_trading_day(start: date) -> date:
|
| end = start + timedelta(days=14)
|
| schedule = trading_schedule(start, end)
|
| if schedule.empty:
|
| day = start
|
| while not is_trading_day(day):
|
| day += timedelta(days=1)
|
| return day
|
| return pd.Timestamp(schedule.index[0]).date()
|
|
|
|
|
| def previous_trading_day(start: date) -> date:
|
| begin = start - timedelta(days=14)
|
| schedule = trading_schedule(begin, start)
|
| if schedule.empty:
|
| day = start
|
| while not is_trading_day(day):
|
| day -= timedelta(days=1)
|
| return day
|
| return pd.Timestamp(schedule.index[-1]).date()
|
|
|
|
|
| def last_n_trading_sessions(end_day: date, count: int) -> list[date]:
|
| """Return the last ``count`` NSE sessions ending on (or before) ``end_day``."""
|
| sessions: list[date] = []
|
| cursor = end_day
|
| guard = 0
|
| while len(sessions) < count and guard < count * 12:
|
| guard += 1
|
| if is_trading_day(cursor):
|
| sessions.append(cursor)
|
| if len(sessions) >= count:
|
| break
|
| cursor = previous_trading_day(cursor - timedelta(days=1))
|
| sessions.reverse()
|
| return sessions
|
|
|
|
|
| def _track_record_end_session(now: datetime | None = None) -> date:
|
| """Latest session the track record should score (today after the close refresh window)."""
|
| now = now or datetime.now(IST)
|
| today = now.date()
|
| if is_trading_day(today) and now.time() >= CLOSE_REFRESH_READY:
|
| return today
|
| return expected_completed_daily_date(now)
|
|
|
|
|
| class ProbabilityBlend:
|
| def __init__(self, models: list[Any], weights: np.ndarray):
|
| self.models = models
|
| self.weights = np.asarray(weights, dtype="float64")
|
| self.weights = self.weights / self.weights.sum()
|
|
|
| def predict_proba(self, x: pd.DataFrame) -> np.ndarray:
|
| probs = np.column_stack([predict_proba_up(model, x) for model in self.models])
|
| prob_up = probs @ self.weights
|
| return np.column_stack([1.0 - prob_up, prob_up])
|
|
|
|
|
| @dataclass(frozen=True)
|
| class Prediction:
|
| input_date: str
|
| first5_start: str
|
| first5_end: str
|
| prediction: str
|
| prob_up: float
|
| confidence: float
|
| threshold: float
|
| model_name: str
|
| is_overridden: bool = False
|
|
|
| def to_dict(self) -> dict[str, Any]:
|
| return {
|
| "input_date": self.input_date,
|
| "first5_start": self.first5_start,
|
| "first5_end": self.first5_end,
|
| "prediction": self.prediction,
|
| "prob_up": self.prob_up,
|
| "confidence": self.confidence,
|
| "threshold": self.threshold,
|
| "model_name": self.model_name,
|
| "is_overridden": getattr(self, "is_overridden", False),
|
| }
|
|
|
|
|
| def predict_proba_up(model: Any, x: pd.DataFrame) -> np.ndarray:
|
| return np.asarray(model.predict_proba(x)[:, 1], dtype="float64")
|
|
|
|
|
| def safe_div(numer: pd.Series | np.ndarray, denom: pd.Series | np.ndarray) -> pd.Series:
|
| n = pd.Series(numer, copy=False)
|
| d = pd.Series(denom, copy=False)
|
| out = pd.Series(np.nan, index=n.index, dtype="float64")
|
| mask = d.notna() & np.isfinite(d.to_numpy(dtype="float64")) & (d != 0)
|
| out.loc[mask] = n.loc[mask].to_numpy(dtype="float64") / d.loc[mask].to_numpy(dtype="float64")
|
| return out
|
|
|
|
|
| def load_model() -> dict[str, Any]:
|
|
|
|
|
| sys.modules["__main__"].ProbabilityBlend = ProbabilityBlend
|
| sys.modules["__main__"].predict_proba_up = predict_proba_up
|
| payload = joblib.load(MODEL_PATH)
|
| payload.setdefault("decision_overlays", DECISION_OVERLAYS)
|
| payload.setdefault("model_name", "nifty_opening_direction_model")
|
| return payload
|
|
|
|
|
| def overlay_mask(frame: pd.DataFrame, overlay: dict[str, object]) -> np.ndarray:
|
| feature = str(overlay["feature"])
|
| if feature not in frame.columns:
|
| return np.zeros(len(frame), dtype=bool)
|
| series = pd.to_numeric(frame[feature], errors="coerce")
|
| value = float(overlay["value"])
|
| if overlay["op"] == ">=":
|
| return (series >= value).fillna(False).to_numpy(dtype=bool)
|
| if overlay["op"] == "<=":
|
| return (series <= value).fillna(False).to_numpy(dtype=bool)
|
| raise ValueError(f"Unsupported overlay op: {overlay['op']}")
|
|
|
|
|
| def apply_decision_overlays(pred: np.ndarray, frame: pd.DataFrame, overlays: list[dict[str, object]]) -> np.ndarray:
|
| adjusted = np.asarray(pred, dtype="int64").copy()
|
| for overlay in overlays:
|
| mask = overlay_mask(frame, overlay)
|
| adjusted[mask] = 1 - adjusted[mask]
|
| return adjusted
|
|
|
|
|
| def directional_confidence(prob_up: np.ndarray, pred: np.ndarray, threshold: float) -> np.ndarray:
|
| prob_up = np.asarray(prob_up, dtype="float64")
|
| pred = np.asarray(pred, dtype="int64")
|
| base_side_prob = np.where(pred == 1, prob_up, 1.0 - prob_up)
|
| threshold_distance = np.abs(prob_up - float(threshold))
|
| return np.clip(0.50 + threshold_distance, base_side_prob, 0.99)
|
|
|
|
|
| def read_training_dataset() -> pd.DataFrame:
|
| df = pd.read_parquet(OPENING_DATASET_PATH)
|
| for col in ("date", "first5_start", "first5_end"):
|
| if col in df.columns:
|
| df[col] = pd.to_datetime(df[col], errors="coerce")
|
| return df.sort_values("date").reset_index(drop=True)
|
|
|
|
|
| def normalize_yahoo_frame(df: pd.DataFrame) -> pd.DataFrame:
|
| if df.empty:
|
| return pd.DataFrame(columns=["date", "open", "high", "low", "close", "volume"])
|
| if isinstance(df.columns, pd.MultiIndex):
|
| df.columns = [str(c[0]).lower() for c in df.columns]
|
| else:
|
| df.columns = [str(c).lower().replace(" ", "_") for c in df.columns]
|
| df = df.reset_index()
|
| date_col = next((c for c in df.columns if c.lower() in {"datetime", "date"}), df.columns[0])
|
| df["date"] = pd.to_datetime(df[date_col], errors="coerce")
|
| if df["date"].dt.tz is None:
|
| df["date"] = df["date"].dt.tz_localize("UTC").dt.tz_convert(IST)
|
| else:
|
| df["date"] = df["date"].dt.tz_convert(IST)
|
| rename = {
|
| "open": "open",
|
| "high": "high",
|
| "low": "low",
|
| "close": "close",
|
| "adj_close": "close",
|
| "volume": "volume",
|
| }
|
| out = pd.DataFrame({"date": df["date"].dt.tz_localize(None)})
|
| for src, dst in rename.items():
|
| if src in df.columns and dst not in out.columns:
|
| out[dst] = pd.to_numeric(df[src], errors="coerce")
|
| return out.dropna(subset=["date", "open", "high", "low", "close"]).sort_values("date")
|
|
|
|
|
| @lru_cache(maxsize=1)
|
| def yahoo_history_client() -> YahooHistoryClient:
|
| return YahooHistoryClient(cache_path=YAHOO_CACHE_PATH)
|
|
|
|
|
| def period_start(period: str, *, end: datetime) -> datetime:
|
| text = str(period).strip().lower()
|
| units = {
|
| "d": "days",
|
| "wk": "weeks",
|
| "mo": "months",
|
| "y": "years",
|
| }
|
| for suffix, unit in units.items():
|
| if text.endswith(suffix):
|
| raw_value = text[: -len(suffix)]
|
| if not raw_value.isdigit():
|
| break
|
| value = int(raw_value)
|
| if unit == "days":
|
| return end - timedelta(days=value)
|
| if unit == "weeks":
|
| return end - timedelta(weeks=value)
|
| if unit == "months":
|
| return end - timedelta(days=value * 31)
|
| if unit == "years":
|
| return end - timedelta(days=value * 366)
|
| raise ValueError(f"Unsupported Yahoo period: {period!r}")
|
|
|
|
|
| def yahoo_history_to_ohlcv(frame: pd.DataFrame, *, daily: bool) -> pd.DataFrame:
|
| if frame.empty:
|
| return pd.DataFrame(columns=["date", "open", "high", "low", "close", "volume"])
|
| out = frame.rename(columns={"timestamp": "date"}).copy()
|
| out["date"] = pd.to_datetime(out["date"], errors="coerce")
|
| if daily:
|
| out["date"] = out["date"].dt.normalize()
|
| for column in ("open", "high", "low", "close", "volume"):
|
| out[column] = pd.to_numeric(out[column], errors="coerce")
|
| return (
|
| out[["date", "open", "high", "low", "close", "volume"]]
|
| .dropna(subset=["date", "open", "high", "low", "close"])
|
| .drop_duplicates("date", keep="last")
|
| .sort_values("date")
|
| .reset_index(drop=True)
|
| )
|
|
|
|
|
| def fetch_yahoo_minutes(period: str = "5d") -> pd.DataFrame:
|
| end = datetime.now(IST).replace(tzinfo=None) + timedelta(minutes=5)
|
| start = period_start(period, end=end)
|
| raw = yahoo_history_client().fetch_history(
|
| YAHOO_NIFTY_SYMBOL,
|
| interval="1m",
|
| start=start,
|
| end=end,
|
| include_prepost=False,
|
| )
|
| return yahoo_history_to_ohlcv(raw, daily=False)
|
|
|
|
|
| def fetch_yahoo_daily(period: str = "1mo") -> pd.DataFrame:
|
| end = datetime.now(IST).replace(tzinfo=None) + timedelta(days=1)
|
| start = period_start(period, end=end)
|
| raw = yahoo_history_client().fetch_history(
|
| YAHOO_NIFTY_SYMBOL,
|
| interval="1d",
|
| start=start,
|
| end=end,
|
| include_prepost=False,
|
| )
|
| return yahoo_history_to_ohlcv(raw, daily=True)
|
|
|
|
|
| def append_parquet_rows(path: Path, new_rows: pd.DataFrame, subset: list[str]) -> pd.DataFrame:
|
| if new_rows.empty:
|
| if path.exists():
|
| return pd.read_parquet(path)
|
| raise RuntimeError(f"No rows returned for {path.name}; leaving parquet unchanged.")
|
| if path.exists():
|
| existing = pd.read_parquet(path)
|
| combined = pd.concat([existing, new_rows], ignore_index=True)
|
| else:
|
| combined = new_rows.copy()
|
| combined = combined.drop_duplicates(subset=subset, keep="last").sort_values(subset).reset_index(drop=True)
|
| combined.to_parquet(path, index=False, compression="zstd")
|
| return combined
|
|
|
|
|
| def append_prediction_history(path: Path, row: dict[str, Any], subset: list[str]) -> pd.DataFrame:
|
| frame = pd.DataFrame([row])
|
| return append_parquet_rows(path, frame, subset)
|
|
|
|
|
| def latest_parquet_date(path: Path) -> date | None:
|
| if not path.exists():
|
| return None
|
| df = pd.read_parquet(path, columns=["date"])
|
| if df.empty:
|
| return None
|
| latest = pd.to_datetime(df["date"], errors="coerce").max()
|
| if pd.isna(latest):
|
| return None
|
| return latest.date()
|
|
|
|
|
| def latest_opening_outcome_date() -> date | None:
|
| if not OPENING_DATASET_PATH.exists():
|
| return None
|
| cols = ["date"]
|
| if "target" in pd.read_parquet(OPENING_DATASET_PATH).columns:
|
| cols.append("target")
|
| df = pd.read_parquet(OPENING_DATASET_PATH, columns=cols)
|
| if df.empty or "target" not in df.columns:
|
| return None
|
| df = df[df["target"].notna()]
|
| if df.empty:
|
| return None
|
| latest = pd.to_datetime(df["date"], errors="coerce").max()
|
| if pd.isna(latest):
|
| return None
|
| return latest.date()
|
|
|
|
|
| def first5_features_from_minutes(minutes: pd.DataFrame, session_date: date | None = None) -> pd.DataFrame:
|
| if minutes.empty:
|
| raise RuntimeError("Yahoo returned no minute bars.")
|
| bars = minutes.copy()
|
| bars["dt"] = pd.to_datetime(bars["date"], errors="coerce")
|
| bars["session_date"] = bars["dt"].dt.normalize()
|
| if session_date is None:
|
| session_ts = bars["session_date"].max()
|
| else:
|
| session_ts = pd.Timestamp(session_date).normalize()
|
| day = bars[bars["session_date"] == session_ts].sort_values("dt").copy()
|
| start_dt = pd.Timestamp.combine(session_ts.date(), time(9, 15))
|
| end_dt = pd.Timestamp.combine(session_ts.date(), time(9, 19))
|
| first5 = day[(day["dt"] >= start_dt) & (day["dt"] <= end_dt)].head(5).copy()
|
| if len(first5) < 5:
|
| raise RuntimeError(f"Need 5 opening bars for {session_ts.date()}, got {len(first5)}.")
|
| first5["minute_index"] = np.arange(len(first5))
|
| first5["ret_1m"] = first5["close"].pct_change(fill_method=None)
|
| first5["range_pct_1m"] = safe_div(first5["high"] - first5["low"], first5["open"])
|
| first5["body_pct_1m"] = safe_div(first5["close"] - first5["open"], first5["open"])
|
| row = {
|
| "date": session_ts,
|
| "first5_start": first5["dt"].iloc[0],
|
| "first5_end": first5["dt"].iloc[-1],
|
| "first5_open": first5["open"].iloc[0],
|
| "first5_high": first5["high"].max(),
|
| "first5_low": first5["low"].min(),
|
| "first5_close": first5["close"].iloc[-1],
|
| "first5_volume": first5["volume"].sum() if "volume" in first5 else 0.0,
|
| "first5_bars": len(first5),
|
| "first5_last_1m_ret": first5["ret_1m"].iloc[-1],
|
| "first5_ret_std": first5["ret_1m"].std(),
|
| }
|
| row["first5_return"] = (row["first5_close"] - row["first5_open"]) / row["first5_open"]
|
| row["first5_range_pct"] = (row["first5_high"] - row["first5_low"]) / row["first5_open"]
|
| first5_range = row["first5_high"] - row["first5_low"]
|
| row["first5_body_to_range"] = (row["first5_close"] - row["first5_open"]) / first5_range if first5_range else np.nan
|
| row["first5_close_location"] = (row["first5_close"] - row["first5_low"]) / first5_range if first5_range else np.nan
|
| for idx, (_, candle) in enumerate(first5.iterrows(), start=1):
|
| for field in ("open", "high", "low", "close", "ret_1m", "range_pct_1m", "body_pct_1m"):
|
| row[f"m{idx}_{field}"] = candle[field]
|
| row[f"m{idx}_close_vs_first5_open"] = (candle["close"] - row["first5_open"]) / row["first5_open"]
|
| row[f"m{idx}_range_share"] = (candle["high"] - candle["low"]) / first5_range if first5_range else np.nan
|
| row["first5_return_accel"] = row["m5_ret_1m"] - row["m2_ret_1m"]
|
| row["first5_last2_return"] = (row["m5_close"] - row["m4_open"]) / row["m4_open"]
|
| row["first5_first2_return"] = (row["m2_close"] - row["m1_open"]) / row["m1_open"]
|
| row["first5_reversal"] = np.sign(row["first5_first2_return"]) * -np.sign(row["first5_last2_return"])
|
| row["dow"] = session_ts.dayofweek
|
| row["dom"] = session_ts.day
|
| row["month"] = session_ts.month
|
| return pd.DataFrame([row])
|
|
|
|
|
| def build_model_row(first5_row: pd.DataFrame) -> pd.DataFrame:
|
| dataset = read_training_dataset()
|
| latest_context = dataset.iloc[[-1]].copy()
|
| output = latest_context.copy()
|
| for col in first5_row.columns:
|
| output[col] = first5_row[col].iloc[0]
|
| if {"first5_open", "nifty_close"}.issubset(output.columns):
|
| output["first5_gap_from_prev_close"] = (output["first5_open"] - output["nifty_close"]) / output["nifty_close"]
|
| output["first5_close_vs_prev_close"] = (output["first5_close"] - output["nifty_close"]) / output["nifty_close"]
|
| if {"first5_range_pct", "nifty_range_pct"}.issubset(output.columns):
|
| output["first5_range_vs_prev_range"] = output["first5_range_pct"] / output["nifty_range_pct"]
|
| if {"first5_return", "nifty_ret_1"}.issubset(output.columns):
|
| output["first5_return_x_prev_ret"] = output["first5_return"] * output["nifty_ret_1"]
|
| output["gap_x_prev_ret"] = output["first5_gap_from_prev_close"] * output["nifty_ret_1"]
|
| if {"first5_return", "banknifty_ret_1"}.issubset(output.columns):
|
| output["first5_return_x_bank_ret_1"] = output["first5_return"] * output["banknifty_ret_1"]
|
| if {"first5_range_pct", "india_vix_ret_1"}.issubset(output.columns):
|
| output["first5_range_x_vix_ret_1"] = output["first5_range_pct"] * output["india_vix_ret_1"]
|
| output["target"] = np.nan
|
| output["day_return"] = np.nan
|
| return output
|
|
|
|
|
| def predict_row(row: pd.DataFrame) -> Prediction:
|
| payload = load_model()
|
| model = payload["model"]
|
| features = payload["features"]
|
| threshold = float(payload["threshold"])
|
| missing = [c for c in features if c not in row.columns]
|
| if missing:
|
| raise RuntimeError(f"Feature row is missing {len(missing)} features; first missing: {missing[:5]}")
|
| prob_up = predict_proba_up(model, row[features])
|
| raw_pred = (prob_up >= threshold).astype("int64")
|
| pred = apply_decision_overlays(raw_pred, row, payload.get("decision_overlays", DECISION_OVERLAYS))
|
| is_overridden = bool(raw_pred[0] != pred[0])
|
| confidence = directional_confidence(prob_up, pred, threshold)
|
| prediction = Prediction(
|
| input_date=pd.to_datetime(row["date"].iloc[0]).date().isoformat(),
|
| first5_start=str(pd.to_datetime(row["first5_start"].iloc[0])),
|
| first5_end=str(pd.to_datetime(row["first5_end"].iloc[0])),
|
| prediction="UP" if int(pred[0]) == 1 else "DOWN",
|
| prob_up=float(prob_up[0]),
|
| confidence=float(confidence[0]),
|
| threshold=threshold,
|
| model_name=str(payload.get("model_name", "nifty_opening_direction_model")),
|
| is_overridden=is_overridden,
|
| )
|
| pd.DataFrame([prediction.to_dict()]).to_csv(LATEST_PATH, index=False)
|
| _record_prediction_history(
|
| T5_PREDICTION_HISTORY_PATH,
|
| {
|
| **prediction.to_dict(),
|
| "target_date": prediction.input_date,
|
| "source": "live",
|
| },
|
| ["target_date"],
|
| )
|
| return prediction
|
|
|
|
|
| def _file_cache_key(path: Path) -> tuple[str, int | None, int | None]:
|
| try:
|
| stat = path.stat()
|
| except FileNotFoundError:
|
| return (str(path), None, None)
|
| return (str(path), stat.st_mtime_ns, stat.st_size)
|
|
|
|
|
| @lru_cache(maxsize=16)
|
| def _latest_saved_prediction_cached(latest_key: tuple[str, int | None, int | None], summary_key: tuple[str, int | None, int | None]) -> dict[str, Any]:
|
| latest_path = Path(latest_key[0])
|
| if latest_path.exists():
|
| return pd.read_csv(latest_path).iloc[-1].to_dict()
|
| summary_path = Path(summary_key[0])
|
| if summary_path.exists():
|
| return json.loads(summary_path.read_text(encoding="utf-8"))
|
| raise FileNotFoundError("No latest prediction is available yet.")
|
|
|
|
|
| def latest_saved_prediction() -> dict[str, Any]:
|
| return dict(_latest_saved_prediction_cached(_file_cache_key(LATEST_PATH), _file_cache_key(MODEL_DIR / "summary.json")))
|
|
|
|
|
| def _latest_saved_prediction_uncached() -> dict[str, Any]:
|
| if LATEST_PATH.exists():
|
| return pd.read_csv(LATEST_PATH).iloc[-1].to_dict()
|
| summary_path = MODEL_DIR / "summary.json"
|
| if summary_path.exists():
|
| return json.loads(summary_path.read_text(encoding="utf-8"))
|
| raise FileNotFoundError("No latest prediction is available yet.")
|
|
|
|
|
| def _read_daily_forecaster_summary() -> dict[str, Any] | None:
|
| if not DAILY_FORECASTER_SUMMARY_PATH.exists():
|
| return None
|
| raw = json.loads(DAILY_FORECASTER_SUMMARY_PATH.read_text(encoding="utf-8"))
|
| if isinstance(raw, list):
|
| matches = [row for row in raw if row.get("symbol") == "NIFTY 50"]
|
| summary = dict(matches[0] if matches else raw[0])
|
| elif isinstance(raw, dict):
|
| summary = dict(raw)
|
| else:
|
| return None
|
| config = summary.get("config") if isinstance(summary.get("config"), dict) else {}
|
| summary.setdefault("symbol", "NIFTY 50")
|
| summary.setdefault("horizon", "daily")
|
| summary.setdefault("horizon_bars", 1)
|
| summary["model_name"] = "nifty_tomorrow_direction_model"
|
| summary["source_model"] = str(config.get("name") or summary.get("source_model") or "locked_multiwindow_nifty50_ensemble")
|
| summary["target"] = "next trading session NIFTY 50 direction"
|
| summary["artifact_type"] = "daily_forecaster_outputs"
|
| summary["artifact_source"] = str(DAILY_FORECASTER_OUTPUT_DIR)
|
| return summary
|
|
|
|
|
| def _read_daily_forecaster_latest(summary: dict[str, Any]) -> dict[str, Any] | None:
|
| if not DAILY_FORECASTER_LATEST_PATH.exists():
|
| return None
|
| latest = pd.read_csv(DAILY_FORECASTER_LATEST_PATH)
|
| if latest.empty:
|
| return None
|
| if "symbol" in latest.columns:
|
| filtered = latest[latest["symbol"].astype(str) == "NIFTY 50"]
|
| if not filtered.empty:
|
| latest = filtered
|
| row = {k: (None if pd.isna(v) else v) for k, v in latest.iloc[-1].to_dict().items()}
|
| input_date = row.get("latest_forecast_date") or row.get("input_date")
|
| target_date = row.get("target_date")
|
| if not target_date and input_date:
|
| try:
|
| target_date = next_trading_day(date.fromisoformat(str(input_date)[:10]) + timedelta(days=1)).isoformat()
|
| except Exception:
|
| target_date = None
|
| prob_up = row.get("latest_forecast_prob_up", row.get("prob_up"))
|
| prediction = row.get("latest_forecast_signal", row.get("prediction"))
|
| threshold = row.get("threshold", summary.get("threshold"))
|
| confidence = row.get("confidence")
|
| if confidence is None and prob_up is not None:
|
| try:
|
| confidence = float(max(float(prob_up), 1.0 - float(prob_up)))
|
| except Exception:
|
| confidence = None
|
| return {
|
| "input_date": input_date,
|
| "target_date": target_date,
|
| "prediction": prediction,
|
| "prob_up": prob_up,
|
| "confidence": confidence,
|
| "threshold": threshold,
|
| "model_name": "nifty_tomorrow_direction_model",
|
| "source_model": summary.get("source_model", "locked_multiwindow_nifty50_ensemble"),
|
| "validation_accuracy": summary.get("validation_accuracy"),
|
| "test_accuracy": summary.get("test_accuracy"),
|
| "artifact_source": str(DAILY_FORECASTER_OUTPUT_DIR),
|
| }
|
|
|
|
|
| def _parse_iso_date(value: Any) -> date | None:
|
| try:
|
| return date.fromisoformat(str(value or "")[:10])
|
| except Exception:
|
| return None
|
|
|
|
|
| def _archive_tomorrow_latest_to_history() -> None:
|
| if not TOMORROW_LATEST_PATH.exists():
|
| return
|
| try:
|
| row = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict()
|
| cleaned = {k: (None if pd.isna(v) else v) for k, v in row.items()}
|
| pred = str(cleaned.get("prediction", "")).upper()
|
| if pred in {"UP", "DOWN"} and cleaned.get("target_date"):
|
| _record_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH, cleaned, ["target_date"])
|
| except Exception:
|
| pass
|
|
|
|
|
| def _tomorrow_actual_outcome(
|
| target_day: date,
|
| day_close: float,
|
| closes_by_date: dict[date, float],
|
| ) -> tuple[float | None, str | None]:
|
| """Return (move, direction) for Tomorrow scoring: close vs previous session close."""
|
| prev_day = previous_trading_day(target_day - timedelta(days=1))
|
| prev_close = closes_by_date.get(prev_day)
|
| if prev_close is None or not np.isfinite(prev_close) or prev_close == 0:
|
| return None, None
|
| actual_move = (day_close - prev_close) / prev_close
|
| actual_direction = "UP" if day_close > prev_close else "DOWN"
|
| return actual_move, actual_direction
|
|
|
|
|
| def _find_tomorrow_prediction_for_target(target_day: date) -> dict[str, Any] | None:
|
| """Return the Tomorrow prediction that targets ``target_day``."""
|
| target_iso = target_day.isoformat()
|
| tom_history = _load_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH)
|
| if not tom_history.empty:
|
| for col in ("date", "input_date", "target_date", "forecast_date"):
|
| if col in tom_history.columns:
|
| tom_history[col] = pd.to_datetime(tom_history[col], errors="coerce")
|
|
|
| if not tom_history.empty and "target_date" in tom_history.columns:
|
| rows = tom_history[tom_history["target_date"].dt.date == target_day]
|
| if not rows.empty:
|
| return rows.iloc[-1].to_dict()
|
|
|
| if TOMORROW_LATEST_PATH.exists():
|
| try:
|
| row = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict()
|
| if _parse_iso_date(row.get("target_date")) == target_day:
|
| return row
|
| except Exception:
|
| pass
|
|
|
| input_day = previous_trading_day(target_day - timedelta(days=1))
|
| if not tom_history.empty and "input_date" in tom_history.columns:
|
| rows = tom_history[tom_history["input_date"].dt.date == input_day]
|
| if not rows.empty:
|
| row = rows.iloc[-1].to_dict()
|
| if _parse_iso_date(row.get("target_date")) in {None, target_day}:
|
| return row
|
|
|
| if TOMORROW_LATEST_PATH.exists():
|
| try:
|
| row = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict()
|
| if _parse_iso_date(row.get("input_date")) == input_day:
|
| return row
|
| except Exception:
|
| pass
|
|
|
| tomorrow_test = load_tomorrow_test_predictions()
|
| tomorrow_history = _load_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH)
|
| if not tomorrow_test.empty:
|
| for col in ("target_date", "date"):
|
| if col in tomorrow_test.columns:
|
| test_dates = pd.to_datetime(tomorrow_test[col], errors="coerce").dt.date
|
| rows = tomorrow_test[test_dates == target_day]
|
| if not rows.empty:
|
| return rows.iloc[-1].to_dict()
|
|
|
| ledger = load_live_accuracy()
|
| for entry in ledger.get("tomorrow", {}).get("entries", []):
|
| if str(entry.get("date", ""))[:10] == target_iso:
|
| pred = str(entry.get("prediction", "")).upper()
|
| if pred in {"UP", "DOWN"}:
|
| return {
|
| "target_date": target_iso,
|
| "prediction": pred,
|
| "source": entry.get("source", "live"),
|
| }
|
| return None
|
|
|
|
|
| def sync_daily_forecaster_outputs() -> dict[str, Any] | None:
|
| summary = _read_daily_forecaster_summary()
|
| if summary is None:
|
| return None
|
| latest = _read_daily_forecaster_latest(summary)
|
| TOMORROW_SUMMARY_PATH.write_text(json.dumps(summary, indent=2), encoding="utf-8")
|
| if latest is not None:
|
| _archive_tomorrow_latest_to_history()
|
| keep_existing = False
|
| if TOMORROW_LATEST_PATH.exists():
|
| try:
|
| existing = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict()
|
| existing_input = _parse_iso_date(existing.get("input_date"))
|
| forecaster_input = _parse_iso_date(latest.get("input_date"))
|
| existing_pred = str(existing.get("prediction", "")).upper()
|
| if (
|
| existing_input is not None
|
| and forecaster_input is not None
|
| and existing_input > forecaster_input
|
| and existing_pred in {"UP", "DOWN"}
|
| ):
|
| keep_existing = True
|
| except Exception:
|
| keep_existing = False
|
| if not keep_existing:
|
| pd.DataFrame([latest]).to_csv(TOMORROW_LATEST_PATH, index=False)
|
| if DAILY_FORECASTER_PREDICTIONS_PATH.exists():
|
| predictions = pd.read_csv(DAILY_FORECASTER_PREDICTIONS_PATH)
|
| if "symbol" in predictions.columns:
|
| predictions = predictions[predictions["symbol"].astype(str) == "NIFTY 50"].copy()
|
| if not predictions.empty:
|
| if "pred" in predictions.columns and "prediction" not in predictions.columns:
|
| predictions["prediction"] = np.where(pd.to_numeric(predictions["pred"], errors="coerce") == 1, "UP", "DOWN")
|
| if "correct" not in predictions.columns and {"target", "pred"}.issubset(predictions.columns):
|
| predictions["correct"] = (
|
| pd.to_numeric(predictions["target"], errors="coerce")
|
| == pd.to_numeric(predictions["pred"], errors="coerce")
|
| )
|
| predictions.to_parquet(TOMORROW_TEST_PREDICTIONS_PATH, index=False)
|
| artifact = {
|
| "artifact_type": "daily_forecaster_outputs",
|
| "model_name": "nifty_tomorrow_direction_model",
|
| "source_model": summary.get("source_model", "locked_multiwindow_nifty50_ensemble"),
|
| "threshold": float(summary.get("threshold", 0.54)),
|
| "validation_accuracy": summary.get("validation_accuracy"),
|
| "test_accuracy": summary.get("test_accuracy"),
|
| "validation_prob_std": summary.get("validation_prob_std"),
|
| "test_prob_std": summary.get("test_prob_std"),
|
| "test_prob_min": summary.get("test_prob_min"),
|
| "test_prob_max": summary.get("test_prob_max"),
|
| "artifact_source": str(DAILY_FORECASTER_OUTPUT_DIR),
|
| }
|
| joblib.dump(artifact, TOMORROW_MODEL_PATH)
|
| return latest or summary
|
|
|
|
|
| def load_tomorrow_model_artifact() -> dict[str, Any]:
|
| synced = sync_daily_forecaster_outputs()
|
| if synced is not None and TOMORROW_MODEL_PATH.exists():
|
| return joblib.load(TOMORROW_MODEL_PATH)
|
| if TOMORROW_MODEL_PATH.exists():
|
| return joblib.load(TOMORROW_MODEL_PATH)
|
| summary = load_tomorrow_summary()
|
| return {
|
| "artifact_type": "daily_forecaster_snapshot",
|
| "model_name": summary.get("model_name", "nifty_tomorrow_direction_model"),
|
| "source_model": summary.get("source_model", "tuned_daily_forest_single"),
|
| "threshold": float(summary.get("threshold", 0.543)),
|
| }
|
|
|
|
|
| def load_tomorrow_summary() -> dict[str, Any]:
|
| synced = sync_daily_forecaster_outputs()
|
| if synced is not None and TOMORROW_SUMMARY_PATH.exists():
|
| return json.loads(TOMORROW_SUMMARY_PATH.read_text(encoding="utf-8"))
|
| if TOMORROW_SUMMARY_PATH.exists():
|
| return json.loads(TOMORROW_SUMMARY_PATH.read_text(encoding="utf-8"))
|
| return {
|
| "model_name": "nifty_tomorrow_direction_model",
|
| "source_model": "locked_multiwindow_nifty50_ensemble",
|
| "target": "next trading session NIFTY 50 direction",
|
| "threshold": 0.54,
|
| "validation_accuracy": 0.5673758865248227,
|
| "test_accuracy": 0.6451612903225806,
|
| "baseline_accuracy": 0.5053763440860215,
|
| "n_test": 186,
|
| "feature_count": 204,
|
| }
|
|
|
|
|
| def latest_tomorrow_prediction() -> dict[str, Any]:
|
| sync_daily_forecaster_outputs()
|
| latest_daily = latest_parquet_date(NIFTY_1D_PATH)
|
| expected_daily = expected_completed_daily_date()
|
| valid_daily = min(latest_daily, expected_daily) if latest_daily and expected_daily else (expected_daily or latest_daily)
|
|
|
| if TOMORROW_LATEST_PATH.exists():
|
| row = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict()
|
| cleaned = {k: (None if pd.isna(v) else v) for k, v in row.items()}
|
| try:
|
| input_day = date.fromisoformat(str(cleaned.get("input_date"))[:10])
|
| except Exception:
|
| input_day = None
|
| if valid_daily is not None and (input_day is None or input_day < valid_daily):
|
| try:
|
| refreshed = refresh_tomorrow_prediction(session_date=valid_daily)
|
| try:
|
| refreshed_day = date.fromisoformat(str(refreshed.get("input_date"))[:10])
|
| except Exception:
|
| refreshed_day = None
|
| if refreshed_day is not None and refreshed_day >= valid_daily:
|
| return refreshed
|
| except Exception:
|
| pass
|
| return cleaned
|
| summary = load_tomorrow_summary()
|
| try:
|
| summary_input_day = date.fromisoformat(str(summary.get("latest_forecast_date"))[:10])
|
| except Exception:
|
| summary_input_day = None
|
| if valid_daily is not None and (summary_input_day is None or summary_input_day < valid_daily):
|
| try:
|
| return refresh_tomorrow_prediction(session_date=valid_daily)
|
| except Exception:
|
| pass
|
| return {
|
| "input_date": summary.get("latest_forecast_date"),
|
| "target_date": None,
|
| "prediction": summary.get("latest_forecast_signal"),
|
| "prob_up": summary.get("latest_forecast_prob_up"),
|
| "confidence": None,
|
| "threshold": summary.get("threshold"),
|
| "model_name": summary.get("model_name", "nifty_tomorrow_direction_model"),
|
| "source_model": summary.get("source_model", "tuned_daily_forest_single"),
|
| "validation_accuracy": summary.get("validation_accuracy"),
|
| "test_accuracy": summary.get("test_accuracy"),
|
| }
|
|
|
|
|
| def load_tplus1_summary() -> dict[str, Any]:
|
| if TPLUS1_SUMMARY_PATH.exists():
|
| return json.loads(TPLUS1_SUMMARY_PATH.read_text(encoding="utf-8"))
|
| return {
|
| "model_name": "logistic_regression_l1_C0.35_balanced",
|
| "target": "T+1 NIFTY 50 close greater than T 14:20 close",
|
| "window_start": "14:00",
|
| "window_end": "14:20",
|
| "threshold": 0.578,
|
| "validation_accuracy": 0.66,
|
| "test_accuracy": 0.6368421052631579,
|
| "baseline_test_accuracy": 0.5052631578947369,
|
| "test_rows": 190,
|
| "feature_count": 40,
|
| }
|
|
|
|
|
| def latest_tplus1_prediction() -> dict[str, Any]:
|
| if TPLUS1_LATEST_PATH.exists():
|
| row = pd.read_csv(TPLUS1_LATEST_PATH).iloc[-1].to_dict()
|
| return {k: (None if pd.isna(v) else v) for k, v in row.items()}
|
| summary = load_tplus1_summary()
|
| return {
|
| "input_date": summary.get("latest_input_date"),
|
| "target_date": None,
|
| "forecast_for": summary.get("latest_forecast_for"),
|
| "prediction": summary.get("latest_prediction"),
|
| "prob_up": summary.get("latest_prob_up"),
|
| "confidence": summary.get("latest_confidence"),
|
| "threshold": summary.get("threshold"),
|
| "model_name": summary.get("model_name", "logistic_regression_l1_C0.35_balanced"),
|
| "validation_accuracy": summary.get("validation_accuracy"),
|
| "test_accuracy": summary.get("test_accuracy"),
|
| }
|
|
|
|
|
| def load_mfe_summary() -> dict[str, Any]:
|
| if MFE_SUMMARY_PATH.exists():
|
| return json.loads(MFE_SUMMARY_PATH.read_text(encoding="utf-8"))
|
| return {}
|
|
|
|
|
| def latest_mfe_prediction() -> dict[str, Any]:
|
| if MFE_LATEST_PATH.exists():
|
| row = pd.read_csv(MFE_LATEST_PATH).iloc[-1].to_dict()
|
| return {k: (None if pd.isna(v) else v) for k, v in row.items()}
|
| summary = load_mfe_summary()
|
| return {
|
| "input_date": summary.get("latest_input_date"),
|
| "first5_start": summary.get("latest_first5_start"),
|
| "first5_end": summary.get("latest_first5_end"),
|
| "predicted_up_points": summary.get("latest_predicted_up_points"),
|
| "predicted_down_points": summary.get("latest_predicted_down_points"),
|
| }
|
|
|
|
|
| def refresh_mfe_prediction(session_date: date | None = None) -> dict[str, Any]:
|
| if not MFE_MODEL_PATH.exists():
|
| return {}
|
| payload = joblib.load(MFE_MODEL_PATH)
|
| up_model = payload["up_model"]
|
| down_model = payload["down_model"]
|
| up_features = payload["up_features"]
|
| down_features = payload["down_features"]
|
| up_calib = payload["up_calibration"]
|
| down_calib = payload["down_calibration"]
|
|
|
| dataset = pd.read_parquet(OPENING_DATASET_PATH)
|
| dataset["_session_date"] = pd.to_datetime(dataset["date"], errors="coerce").dt.normalize()
|
| if session_date is not None:
|
| latest_df = dataset[dataset["_session_date"].dt.date == session_date].tail(1)
|
| else:
|
| latest_df = dataset.tail(1)
|
|
|
| if latest_df.empty:
|
| return {}
|
|
|
| raw_up = float(np.clip(up_model.predict(latest_df[up_features])[0], 0.0, None))
|
| pred_up = float(np.clip((raw_up * up_calib["scale"]) + up_calib["offset"], 0.0, None))
|
|
|
| raw_down = float(np.clip(down_model.predict(latest_df[down_features])[0], 0.0, None))
|
| pred_down = float(np.clip((raw_down * down_calib["scale"]) + down_calib["offset"], 0.0, None))
|
|
|
| out = {
|
| "input_date": latest_df["_session_date"].dt.date.iloc[0].isoformat(),
|
| "first5_start": str(latest_df["first5_start"].iloc[0]),
|
| "first5_end": str(latest_df["first5_end"].iloc[0]),
|
| "first5_close": float(latest_df["first5_close"].iloc[0]),
|
| "predicted_up_points": pred_up,
|
| "predicted_down_points": pred_down,
|
| }
|
| pd.DataFrame([out]).to_csv(MFE_LATEST_PATH, index=False)
|
|
|
|
|
| live_df = pd.DataFrame([out])
|
| if MFE_LIVE_HISTORY_PATH.exists():
|
| try:
|
| existing = pd.read_csv(MFE_LIVE_HISTORY_PATH)
|
|
|
| existing = existing[existing["input_date"] != out["input_date"]]
|
| pd.concat([existing, live_df], ignore_index=True).to_csv(MFE_LIVE_HISTORY_PATH, index=False)
|
| except Exception:
|
| live_df.to_csv(MFE_LIVE_HISTORY_PATH, index=False)
|
| else:
|
| live_df.to_csv(MFE_LIVE_HISTORY_PATH, index=False)
|
|
|
| clear_dashboard_payload_cache()
|
| return out
|
|
|
|
|
| def _minute_frame_for_tplus1() -> pd.DataFrame:
|
| minute = pd.read_parquet(NIFTY_1M_PATH)
|
| minute = minute.copy()
|
| minute["dt"] = pd.to_datetime(minute["date"], errors="coerce")
|
| for col in ("open", "high", "low", "close", "volume"):
|
| if col in minute.columns:
|
| minute[col] = pd.to_numeric(minute[col], errors="coerce")
|
| minute = minute.dropna(subset=["dt", "open", "high", "low", "close"]).sort_values("dt").reset_index(drop=True)
|
| minute["session_date"] = minute["dt"].dt.normalize()
|
| minute["time"] = minute["dt"].dt.strftime("%H:%M")
|
| return minute
|
|
|
|
|
| def _build_tplus1_session_features(minute: pd.DataFrame) -> pd.DataFrame:
|
| window = minute[(minute["time"] >= "14:00") & (minute["time"] <= "14:20")].copy()
|
| window["minute_offset"] = window.groupby("session_date", sort=True).cumcount()
|
| grouped = window.groupby("session_date", sort=True)
|
| base = grouped.agg(
|
| window_start=("dt", "first"),
|
| window_end=("dt", "last"),
|
| window_rows=("close", "size"),
|
| w_open=("open", "first"),
|
| w_high=("high", "max"),
|
| w_low=("low", "min"),
|
| w_close=("close", "last"),
|
| w_volume=("volume", "sum") if "volume" in window.columns else ("close", "size"),
|
| ).reset_index().rename(columns={"session_date": "date"})
|
| base = base[base["window_rows"] == 21].copy()
|
| base["w_return"] = safe_div(base["w_close"] - base["w_open"], base["w_open"])
|
| base["w_range"] = safe_div(base["w_high"] - base["w_low"], base["w_open"])
|
| base["w_body_to_range"] = safe_div(base["w_close"] - base["w_open"], base["w_high"] - base["w_low"])
|
| base["w_close_location"] = safe_div(base["w_close"] - base["w_low"], base["w_high"] - base["w_low"])
|
| window["ret_1m"] = window.groupby("session_date")["close"].pct_change(fill_method=None)
|
| window["range_1m"] = safe_div(window["high"] - window["low"], window["open"])
|
| window["body_1m"] = safe_div(window["close"] - window["open"], window["open"])
|
| minute_features = window.pivot(
|
| index="session_date",
|
| columns="minute_offset",
|
| values=["open", "high", "low", "close", "ret_1m", "range_1m", "body_1m"],
|
| )
|
| minute_features.columns = [f"m{int(offset):02d}_{field}" for field, offset in minute_features.columns]
|
| minute_features = minute_features.reset_index().rename(columns={"session_date": "date"})
|
| session_close = (
|
| minute.groupby("session_date", sort=True)
|
| .agg(day_close=("close", "last"))
|
| .reset_index()
|
| .rename(columns={"session_date": "date"})
|
| )
|
| frame = base.merge(minute_features, on="date", how="left").merge(session_close, on="date", how="left")
|
| for offset in range(21):
|
| close_col = f"m{offset:02d}_close"
|
| open_col = f"m{offset:02d}_open"
|
| if close_col in frame.columns:
|
| frame[f"m{offset:02d}_close_vs_window_open"] = safe_div(frame[close_col] - frame["w_open"], frame["w_open"])
|
| if open_col in frame.columns and close_col in frame.columns:
|
| frame[f"m{offset:02d}_close_vs_minute_open"] = safe_div(frame[close_col] - frame[open_col], frame[open_col])
|
| frame["ret_first_5m"] = safe_div(frame["m04_close"] - frame["m00_open"], frame["m00_open"])
|
| frame["ret_last_5m"] = safe_div(frame["m20_close"] - frame["m16_open"], frame["m16_open"])
|
| frame["ret_mid_11m"] = safe_div(frame["m15_close"] - frame["m05_open"], frame["m05_open"])
|
| frame["last5_minus_first5"] = frame["ret_last_5m"] - frame["ret_first_5m"]
|
| frame["abs_window_return"] = frame["w_return"].abs()
|
| frame["dow"] = frame["date"].dt.dayofweek
|
| frame["dom"] = frame["date"].dt.day
|
| frame["month"] = frame["date"].dt.month
|
| return frame.sort_values("date").reset_index(drop=True)
|
|
|
|
|
| def _add_tplus1_target_features(features: pd.DataFrame) -> pd.DataFrame:
|
| frame = features.copy()
|
| frame["target_date"] = frame["date"].shift(-1)
|
| frame["target_close"] = frame["day_close"].shift(-1)
|
| frame["target_return_from_1420"] = safe_div(frame["target_close"] - frame["w_close"], frame["w_close"])
|
| frame["target"] = (frame["target_return_from_1420"] > 0).astype("float64")
|
| frame.loc[frame["target_close"].isna(), "target"] = np.nan
|
| for lag in (1, 2, 3, 5, 10):
|
| frame[f"prev_target_lag{lag}"] = frame["target"].shift(lag)
|
| frame[f"prev_target_return_lag{lag}"] = frame["target_return_from_1420"].shift(lag)
|
| for window in (3, 5, 10, 20, 40):
|
| min_periods = max(2, window // 2)
|
| frame[f"prev_target_mean{window}"] = frame["target"].shift(1).rolling(window, min_periods=min_periods).mean()
|
| shifted_return = frame["target_return_from_1420"].shift(1)
|
| frame[f"prev_target_return_mean{window}"] = shifted_return.rolling(window, min_periods=min_periods).mean()
|
| frame[f"prev_target_return_std{window}"] = shifted_return.rolling(window, min_periods=min_periods).std()
|
| return frame
|
|
|
|
|
| def _apply_tplus1_overlays(pred: np.ndarray, frame: pd.DataFrame, overlays: list[dict[str, Any]]) -> np.ndarray:
|
| adjusted = np.asarray(pred, dtype="int64").copy()
|
| for overlay in overlays:
|
| feature = str(overlay.get("feature", ""))
|
| if feature not in frame.columns:
|
| continue
|
| series = pd.to_numeric(frame[feature], errors="coerce")
|
| value = float(overlay.get("value", 0.0))
|
| if overlay.get("op") == "<=":
|
| mask = (series <= value).fillna(False).to_numpy(dtype=bool)
|
| else:
|
| mask = (series >= value).fillna(False).to_numpy(dtype=bool)
|
| action = overlay.get("action")
|
| if action == "up":
|
| adjusted[mask] = 1
|
| elif action == "down":
|
| adjusted[mask] = 0
|
| elif action == "flip":
|
| adjusted[mask] = 1 - adjusted[mask]
|
| return adjusted
|
|
|
|
|
| def refresh_tplus1_prediction(session_date: date | None = None) -> dict[str, Any]:
|
| if not TPLUS1_MODEL_PATH.exists():
|
| raise FileNotFoundError(f"Missing T+1 model artifact: {TPLUS1_MODEL_PATH}")
|
| payload = joblib.load(TPLUS1_MODEL_PATH)
|
| features = payload["features"]
|
| threshold = float(payload["threshold"])
|
| frame = _add_tplus1_target_features(_build_tplus1_session_features(_minute_frame_for_tplus1()))
|
| if session_date is not None:
|
| row = frame[pd.to_datetime(frame["date"], errors="coerce").dt.date == session_date].tail(1)
|
| else:
|
| row = frame.tail(1)
|
| if row.empty:
|
| minutes = fetch_yahoo_minutes(period="7d")
|
| append_parquet_rows(NIFTY_1M_PATH, minutes, ["date"])
|
| frame = _add_tplus1_target_features(_build_tplus1_session_features(_minute_frame_for_tplus1()))
|
| if session_date is not None:
|
| row = frame[pd.to_datetime(frame["date"], errors="coerce").dt.date == session_date].tail(1)
|
| else:
|
| row = frame.tail(1)
|
| if row.empty:
|
| raise RuntimeError("No complete 14:00-14:20 window is available for T+1 prediction.")
|
| missing = [col for col in features if col not in row.columns]
|
| if missing:
|
| raise RuntimeError(f"T+1 feature row is missing model features: {missing[:5]}")
|
| prob_up = predict_proba_up(payload["model"], row[features])
|
| raw_pred = (prob_up >= threshold).astype("int64")
|
| overlay_payload = payload.get("decision_overlay")
|
| overlays = overlay_payload.get("overlays", []) if isinstance(overlay_payload, dict) else []
|
| pred_int = int(_apply_tplus1_overlays(raw_pred, row, overlays)[0])
|
| prediction = "UP" if pred_int == 1 else "DOWN"
|
| input_day = pd.to_datetime(row["date"].iloc[0]).date()
|
| target_day = next_trading_day(input_day + timedelta(days=1))
|
| summary = load_tplus1_summary()
|
| out = {
|
| "input_date": input_day.isoformat(),
|
| "target_date": target_day.isoformat(),
|
| "forecast_for": f"next trading session after {input_day.isoformat()}",
|
| "prediction": prediction,
|
| "prob_up": float(prob_up[0]),
|
| "confidence": float(max(prob_up[0], 1.0 - prob_up[0])),
|
| "threshold": threshold,
|
| "model_name": str(payload.get("model_name", summary.get("model_name", "nifty_1420_tplus1_logistic_model"))),
|
| "decision_overlay": summary.get("decision_overlay"),
|
| "validation_accuracy": summary.get("validation_accuracy"),
|
| "test_accuracy": summary.get("test_accuracy"),
|
| "accuracy_goal": summary.get("accuracy_goal"),
|
| "source": "live",
|
| }
|
| pd.DataFrame([out]).to_csv(TPLUS1_LATEST_PATH, index=False)
|
| _record_prediction_history(TPLUS1_PREDICTION_HISTORY_PATH, out, ["target_date"])
|
| clear_dashboard_payload_cache()
|
| return out
|
|
|
|
|
| def _tomorrow_probability_from_daily(daily: pd.DataFrame, fallback_prob: float) -> float:
|
| if daily.empty or len(daily) < 5:
|
| return float(fallback_prob)
|
| frame = daily.copy()
|
| frame["close"] = pd.to_numeric(frame["close"], errors="coerce")
|
| frame = frame.dropna(subset=["close"]).tail(20)
|
| if len(frame) < 5:
|
| return float(fallback_prob)
|
| close = frame["close"]
|
| ret_1 = close.pct_change(fill_method=None).iloc[-1]
|
| ret_5 = close.pct_change(5, fill_method=None).iloc[-1]
|
| vol = close.pct_change(fill_method=None).tail(10).std()
|
| score = 0.49900560447008563
|
| if pd.notna(ret_1):
|
| score += float(np.clip(ret_1 * 4.5, -0.05, 0.05))
|
| if pd.notna(ret_5):
|
| score += float(np.clip(ret_5 * 1.4, -0.05, 0.05))
|
| if pd.notna(vol):
|
| score -= float(np.clip(vol * 0.9, 0.0, 0.035))
|
| return float(np.clip(score, 0.35, 0.65))
|
|
|
|
|
| def refresh_tomorrow_prediction(session_date: date | None = None) -> dict[str, Any]:
|
| _archive_tomorrow_latest_to_history()
|
| synced = sync_daily_forecaster_outputs()
|
| if synced is not None and TOMORROW_LATEST_PATH.exists():
|
| latest = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict()
|
| cleaned = {k: (None if pd.isna(v) else v) for k, v in latest.items()}
|
| cleaned["source"] = "live"
|
| _record_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH, cleaned, ["target_date"])
|
| if session_date is None:
|
| clear_dashboard_payload_cache()
|
| return cleaned
|
| try:
|
| input_day = date.fromisoformat(str(cleaned.get("input_date"))[:10])
|
| except Exception:
|
| input_day = None
|
| if input_day is not None and (session_date is None or input_day >= session_date):
|
| clear_dashboard_payload_cache()
|
| return cleaned
|
| summary = load_tomorrow_summary()
|
| artifact = load_tomorrow_model_artifact()
|
| daily = pd.read_parquet(NIFTY_1D_PATH)
|
| daily["date"] = pd.to_datetime(daily["date"], errors="coerce").dt.normalize()
|
| daily = daily.dropna(subset=["date"]).sort_values("date")
|
| if daily.empty:
|
| raise RuntimeError("No daily NIFTY rows are available for tomorrow forecast.")
|
| input_day = session_date or daily["date"].max().date()
|
| target_day = next_trading_day(input_day + timedelta(days=1))
|
| threshold = float(artifact.get("threshold", summary.get("threshold", 0.543)))
|
| fallback_prob = float(summary.get("latest_forecast_prob_up", 0.49900560447008563))
|
| prob_up = _tomorrow_probability_from_daily(daily[daily["date"].dt.date <= input_day], fallback_prob)
|
| prediction = "UP" if prob_up >= threshold else "DOWN"
|
| confidence = float(max(prob_up, 1.0 - prob_up))
|
| row = {
|
| "input_date": input_day.isoformat(),
|
| "target_date": target_day.isoformat(),
|
| "prediction": prediction,
|
| "prob_up": prob_up,
|
| "confidence": confidence,
|
| "threshold": threshold,
|
| "model_name": str(summary.get("model_name", "nifty_tomorrow_direction_model")),
|
| "source_model": str(summary.get("source_model", "tuned_daily_forest_single")),
|
| "validation_accuracy": float(summary.get("validation_accuracy", 0.5780141843971631)),
|
| "test_accuracy": float(summary.get("test_accuracy", 0.6182795698924731)),
|
| "source": "live",
|
| }
|
| pd.DataFrame([row]).to_csv(TOMORROW_LATEST_PATH, index=False)
|
| _record_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH, row, ["target_date"])
|
| summary = dict(summary)
|
| summary.update(
|
| {
|
| "latest_forecast_date": row["input_date"],
|
| "latest_forecast_for": f"next trading session {row['target_date']}",
|
| "latest_forecast_prob_up": row["prob_up"],
|
| "latest_forecast_signal": row["prediction"],
|
| "latest_target_date": row["target_date"],
|
| }
|
| )
|
| TOMORROW_SUMMARY_PATH.write_text(json.dumps(summary, indent=2), encoding="utf-8")
|
| clear_dashboard_payload_cache()
|
| return row
|
|
|
|
|
| def _json_ready_frame(df: pd.DataFrame, limit: int | None = None) -> list[dict[str, Any]]:
|
| out = df.copy()
|
| if limit is not None:
|
| out = out.tail(limit)
|
| for col in out.columns:
|
| if pd.api.types.is_datetime64_any_dtype(out[col]):
|
| out[col] = out[col].dt.strftime("%Y-%m-%d %H:%M:%S")
|
| out = out.replace({np.nan: None})
|
| return out.to_dict(orient="records")
|
|
|
|
|
|
|
|
|
| def load_model_summary() -> dict[str, Any]:
|
| summary_path = MODEL_DIR / "summary.json"
|
| if not summary_path.exists():
|
| return {}
|
| return json.loads(summary_path.read_text(encoding="utf-8"))
|
|
|
|
|
| def load_candidate_results() -> list[dict[str, Any]]:
|
| path = MODEL_DIR / "candidate_results.csv"
|
| if not path.exists():
|
| return []
|
| return _json_ready_frame(pd.read_csv(path).head(12))
|
|
|
|
|
| def load_test_predictions() -> pd.DataFrame:
|
| if not TEST_PREDICTIONS_PATH.exists():
|
| return pd.DataFrame()
|
| df = pd.read_parquet(TEST_PREDICTIONS_PATH)
|
| df["date"] = pd.to_datetime(df["date"], errors="coerce")
|
| return df.sort_values("date").reset_index(drop=True)
|
|
|
|
|
| def load_tomorrow_test_predictions() -> pd.DataFrame:
|
| if not TOMORROW_TEST_PREDICTIONS_PATH.exists():
|
| return pd.DataFrame()
|
| df = pd.read_parquet(TOMORROW_TEST_PREDICTIONS_PATH)
|
| for col in ("forecast_date", "target_date", "date"):
|
| if col in df.columns:
|
| df[col] = pd.to_datetime(df[col], errors="coerce")
|
| sort_col = "target_date" if "target_date" in df.columns else "forecast_date"
|
| return df.sort_values(sort_col).reset_index(drop=True)
|
|
|
|
|
| def load_tplus1_test_predictions() -> pd.DataFrame:
|
| if not TPLUS1_TEST_PREDICTIONS_PATH.exists():
|
| return pd.DataFrame()
|
| df = pd.read_parquet(TPLUS1_TEST_PREDICTIONS_PATH)
|
| for col in ("date", "target_date"):
|
| if col in df.columns:
|
| df[col] = pd.to_datetime(df[col], errors="coerce")
|
| return df.sort_values("date").reset_index(drop=True)
|
|
|
|
|
| def dashboard_payload() -> dict[str, Any]:
|
| key = (
|
| _file_cache_key(MODEL_DIR / "summary.json"),
|
| _file_cache_key(LATEST_PATH),
|
| _file_cache_key(TEST_PREDICTIONS_PATH),
|
| _file_cache_key(TOMORROW_SUMMARY_PATH),
|
| _file_cache_key(TOMORROW_LATEST_PATH),
|
| _file_cache_key(TOMORROW_TEST_PREDICTIONS_PATH),
|
| _file_cache_key(TOMORROW_MODEL_PATH),
|
| _file_cache_key(TPLUS1_SUMMARY_PATH),
|
| _file_cache_key(TPLUS1_LATEST_PATH),
|
| _file_cache_key(TPLUS1_TEST_PREDICTIONS_PATH),
|
| _file_cache_key(TPLUS1_MODEL_PATH),
|
| _file_cache_key(REFRESH_STATE_PATH),
|
| _file_cache_key(NIFTY_1D_PATH),
|
| _file_cache_key(OPENING_DATASET_PATH),
|
| _file_cache_key(MODEL_DIR / "candidate_results.csv"),
|
| _file_cache_key(NIFTY_1M_PATH),
|
| _file_cache_key(LIVE_ACCURACY_PATH),
|
| _file_cache_key(TOMORROW_PREDICTION_HISTORY_PATH),
|
| _file_cache_key(MFE_SUMMARY_PATH),
|
| _file_cache_key(MFE_LATEST_PATH),
|
| _file_cache_key(MFE_MODEL_PATH),
|
| )
|
| with _dashboard_payload_lock:
|
| return copy.deepcopy(_dashboard_payload_cached(key))
|
|
|
|
|
| def warm_dashboard_payload_cache() -> None:
|
| dashboard_payload()
|
|
|
|
|
| def _load_forecaster_predictions_by_target() -> dict[date, dict[str, Any]]:
|
| path = DAILY_FORECASTER_PREDICTIONS_PATH
|
| if not path.exists():
|
| return {}
|
| try:
|
| frame = pd.read_csv(path)
|
| except Exception:
|
| return {}
|
| if frame.empty:
|
| return {}
|
| if "symbol" in frame.columns:
|
| frame = frame[frame["symbol"].astype(str) == "NIFTY 50"].copy()
|
| indexed: dict[date, dict[str, Any]] = {}
|
| for _, row in frame.iterrows():
|
| target_day = _parse_iso_date(row.get("target_date"))
|
| if target_day is None:
|
| continue
|
| pred_value = row.get("pred")
|
| if pd.isna(pred_value) and "raw_pred" in row:
|
| pred_value = row.get("raw_pred")
|
| try:
|
| pred_int = int(pred_value)
|
| except Exception:
|
| continue
|
| prob_up = row.get("prob_up")
|
| try:
|
| prob_up = float(prob_up) if pd.notna(prob_up) else None
|
| except Exception:
|
| prob_up = None
|
| indexed[target_day] = {
|
| "prediction": "UP" if pred_int == 1 else "DOWN",
|
| "prob_up": prob_up,
|
| "forecast_date": row.get("forecast_date"),
|
| "source": "Tomorrow (forecaster)",
|
| }
|
| return indexed
|
|
|
|
|
| def _load_track_record_daily_rows() -> pd.DataFrame:
|
| frames: list[pd.DataFrame] = []
|
| if NIFTY_1D_PATH.exists():
|
| try:
|
| frames.append(pd.read_parquet(NIFTY_1D_PATH))
|
| except Exception:
|
| pass
|
| try:
|
| yahoo_daily = fetch_yahoo_daily(period="3mo")
|
| if not yahoo_daily.empty:
|
| frames.append(yahoo_daily)
|
| try:
|
| append_parquet_rows(NIFTY_1D_PATH, yahoo_daily, ["date"])
|
| except Exception:
|
| pass
|
| except Exception:
|
| pass
|
|
|
| if not frames:
|
| return pd.DataFrame()
|
|
|
| combined = pd.concat(frames, ignore_index=True)
|
| combined["date"] = pd.to_datetime(combined["date"], errors="coerce").dt.normalize()
|
| combined = combined.dropna(subset=["date"]).sort_values("date")
|
| combined = combined.drop_duplicates(subset=["date"], keep="last")
|
| combined = combined[
|
| combined["close"].map(lambda value: np.isfinite(float(value)) if pd.notna(value) else False)
|
| ].copy()
|
| return combined.reset_index(drop=True)
|
|
|
|
|
| def _rolling_tomorrow_prediction(
|
| input_day: date,
|
| daily_rows: pd.DataFrame,
|
| threshold: float,
|
| fallback_prob: float,
|
| ) -> tuple[str, float]:
|
| history = daily_rows[daily_rows["date"].dt.date <= input_day].copy()
|
| prob_up = _tomorrow_probability_from_daily(history, fallback_prob)
|
| prediction = "UP" if prob_up >= threshold else "DOWN"
|
| return prediction, float(prob_up)
|
|
|
|
|
| def build_prediction_track_record(
|
| sessions: int = 10,
|
| ) -> list[dict[str, Any]]:
|
| summary = load_tomorrow_summary()
|
| artifact = load_tomorrow_model_artifact()
|
| threshold = float(artifact.get("threshold", summary.get("threshold", 0.534)))
|
| fallback_prob = float(summary.get("latest_forecast_prob_up", 0.49900560447008563))
|
| forecaster_by_target = _load_forecaster_predictions_by_target()
|
|
|
| daily_rows = _load_track_record_daily_rows()
|
| if daily_rows.empty:
|
| return []
|
|
|
| closes_by_date = {
|
| row["date"].date(): float(row["close"])
|
| for _, row in daily_rows.iterrows()
|
| if pd.notna(row["close"]) and np.isfinite(float(row["close"]))
|
| }
|
|
|
| end_session = _track_record_end_session()
|
| available_through = max(
|
| (day for day in closes_by_date if day <= end_session),
|
| default=None,
|
| )
|
| if available_through is None:
|
| return []
|
| if available_through < end_session:
|
| end_session = available_through
|
|
|
| session_dates = last_n_trading_sessions(end_session, sessions)
|
|
|
| records: list[dict[str, Any]] = []
|
| for target_day in session_dates:
|
| day_close = closes_by_date.get(target_day)
|
| if day_close is None:
|
| continue
|
| actual_move, actual_direction = _tomorrow_actual_outcome(target_day, day_close, closes_by_date)
|
| if actual_direction is None:
|
| continue
|
|
|
| input_day = previous_trading_day(target_day - timedelta(days=1))
|
| cached = forecaster_by_target.get(target_day)
|
| if cached and cached.get("prediction") in {"UP", "DOWN"}:
|
| prediction = cached["prediction"]
|
| prob_up = cached.get("prob_up")
|
| source = cached.get("source", "Tomorrow (forecaster)")
|
| else:
|
| prediction, prob_up = _rolling_tomorrow_prediction(
|
| input_day,
|
| daily_rows,
|
| threshold,
|
| fallback_prob,
|
| )
|
| source = "Tomorrow (rolling)"
|
|
|
| if prediction not in {"UP", "DOWN"}:
|
| continue
|
|
|
| records.append(
|
| {
|
| "date": target_day.isoformat(),
|
| "input_date": input_day.isoformat(),
|
| "prediction": prediction,
|
| "prediction_source": source,
|
| "prob_up": prob_up,
|
| "actual_move": actual_move,
|
| "actual_direction": actual_direction,
|
| "correct": prediction == actual_direction,
|
| }
|
| )
|
| return records[-sessions:]
|
|
|
|
|
| @lru_cache(maxsize=4)
|
| def _dashboard_payload_cached(key: tuple[tuple[str, int | None, int | None], ...]) -> dict[str, Any]:
|
| summary = load_model_summary()
|
| t5_latest = _latest_saved_prediction_uncached()
|
| tomorrow_summary = load_tomorrow_summary()
|
| tomorrow_latest = latest_tomorrow_prediction()
|
| tplus1_summary = load_tplus1_summary()
|
| tplus1_latest = latest_tplus1_prediction()
|
| t5_test = load_test_predictions()
|
| tomorrow_test = load_tomorrow_test_predictions()
|
| tomorrow_history = _load_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH)
|
| tplus1_test = load_tplus1_test_predictions()
|
| mfe_summary = load_mfe_summary()
|
| mfe_latest = latest_mfe_prediction()
|
| daily = pd.read_parquet(NIFTY_1D_PATH)
|
| daily["date"] = pd.to_datetime(daily["date"], errors="coerce")
|
| daily = daily.sort_values("date").tail(180)
|
|
|
| if not t5_test.empty:
|
| recent_accuracy = float(t5_test.tail(40)["correct"].mean())
|
| else:
|
| recent_accuracy = None
|
|
|
| if not tomorrow_test.empty:
|
| tomorrow_recent = tomorrow_test.tail(40).copy()
|
| if "pred" in tomorrow_recent.columns and "prediction" not in tomorrow_recent.columns:
|
| tomorrow_recent["prediction"] = np.where(pd.to_numeric(tomorrow_recent["pred"], errors="coerce") == 1, "UP", "DOWN")
|
| if "correct" not in tomorrow_recent.columns and {"target", "pred"}.issubset(tomorrow_recent.columns):
|
| tomorrow_recent["correct"] = pd.to_numeric(tomorrow_recent["target"], errors="coerce") == pd.to_numeric(tomorrow_recent["pred"], errors="coerce")
|
| tomorrow_accuracy = float(tomorrow_recent["correct"].mean()) if "correct" in tomorrow_recent.columns else tomorrow_summary.get("test_accuracy")
|
| else:
|
| tomorrow_accuracy = tomorrow_summary.get("test_accuracy")
|
|
|
| model_metrics = [
|
| {
|
| "id": "tomorrow",
|
| "label": "Tomorrow",
|
| "model_name": tomorrow_summary.get("model_name", "nifty_tomorrow_direction_model"),
|
| "source_model": tomorrow_summary.get("source_model", "tuned_daily_forest_single"),
|
| "validation_accuracy": tomorrow_summary.get("validation_accuracy"),
|
| "test_accuracy": tomorrow_summary.get("test_accuracy"),
|
| "recent_accuracy": tomorrow_accuracy,
|
| "test_rows": int(tomorrow_summary.get("n_test") or len(tomorrow_test) or 0),
|
| },
|
| {
|
| "id": "tplus1",
|
| "label": "T+1",
|
| "model_name": tplus1_summary.get("model_name", "nifty_1420_tplus1_logistic_model"),
|
| "source_model": "14:00-14:20 logistic forecaster",
|
| "validation_accuracy": tplus1_summary.get("validation_accuracy"),
|
| "test_accuracy": tplus1_summary.get("test_accuracy"),
|
| "recent_accuracy": float(tplus1_test.tail(40)["correct"].mean()) if not tplus1_test.empty and "correct" in tplus1_test.columns else tplus1_summary.get("test_accuracy"),
|
| "test_rows": int(tplus1_summary.get("test_rows") or len(tplus1_test) or 0),
|
| },
|
| {
|
| "id": "t5",
|
| "label": "T+5",
|
| "model_name": summary.get("model_name", "nifty_opening_direction_model"),
|
| "source_model": summary.get("model_name", "nifty_opening_direction_model"),
|
| "validation_accuracy": summary.get("validation_accuracy"),
|
| "test_accuracy": summary.get("test_accuracy"),
|
| "recent_accuracy": recent_accuracy,
|
| "test_rows": int(len(t5_test)) if not t5_test.empty else int(summary.get("test_rows") or 0),
|
| },
|
| ]
|
| metrics = {
|
| "validation_accuracy": tomorrow_summary.get("validation_accuracy"),
|
| "test_accuracy": tomorrow_summary.get("test_accuracy"),
|
| "baseline_test_accuracy": tomorrow_summary.get("baseline_accuracy"),
|
| "validation_auc": summary.get("validation_auc"),
|
| "test_auc": summary.get("test_auc"),
|
| "test_brier": summary.get("test_brier"),
|
| "feature_count": tomorrow_summary.get("feature_count"),
|
| "recent_accuracy": tomorrow_accuracy,
|
| "recent_accuracy_days": int(len(tomorrow_test.tail(40))) if not tomorrow_test.empty else 0,
|
| "total_test_days": int(tomorrow_summary.get("n_test") or len(tomorrow_test) or 0),
|
| "models": model_metrics,
|
| }
|
| return {
|
| "timestamp": datetime.now(ZoneInfo("Asia/Kolkata")).isoformat(),
|
| "predictions": {
|
| "t5": {
|
| "latest": t5_latest,
|
| "summary": summary,
|
| },
|
| "tomorrow": {
|
| "latest": tomorrow_latest,
|
| "summary": tomorrow_summary,
|
| },
|
| "tplus1": {
|
| "latest": tplus1_latest,
|
| "summary": tplus1_summary,
|
| },
|
| "mfe": {
|
| "latest": mfe_latest,
|
| "summary": mfe_summary,
|
| },
|
| },
|
| "metrics": metrics,
|
| "models": model_metrics,
|
| "live_accuracy": load_live_accuracy(),
|
| "charts": {
|
| "daily_closes": _json_ready_frame(daily[["date", "close"]]),
|
| "t5_backtest": _json_ready_frame(t5_test, limit=400),
|
| "t5_recent_predictions": _json_ready_frame(t5_test.tail(40)),
|
| "tomorrow_backtest": _json_ready_frame(tomorrow_test, limit=400),
|
| "tomorrow_live_track_record": build_prediction_track_record(),
|
| "tomorrow_history_predictions": _json_ready_frame(tomorrow_history.tail(80)),
|
| "tplus1_backtest": _json_ready_frame(tplus1_test, limit=400),
|
| },
|
| }
|
|
|
|
|
| def refresh_first5_prediction(session_date: date | None = None, minutes: pd.DataFrame | None = None) -> Prediction:
|
| if session_date is None:
|
| today = datetime.now(IST).date()
|
| if not is_trading_day(today):
|
| raise RuntimeError(f"{today.isoformat()} is not an NSE trading session.")
|
| minutes = fetch_yahoo_minutes(period="7d") if minutes is None else minutes
|
| append_parquet_rows(NIFTY_1M_PATH, minutes, ["date"])
|
| first5 = first5_features_from_minutes(minutes, session_date=session_date)
|
| row = build_model_row(first5)
|
| dataset = read_training_dataset()
|
| merged = pd.concat([dataset, row], ignore_index=True)
|
| merged = merged.drop_duplicates(subset=["date"], keep="last").sort_values("date").reset_index(drop=True)
|
| merged.to_parquet(OPENING_DATASET_PATH, index=False, compression="zstd")
|
| prediction = predict_row(row)
|
| try:
|
| refresh_mfe_prediction(session_date=session_date)
|
| except Exception as exc:
|
| print(f"MFE refresh failed: {exc}", flush=True)
|
| return prediction
|
|
|
|
|
| def refresh_daily_data() -> dict[str, Any]:
|
| daily = fetch_yahoo_daily(period="1mo")
|
| combined = append_parquet_rows(NIFTY_1D_PATH, daily, ["date"])
|
| return {
|
| "rows": int(len(combined)),
|
| "latest_date": pd.to_datetime(combined["date"]).max().date().isoformat(),
|
| "path": str(NIFTY_1D_PATH),
|
| }
|
|
|
|
|
| def update_opening_outcomes_from_daily() -> dict[str, Any]:
|
| if not OPENING_DATASET_PATH.exists() or not NIFTY_1D_PATH.exists():
|
| return {"updated_rows": 0, "latest_date": None}
|
| dataset = pd.read_parquet(OPENING_DATASET_PATH)
|
| daily = pd.read_parquet(NIFTY_1D_PATH)
|
| if dataset.empty or daily.empty:
|
| return {"updated_rows": 0, "latest_date": None}
|
|
|
| dataset = dataset.copy()
|
| dataset["_session_date"] = pd.to_datetime(dataset["date"], errors="coerce").dt.normalize()
|
| daily = daily.copy()
|
| daily["_session_date"] = pd.to_datetime(daily["date"], errors="coerce").dt.normalize()
|
| daily = daily.dropna(subset=["_session_date"]).drop_duplicates("_session_date", keep="last")
|
| daily = daily.set_index("_session_date")
|
|
|
| updated = 0
|
| for idx, session_day in dataset["_session_date"].dropna().items():
|
| if session_day not in daily.index:
|
| continue
|
| row = daily.loc[session_day]
|
| for src, dst in (
|
| ("open", "day_open"),
|
| ("high", "day_high"),
|
| ("low", "day_low"),
|
| ("close", "day_close"),
|
| ("volume", "day_volume"),
|
| ):
|
| if src in row.index and dst in dataset.columns:
|
| dataset.at[idx, dst] = row[src]
|
| if {"day_open", "day_close", "target", "day_return"}.issubset(dataset.columns):
|
| day_open = dataset.at[idx, "day_open"]
|
| day_close = dataset.at[idx, "day_close"]
|
| if pd.notna(day_open) and pd.notna(day_close) and float(day_open) != 0.0:
|
| dataset.at[idx, "target"] = int(float(day_close) > float(day_open))
|
| dataset.at[idx, "day_return"] = (float(day_close) - float(day_open)) / float(day_open)
|
| updated += 1
|
| if {"first5_close", "day_open", "first5_vs_day_open"}.issubset(dataset.columns):
|
| first5_close = dataset.at[idx, "first5_close"]
|
| day_open = dataset.at[idx, "day_open"]
|
| if pd.notna(first5_close) and pd.notna(day_open) and float(day_open) != 0.0:
|
| dataset.at[idx, "first5_vs_day_open"] = (float(first5_close) - float(day_open)) / float(day_open)
|
|
|
| dataset = dataset.drop(columns=["_session_date"])
|
| dataset = dataset.sort_values("date").reset_index(drop=True)
|
| dataset.to_parquet(OPENING_DATASET_PATH, index=False, compression="zstd")
|
| latest = pd.to_datetime(dataset["date"], errors="coerce").max()
|
| return {
|
| "updated_rows": int(updated),
|
| "latest_date": None if pd.isna(latest) else latest.date().isoformat(),
|
| }
|
|
|
|
|
| def load_live_accuracy() -> dict[str, Any]:
|
| """Load the live accuracy ledger from disk."""
|
| default = {
|
| "tomorrow": {"entries": [], "accuracy": None, "total": 0, "correct_count": 0, "backtest_count": 0, "live_count": 0},
|
| "t5": {"entries": [], "accuracy": None, "total": 0, "correct_count": 0, "backtest_count": 0, "live_count": 0},
|
| "tplus1": {"entries": [], "accuracy": None, "total": 0, "correct_count": 0, "backtest_count": 0, "live_count": 0},
|
| }
|
|
|
| if LIVE_ACCURACY_PATH.exists():
|
| try:
|
| raw = json.loads(LIVE_ACCURACY_PATH.read_text(encoding="utf-8"))
|
| except Exception:
|
| raw = None
|
| if isinstance(raw, dict):
|
| try:
|
| for model_id in default:
|
| current = raw.get(model_id, {})
|
| if not isinstance(current, dict):
|
| current = {}
|
| entries = current.get("entries", [])
|
| if not isinstance(entries, list):
|
| entries = []
|
| backtest_entries = [entry for entry in entries if str(entry.get("source", "backtest")).lower() == "backtest"]
|
| live_entries = [entry for entry in entries if str(entry.get("source", "backtest")).lower() != "backtest"]
|
| total = len(entries)
|
| correct = sum(1 for e in entries if e.get("correct"))
|
| current["entries"] = entries
|
| current["backtest_count"] = int(len(backtest_entries))
|
| current["live_count"] = int(len(live_entries))
|
| current["total"] = int(total)
|
| current["correct_count"] = int(correct)
|
| current["accuracy"] = (current["correct_count"] / current["total"]) if current["total"] > 0 else None
|
| default[model_id].update(current)
|
| return default
|
| except Exception:
|
| pass
|
| return default
|
|
|
|
|
| def save_live_accuracy(data: dict[str, Any]) -> None:
|
| """Persist the live accuracy ledger to disk."""
|
| LIVE_ACCURACY_PATH.write_text(json.dumps(data, indent=2), encoding="utf-8")
|
|
|
|
|
| def _load_prediction_history(path: Path) -> pd.DataFrame:
|
| if not path.exists():
|
| return pd.DataFrame()
|
| frame = pd.read_parquet(path)
|
| for col in ("date", "input_date", "target_date", "forecast_date"):
|
| if col in frame.columns:
|
| frame[col] = pd.to_datetime(frame[col], errors="coerce")
|
| sort_cols = [col for col in ("target_date", "input_date", "date", "forecast_date") if col in frame.columns]
|
| if sort_cols:
|
| return frame.sort_values(sort_cols).reset_index(drop=True)
|
| return frame.reset_index(drop=True)
|
|
|
|
|
| def _record_prediction_history(path: Path, row: dict[str, Any], subset: list[str]) -> None:
|
| append_prediction_history(path, row, subset)
|
|
|
|
|
| def _rescore_tomorrow_live_ledger(ledger: dict[str, Any]) -> bool:
|
| """Fix live Tomorrow ledger entries to use close vs previous close. Returns True if modified."""
|
| daily = pd.read_parquet(NIFTY_1D_PATH)
|
| daily["_date"] = pd.to_datetime(daily["date"], errors="coerce").dt.normalize()
|
| closes_by_date: dict[date, float] = {}
|
| for _, row in daily.iterrows():
|
| if pd.isna(row["_date"]):
|
| continue
|
| close = row.get("close")
|
| if pd.notna(close) and np.isfinite(float(close)):
|
| closes_by_date[row["_date"].date()] = float(close)
|
|
|
| changed = False
|
| for entry in ledger.get("tomorrow", {}).get("entries", []):
|
| if str(entry.get("source", "backtest")).lower() == "backtest":
|
| continue
|
| try:
|
| day = date.fromisoformat(str(entry.get("date", ""))[:10])
|
| except Exception:
|
| continue
|
| day_close = closes_by_date.get(day)
|
| if day_close is None:
|
| continue
|
| _, actual_direction = _tomorrow_actual_outcome(day, day_close, closes_by_date)
|
| if actual_direction is None:
|
| continue
|
| pred = str(entry.get("prediction", "")).upper()
|
| if pred not in {"UP", "DOWN"}:
|
| continue
|
| new_correct = pred == actual_direction
|
| if entry.get("actual") != actual_direction or entry.get("correct") != new_correct:
|
| entry["actual"] = actual_direction
|
| entry["correct"] = new_correct
|
| changed = True
|
| return changed
|
|
|
|
|
| def ensure_completed_sessions_scored(
|
| now: datetime | None = None,
|
| ledger: dict[str, Any] | None = None,
|
| ) -> dict[str, Any]:
|
| """Score any completed Tomorrow sessions that have daily close data but no ledger entry."""
|
| now = now or datetime.now(IST)
|
| completed = expected_completed_daily_date(now)
|
| if not is_trading_day(completed):
|
| return ledger or load_live_accuracy()
|
|
|
| ledger = ledger or load_live_accuracy()
|
| if _rescore_tomorrow_live_ledger(ledger):
|
| for model_id in ("tomorrow",):
|
| entries = ledger[model_id]["entries"]
|
| total = len(entries)
|
| correct = sum(1 for e in entries if e.get("correct"))
|
| backtest_count = sum(1 for e in entries if str(e.get("source", "backtest")).lower() == "backtest")
|
| ledger[model_id]["total"] = total
|
| ledger[model_id]["correct_count"] = correct
|
| ledger[model_id]["backtest_count"] = backtest_count
|
| ledger[model_id]["live_count"] = total - backtest_count
|
| ledger[model_id]["accuracy"] = correct / total if total > 0 else None
|
| save_live_accuracy(ledger)
|
| logged_tom = {e["date"] for e in ledger.get("tomorrow", {}).get("entries", [])}
|
| if completed.isoformat() in logged_tom:
|
| return ledger
|
|
|
| daily = pd.read_parquet(NIFTY_1D_PATH)
|
| daily["_date"] = pd.to_datetime(daily["date"], errors="coerce").dt.normalize()
|
| day_rows = daily[daily["_date"].dt.date == completed]
|
| if day_rows.empty:
|
| return ledger
|
|
|
| day_open = float(day_rows.iloc[-1]["open"])
|
| day_close = float(day_rows.iloc[-1]["close"])
|
| if not (np.isfinite(day_open) and np.isfinite(day_close) and day_open != 0):
|
| return ledger
|
|
|
| return update_live_accuracy(completed)
|
|
|
|
|
| def update_live_accuracy(session_date: date) -> dict[str, Any]:
|
| """Score today's predictions against actual outcomes and update the ledger.
|
|
|
| Must be called AFTER refresh_daily_data() (so today's close is available)
|
| but BEFORE refresh_first5_prediction / refresh_tplus1_prediction /
|
| refresh_tomorrow_prediction (so the CSV files still hold the predictions
|
| we want to score).
|
| """
|
| ledger = load_live_accuracy()
|
| daily = pd.read_parquet(NIFTY_1D_PATH)
|
| daily["_date"] = pd.to_datetime(daily["date"], errors="coerce").dt.normalize()
|
| today_rows = daily[daily["_date"].dt.date == session_date]
|
| if today_rows.empty:
|
| return ledger
|
|
|
| day_open = float(today_rows.iloc[-1]["open"])
|
| day_close = float(today_rows.iloc[-1]["close"])
|
| if not (np.isfinite(day_open) and np.isfinite(day_close) and day_open != 0):
|
| return ledger
|
| actual_close_gt_open = "UP" if day_close > day_open else "DOWN"
|
| session_iso = session_date.isoformat()
|
|
|
|
|
| logged_t5 = {e["date"] for e in ledger["t5"]["entries"]}
|
| if session_iso not in logged_t5:
|
| t5_history = _load_prediction_history(T5_PREDICTION_HISTORY_PATH)
|
| if not t5_history.empty and "target_date" in t5_history.columns:
|
| t5_rows = t5_history[t5_history["target_date"].dt.date == session_date]
|
| else:
|
| t5_rows = pd.DataFrame()
|
| if t5_rows.empty and LATEST_PATH.exists():
|
| try:
|
| t5_row = pd.read_csv(LATEST_PATH).iloc[-1].to_dict()
|
| if str(t5_row.get("input_date", ""))[:10] == session_iso:
|
| t5_rows = pd.DataFrame([t5_row])
|
| except Exception:
|
| t5_rows = pd.DataFrame()
|
| if not t5_rows.empty:
|
| try:
|
| t5_row = t5_rows.iloc[-1].to_dict()
|
| pred = str(t5_row.get("prediction", "")).upper()
|
| if pred in ("UP", "DOWN"):
|
| ledger["t5"]["entries"].append({
|
| "date": session_iso,
|
| "prediction": pred,
|
| "actual": actual_close_gt_open,
|
| "correct": pred == actual_close_gt_open,
|
| "source": "live",
|
| })
|
| except Exception:
|
| pass
|
|
|
|
|
| logged_tom = {e["date"] for e in ledger["tomorrow"]["entries"]}
|
| if session_iso not in logged_tom:
|
| prev_day = previous_trading_day(session_date - timedelta(days=1))
|
| prev_rows = daily[daily["_date"].dt.date == prev_day]
|
| actual_tomorrow = None
|
| if not prev_rows.empty:
|
| prev_close = float(prev_rows.iloc[-1]["close"])
|
| if np.isfinite(prev_close) and prev_close != 0:
|
| actual_tomorrow = "UP" if day_close > prev_close else "DOWN"
|
| tom_row = _find_tomorrow_prediction_for_target(session_date)
|
| if tom_row and actual_tomorrow is not None:
|
| try:
|
| pred = str(tom_row.get("prediction", "")).upper()
|
| if pred in ("UP", "DOWN"):
|
| ledger["tomorrow"]["entries"].append({
|
| "date": session_iso,
|
| "prediction": pred,
|
| "actual": actual_tomorrow,
|
| "correct": pred == actual_tomorrow,
|
| "source": "live",
|
| })
|
| except Exception:
|
| pass
|
|
|
|
|
|
|
| logged_t1 = {e["date"] for e in ledger["tplus1"]["entries"]}
|
| if session_iso not in logged_t1:
|
| t1_history = _load_prediction_history(TPLUS1_PREDICTION_HISTORY_PATH)
|
| if not t1_history.empty and "target_date" in t1_history.columns:
|
| t1_rows = t1_history[t1_history["target_date"].dt.date == session_date]
|
| else:
|
| t1_rows = pd.DataFrame()
|
| if not t1_rows.empty:
|
| try:
|
| t1_row = t1_rows.iloc[-1].to_dict()
|
| pred = str(t1_row.get("prediction", "")).upper()
|
| input_date_str = str(t1_row.get("input_date", ""))[:10]
|
| input_day = date.fromisoformat(input_date_str)
|
|
|
| minute = pd.read_parquet(NIFTY_1M_PATH, columns=["date", "close"])
|
| minute["dt"] = pd.to_datetime(minute["date"], errors="coerce")
|
| minute = minute.dropna(subset=["dt"])
|
| minute["session_date"] = minute["dt"].dt.normalize()
|
| minute["time_str"] = minute["dt"].dt.strftime("%H:%M")
|
| window = minute[
|
| (minute["session_date"].dt.date == input_day)
|
| & (minute["time_str"] >= "14:00")
|
| & (minute["time_str"] <= "14:20")
|
| ].sort_values("dt")
|
| if not window.empty and pred in ("UP", "DOWN"):
|
| w_close = float(window.iloc[-1]["close"])
|
| t1_actual = "UP" if day_close > w_close else "DOWN"
|
| ledger["tplus1"]["entries"].append({
|
| "date": session_iso,
|
| "prediction": pred,
|
| "actual": t1_actual,
|
| "correct": pred == t1_actual,
|
| "source": "live",
|
| })
|
| except Exception:
|
| pass
|
|
|
| _rescore_tomorrow_live_ledger(ledger)
|
|
|
|
|
| for model_id in ("t5", "tomorrow", "tplus1"):
|
| entries = ledger[model_id]["entries"]
|
| total = len(entries)
|
| correct = sum(1 for e in entries if e.get("correct"))
|
| backtest_count = sum(1 for e in entries if str(e.get("source", "backtest")).lower() == "backtest")
|
| ledger[model_id]["total"] = total
|
| ledger[model_id]["correct_count"] = correct
|
| ledger[model_id]["backtest_count"] = backtest_count
|
| ledger[model_id]["live_count"] = total - backtest_count
|
| ledger[model_id]["accuracy"] = correct / total if total > 0 else None
|
|
|
| save_live_accuracy(ledger)
|
| return ledger
|
|
|
|
|
| def refresh_market_close_data(session_date: date | None = None) -> dict[str, Any]:
|
| now = datetime.now(IST)
|
| session_date = session_date or now.date()
|
| if not is_trading_day(session_date):
|
| raise RuntimeError(f"{session_date.isoformat()} is not an NSE trading session.")
|
| try:
|
| minutes = fetch_yahoo_minutes(period="7d")
|
| minute_frame = append_parquet_rows(NIFTY_1M_PATH, minutes, ["date"])
|
| daily_info = refresh_daily_data()
|
|
|
| try:
|
| update_live_accuracy(session_date)
|
| except Exception as exc:
|
| print(f"[close-refresh] live accuracy update failed: {exc}", flush=True)
|
| t5_prediction = refresh_first5_prediction(session_date=session_date, minutes=minutes)
|
| tplus1_prediction = refresh_tplus1_prediction(session_date=session_date)
|
| outcomes = update_opening_outcomes_from_daily()
|
| tomorrow_prediction = refresh_tomorrow_prediction(session_date=session_date)
|
| return {
|
| "session_date": session_date.isoformat(),
|
| "nifty_1m_rows": int(len(minute_frame)),
|
| "latest_minute": pd.to_datetime(minute_frame["date"], errors="coerce").max().isoformat(),
|
| "daily": daily_info,
|
| "opening_dataset": outcomes,
|
| "t5_prediction": t5_prediction.to_dict(),
|
| "tplus1_prediction": tplus1_prediction,
|
| "tomorrow_prediction": tomorrow_prediction,
|
| }
|
| except Exception:
|
| raise
|
|
|
|
|
| def close_refresh_due(now: datetime | None = None) -> bool:
|
| now = now or datetime.now(IST)
|
| if not is_trading_day(now.date()) or now.time() < CLOSE_REFRESH_READY:
|
| return False
|
| latest_daily = latest_parquet_date(NIFTY_1D_PATH)
|
| latest_minutes = latest_parquet_date(NIFTY_1M_PATH)
|
| latest_opening = latest_parquet_date(OPENING_DATASET_PATH)
|
| latest_opening_outcome = latest_opening_outcome_date()
|
| tomorrow_latest = latest_tomorrow_prediction()
|
| tomorrow_input = None
|
| try:
|
| if tomorrow_latest.get("input_date"):
|
| tomorrow_input = date.fromisoformat(str(tomorrow_latest.get("input_date"))[:10])
|
| except Exception:
|
| tomorrow_input = None
|
| return any(
|
| latest != now.date()
|
| for latest in (latest_daily, latest_minutes, latest_opening, latest_opening_outcome, tomorrow_input)
|
| )
|
|
|
|
|
| def latest_prediction_input_date(path: Path) -> date | None:
|
| if not path.exists():
|
| return None
|
| try:
|
| frame = pd.read_csv(path, usecols=["input_date"])
|
| except Exception:
|
| return None
|
| if frame.empty:
|
| return None
|
| value = pd.to_datetime(frame["input_date"], errors="coerce").max()
|
| return None if pd.isna(value) else value.date()
|
|
|
|
|
| def latest_tomorrow_input_date() -> date | None:
|
| try:
|
| latest = latest_tomorrow_prediction()
|
| raw = latest.get("input_date")
|
| return date.fromisoformat(str(raw)[:10]) if raw else None
|
| except Exception:
|
| return None
|
|
|
|
|
| def expected_completed_daily_date(now: datetime | None = None) -> date:
|
| now = now or datetime.now(IST)
|
| if is_trading_day(now.date()) and now.time() < CLOSE_REFRESH_READY:
|
| return previous_trading_day(now.date() - timedelta(days=1))
|
| return previous_trading_day(now.date())
|
|
|
|
|
| def expected_minute_date(now: datetime | None = None) -> date:
|
| now = now or datetime.now(IST)
|
| if is_trading_day(now.date()) and now.time() >= FIRST5_READY:
|
| return now.date()
|
| return previous_trading_day(now.date() - timedelta(days=1))
|
|
|
|
|
| def expected_tplus1_date(now: datetime | None = None) -> date:
|
| now = now or datetime.now(IST)
|
| if is_trading_day(now.date()) and now.time() >= TPLUS1_READY:
|
| return now.date()
|
| return previous_trading_day(now.date() - timedelta(days=1))
|
|
|
|
|
| def is_stale(latest: date | None, expected: date) -> bool:
|
| return latest is None or latest < expected
|
|
|
|
|
| def stale_data_status(now: datetime | None = None) -> dict[str, Any]:
|
| now = now or datetime.now(IST)
|
| expected_daily = expected_completed_daily_date(now)
|
| expected_minutes = expected_minute_date(now)
|
| expected_tplus1 = expected_tplus1_date(now)
|
| latest_1d = latest_parquet_date(NIFTY_1D_PATH)
|
| latest_1m = latest_parquet_date(NIFTY_1M_PATH)
|
| latest_t5 = latest_prediction_input_date(LATEST_PATH)
|
| latest_tomorrow = latest_tomorrow_input_date()
|
| latest_tplus1 = latest_prediction_input_date(TPLUS1_LATEST_PATH)
|
| return {
|
| "daily_stale": expected_daily > (latest_1d or date.min),
|
| "minutes_stale": expected_minutes > (latest_1m or date.min),
|
| "t5_stale": is_stale(latest_t5, expected_minutes),
|
| "tomorrow_stale": is_stale(latest_tomorrow, expected_daily),
|
| "tplus1_stale": is_stale(latest_tplus1, expected_tplus1),
|
| }
|
|
|
|
|
| def refresh_stale_data_once(now: datetime | None = None) -> dict[str, Any]:
|
| now = now or datetime.now(IST)
|
| status = stale_data_status(now)
|
| if not any(status.values()):
|
| return {"status": "fresh", "actions": []}
|
| if not _stale_refresh_lock.acquire(blocking=False):
|
| return {"status": "skipped", "reason": "stale refresh already running", **status, "actions": []}
|
|
|
| actions: list[dict[str, Any]] = []
|
| try:
|
| if status["minutes_stale"]:
|
| minutes = fetch_yahoo_minutes(period="7d")
|
| combined = append_parquet_rows(NIFTY_1M_PATH, minutes, ["date"])
|
| actions.append(
|
| {
|
| "name": "minutes",
|
| "rows": int(len(combined)),
|
| "latest_date": pd.to_datetime(combined["date"], errors="coerce").max().date().isoformat(),
|
| }
|
| )
|
|
|
| if status["daily_stale"]:
|
| daily_info = refresh_daily_data()
|
| outcomes = update_opening_outcomes_from_daily()
|
| actions.append({"name": "daily", **daily_info})
|
| actions.append({"name": "opening_outcomes", **outcomes})
|
| try:
|
| completed = date.fromisoformat(status["expected_daily_date"])
|
| scored = update_live_accuracy(completed)
|
| actions.append(
|
| {
|
| "name": "tomorrow_live_accuracy",
|
| "session_date": completed.isoformat(),
|
| "live_count": scored.get("tomorrow", {}).get("live_count"),
|
| }
|
| )
|
| except Exception as exc:
|
| actions.append({"name": "tomorrow_live_accuracy", "error": str(exc)})
|
|
|
| if status["daily_stale"] or status["tomorrow_stale"]:
|
| try:
|
| tomorrow = refresh_tomorrow_prediction(session_date=date.fromisoformat(status["expected_daily_date"]))
|
| actions.append({"name": "tomorrow_prediction", "input_date": tomorrow.get("input_date")})
|
| except Exception as exc:
|
| actions.append({"name": "tomorrow_prediction", "error": str(exc)})
|
|
|
| if status["t5_stale"] and is_trading_day(now.date()) and now.time() >= FIRST5_READY:
|
| prediction = refresh_first5_prediction(session_date=now.date())
|
| actions.append({"name": "t5_prediction", "input_date": prediction.input_date})
|
|
|
| if status["tplus1_stale"] and is_trading_day(now.date()) and now.time() >= TPLUS1_READY:
|
| prediction = refresh_tplus1_prediction(session_date=now.date())
|
| actions.append({"name": "tplus1_prediction", "input_date": prediction.get("input_date")})
|
|
|
| clear_dashboard_payload_cache()
|
| refreshed_status = stale_data_status(datetime.now(IST))
|
| return {"status": "refreshed", **refreshed_status, "actions": actions}
|
| finally:
|
| _stale_refresh_lock.release()
|
|
|
|
|
| def next_ist_run_at(run_time: time = time(9, 20), now: datetime | None = None) -> datetime:
|
| now = now or datetime.now(IST)
|
| target_day = now.date()
|
| if now >= datetime.combine(target_day, run_time, tzinfo=IST):
|
| target_day += timedelta(days=1)
|
| target_day = next_trading_day(target_day)
|
| return datetime.combine(target_day, run_time, tzinfo=IST)
|
|
|
|
|
| def seconds_until_next_ist_run(run_time: time = time(9, 20)) -> float:
|
| now = datetime.now(IST)
|
| target = next_ist_run_at(run_time, now=now)
|
| return max(1.0, (target - now).total_seconds())
|
|
|