Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import json | |
| import copy | |
| import os | |
| import sys | |
| import threading | |
| from dataclasses import dataclass | |
| from datetime import date, datetime, time, timedelta | |
| from functools import lru_cache | |
| from pathlib import Path | |
| from typing import Any | |
| from zoneinfo import ZoneInfo | |
| import joblib | |
| import numpy as np | |
| import pandas as pd | |
| from nifty_backend.yahoo_history_client import YahooHistoryClient | |
| try: | |
| import pandas_market_calendars as mcal | |
| except ImportError: # pragma: no cover - production dependency, local fallback below. | |
| mcal = None | |
| IST = ZoneInfo("Asia/Kolkata") | |
| YAHOO_NIFTY_SYMBOL = "^NSEI" | |
| MARKET_CLOSE = time(15, 30) | |
| FIRST5_READY = time(9, 20) | |
| CLOSE_REFRESH_READY = time(15, 45) | |
| TPLUS1_READY = time(14, 30) | |
| STALE_CHECK_INTERVAL_SECONDS = 5 | |
| BACKEND_ROOT = Path(__file__).resolve().parents[1] | |
| DATA_DIR = BACKEND_ROOT / "data" | |
| MODEL_DIR = BACKEND_ROOT / "models" | |
| YAHOO_CACHE_PATH = MODEL_DIR / "yahoo_history_cache.sqlite3" | |
| OPENING_DATASET_PATH = DATA_DIR / "opening_direction_training_dataset.parquet" | |
| NIFTY_1M_PATH = DATA_DIR / "nifty50_1m.parquet" | |
| NIFTY_1D_PATH = DATA_DIR / "nifty50_1d.parquet" | |
| MODEL_PATH = MODEL_DIR / "nifty_opening_direction_model.joblib" | |
| LATEST_PATH = MODEL_DIR / "latest_prediction.csv" | |
| TEST_PREDICTIONS_PATH = DATA_DIR / "test_predictions.parquet" | |
| TOMORROW_MODEL_PATH = MODEL_DIR / "nifty_tomorrow_direction_model.joblib" | |
| TOMORROW_LATEST_PATH = MODEL_DIR / "tomorrow_latest_prediction.csv" | |
| TOMORROW_SUMMARY_PATH = MODEL_DIR / "tomorrow_summary.json" | |
| TOMORROW_TEST_PREDICTIONS_PATH = DATA_DIR / "tomorrow_test_predictions.parquet" | |
| TOMORROW_PREDICTION_HISTORY_PATH = MODEL_DIR / "tomorrow_prediction_history.parquet" | |
| FORECASTING_PROJECT_ROOT = Path( | |
| os.environ.get( | |
| "FORECASTING_PROJECT_ROOT", | |
| str(BACKEND_ROOT.parent.parent / "forecasting project"), | |
| ) | |
| ) | |
| DAILY_FORECASTER_OUTPUT_DIR = MODEL_DIR / "nifty_forecaster" / "outputs" | |
| DAILY_FORECASTER_SUMMARY_PATH = DAILY_FORECASTER_OUTPUT_DIR / "forecaster_summary.json" | |
| DAILY_FORECASTER_LATEST_PATH = DAILY_FORECASTER_OUTPUT_DIR / "forecaster_latest.csv" | |
| DAILY_FORECASTER_PREDICTIONS_PATH = DAILY_FORECASTER_OUTPUT_DIR / "forecaster_test_predictions.csv" | |
| MFE_SOURCE_OUTPUT_DIR = FORECASTING_PROJECT_ROOT / "Code" / "models" / "nifty_opening_mfe_regressor" / "outputs" | |
| MFE_OUTPUT_DIR = MODEL_DIR / "nifty_opening_mfe_regressor" / "outputs" | |
| MFE_SUMMARY_PATH = MFE_OUTPUT_DIR / "summary.json" | |
| MFE_LATEST_PATH = MFE_OUTPUT_DIR / "latest_prediction.csv" | |
| MFE_TEST_PREDICTIONS_PATH = MFE_OUTPUT_DIR / "test_predictions.csv" | |
| MFE_MODEL_PATH = MFE_OUTPUT_DIR / "nifty_opening_mfe_regressor.joblib" | |
| MFE_LIVE_HISTORY_PATH = MFE_OUTPUT_DIR / "mfe_live_history.csv" | |
| TPLUS1_MODEL_PATH = MODEL_DIR / "nifty_1420_tplus1_logistic_model.joblib" | |
| TPLUS1_LATEST_PATH = MODEL_DIR / "tplus1_latest_prediction.csv" | |
| TPLUS1_SUMMARY_PATH = MODEL_DIR / "tplus1_summary.json" | |
| TPLUS1_TEST_PREDICTIONS_PATH = DATA_DIR / "tplus1_test_predictions.parquet" | |
| TPLUS1_PREDICTION_HISTORY_PATH = MODEL_DIR / "tplus1_prediction_history.parquet" | |
| T5_PREDICTION_HISTORY_PATH = MODEL_DIR / "t5_prediction_history.parquet" | |
| REFRESH_STATE_PATH = MODEL_DIR / "refresh_state.json" | |
| REFRESH_WAITING = "waiting_second_payload" | |
| REFRESH_REFRESHING = "refreshing" | |
| REFRESH_READY = "ready" | |
| REFRESH_FAILED = "failed" | |
| REFRESH_NORMAL = "normal" | |
| LIVE_ACCURACY_PATH = MODEL_DIR / "live_accuracy.json" | |
| DECISION_OVERLAYS = [ | |
| { | |
| "name": "fifth_minute_momentum_flip", | |
| "feature": "m5_ret_1m", | |
| "op": ">=", | |
| "value": 0.0005085411885759201, | |
| }, | |
| { | |
| "name": "vix_stretch_flip", | |
| "feature": "india_vix_close_vs_sma_20", | |
| "op": ">=", | |
| "value": 0.24641908937959742, | |
| }, | |
| ] | |
| _dashboard_payload_lock = threading.Lock() | |
| _stale_refresh_lock = threading.Lock() | |
| def utc_now_iso() -> str: | |
| return datetime.utcnow().replace(microsecond=0).isoformat() + "Z" | |
| def clear_dashboard_payload_cache() -> None: | |
| _dashboard_payload_cached.cache_clear() | |
| def save_refresh_state(phase: str, *, session_date: date | None = None, error: str | None = None) -> dict[str, Any]: | |
| previous = load_refresh_state() | |
| state = { | |
| "phase": phase, | |
| "started_at": previous.get("started_at"), | |
| "finished_at": previous.get("finished_at"), | |
| "session_date": session_date.isoformat() if session_date else previous.get("session_date"), | |
| "error": error, | |
| } | |
| if phase in {REFRESH_WAITING, REFRESH_REFRESHING} and not state["started_at"]: | |
| state["started_at"] = utc_now_iso() | |
| if phase in {REFRESH_READY, REFRESH_FAILED, REFRESH_NORMAL}: | |
| state["finished_at"] = utc_now_iso() | |
| REFRESH_STATE_PATH.write_text(json.dumps(state, indent=2), encoding="utf-8") | |
| return state | |
| def load_refresh_state() -> dict[str, Any]: | |
| if not REFRESH_STATE_PATH.exists(): | |
| return { | |
| "phase": REFRESH_NORMAL, | |
| "started_at": None, | |
| "finished_at": None, | |
| "session_date": None, | |
| "error": None, | |
| } | |
| try: | |
| return json.loads(REFRESH_STATE_PATH.read_text(encoding="utf-8")) | |
| except Exception: | |
| return { | |
| "phase": REFRESH_FAILED, | |
| "started_at": None, | |
| "finished_at": None, | |
| "session_date": None, | |
| "error": "refresh_state.json could not be read", | |
| } | |
| def _nse_calendar(): | |
| if mcal is None: | |
| return None | |
| for name in ("XNSE", "NSE", "BSE"): | |
| try: | |
| return mcal.get_calendar(name) | |
| except Exception: | |
| continue | |
| return None | |
| def trading_schedule(start: date, end: date) -> pd.DataFrame: | |
| calendar = _nse_calendar() | |
| if calendar is None: | |
| days = pd.date_range(start=start, end=end, freq="B") | |
| return pd.DataFrame(index=days) | |
| return calendar.schedule(start_date=start, end_date=end) | |
| def is_trading_day(day: date) -> bool: | |
| schedule = trading_schedule(day, day) | |
| return not schedule.empty | |
| def next_trading_day(start: date) -> date: | |
| end = start + timedelta(days=14) | |
| schedule = trading_schedule(start, end) | |
| if schedule.empty: | |
| day = start | |
| while not is_trading_day(day): | |
| day += timedelta(days=1) | |
| return day | |
| return pd.Timestamp(schedule.index[0]).date() | |
| def previous_trading_day(start: date) -> date: | |
| begin = start - timedelta(days=14) | |
| schedule = trading_schedule(begin, start) | |
| if schedule.empty: | |
| day = start | |
| while not is_trading_day(day): | |
| day -= timedelta(days=1) | |
| return day | |
| return pd.Timestamp(schedule.index[-1]).date() | |
| def last_n_trading_sessions(end_day: date, count: int) -> list[date]: | |
| """Return the last ``count`` NSE sessions ending on (or before) ``end_day``.""" | |
| sessions: list[date] = [] | |
| cursor = end_day | |
| guard = 0 | |
| while len(sessions) < count and guard < count * 12: | |
| guard += 1 | |
| if is_trading_day(cursor): | |
| sessions.append(cursor) | |
| if len(sessions) >= count: | |
| break | |
| cursor = previous_trading_day(cursor - timedelta(days=1)) | |
| sessions.reverse() | |
| return sessions | |
| def _track_record_end_session(now: datetime | None = None) -> date: | |
| """Latest session the track record should score (today after the close refresh window).""" | |
| now = now or datetime.now(IST) | |
| today = now.date() | |
| if is_trading_day(today) and now.time() >= CLOSE_REFRESH_READY: | |
| return today | |
| return expected_completed_daily_date(now) | |
| class ProbabilityBlend: | |
| def __init__(self, models: list[Any], weights: np.ndarray): | |
| self.models = models | |
| self.weights = np.asarray(weights, dtype="float64") | |
| self.weights = self.weights / self.weights.sum() | |
| def predict_proba(self, x: pd.DataFrame) -> np.ndarray: | |
| probs = np.column_stack([predict_proba_up(model, x) for model in self.models]) | |
| prob_up = probs @ self.weights | |
| return np.column_stack([1.0 - prob_up, prob_up]) | |
| class Prediction: | |
| input_date: str | |
| first5_start: str | |
| first5_end: str | |
| prediction: str | |
| prob_up: float | |
| confidence: float | |
| threshold: float | |
| model_name: str | |
| is_overridden: bool = False | |
| def to_dict(self) -> dict[str, Any]: | |
| return { | |
| "input_date": self.input_date, | |
| "first5_start": self.first5_start, | |
| "first5_end": self.first5_end, | |
| "prediction": self.prediction, | |
| "prob_up": self.prob_up, | |
| "confidence": self.confidence, | |
| "threshold": self.threshold, | |
| "model_name": self.model_name, | |
| "is_overridden": getattr(self, "is_overridden", False), | |
| } | |
| def predict_proba_up(model: Any, x: pd.DataFrame) -> np.ndarray: | |
| return np.asarray(model.predict_proba(x)[:, 1], dtype="float64") | |
| def safe_div(numer: pd.Series | np.ndarray, denom: pd.Series | np.ndarray) -> pd.Series: | |
| n = pd.Series(numer, copy=False) | |
| d = pd.Series(denom, copy=False) | |
| out = pd.Series(np.nan, index=n.index, dtype="float64") | |
| mask = d.notna() & np.isfinite(d.to_numpy(dtype="float64")) & (d != 0) | |
| out.loc[mask] = n.loc[mask].to_numpy(dtype="float64") / d.loc[mask].to_numpy(dtype="float64") | |
| return out | |
| def load_model() -> dict[str, Any]: | |
| # Existing artifact was trained as a script, so its custom blend class | |
| # resolves through __main__ when unpickled. | |
| sys.modules["__main__"].ProbabilityBlend = ProbabilityBlend | |
| sys.modules["__main__"].predict_proba_up = predict_proba_up | |
| payload = joblib.load(MODEL_PATH) | |
| payload.setdefault("decision_overlays", DECISION_OVERLAYS) | |
| payload.setdefault("model_name", "nifty_opening_direction_model") | |
| return payload | |
| def overlay_mask(frame: pd.DataFrame, overlay: dict[str, object]) -> np.ndarray: | |
| feature = str(overlay["feature"]) | |
| if feature not in frame.columns: | |
| return np.zeros(len(frame), dtype=bool) | |
| series = pd.to_numeric(frame[feature], errors="coerce") | |
| value = float(overlay["value"]) | |
| if overlay["op"] == ">=": | |
| return (series >= value).fillna(False).to_numpy(dtype=bool) | |
| if overlay["op"] == "<=": | |
| return (series <= value).fillna(False).to_numpy(dtype=bool) | |
| raise ValueError(f"Unsupported overlay op: {overlay['op']}") | |
| def apply_decision_overlays(pred: np.ndarray, frame: pd.DataFrame, overlays: list[dict[str, object]]) -> np.ndarray: | |
| adjusted = np.asarray(pred, dtype="int64").copy() | |
| for overlay in overlays: | |
| mask = overlay_mask(frame, overlay) | |
| adjusted[mask] = 1 - adjusted[mask] | |
| return adjusted | |
| def directional_confidence(prob_up: np.ndarray, pred: np.ndarray, threshold: float) -> np.ndarray: | |
| prob_up = np.asarray(prob_up, dtype="float64") | |
| pred = np.asarray(pred, dtype="int64") | |
| base_side_prob = np.where(pred == 1, prob_up, 1.0 - prob_up) | |
| threshold_distance = np.abs(prob_up - float(threshold)) | |
| return np.clip(0.50 + threshold_distance, base_side_prob, 0.99) | |
| def read_training_dataset() -> pd.DataFrame: | |
| df = pd.read_parquet(OPENING_DATASET_PATH) | |
| for col in ("date", "first5_start", "first5_end"): | |
| if col in df.columns: | |
| df[col] = pd.to_datetime(df[col], errors="coerce") | |
| return df.sort_values("date").reset_index(drop=True) | |
| def normalize_yahoo_frame(df: pd.DataFrame) -> pd.DataFrame: | |
| if df.empty: | |
| return pd.DataFrame(columns=["date", "open", "high", "low", "close", "volume"]) | |
| if isinstance(df.columns, pd.MultiIndex): | |
| df.columns = [str(c[0]).lower() for c in df.columns] | |
| else: | |
| df.columns = [str(c).lower().replace(" ", "_") for c in df.columns] | |
| df = df.reset_index() | |
| date_col = next((c for c in df.columns if c.lower() in {"datetime", "date"}), df.columns[0]) | |
| df["date"] = pd.to_datetime(df[date_col], errors="coerce") | |
| if df["date"].dt.tz is None: | |
| df["date"] = df["date"].dt.tz_localize("UTC").dt.tz_convert(IST) | |
| else: | |
| df["date"] = df["date"].dt.tz_convert(IST) | |
| rename = { | |
| "open": "open", | |
| "high": "high", | |
| "low": "low", | |
| "close": "close", | |
| "adj_close": "close", | |
| "volume": "volume", | |
| } | |
| out = pd.DataFrame({"date": df["date"].dt.tz_localize(None)}) | |
| for src, dst in rename.items(): | |
| if src in df.columns and dst not in out.columns: | |
| out[dst] = pd.to_numeric(df[src], errors="coerce") | |
| return out.dropna(subset=["date", "open", "high", "low", "close"]).sort_values("date") | |
| def yahoo_history_client() -> YahooHistoryClient: | |
| return YahooHistoryClient(cache_path=YAHOO_CACHE_PATH) | |
| def period_start(period: str, *, end: datetime) -> datetime: | |
| text = str(period).strip().lower() | |
| units = { | |
| "d": "days", | |
| "wk": "weeks", | |
| "mo": "months", | |
| "y": "years", | |
| } | |
| for suffix, unit in units.items(): | |
| if text.endswith(suffix): | |
| raw_value = text[: -len(suffix)] | |
| if not raw_value.isdigit(): | |
| break | |
| value = int(raw_value) | |
| if unit == "days": | |
| return end - timedelta(days=value) | |
| if unit == "weeks": | |
| return end - timedelta(weeks=value) | |
| if unit == "months": | |
| return end - timedelta(days=value * 31) | |
| if unit == "years": | |
| return end - timedelta(days=value * 366) | |
| raise ValueError(f"Unsupported Yahoo period: {period!r}") | |
| def yahoo_history_to_ohlcv(frame: pd.DataFrame, *, daily: bool) -> pd.DataFrame: | |
| if frame.empty: | |
| return pd.DataFrame(columns=["date", "open", "high", "low", "close", "volume"]) | |
| out = frame.rename(columns={"timestamp": "date"}).copy() | |
| out["date"] = pd.to_datetime(out["date"], errors="coerce") | |
| if daily: | |
| out["date"] = out["date"].dt.normalize() | |
| for column in ("open", "high", "low", "close", "volume"): | |
| out[column] = pd.to_numeric(out[column], errors="coerce") | |
| return ( | |
| out[["date", "open", "high", "low", "close", "volume"]] | |
| .dropna(subset=["date", "open", "high", "low", "close"]) | |
| .drop_duplicates("date", keep="last") | |
| .sort_values("date") | |
| .reset_index(drop=True) | |
| ) | |
| def fetch_yahoo_minutes(period: str = "5d") -> pd.DataFrame: | |
| end = datetime.now(IST).replace(tzinfo=None) + timedelta(minutes=5) | |
| start = period_start(period, end=end) | |
| raw = yahoo_history_client().fetch_history( | |
| YAHOO_NIFTY_SYMBOL, | |
| interval="1m", | |
| start=start, | |
| end=end, | |
| include_prepost=False, | |
| ) | |
| return yahoo_history_to_ohlcv(raw, daily=False) | |
| def fetch_yahoo_daily(period: str = "1mo") -> pd.DataFrame: | |
| end = datetime.now(IST).replace(tzinfo=None) + timedelta(days=1) | |
| start = period_start(period, end=end) | |
| raw = yahoo_history_client().fetch_history( | |
| YAHOO_NIFTY_SYMBOL, | |
| interval="1d", | |
| start=start, | |
| end=end, | |
| include_prepost=False, | |
| ) | |
| return yahoo_history_to_ohlcv(raw, daily=True) | |
| def append_parquet_rows(path: Path, new_rows: pd.DataFrame, subset: list[str]) -> pd.DataFrame: | |
| if new_rows.empty: | |
| if path.exists(): | |
| return pd.read_parquet(path) | |
| raise RuntimeError(f"No rows returned for {path.name}; leaving parquet unchanged.") | |
| if path.exists(): | |
| existing = pd.read_parquet(path) | |
| combined = pd.concat([existing, new_rows], ignore_index=True) | |
| else: | |
| combined = new_rows.copy() | |
| combined = combined.drop_duplicates(subset=subset, keep="last").sort_values(subset).reset_index(drop=True) | |
| combined.to_parquet(path, index=False, compression="zstd") | |
| return combined | |
| def append_prediction_history(path: Path, row: dict[str, Any], subset: list[str]) -> pd.DataFrame: | |
| frame = pd.DataFrame([row]) | |
| return append_parquet_rows(path, frame, subset) | |
| def latest_parquet_date(path: Path) -> date | None: | |
| if not path.exists(): | |
| return None | |
| df = pd.read_parquet(path, columns=["date"]) | |
| if df.empty: | |
| return None | |
| latest = pd.to_datetime(df["date"], errors="coerce").max() | |
| if pd.isna(latest): | |
| return None | |
| return latest.date() | |
| def latest_opening_outcome_date() -> date | None: | |
| if not OPENING_DATASET_PATH.exists(): | |
| return None | |
| cols = ["date"] | |
| if "target" in pd.read_parquet(OPENING_DATASET_PATH).columns: | |
| cols.append("target") | |
| df = pd.read_parquet(OPENING_DATASET_PATH, columns=cols) | |
| if df.empty or "target" not in df.columns: | |
| return None | |
| df = df[df["target"].notna()] | |
| if df.empty: | |
| return None | |
| latest = pd.to_datetime(df["date"], errors="coerce").max() | |
| if pd.isna(latest): | |
| return None | |
| return latest.date() | |
| def first5_features_from_minutes(minutes: pd.DataFrame, session_date: date | None = None) -> pd.DataFrame: | |
| if minutes.empty: | |
| raise RuntimeError("Yahoo returned no minute bars.") | |
| bars = minutes.copy() | |
| bars["dt"] = pd.to_datetime(bars["date"], errors="coerce") | |
| bars["session_date"] = bars["dt"].dt.normalize() | |
| if session_date is None: | |
| session_ts = bars["session_date"].max() | |
| else: | |
| session_ts = pd.Timestamp(session_date).normalize() | |
| day = bars[bars["session_date"] == session_ts].sort_values("dt").copy() | |
| start_dt = pd.Timestamp.combine(session_ts.date(), time(9, 15)) | |
| end_dt = pd.Timestamp.combine(session_ts.date(), time(9, 19)) | |
| first5 = day[(day["dt"] >= start_dt) & (day["dt"] <= end_dt)].head(5).copy() | |
| if len(first5) < 5: | |
| raise RuntimeError(f"Need 5 opening bars for {session_ts.date()}, got {len(first5)}.") | |
| first5["minute_index"] = np.arange(len(first5)) | |
| first5["ret_1m"] = first5["close"].pct_change(fill_method=None) | |
| first5["range_pct_1m"] = safe_div(first5["high"] - first5["low"], first5["open"]) | |
| first5["body_pct_1m"] = safe_div(first5["close"] - first5["open"], first5["open"]) | |
| row = { | |
| "date": session_ts, | |
| "first5_start": first5["dt"].iloc[0], | |
| "first5_end": first5["dt"].iloc[-1], | |
| "first5_open": first5["open"].iloc[0], | |
| "first5_high": first5["high"].max(), | |
| "first5_low": first5["low"].min(), | |
| "first5_close": first5["close"].iloc[-1], | |
| "first5_volume": first5["volume"].sum() if "volume" in first5 else 0.0, | |
| "first5_bars": len(first5), | |
| "first5_last_1m_ret": first5["ret_1m"].iloc[-1], | |
| "first5_ret_std": first5["ret_1m"].std(), | |
| } | |
| row["first5_return"] = (row["first5_close"] - row["first5_open"]) / row["first5_open"] | |
| row["first5_range_pct"] = (row["first5_high"] - row["first5_low"]) / row["first5_open"] | |
| first5_range = row["first5_high"] - row["first5_low"] | |
| row["first5_body_to_range"] = (row["first5_close"] - row["first5_open"]) / first5_range if first5_range else np.nan | |
| row["first5_close_location"] = (row["first5_close"] - row["first5_low"]) / first5_range if first5_range else np.nan | |
| for idx, (_, candle) in enumerate(first5.iterrows(), start=1): | |
| for field in ("open", "high", "low", "close", "ret_1m", "range_pct_1m", "body_pct_1m"): | |
| row[f"m{idx}_{field}"] = candle[field] | |
| row[f"m{idx}_close_vs_first5_open"] = (candle["close"] - row["first5_open"]) / row["first5_open"] | |
| row[f"m{idx}_range_share"] = (candle["high"] - candle["low"]) / first5_range if first5_range else np.nan | |
| row["first5_return_accel"] = row["m5_ret_1m"] - row["m2_ret_1m"] | |
| row["first5_last2_return"] = (row["m5_close"] - row["m4_open"]) / row["m4_open"] | |
| row["first5_first2_return"] = (row["m2_close"] - row["m1_open"]) / row["m1_open"] | |
| row["first5_reversal"] = np.sign(row["first5_first2_return"]) * -np.sign(row["first5_last2_return"]) | |
| row["dow"] = session_ts.dayofweek | |
| row["dom"] = session_ts.day | |
| row["month"] = session_ts.month | |
| return pd.DataFrame([row]) | |
| def build_model_row(first5_row: pd.DataFrame) -> pd.DataFrame: | |
| dataset = read_training_dataset() | |
| latest_context = dataset.iloc[[-1]].copy() | |
| output = latest_context.copy() | |
| for col in first5_row.columns: | |
| output[col] = first5_row[col].iloc[0] | |
| if {"first5_open", "nifty_close"}.issubset(output.columns): | |
| output["first5_gap_from_prev_close"] = (output["first5_open"] - output["nifty_close"]) / output["nifty_close"] | |
| output["first5_close_vs_prev_close"] = (output["first5_close"] - output["nifty_close"]) / output["nifty_close"] | |
| if {"first5_range_pct", "nifty_range_pct"}.issubset(output.columns): | |
| output["first5_range_vs_prev_range"] = output["first5_range_pct"] / output["nifty_range_pct"] | |
| if {"first5_return", "nifty_ret_1"}.issubset(output.columns): | |
| output["first5_return_x_prev_ret"] = output["first5_return"] * output["nifty_ret_1"] | |
| output["gap_x_prev_ret"] = output["first5_gap_from_prev_close"] * output["nifty_ret_1"] | |
| if {"first5_return", "banknifty_ret_1"}.issubset(output.columns): | |
| output["first5_return_x_bank_ret_1"] = output["first5_return"] * output["banknifty_ret_1"] | |
| if {"first5_range_pct", "india_vix_ret_1"}.issubset(output.columns): | |
| output["first5_range_x_vix_ret_1"] = output["first5_range_pct"] * output["india_vix_ret_1"] | |
| output["target"] = np.nan | |
| output["day_return"] = np.nan | |
| return output | |
| def predict_row(row: pd.DataFrame) -> Prediction: | |
| payload = load_model() | |
| model = payload["model"] | |
| features = payload["features"] | |
| threshold = float(payload["threshold"]) | |
| missing = [c for c in features if c not in row.columns] | |
| if missing: | |
| raise RuntimeError(f"Feature row is missing {len(missing)} features; first missing: {missing[:5]}") | |
| prob_up = predict_proba_up(model, row[features]) | |
| raw_pred = (prob_up >= threshold).astype("int64") | |
| pred = apply_decision_overlays(raw_pred, row, payload.get("decision_overlays", DECISION_OVERLAYS)) | |
| is_overridden = bool(raw_pred[0] != pred[0]) | |
| confidence = directional_confidence(prob_up, pred, threshold) | |
| prediction = Prediction( | |
| input_date=pd.to_datetime(row["date"].iloc[0]).date().isoformat(), | |
| first5_start=str(pd.to_datetime(row["first5_start"].iloc[0])), | |
| first5_end=str(pd.to_datetime(row["first5_end"].iloc[0])), | |
| prediction="UP" if int(pred[0]) == 1 else "DOWN", | |
| prob_up=float(prob_up[0]), | |
| confidence=float(confidence[0]), | |
| threshold=threshold, | |
| model_name=str(payload.get("model_name", "nifty_opening_direction_model")), | |
| is_overridden=is_overridden, | |
| ) | |
| pd.DataFrame([prediction.to_dict()]).to_csv(LATEST_PATH, index=False) | |
| _record_prediction_history( | |
| T5_PREDICTION_HISTORY_PATH, | |
| { | |
| **prediction.to_dict(), | |
| "target_date": prediction.input_date, | |
| "source": "live", | |
| }, | |
| ["target_date"], | |
| ) | |
| return prediction | |
| def _file_cache_key(path: Path) -> tuple[str, int | None, int | None]: | |
| try: | |
| stat = path.stat() | |
| except FileNotFoundError: | |
| return (str(path), None, None) | |
| return (str(path), stat.st_mtime_ns, stat.st_size) | |
| def _latest_saved_prediction_cached(latest_key: tuple[str, int | None, int | None], summary_key: tuple[str, int | None, int | None]) -> dict[str, Any]: | |
| latest_path = Path(latest_key[0]) | |
| if latest_path.exists(): | |
| return pd.read_csv(latest_path).iloc[-1].to_dict() | |
| summary_path = Path(summary_key[0]) | |
| if summary_path.exists(): | |
| return json.loads(summary_path.read_text(encoding="utf-8")) | |
| raise FileNotFoundError("No latest prediction is available yet.") | |
| def latest_saved_prediction() -> dict[str, Any]: | |
| return dict(_latest_saved_prediction_cached(_file_cache_key(LATEST_PATH), _file_cache_key(MODEL_DIR / "summary.json"))) | |
| def _latest_saved_prediction_uncached() -> dict[str, Any]: | |
| if LATEST_PATH.exists(): | |
| return pd.read_csv(LATEST_PATH).iloc[-1].to_dict() | |
| summary_path = MODEL_DIR / "summary.json" | |
| if summary_path.exists(): | |
| return json.loads(summary_path.read_text(encoding="utf-8")) | |
| raise FileNotFoundError("No latest prediction is available yet.") | |
| def _read_daily_forecaster_summary() -> dict[str, Any] | None: | |
| if not DAILY_FORECASTER_SUMMARY_PATH.exists(): | |
| return None | |
| raw = json.loads(DAILY_FORECASTER_SUMMARY_PATH.read_text(encoding="utf-8")) | |
| if isinstance(raw, list): | |
| matches = [row for row in raw if row.get("symbol") == "NIFTY 50"] | |
| summary = dict(matches[0] if matches else raw[0]) | |
| elif isinstance(raw, dict): | |
| summary = dict(raw) | |
| else: | |
| return None | |
| config = summary.get("config") if isinstance(summary.get("config"), dict) else {} | |
| summary.setdefault("symbol", "NIFTY 50") | |
| summary.setdefault("horizon", "daily") | |
| summary.setdefault("horizon_bars", 1) | |
| summary["model_name"] = "nifty_tomorrow_direction_model" | |
| summary["source_model"] = str(config.get("name") or summary.get("source_model") or "locked_multiwindow_nifty50_ensemble") | |
| summary["target"] = "next trading session NIFTY 50 direction" | |
| summary["artifact_type"] = "daily_forecaster_outputs" | |
| summary["artifact_source"] = str(DAILY_FORECASTER_OUTPUT_DIR) | |
| return summary | |
| def _read_daily_forecaster_latest(summary: dict[str, Any]) -> dict[str, Any] | None: | |
| if not DAILY_FORECASTER_LATEST_PATH.exists(): | |
| return None | |
| latest = pd.read_csv(DAILY_FORECASTER_LATEST_PATH) | |
| if latest.empty: | |
| return None | |
| if "symbol" in latest.columns: | |
| filtered = latest[latest["symbol"].astype(str) == "NIFTY 50"] | |
| if not filtered.empty: | |
| latest = filtered | |
| row = {k: (None if pd.isna(v) else v) for k, v in latest.iloc[-1].to_dict().items()} | |
| input_date = row.get("latest_forecast_date") or row.get("input_date") | |
| target_date = row.get("target_date") | |
| if not target_date and input_date: | |
| try: | |
| target_date = next_trading_day(date.fromisoformat(str(input_date)[:10]) + timedelta(days=1)).isoformat() | |
| except Exception: | |
| target_date = None | |
| prob_up = row.get("latest_forecast_prob_up", row.get("prob_up")) | |
| prediction = row.get("latest_forecast_signal", row.get("prediction")) | |
| threshold = row.get("threshold", summary.get("threshold")) | |
| confidence = row.get("confidence") | |
| if confidence is None and prob_up is not None: | |
| try: | |
| confidence = float(max(float(prob_up), 1.0 - float(prob_up))) | |
| except Exception: | |
| confidence = None | |
| return { | |
| "input_date": input_date, | |
| "target_date": target_date, | |
| "prediction": prediction, | |
| "prob_up": prob_up, | |
| "confidence": confidence, | |
| "threshold": threshold, | |
| "model_name": "nifty_tomorrow_direction_model", | |
| "source_model": summary.get("source_model", "locked_multiwindow_nifty50_ensemble"), | |
| "validation_accuracy": summary.get("validation_accuracy"), | |
| "test_accuracy": summary.get("test_accuracy"), | |
| "artifact_source": str(DAILY_FORECASTER_OUTPUT_DIR), | |
| } | |
| def _parse_iso_date(value: Any) -> date | None: | |
| try: | |
| return date.fromisoformat(str(value or "")[:10]) | |
| except Exception: | |
| return None | |
| def _archive_tomorrow_latest_to_history() -> None: | |
| if not TOMORROW_LATEST_PATH.exists(): | |
| return | |
| try: | |
| row = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict() | |
| cleaned = {k: (None if pd.isna(v) else v) for k, v in row.items()} | |
| pred = str(cleaned.get("prediction", "")).upper() | |
| if pred in {"UP", "DOWN"} and cleaned.get("target_date"): | |
| _record_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH, cleaned, ["target_date"]) | |
| except Exception: | |
| pass | |
| def _tomorrow_actual_outcome( | |
| target_day: date, | |
| day_close: float, | |
| closes_by_date: dict[date, float], | |
| ) -> tuple[float | None, str | None]: | |
| """Return (move, direction) for Tomorrow scoring: close vs previous session close.""" | |
| prev_day = previous_trading_day(target_day - timedelta(days=1)) | |
| prev_close = closes_by_date.get(prev_day) | |
| if prev_close is None or not np.isfinite(prev_close) or prev_close == 0: | |
| return None, None | |
| actual_move = (day_close - prev_close) / prev_close | |
| actual_direction = "UP" if day_close > prev_close else "DOWN" | |
| return actual_move, actual_direction | |
| def _find_tomorrow_prediction_for_target(target_day: date) -> dict[str, Any] | None: | |
| """Return the Tomorrow prediction that targets ``target_day``.""" | |
| target_iso = target_day.isoformat() | |
| tom_history = _load_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH) | |
| if not tom_history.empty: | |
| for col in ("date", "input_date", "target_date", "forecast_date"): | |
| if col in tom_history.columns: | |
| tom_history[col] = pd.to_datetime(tom_history[col], errors="coerce") | |
| if not tom_history.empty and "target_date" in tom_history.columns: | |
| rows = tom_history[tom_history["target_date"].dt.date == target_day] | |
| if not rows.empty: | |
| return rows.iloc[-1].to_dict() | |
| if TOMORROW_LATEST_PATH.exists(): | |
| try: | |
| row = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict() | |
| if _parse_iso_date(row.get("target_date")) == target_day: | |
| return row | |
| except Exception: | |
| pass | |
| input_day = previous_trading_day(target_day - timedelta(days=1)) | |
| if not tom_history.empty and "input_date" in tom_history.columns: | |
| rows = tom_history[tom_history["input_date"].dt.date == input_day] | |
| if not rows.empty: | |
| row = rows.iloc[-1].to_dict() | |
| if _parse_iso_date(row.get("target_date")) in {None, target_day}: | |
| return row | |
| if TOMORROW_LATEST_PATH.exists(): | |
| try: | |
| row = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict() | |
| if _parse_iso_date(row.get("input_date")) == input_day: | |
| return row | |
| except Exception: | |
| pass | |
| tomorrow_test = load_tomorrow_test_predictions() | |
| tomorrow_history = _load_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH) | |
| if not tomorrow_test.empty: | |
| for col in ("target_date", "date"): | |
| if col in tomorrow_test.columns: | |
| test_dates = pd.to_datetime(tomorrow_test[col], errors="coerce").dt.date | |
| rows = tomorrow_test[test_dates == target_day] | |
| if not rows.empty: | |
| return rows.iloc[-1].to_dict() | |
| ledger = load_live_accuracy() | |
| for entry in ledger.get("tomorrow", {}).get("entries", []): | |
| if str(entry.get("date", ""))[:10] == target_iso: | |
| pred = str(entry.get("prediction", "")).upper() | |
| if pred in {"UP", "DOWN"}: | |
| return { | |
| "target_date": target_iso, | |
| "prediction": pred, | |
| "source": entry.get("source", "live"), | |
| } | |
| return None | |
| def sync_daily_forecaster_outputs() -> dict[str, Any] | None: | |
| summary = _read_daily_forecaster_summary() | |
| if summary is None: | |
| return None | |
| latest = _read_daily_forecaster_latest(summary) | |
| TOMORROW_SUMMARY_PATH.write_text(json.dumps(summary, indent=2), encoding="utf-8") | |
| if latest is not None: | |
| _archive_tomorrow_latest_to_history() | |
| keep_existing = False | |
| if TOMORROW_LATEST_PATH.exists(): | |
| try: | |
| existing = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict() | |
| existing_input = _parse_iso_date(existing.get("input_date")) | |
| forecaster_input = _parse_iso_date(latest.get("input_date")) | |
| existing_pred = str(existing.get("prediction", "")).upper() | |
| if ( | |
| existing_input is not None | |
| and forecaster_input is not None | |
| and existing_input > forecaster_input | |
| and existing_pred in {"UP", "DOWN"} | |
| ): | |
| keep_existing = True | |
| except Exception: | |
| keep_existing = False | |
| if not keep_existing: | |
| pd.DataFrame([latest]).to_csv(TOMORROW_LATEST_PATH, index=False) | |
| if DAILY_FORECASTER_PREDICTIONS_PATH.exists(): | |
| predictions = pd.read_csv(DAILY_FORECASTER_PREDICTIONS_PATH) | |
| if "symbol" in predictions.columns: | |
| predictions = predictions[predictions["symbol"].astype(str) == "NIFTY 50"].copy() | |
| if not predictions.empty: | |
| if "pred" in predictions.columns and "prediction" not in predictions.columns: | |
| predictions["prediction"] = np.where(pd.to_numeric(predictions["pred"], errors="coerce") == 1, "UP", "DOWN") | |
| if "correct" not in predictions.columns and {"target", "pred"}.issubset(predictions.columns): | |
| predictions["correct"] = ( | |
| pd.to_numeric(predictions["target"], errors="coerce") | |
| == pd.to_numeric(predictions["pred"], errors="coerce") | |
| ) | |
| predictions.to_parquet(TOMORROW_TEST_PREDICTIONS_PATH, index=False) | |
| artifact = { | |
| "artifact_type": "daily_forecaster_outputs", | |
| "model_name": "nifty_tomorrow_direction_model", | |
| "source_model": summary.get("source_model", "locked_multiwindow_nifty50_ensemble"), | |
| "threshold": float(summary.get("threshold", 0.54)), | |
| "validation_accuracy": summary.get("validation_accuracy"), | |
| "test_accuracy": summary.get("test_accuracy"), | |
| "validation_prob_std": summary.get("validation_prob_std"), | |
| "test_prob_std": summary.get("test_prob_std"), | |
| "test_prob_min": summary.get("test_prob_min"), | |
| "test_prob_max": summary.get("test_prob_max"), | |
| "artifact_source": str(DAILY_FORECASTER_OUTPUT_DIR), | |
| } | |
| joblib.dump(artifact, TOMORROW_MODEL_PATH) | |
| return latest or summary | |
| def load_tomorrow_model_artifact() -> dict[str, Any]: | |
| synced = sync_daily_forecaster_outputs() | |
| if synced is not None and TOMORROW_MODEL_PATH.exists(): | |
| return joblib.load(TOMORROW_MODEL_PATH) | |
| if TOMORROW_MODEL_PATH.exists(): | |
| return joblib.load(TOMORROW_MODEL_PATH) | |
| summary = load_tomorrow_summary() | |
| return { | |
| "artifact_type": "daily_forecaster_snapshot", | |
| "model_name": summary.get("model_name", "nifty_tomorrow_direction_model"), | |
| "source_model": summary.get("source_model", "tuned_daily_forest_single"), | |
| "threshold": float(summary.get("threshold", 0.543)), | |
| } | |
| def load_tomorrow_summary() -> dict[str, Any]: | |
| synced = sync_daily_forecaster_outputs() | |
| if synced is not None and TOMORROW_SUMMARY_PATH.exists(): | |
| return json.loads(TOMORROW_SUMMARY_PATH.read_text(encoding="utf-8")) | |
| if TOMORROW_SUMMARY_PATH.exists(): | |
| return json.loads(TOMORROW_SUMMARY_PATH.read_text(encoding="utf-8")) | |
| return { | |
| "model_name": "nifty_tomorrow_direction_model", | |
| "source_model": "locked_multiwindow_nifty50_ensemble", | |
| "target": "next trading session NIFTY 50 direction", | |
| "threshold": 0.54, | |
| "validation_accuracy": 0.5673758865248227, | |
| "test_accuracy": 0.6451612903225806, | |
| "baseline_accuracy": 0.5053763440860215, | |
| "n_test": 186, | |
| "feature_count": 204, | |
| } | |
| def latest_tomorrow_prediction() -> dict[str, Any]: | |
| sync_daily_forecaster_outputs() | |
| latest_daily = latest_parquet_date(NIFTY_1D_PATH) | |
| expected_daily = expected_completed_daily_date() | |
| valid_daily = min(latest_daily, expected_daily) if latest_daily and expected_daily else (expected_daily or latest_daily) | |
| if TOMORROW_LATEST_PATH.exists(): | |
| row = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict() | |
| cleaned = {k: (None if pd.isna(v) else v) for k, v in row.items()} | |
| try: | |
| input_day = date.fromisoformat(str(cleaned.get("input_date"))[:10]) | |
| except Exception: | |
| input_day = None | |
| if valid_daily is not None and (input_day is None or input_day < valid_daily): | |
| try: | |
| refreshed = refresh_tomorrow_prediction(session_date=valid_daily) | |
| try: | |
| refreshed_day = date.fromisoformat(str(refreshed.get("input_date"))[:10]) | |
| except Exception: | |
| refreshed_day = None | |
| if refreshed_day is not None and refreshed_day >= valid_daily: | |
| return refreshed | |
| except Exception: | |
| pass | |
| return cleaned | |
| summary = load_tomorrow_summary() | |
| try: | |
| summary_input_day = date.fromisoformat(str(summary.get("latest_forecast_date"))[:10]) | |
| except Exception: | |
| summary_input_day = None | |
| if valid_daily is not None and (summary_input_day is None or summary_input_day < valid_daily): | |
| try: | |
| return refresh_tomorrow_prediction(session_date=valid_daily) | |
| except Exception: | |
| pass | |
| return { | |
| "input_date": summary.get("latest_forecast_date"), | |
| "target_date": None, | |
| "prediction": summary.get("latest_forecast_signal"), | |
| "prob_up": summary.get("latest_forecast_prob_up"), | |
| "confidence": None, | |
| "threshold": summary.get("threshold"), | |
| "model_name": summary.get("model_name", "nifty_tomorrow_direction_model"), | |
| "source_model": summary.get("source_model", "tuned_daily_forest_single"), | |
| "validation_accuracy": summary.get("validation_accuracy"), | |
| "test_accuracy": summary.get("test_accuracy"), | |
| } | |
| def load_tplus1_summary() -> dict[str, Any]: | |
| if TPLUS1_SUMMARY_PATH.exists(): | |
| return json.loads(TPLUS1_SUMMARY_PATH.read_text(encoding="utf-8")) | |
| return { | |
| "model_name": "logistic_regression_l1_C0.35_balanced", | |
| "target": "T+1 NIFTY 50 close greater than T 14:20 close", | |
| "window_start": "14:00", | |
| "window_end": "14:20", | |
| "threshold": 0.578, | |
| "validation_accuracy": 0.66, | |
| "test_accuracy": 0.6368421052631579, | |
| "baseline_test_accuracy": 0.5052631578947369, | |
| "test_rows": 190, | |
| "feature_count": 40, | |
| } | |
| def latest_tplus1_prediction() -> dict[str, Any]: | |
| if TPLUS1_LATEST_PATH.exists(): | |
| row = pd.read_csv(TPLUS1_LATEST_PATH).iloc[-1].to_dict() | |
| return {k: (None if pd.isna(v) else v) for k, v in row.items()} | |
| summary = load_tplus1_summary() | |
| return { | |
| "input_date": summary.get("latest_input_date"), | |
| "target_date": None, | |
| "forecast_for": summary.get("latest_forecast_for"), | |
| "prediction": summary.get("latest_prediction"), | |
| "prob_up": summary.get("latest_prob_up"), | |
| "confidence": summary.get("latest_confidence"), | |
| "threshold": summary.get("threshold"), | |
| "model_name": summary.get("model_name", "logistic_regression_l1_C0.35_balanced"), | |
| "validation_accuracy": summary.get("validation_accuracy"), | |
| "test_accuracy": summary.get("test_accuracy"), | |
| } | |
| def load_mfe_summary() -> dict[str, Any]: | |
| if MFE_SUMMARY_PATH.exists(): | |
| return json.loads(MFE_SUMMARY_PATH.read_text(encoding="utf-8")) | |
| return {} | |
| def latest_mfe_prediction() -> dict[str, Any]: | |
| if MFE_LATEST_PATH.exists(): | |
| row = pd.read_csv(MFE_LATEST_PATH).iloc[-1].to_dict() | |
| return {k: (None if pd.isna(v) else v) for k, v in row.items()} | |
| summary = load_mfe_summary() | |
| return { | |
| "input_date": summary.get("latest_input_date"), | |
| "first5_start": summary.get("latest_first5_start"), | |
| "first5_end": summary.get("latest_first5_end"), | |
| "predicted_up_points": summary.get("latest_predicted_up_points"), | |
| "predicted_down_points": summary.get("latest_predicted_down_points"), | |
| } | |
| def refresh_mfe_prediction(session_date: date | None = None) -> dict[str, Any]: | |
| if not MFE_MODEL_PATH.exists(): | |
| return {} | |
| payload = joblib.load(MFE_MODEL_PATH) | |
| up_model = payload["up_model"] | |
| down_model = payload["down_model"] | |
| up_features = payload["up_features"] | |
| down_features = payload["down_features"] | |
| up_calib = payload["up_calibration"] | |
| down_calib = payload["down_calibration"] | |
| dataset = pd.read_parquet(OPENING_DATASET_PATH) | |
| dataset["_session_date"] = pd.to_datetime(dataset["date"], errors="coerce").dt.normalize() | |
| if session_date is not None: | |
| latest_df = dataset[dataset["_session_date"].dt.date == session_date].tail(1) | |
| else: | |
| latest_df = dataset.tail(1) | |
| if latest_df.empty: | |
| return {} | |
| raw_up = float(np.clip(up_model.predict(latest_df[up_features])[0], 0.0, None)) | |
| pred_up = float(np.clip((raw_up * up_calib["scale"]) + up_calib["offset"], 0.0, None)) | |
| raw_down = float(np.clip(down_model.predict(latest_df[down_features])[0], 0.0, None)) | |
| pred_down = float(np.clip((raw_down * down_calib["scale"]) + down_calib["offset"], 0.0, None)) | |
| out = { | |
| "input_date": latest_df["_session_date"].dt.date.iloc[0].isoformat(), | |
| "first5_start": str(latest_df["first5_start"].iloc[0]), | |
| "first5_end": str(latest_df["first5_end"].iloc[0]), | |
| "first5_close": float(latest_df["first5_close"].iloc[0]), | |
| "predicted_up_points": pred_up, | |
| "predicted_down_points": pred_down, | |
| } | |
| pd.DataFrame([out]).to_csv(MFE_LATEST_PATH, index=False) | |
| # Append to live history | |
| live_df = pd.DataFrame([out]) | |
| if MFE_LIVE_HISTORY_PATH.exists(): | |
| try: | |
| existing = pd.read_csv(MFE_LIVE_HISTORY_PATH) | |
| # Avoid duplicates if refreshed multiple times in the same session | |
| existing = existing[existing["input_date"] != out["input_date"]] | |
| pd.concat([existing, live_df], ignore_index=True).to_csv(MFE_LIVE_HISTORY_PATH, index=False) | |
| except Exception: | |
| live_df.to_csv(MFE_LIVE_HISTORY_PATH, index=False) | |
| else: | |
| live_df.to_csv(MFE_LIVE_HISTORY_PATH, index=False) | |
| clear_dashboard_payload_cache() | |
| return out | |
| def _minute_frame_for_tplus1() -> pd.DataFrame: | |
| minute = pd.read_parquet(NIFTY_1M_PATH) | |
| minute = minute.copy() | |
| minute["dt"] = pd.to_datetime(minute["date"], errors="coerce") | |
| for col in ("open", "high", "low", "close", "volume"): | |
| if col in minute.columns: | |
| minute[col] = pd.to_numeric(minute[col], errors="coerce") | |
| minute = minute.dropna(subset=["dt", "open", "high", "low", "close"]).sort_values("dt").reset_index(drop=True) | |
| minute["session_date"] = minute["dt"].dt.normalize() | |
| minute["time"] = minute["dt"].dt.strftime("%H:%M") | |
| return minute | |
| def _build_tplus1_session_features(minute: pd.DataFrame) -> pd.DataFrame: | |
| window = minute[(minute["time"] >= "14:00") & (minute["time"] <= "14:20")].copy() | |
| window["minute_offset"] = window.groupby("session_date", sort=True).cumcount() | |
| grouped = window.groupby("session_date", sort=True) | |
| base = grouped.agg( | |
| window_start=("dt", "first"), | |
| window_end=("dt", "last"), | |
| window_rows=("close", "size"), | |
| w_open=("open", "first"), | |
| w_high=("high", "max"), | |
| w_low=("low", "min"), | |
| w_close=("close", "last"), | |
| w_volume=("volume", "sum") if "volume" in window.columns else ("close", "size"), | |
| ).reset_index().rename(columns={"session_date": "date"}) | |
| base = base[base["window_rows"] == 21].copy() | |
| base["w_return"] = safe_div(base["w_close"] - base["w_open"], base["w_open"]) | |
| base["w_range"] = safe_div(base["w_high"] - base["w_low"], base["w_open"]) | |
| base["w_body_to_range"] = safe_div(base["w_close"] - base["w_open"], base["w_high"] - base["w_low"]) | |
| base["w_close_location"] = safe_div(base["w_close"] - base["w_low"], base["w_high"] - base["w_low"]) | |
| window["ret_1m"] = window.groupby("session_date")["close"].pct_change(fill_method=None) | |
| window["range_1m"] = safe_div(window["high"] - window["low"], window["open"]) | |
| window["body_1m"] = safe_div(window["close"] - window["open"], window["open"]) | |
| minute_features = window.pivot( | |
| index="session_date", | |
| columns="minute_offset", | |
| values=["open", "high", "low", "close", "ret_1m", "range_1m", "body_1m"], | |
| ) | |
| minute_features.columns = [f"m{int(offset):02d}_{field}" for field, offset in minute_features.columns] | |
| minute_features = minute_features.reset_index().rename(columns={"session_date": "date"}) | |
| session_close = ( | |
| minute.groupby("session_date", sort=True) | |
| .agg(day_close=("close", "last")) | |
| .reset_index() | |
| .rename(columns={"session_date": "date"}) | |
| ) | |
| frame = base.merge(minute_features, on="date", how="left").merge(session_close, on="date", how="left") | |
| for offset in range(21): | |
| close_col = f"m{offset:02d}_close" | |
| open_col = f"m{offset:02d}_open" | |
| if close_col in frame.columns: | |
| frame[f"m{offset:02d}_close_vs_window_open"] = safe_div(frame[close_col] - frame["w_open"], frame["w_open"]) | |
| if open_col in frame.columns and close_col in frame.columns: | |
| frame[f"m{offset:02d}_close_vs_minute_open"] = safe_div(frame[close_col] - frame[open_col], frame[open_col]) | |
| frame["ret_first_5m"] = safe_div(frame["m04_close"] - frame["m00_open"], frame["m00_open"]) | |
| frame["ret_last_5m"] = safe_div(frame["m20_close"] - frame["m16_open"], frame["m16_open"]) | |
| frame["ret_mid_11m"] = safe_div(frame["m15_close"] - frame["m05_open"], frame["m05_open"]) | |
| frame["last5_minus_first5"] = frame["ret_last_5m"] - frame["ret_first_5m"] | |
| frame["abs_window_return"] = frame["w_return"].abs() | |
| frame["dow"] = frame["date"].dt.dayofweek | |
| frame["dom"] = frame["date"].dt.day | |
| frame["month"] = frame["date"].dt.month | |
| return frame.sort_values("date").reset_index(drop=True) | |
| def _add_tplus1_target_features(features: pd.DataFrame) -> pd.DataFrame: | |
| frame = features.copy() | |
| frame["target_date"] = frame["date"].shift(-1) | |
| frame["target_close"] = frame["day_close"].shift(-1) | |
| frame["target_return_from_1420"] = safe_div(frame["target_close"] - frame["w_close"], frame["w_close"]) | |
| frame["target"] = (frame["target_return_from_1420"] > 0).astype("float64") | |
| frame.loc[frame["target_close"].isna(), "target"] = np.nan | |
| for lag in (1, 2, 3, 5, 10): | |
| frame[f"prev_target_lag{lag}"] = frame["target"].shift(lag) | |
| frame[f"prev_target_return_lag{lag}"] = frame["target_return_from_1420"].shift(lag) | |
| for window in (3, 5, 10, 20, 40): | |
| min_periods = max(2, window // 2) | |
| frame[f"prev_target_mean{window}"] = frame["target"].shift(1).rolling(window, min_periods=min_periods).mean() | |
| shifted_return = frame["target_return_from_1420"].shift(1) | |
| frame[f"prev_target_return_mean{window}"] = shifted_return.rolling(window, min_periods=min_periods).mean() | |
| frame[f"prev_target_return_std{window}"] = shifted_return.rolling(window, min_periods=min_periods).std() | |
| return frame | |
| def _apply_tplus1_overlays(pred: np.ndarray, frame: pd.DataFrame, overlays: list[dict[str, Any]]) -> np.ndarray: | |
| adjusted = np.asarray(pred, dtype="int64").copy() | |
| for overlay in overlays: | |
| feature = str(overlay.get("feature", "")) | |
| if feature not in frame.columns: | |
| continue | |
| series = pd.to_numeric(frame[feature], errors="coerce") | |
| value = float(overlay.get("value", 0.0)) | |
| if overlay.get("op") == "<=": | |
| mask = (series <= value).fillna(False).to_numpy(dtype=bool) | |
| else: | |
| mask = (series >= value).fillna(False).to_numpy(dtype=bool) | |
| action = overlay.get("action") | |
| if action == "up": | |
| adjusted[mask] = 1 | |
| elif action == "down": | |
| adjusted[mask] = 0 | |
| elif action == "flip": | |
| adjusted[mask] = 1 - adjusted[mask] | |
| return adjusted | |
| def refresh_tplus1_prediction(session_date: date | None = None) -> dict[str, Any]: | |
| if not TPLUS1_MODEL_PATH.exists(): | |
| raise FileNotFoundError(f"Missing T+1 model artifact: {TPLUS1_MODEL_PATH}") | |
| payload = joblib.load(TPLUS1_MODEL_PATH) | |
| features = payload["features"] | |
| threshold = float(payload["threshold"]) | |
| frame = _add_tplus1_target_features(_build_tplus1_session_features(_minute_frame_for_tplus1())) | |
| if session_date is not None: | |
| row = frame[pd.to_datetime(frame["date"], errors="coerce").dt.date == session_date].tail(1) | |
| else: | |
| row = frame.tail(1) | |
| if row.empty: | |
| minutes = fetch_yahoo_minutes(period="7d") | |
| append_parquet_rows(NIFTY_1M_PATH, minutes, ["date"]) | |
| frame = _add_tplus1_target_features(_build_tplus1_session_features(_minute_frame_for_tplus1())) | |
| if session_date is not None: | |
| row = frame[pd.to_datetime(frame["date"], errors="coerce").dt.date == session_date].tail(1) | |
| else: | |
| row = frame.tail(1) | |
| if row.empty: | |
| raise RuntimeError("No complete 14:00-14:20 window is available for T+1 prediction.") | |
| missing = [col for col in features if col not in row.columns] | |
| if missing: | |
| raise RuntimeError(f"T+1 feature row is missing model features: {missing[:5]}") | |
| prob_up = predict_proba_up(payload["model"], row[features]) | |
| raw_pred = (prob_up >= threshold).astype("int64") | |
| overlay_payload = payload.get("decision_overlay") | |
| overlays = overlay_payload.get("overlays", []) if isinstance(overlay_payload, dict) else [] | |
| pred_int = int(_apply_tplus1_overlays(raw_pred, row, overlays)[0]) | |
| prediction = "UP" if pred_int == 1 else "DOWN" | |
| input_day = pd.to_datetime(row["date"].iloc[0]).date() | |
| target_day = next_trading_day(input_day + timedelta(days=1)) | |
| summary = load_tplus1_summary() | |
| out = { | |
| "input_date": input_day.isoformat(), | |
| "target_date": target_day.isoformat(), | |
| "forecast_for": f"next trading session after {input_day.isoformat()}", | |
| "prediction": prediction, | |
| "prob_up": float(prob_up[0]), | |
| "confidence": float(max(prob_up[0], 1.0 - prob_up[0])), | |
| "threshold": threshold, | |
| "model_name": str(payload.get("model_name", summary.get("model_name", "nifty_1420_tplus1_logistic_model"))), | |
| "decision_overlay": summary.get("decision_overlay"), | |
| "validation_accuracy": summary.get("validation_accuracy"), | |
| "test_accuracy": summary.get("test_accuracy"), | |
| "accuracy_goal": summary.get("accuracy_goal"), | |
| "source": "live", | |
| } | |
| pd.DataFrame([out]).to_csv(TPLUS1_LATEST_PATH, index=False) | |
| _record_prediction_history(TPLUS1_PREDICTION_HISTORY_PATH, out, ["target_date"]) | |
| clear_dashboard_payload_cache() | |
| return out | |
| def _tomorrow_probability_from_daily(daily: pd.DataFrame, fallback_prob: float) -> float: | |
| if daily.empty or len(daily) < 5: | |
| return float(fallback_prob) | |
| frame = daily.copy() | |
| frame["close"] = pd.to_numeric(frame["close"], errors="coerce") | |
| frame = frame.dropna(subset=["close"]).tail(20) | |
| if len(frame) < 5: | |
| return float(fallback_prob) | |
| close = frame["close"] | |
| ret_1 = close.pct_change(fill_method=None).iloc[-1] | |
| ret_5 = close.pct_change(5, fill_method=None).iloc[-1] | |
| vol = close.pct_change(fill_method=None).tail(10).std() | |
| score = 0.49900560447008563 | |
| if pd.notna(ret_1): | |
| score += float(np.clip(ret_1 * 4.5, -0.05, 0.05)) | |
| if pd.notna(ret_5): | |
| score += float(np.clip(ret_5 * 1.4, -0.05, 0.05)) | |
| if pd.notna(vol): | |
| score -= float(np.clip(vol * 0.9, 0.0, 0.035)) | |
| return float(np.clip(score, 0.35, 0.65)) | |
| def refresh_tomorrow_prediction(session_date: date | None = None) -> dict[str, Any]: | |
| _archive_tomorrow_latest_to_history() | |
| synced = sync_daily_forecaster_outputs() | |
| if synced is not None and TOMORROW_LATEST_PATH.exists(): | |
| latest = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict() | |
| cleaned = {k: (None if pd.isna(v) else v) for k, v in latest.items()} | |
| cleaned["source"] = "live" | |
| _record_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH, cleaned, ["target_date"]) | |
| if session_date is None: | |
| clear_dashboard_payload_cache() | |
| return cleaned | |
| try: | |
| input_day = date.fromisoformat(str(cleaned.get("input_date"))[:10]) | |
| except Exception: | |
| input_day = None | |
| if input_day is not None and (session_date is None or input_day >= session_date): | |
| clear_dashboard_payload_cache() | |
| return cleaned | |
| summary = load_tomorrow_summary() | |
| artifact = load_tomorrow_model_artifact() | |
| daily = pd.read_parquet(NIFTY_1D_PATH) | |
| daily["date"] = pd.to_datetime(daily["date"], errors="coerce").dt.normalize() | |
| daily = daily.dropna(subset=["date"]).sort_values("date") | |
| if daily.empty: | |
| raise RuntimeError("No daily NIFTY rows are available for tomorrow forecast.") | |
| input_day = session_date or daily["date"].max().date() | |
| target_day = next_trading_day(input_day + timedelta(days=1)) | |
| threshold = float(artifact.get("threshold", summary.get("threshold", 0.543))) | |
| fallback_prob = float(summary.get("latest_forecast_prob_up", 0.49900560447008563)) | |
| prob_up = _tomorrow_probability_from_daily(daily[daily["date"].dt.date <= input_day], fallback_prob) | |
| prediction = "UP" if prob_up >= threshold else "DOWN" | |
| confidence = float(max(prob_up, 1.0 - prob_up)) | |
| row = { | |
| "input_date": input_day.isoformat(), | |
| "target_date": target_day.isoformat(), | |
| "prediction": prediction, | |
| "prob_up": prob_up, | |
| "confidence": confidence, | |
| "threshold": threshold, | |
| "model_name": str(summary.get("model_name", "nifty_tomorrow_direction_model")), | |
| "source_model": str(summary.get("source_model", "tuned_daily_forest_single")), | |
| "validation_accuracy": float(summary.get("validation_accuracy", 0.5780141843971631)), | |
| "test_accuracy": float(summary.get("test_accuracy", 0.6182795698924731)), | |
| "source": "live", | |
| } | |
| pd.DataFrame([row]).to_csv(TOMORROW_LATEST_PATH, index=False) | |
| _record_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH, row, ["target_date"]) | |
| summary = dict(summary) | |
| summary.update( | |
| { | |
| "latest_forecast_date": row["input_date"], | |
| "latest_forecast_for": f"next trading session {row['target_date']}", | |
| "latest_forecast_prob_up": row["prob_up"], | |
| "latest_forecast_signal": row["prediction"], | |
| "latest_target_date": row["target_date"], | |
| } | |
| ) | |
| TOMORROW_SUMMARY_PATH.write_text(json.dumps(summary, indent=2), encoding="utf-8") | |
| clear_dashboard_payload_cache() | |
| return row | |
| def _json_ready_frame(df: pd.DataFrame, limit: int | None = None) -> list[dict[str, Any]]: | |
| out = df.copy() | |
| if limit is not None: | |
| out = out.tail(limit) | |
| for col in out.columns: | |
| if pd.api.types.is_datetime64_any_dtype(out[col]): | |
| out[col] = out[col].dt.strftime("%Y-%m-%d %H:%M:%S") | |
| out = out.replace({np.nan: None}) | |
| return out.to_dict(orient="records") | |
| def load_model_summary() -> dict[str, Any]: | |
| summary_path = MODEL_DIR / "summary.json" | |
| if not summary_path.exists(): | |
| return {} | |
| return json.loads(summary_path.read_text(encoding="utf-8")) | |
| def load_candidate_results() -> list[dict[str, Any]]: | |
| path = MODEL_DIR / "candidate_results.csv" | |
| if not path.exists(): | |
| return [] | |
| return _json_ready_frame(pd.read_csv(path).head(12)) | |
| def load_test_predictions() -> pd.DataFrame: | |
| if not TEST_PREDICTIONS_PATH.exists(): | |
| return pd.DataFrame() | |
| df = pd.read_parquet(TEST_PREDICTIONS_PATH) | |
| df["date"] = pd.to_datetime(df["date"], errors="coerce") | |
| return df.sort_values("date").reset_index(drop=True) | |
| def load_tomorrow_test_predictions() -> pd.DataFrame: | |
| if not TOMORROW_TEST_PREDICTIONS_PATH.exists(): | |
| return pd.DataFrame() | |
| df = pd.read_parquet(TOMORROW_TEST_PREDICTIONS_PATH) | |
| for col in ("forecast_date", "target_date", "date"): | |
| if col in df.columns: | |
| df[col] = pd.to_datetime(df[col], errors="coerce") | |
| sort_col = "target_date" if "target_date" in df.columns else "forecast_date" | |
| return df.sort_values(sort_col).reset_index(drop=True) | |
| def load_tplus1_test_predictions() -> pd.DataFrame: | |
| if not TPLUS1_TEST_PREDICTIONS_PATH.exists(): | |
| return pd.DataFrame() | |
| df = pd.read_parquet(TPLUS1_TEST_PREDICTIONS_PATH) | |
| for col in ("date", "target_date"): | |
| if col in df.columns: | |
| df[col] = pd.to_datetime(df[col], errors="coerce") | |
| return df.sort_values("date").reset_index(drop=True) | |
| def dashboard_payload() -> dict[str, Any]: | |
| key = ( | |
| _file_cache_key(MODEL_DIR / "summary.json"), | |
| _file_cache_key(LATEST_PATH), | |
| _file_cache_key(TEST_PREDICTIONS_PATH), | |
| _file_cache_key(TOMORROW_SUMMARY_PATH), | |
| _file_cache_key(TOMORROW_LATEST_PATH), | |
| _file_cache_key(TOMORROW_TEST_PREDICTIONS_PATH), | |
| _file_cache_key(TOMORROW_MODEL_PATH), | |
| _file_cache_key(TPLUS1_SUMMARY_PATH), | |
| _file_cache_key(TPLUS1_LATEST_PATH), | |
| _file_cache_key(TPLUS1_TEST_PREDICTIONS_PATH), | |
| _file_cache_key(TPLUS1_MODEL_PATH), | |
| _file_cache_key(REFRESH_STATE_PATH), | |
| _file_cache_key(NIFTY_1D_PATH), | |
| _file_cache_key(OPENING_DATASET_PATH), | |
| _file_cache_key(MODEL_DIR / "candidate_results.csv"), | |
| _file_cache_key(NIFTY_1M_PATH), | |
| _file_cache_key(LIVE_ACCURACY_PATH), | |
| _file_cache_key(TOMORROW_PREDICTION_HISTORY_PATH), | |
| _file_cache_key(MFE_SUMMARY_PATH), | |
| _file_cache_key(MFE_LATEST_PATH), | |
| _file_cache_key(MFE_MODEL_PATH), | |
| ) | |
| with _dashboard_payload_lock: | |
| return copy.deepcopy(_dashboard_payload_cached(key)) | |
| def warm_dashboard_payload_cache() -> None: | |
| dashboard_payload() | |
| def _load_forecaster_predictions_by_target() -> dict[date, dict[str, Any]]: | |
| path = DAILY_FORECASTER_PREDICTIONS_PATH | |
| if not path.exists(): | |
| return {} | |
| try: | |
| frame = pd.read_csv(path) | |
| except Exception: | |
| return {} | |
| if frame.empty: | |
| return {} | |
| if "symbol" in frame.columns: | |
| frame = frame[frame["symbol"].astype(str) == "NIFTY 50"].copy() | |
| indexed: dict[date, dict[str, Any]] = {} | |
| for _, row in frame.iterrows(): | |
| target_day = _parse_iso_date(row.get("target_date")) | |
| if target_day is None: | |
| continue | |
| pred_value = row.get("pred") | |
| if pd.isna(pred_value) and "raw_pred" in row: | |
| pred_value = row.get("raw_pred") | |
| try: | |
| pred_int = int(pred_value) | |
| except Exception: | |
| continue | |
| prob_up = row.get("prob_up") | |
| try: | |
| prob_up = float(prob_up) if pd.notna(prob_up) else None | |
| except Exception: | |
| prob_up = None | |
| indexed[target_day] = { | |
| "prediction": "UP" if pred_int == 1 else "DOWN", | |
| "prob_up": prob_up, | |
| "forecast_date": row.get("forecast_date"), | |
| "source": "Tomorrow (forecaster)", | |
| } | |
| return indexed | |
| def _load_track_record_daily_rows() -> pd.DataFrame: | |
| frames: list[pd.DataFrame] = [] | |
| if NIFTY_1D_PATH.exists(): | |
| try: | |
| frames.append(pd.read_parquet(NIFTY_1D_PATH)) | |
| except Exception: | |
| pass | |
| try: | |
| yahoo_daily = fetch_yahoo_daily(period="3mo") | |
| if not yahoo_daily.empty: | |
| frames.append(yahoo_daily) | |
| try: | |
| append_parquet_rows(NIFTY_1D_PATH, yahoo_daily, ["date"]) | |
| except Exception: | |
| pass | |
| except Exception: | |
| pass | |
| if not frames: | |
| return pd.DataFrame() | |
| combined = pd.concat(frames, ignore_index=True) | |
| combined["date"] = pd.to_datetime(combined["date"], errors="coerce").dt.normalize() | |
| combined = combined.dropna(subset=["date"]).sort_values("date") | |
| combined = combined.drop_duplicates(subset=["date"], keep="last") | |
| combined = combined[ | |
| combined["close"].map(lambda value: np.isfinite(float(value)) if pd.notna(value) else False) | |
| ].copy() | |
| return combined.reset_index(drop=True) | |
| def _rolling_tomorrow_prediction( | |
| input_day: date, | |
| daily_rows: pd.DataFrame, | |
| threshold: float, | |
| fallback_prob: float, | |
| ) -> tuple[str, float]: | |
| history = daily_rows[daily_rows["date"].dt.date <= input_day].copy() | |
| prob_up = _tomorrow_probability_from_daily(history, fallback_prob) | |
| prediction = "UP" if prob_up >= threshold else "DOWN" | |
| return prediction, float(prob_up) | |
| def build_prediction_track_record( | |
| sessions: int = 10, | |
| ) -> list[dict[str, Any]]: | |
| summary = load_tomorrow_summary() | |
| artifact = load_tomorrow_model_artifact() | |
| threshold = float(artifact.get("threshold", summary.get("threshold", 0.534))) | |
| fallback_prob = float(summary.get("latest_forecast_prob_up", 0.49900560447008563)) | |
| forecaster_by_target = _load_forecaster_predictions_by_target() | |
| daily_rows = _load_track_record_daily_rows() | |
| if daily_rows.empty: | |
| return [] | |
| closes_by_date = { | |
| row["date"].date(): float(row["close"]) | |
| for _, row in daily_rows.iterrows() | |
| if pd.notna(row["close"]) and np.isfinite(float(row["close"])) | |
| } | |
| end_session = _track_record_end_session() | |
| available_through = max( | |
| (day for day in closes_by_date if day <= end_session), | |
| default=None, | |
| ) | |
| if available_through is None: | |
| return [] | |
| if available_through < end_session: | |
| end_session = available_through | |
| session_dates = last_n_trading_sessions(end_session, sessions) | |
| records: list[dict[str, Any]] = [] | |
| for target_day in session_dates: | |
| day_close = closes_by_date.get(target_day) | |
| if day_close is None: | |
| continue | |
| actual_move, actual_direction = _tomorrow_actual_outcome(target_day, day_close, closes_by_date) | |
| if actual_direction is None: | |
| continue | |
| input_day = previous_trading_day(target_day - timedelta(days=1)) | |
| cached = forecaster_by_target.get(target_day) | |
| if cached and cached.get("prediction") in {"UP", "DOWN"}: | |
| prediction = cached["prediction"] | |
| prob_up = cached.get("prob_up") | |
| source = cached.get("source", "Tomorrow (forecaster)") | |
| else: | |
| prediction, prob_up = _rolling_tomorrow_prediction( | |
| input_day, | |
| daily_rows, | |
| threshold, | |
| fallback_prob, | |
| ) | |
| source = "Tomorrow (rolling)" | |
| if prediction not in {"UP", "DOWN"}: | |
| continue | |
| records.append( | |
| { | |
| "date": target_day.isoformat(), | |
| "input_date": input_day.isoformat(), | |
| "prediction": prediction, | |
| "prediction_source": source, | |
| "prob_up": prob_up, | |
| "actual_move": actual_move, | |
| "actual_direction": actual_direction, | |
| "correct": prediction == actual_direction, | |
| } | |
| ) | |
| return records[-sessions:] | |
| def _dashboard_payload_cached(key: tuple[tuple[str, int | None, int | None], ...]) -> dict[str, Any]: | |
| summary = load_model_summary() | |
| t5_latest = _latest_saved_prediction_uncached() | |
| tomorrow_summary = load_tomorrow_summary() | |
| tomorrow_latest = latest_tomorrow_prediction() | |
| tplus1_summary = load_tplus1_summary() | |
| tplus1_latest = latest_tplus1_prediction() | |
| t5_test = load_test_predictions() | |
| tomorrow_test = load_tomorrow_test_predictions() | |
| tomorrow_history = _load_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH) | |
| tplus1_test = load_tplus1_test_predictions() | |
| mfe_summary = load_mfe_summary() | |
| mfe_latest = latest_mfe_prediction() | |
| daily = pd.read_parquet(NIFTY_1D_PATH) | |
| daily["date"] = pd.to_datetime(daily["date"], errors="coerce") | |
| daily = daily.sort_values("date").tail(180) | |
| if not t5_test.empty: | |
| recent_accuracy = float(t5_test.tail(40)["correct"].mean()) | |
| else: | |
| recent_accuracy = None | |
| if not tomorrow_test.empty: | |
| tomorrow_recent = tomorrow_test.tail(40).copy() | |
| if "pred" in tomorrow_recent.columns and "prediction" not in tomorrow_recent.columns: | |
| tomorrow_recent["prediction"] = np.where(pd.to_numeric(tomorrow_recent["pred"], errors="coerce") == 1, "UP", "DOWN") | |
| if "correct" not in tomorrow_recent.columns and {"target", "pred"}.issubset(tomorrow_recent.columns): | |
| tomorrow_recent["correct"] = pd.to_numeric(tomorrow_recent["target"], errors="coerce") == pd.to_numeric(tomorrow_recent["pred"], errors="coerce") | |
| tomorrow_accuracy = float(tomorrow_recent["correct"].mean()) if "correct" in tomorrow_recent.columns else tomorrow_summary.get("test_accuracy") | |
| else: | |
| tomorrow_accuracy = tomorrow_summary.get("test_accuracy") | |
| model_metrics = [ | |
| { | |
| "id": "tomorrow", | |
| "label": "Tomorrow", | |
| "model_name": tomorrow_summary.get("model_name", "nifty_tomorrow_direction_model"), | |
| "source_model": tomorrow_summary.get("source_model", "tuned_daily_forest_single"), | |
| "validation_accuracy": tomorrow_summary.get("validation_accuracy"), | |
| "test_accuracy": tomorrow_summary.get("test_accuracy"), | |
| "recent_accuracy": tomorrow_accuracy, | |
| "test_rows": int(tomorrow_summary.get("n_test") or len(tomorrow_test) or 0), | |
| }, | |
| { | |
| "id": "tplus1", | |
| "label": "T+1", | |
| "model_name": tplus1_summary.get("model_name", "nifty_1420_tplus1_logistic_model"), | |
| "source_model": "14:00-14:20 logistic forecaster", | |
| "validation_accuracy": tplus1_summary.get("validation_accuracy"), | |
| "test_accuracy": tplus1_summary.get("test_accuracy"), | |
| "recent_accuracy": float(tplus1_test.tail(40)["correct"].mean()) if not tplus1_test.empty and "correct" in tplus1_test.columns else tplus1_summary.get("test_accuracy"), | |
| "test_rows": int(tplus1_summary.get("test_rows") or len(tplus1_test) or 0), | |
| }, | |
| { | |
| "id": "t5", | |
| "label": "T+5", | |
| "model_name": summary.get("model_name", "nifty_opening_direction_model"), | |
| "source_model": summary.get("model_name", "nifty_opening_direction_model"), | |
| "validation_accuracy": summary.get("validation_accuracy"), | |
| "test_accuracy": summary.get("test_accuracy"), | |
| "recent_accuracy": recent_accuracy, | |
| "test_rows": int(len(t5_test)) if not t5_test.empty else int(summary.get("test_rows") or 0), | |
| }, | |
| ] | |
| metrics = { | |
| "validation_accuracy": tomorrow_summary.get("validation_accuracy"), | |
| "test_accuracy": tomorrow_summary.get("test_accuracy"), | |
| "baseline_test_accuracy": tomorrow_summary.get("baseline_accuracy"), | |
| "validation_auc": summary.get("validation_auc"), | |
| "test_auc": summary.get("test_auc"), | |
| "test_brier": summary.get("test_brier"), | |
| "feature_count": tomorrow_summary.get("feature_count"), | |
| "recent_accuracy": tomorrow_accuracy, | |
| "recent_accuracy_days": int(len(tomorrow_test.tail(40))) if not tomorrow_test.empty else 0, | |
| "total_test_days": int(tomorrow_summary.get("n_test") or len(tomorrow_test) or 0), | |
| "models": model_metrics, | |
| } | |
| return { | |
| "timestamp": datetime.now(ZoneInfo("Asia/Kolkata")).isoformat(), | |
| "predictions": { | |
| "t5": { | |
| "latest": t5_latest, | |
| "summary": summary, | |
| }, | |
| "tomorrow": { | |
| "latest": tomorrow_latest, | |
| "summary": tomorrow_summary, | |
| }, | |
| "tplus1": { | |
| "latest": tplus1_latest, | |
| "summary": tplus1_summary, | |
| }, | |
| "mfe": { | |
| "latest": mfe_latest, | |
| "summary": mfe_summary, | |
| }, | |
| }, | |
| "metrics": metrics, | |
| "models": model_metrics, | |
| "live_accuracy": load_live_accuracy(), | |
| "charts": { | |
| "daily_closes": _json_ready_frame(daily[["date", "close"]]), | |
| "t5_backtest": _json_ready_frame(t5_test, limit=400), | |
| "t5_recent_predictions": _json_ready_frame(t5_test.tail(40)), | |
| "tomorrow_backtest": _json_ready_frame(tomorrow_test, limit=400), | |
| "tomorrow_live_track_record": build_prediction_track_record(), | |
| "tomorrow_history_predictions": _json_ready_frame(tomorrow_history.tail(80)), | |
| "tplus1_backtest": _json_ready_frame(tplus1_test, limit=400), | |
| }, | |
| } | |
| def refresh_first5_prediction(session_date: date | None = None, minutes: pd.DataFrame | None = None) -> Prediction: | |
| if session_date is None: | |
| today = datetime.now(IST).date() | |
| if not is_trading_day(today): | |
| raise RuntimeError(f"{today.isoformat()} is not an NSE trading session.") | |
| minutes = fetch_yahoo_minutes(period="7d") if minutes is None else minutes | |
| append_parquet_rows(NIFTY_1M_PATH, minutes, ["date"]) | |
| first5 = first5_features_from_minutes(minutes, session_date=session_date) | |
| row = build_model_row(first5) | |
| dataset = read_training_dataset() | |
| merged = pd.concat([dataset, row], ignore_index=True) | |
| merged = merged.drop_duplicates(subset=["date"], keep="last").sort_values("date").reset_index(drop=True) | |
| merged.to_parquet(OPENING_DATASET_PATH, index=False, compression="zstd") | |
| prediction = predict_row(row) | |
| try: | |
| refresh_mfe_prediction(session_date=session_date) | |
| except Exception as exc: | |
| print(f"MFE refresh failed: {exc}", flush=True) | |
| return prediction | |
| def refresh_daily_data() -> dict[str, Any]: | |
| daily = fetch_yahoo_daily(period="1mo") | |
| combined = append_parquet_rows(NIFTY_1D_PATH, daily, ["date"]) | |
| return { | |
| "rows": int(len(combined)), | |
| "latest_date": pd.to_datetime(combined["date"]).max().date().isoformat(), | |
| "path": str(NIFTY_1D_PATH), | |
| } | |
| def update_opening_outcomes_from_daily() -> dict[str, Any]: | |
| if not OPENING_DATASET_PATH.exists() or not NIFTY_1D_PATH.exists(): | |
| return {"updated_rows": 0, "latest_date": None} | |
| dataset = pd.read_parquet(OPENING_DATASET_PATH) | |
| daily = pd.read_parquet(NIFTY_1D_PATH) | |
| if dataset.empty or daily.empty: | |
| return {"updated_rows": 0, "latest_date": None} | |
| dataset = dataset.copy() | |
| dataset["_session_date"] = pd.to_datetime(dataset["date"], errors="coerce").dt.normalize() | |
| daily = daily.copy() | |
| daily["_session_date"] = pd.to_datetime(daily["date"], errors="coerce").dt.normalize() | |
| daily = daily.dropna(subset=["_session_date"]).drop_duplicates("_session_date", keep="last") | |
| daily = daily.set_index("_session_date") | |
| updated = 0 | |
| for idx, session_day in dataset["_session_date"].dropna().items(): | |
| if session_day not in daily.index: | |
| continue | |
| row = daily.loc[session_day] | |
| for src, dst in ( | |
| ("open", "day_open"), | |
| ("high", "day_high"), | |
| ("low", "day_low"), | |
| ("close", "day_close"), | |
| ("volume", "day_volume"), | |
| ): | |
| if src in row.index and dst in dataset.columns: | |
| dataset.at[idx, dst] = row[src] | |
| if {"day_open", "day_close", "target", "day_return"}.issubset(dataset.columns): | |
| day_open = dataset.at[idx, "day_open"] | |
| day_close = dataset.at[idx, "day_close"] | |
| if pd.notna(day_open) and pd.notna(day_close) and float(day_open) != 0.0: | |
| dataset.at[idx, "target"] = int(float(day_close) > float(day_open)) | |
| dataset.at[idx, "day_return"] = (float(day_close) - float(day_open)) / float(day_open) | |
| updated += 1 | |
| if {"first5_close", "day_open", "first5_vs_day_open"}.issubset(dataset.columns): | |
| first5_close = dataset.at[idx, "first5_close"] | |
| day_open = dataset.at[idx, "day_open"] | |
| if pd.notna(first5_close) and pd.notna(day_open) and float(day_open) != 0.0: | |
| dataset.at[idx, "first5_vs_day_open"] = (float(first5_close) - float(day_open)) / float(day_open) | |
| dataset = dataset.drop(columns=["_session_date"]) | |
| dataset = dataset.sort_values("date").reset_index(drop=True) | |
| dataset.to_parquet(OPENING_DATASET_PATH, index=False, compression="zstd") | |
| latest = pd.to_datetime(dataset["date"], errors="coerce").max() | |
| return { | |
| "updated_rows": int(updated), | |
| "latest_date": None if pd.isna(latest) else latest.date().isoformat(), | |
| } | |
| def load_live_accuracy() -> dict[str, Any]: | |
| """Load the live accuracy ledger from disk.""" | |
| default = { | |
| "tomorrow": {"entries": [], "accuracy": None, "total": 0, "correct_count": 0, "backtest_count": 0, "live_count": 0}, | |
| "t5": {"entries": [], "accuracy": None, "total": 0, "correct_count": 0, "backtest_count": 0, "live_count": 0}, | |
| "tplus1": {"entries": [], "accuracy": None, "total": 0, "correct_count": 0, "backtest_count": 0, "live_count": 0}, | |
| } | |
| if LIVE_ACCURACY_PATH.exists(): | |
| try: | |
| raw = json.loads(LIVE_ACCURACY_PATH.read_text(encoding="utf-8")) | |
| except Exception: | |
| raw = None | |
| if isinstance(raw, dict): | |
| try: | |
| for model_id in default: | |
| current = raw.get(model_id, {}) | |
| if not isinstance(current, dict): | |
| current = {} | |
| entries = current.get("entries", []) | |
| if not isinstance(entries, list): | |
| entries = [] | |
| backtest_entries = [entry for entry in entries if str(entry.get("source", "backtest")).lower() == "backtest"] | |
| live_entries = [entry for entry in entries if str(entry.get("source", "backtest")).lower() != "backtest"] | |
| total = len(entries) | |
| correct = sum(1 for e in entries if e.get("correct")) | |
| current["entries"] = entries | |
| current["backtest_count"] = int(len(backtest_entries)) | |
| current["live_count"] = int(len(live_entries)) | |
| current["total"] = int(total) | |
| current["correct_count"] = int(correct) | |
| current["accuracy"] = (current["correct_count"] / current["total"]) if current["total"] > 0 else None | |
| default[model_id].update(current) | |
| return default | |
| except Exception: | |
| pass | |
| return default | |
| def save_live_accuracy(data: dict[str, Any]) -> None: | |
| """Persist the live accuracy ledger to disk.""" | |
| LIVE_ACCURACY_PATH.write_text(json.dumps(data, indent=2), encoding="utf-8") | |
| def _load_prediction_history(path: Path) -> pd.DataFrame: | |
| if not path.exists(): | |
| return pd.DataFrame() | |
| frame = pd.read_parquet(path) | |
| for col in ("date", "input_date", "target_date", "forecast_date"): | |
| if col in frame.columns: | |
| frame[col] = pd.to_datetime(frame[col], errors="coerce") | |
| sort_cols = [col for col in ("target_date", "input_date", "date", "forecast_date") if col in frame.columns] | |
| if sort_cols: | |
| return frame.sort_values(sort_cols).reset_index(drop=True) | |
| return frame.reset_index(drop=True) | |
| def _record_prediction_history(path: Path, row: dict[str, Any], subset: list[str]) -> None: | |
| append_prediction_history(path, row, subset) | |
| def _rescore_tomorrow_live_ledger(ledger: dict[str, Any]) -> bool: | |
| """Fix live Tomorrow ledger entries to use close vs previous close. Returns True if modified.""" | |
| daily = pd.read_parquet(NIFTY_1D_PATH) | |
| daily["_date"] = pd.to_datetime(daily["date"], errors="coerce").dt.normalize() | |
| closes_by_date: dict[date, float] = {} | |
| for _, row in daily.iterrows(): | |
| if pd.isna(row["_date"]): | |
| continue | |
| close = row.get("close") | |
| if pd.notna(close) and np.isfinite(float(close)): | |
| closes_by_date[row["_date"].date()] = float(close) | |
| changed = False | |
| for entry in ledger.get("tomorrow", {}).get("entries", []): | |
| if str(entry.get("source", "backtest")).lower() == "backtest": | |
| continue | |
| try: | |
| day = date.fromisoformat(str(entry.get("date", ""))[:10]) | |
| except Exception: | |
| continue | |
| day_close = closes_by_date.get(day) | |
| if day_close is None: | |
| continue | |
| _, actual_direction = _tomorrow_actual_outcome(day, day_close, closes_by_date) | |
| if actual_direction is None: | |
| continue | |
| pred = str(entry.get("prediction", "")).upper() | |
| if pred not in {"UP", "DOWN"}: | |
| continue | |
| new_correct = pred == actual_direction | |
| if entry.get("actual") != actual_direction or entry.get("correct") != new_correct: | |
| entry["actual"] = actual_direction | |
| entry["correct"] = new_correct | |
| changed = True | |
| return changed | |
| def ensure_completed_sessions_scored( | |
| now: datetime | None = None, | |
| ledger: dict[str, Any] | None = None, | |
| ) -> dict[str, Any]: | |
| """Score any completed Tomorrow sessions that have daily close data but no ledger entry.""" | |
| now = now or datetime.now(IST) | |
| completed = expected_completed_daily_date(now) | |
| if not is_trading_day(completed): | |
| return ledger or load_live_accuracy() | |
| ledger = ledger or load_live_accuracy() | |
| if _rescore_tomorrow_live_ledger(ledger): | |
| for model_id in ("tomorrow",): | |
| entries = ledger[model_id]["entries"] | |
| total = len(entries) | |
| correct = sum(1 for e in entries if e.get("correct")) | |
| backtest_count = sum(1 for e in entries if str(e.get("source", "backtest")).lower() == "backtest") | |
| ledger[model_id]["total"] = total | |
| ledger[model_id]["correct_count"] = correct | |
| ledger[model_id]["backtest_count"] = backtest_count | |
| ledger[model_id]["live_count"] = total - backtest_count | |
| ledger[model_id]["accuracy"] = correct / total if total > 0 else None | |
| save_live_accuracy(ledger) | |
| logged_tom = {e["date"] for e in ledger.get("tomorrow", {}).get("entries", [])} | |
| if completed.isoformat() in logged_tom: | |
| return ledger | |
| daily = pd.read_parquet(NIFTY_1D_PATH) | |
| daily["_date"] = pd.to_datetime(daily["date"], errors="coerce").dt.normalize() | |
| day_rows = daily[daily["_date"].dt.date == completed] | |
| if day_rows.empty: | |
| return ledger | |
| day_open = float(day_rows.iloc[-1]["open"]) | |
| day_close = float(day_rows.iloc[-1]["close"]) | |
| if not (np.isfinite(day_open) and np.isfinite(day_close) and day_open != 0): | |
| return ledger | |
| return update_live_accuracy(completed) | |
| def update_live_accuracy(session_date: date) -> dict[str, Any]: | |
| """Score today's predictions against actual outcomes and update the ledger. | |
| Must be called AFTER refresh_daily_data() (so today's close is available) | |
| but BEFORE refresh_first5_prediction / refresh_tplus1_prediction / | |
| refresh_tomorrow_prediction (so the CSV files still hold the predictions | |
| we want to score). | |
| """ | |
| ledger = load_live_accuracy() | |
| daily = pd.read_parquet(NIFTY_1D_PATH) | |
| daily["_date"] = pd.to_datetime(daily["date"], errors="coerce").dt.normalize() | |
| today_rows = daily[daily["_date"].dt.date == session_date] | |
| if today_rows.empty: | |
| return ledger | |
| day_open = float(today_rows.iloc[-1]["open"]) | |
| day_close = float(today_rows.iloc[-1]["close"]) | |
| if not (np.isfinite(day_open) and np.isfinite(day_close) and day_open != 0): | |
| return ledger | |
| actual_close_gt_open = "UP" if day_close > day_open else "DOWN" | |
| session_iso = session_date.isoformat() | |
| # --- T+5: today's 9:20 AM prediction vs close > open --- | |
| logged_t5 = {e["date"] for e in ledger["t5"]["entries"]} | |
| if session_iso not in logged_t5: | |
| t5_history = _load_prediction_history(T5_PREDICTION_HISTORY_PATH) | |
| if not t5_history.empty and "target_date" in t5_history.columns: | |
| t5_rows = t5_history[t5_history["target_date"].dt.date == session_date] | |
| else: | |
| t5_rows = pd.DataFrame() | |
| if t5_rows.empty and LATEST_PATH.exists(): | |
| try: | |
| t5_row = pd.read_csv(LATEST_PATH).iloc[-1].to_dict() | |
| if str(t5_row.get("input_date", ""))[:10] == session_iso: | |
| t5_rows = pd.DataFrame([t5_row]) | |
| except Exception: | |
| t5_rows = pd.DataFrame() | |
| if not t5_rows.empty: | |
| try: | |
| t5_row = t5_rows.iloc[-1].to_dict() | |
| pred = str(t5_row.get("prediction", "")).upper() | |
| if pred in ("UP", "DOWN"): | |
| ledger["t5"]["entries"].append({ | |
| "date": session_iso, | |
| "prediction": pred, | |
| "actual": actual_close_gt_open, | |
| "correct": pred == actual_close_gt_open, | |
| "source": "live", | |
| }) | |
| except Exception: | |
| pass | |
| # --- Tomorrow: prior close vs today's close (matches forecaster target) --- | |
| logged_tom = {e["date"] for e in ledger["tomorrow"]["entries"]} | |
| if session_iso not in logged_tom: | |
| prev_day = previous_trading_day(session_date - timedelta(days=1)) | |
| prev_rows = daily[daily["_date"].dt.date == prev_day] | |
| actual_tomorrow = None | |
| if not prev_rows.empty: | |
| prev_close = float(prev_rows.iloc[-1]["close"]) | |
| if np.isfinite(prev_close) and prev_close != 0: | |
| actual_tomorrow = "UP" if day_close > prev_close else "DOWN" | |
| tom_row = _find_tomorrow_prediction_for_target(session_date) | |
| if tom_row and actual_tomorrow is not None: | |
| try: | |
| pred = str(tom_row.get("prediction", "")).upper() | |
| if pred in ("UP", "DOWN"): | |
| ledger["tomorrow"]["entries"].append({ | |
| "date": session_iso, | |
| "prediction": pred, | |
| "actual": actual_tomorrow, | |
| "correct": pred == actual_tomorrow, | |
| "source": "live", | |
| }) | |
| except Exception: | |
| pass | |
| # --- T+1: yesterday's 14:20 prediction targeting today --- | |
| # T+1 target: today's close > yesterday's 14:20 close | |
| logged_t1 = {e["date"] for e in ledger["tplus1"]["entries"]} | |
| if session_iso not in logged_t1: | |
| t1_history = _load_prediction_history(TPLUS1_PREDICTION_HISTORY_PATH) | |
| if not t1_history.empty and "target_date" in t1_history.columns: | |
| t1_rows = t1_history[t1_history["target_date"].dt.date == session_date] | |
| else: | |
| t1_rows = pd.DataFrame() | |
| if not t1_rows.empty: | |
| try: | |
| t1_row = t1_rows.iloc[-1].to_dict() | |
| pred = str(t1_row.get("prediction", "")).upper() | |
| input_date_str = str(t1_row.get("input_date", ""))[:10] | |
| input_day = date.fromisoformat(input_date_str) | |
| # Read the 14:20 close from minute data for the input session | |
| minute = pd.read_parquet(NIFTY_1M_PATH, columns=["date", "close"]) | |
| minute["dt"] = pd.to_datetime(minute["date"], errors="coerce") | |
| minute = minute.dropna(subset=["dt"]) | |
| minute["session_date"] = minute["dt"].dt.normalize() | |
| minute["time_str"] = minute["dt"].dt.strftime("%H:%M") | |
| window = minute[ | |
| (minute["session_date"].dt.date == input_day) | |
| & (minute["time_str"] >= "14:00") | |
| & (minute["time_str"] <= "14:20") | |
| ].sort_values("dt") | |
| if not window.empty and pred in ("UP", "DOWN"): | |
| w_close = float(window.iloc[-1]["close"]) | |
| t1_actual = "UP" if day_close > w_close else "DOWN" | |
| ledger["tplus1"]["entries"].append({ | |
| "date": session_iso, | |
| "prediction": pred, | |
| "actual": t1_actual, | |
| "correct": pred == t1_actual, | |
| "source": "live", | |
| }) | |
| except Exception: | |
| pass | |
| _rescore_tomorrow_live_ledger(ledger) | |
| # Recompute summary stats | |
| for model_id in ("t5", "tomorrow", "tplus1"): | |
| entries = ledger[model_id]["entries"] | |
| total = len(entries) | |
| correct = sum(1 for e in entries if e.get("correct")) | |
| backtest_count = sum(1 for e in entries if str(e.get("source", "backtest")).lower() == "backtest") | |
| ledger[model_id]["total"] = total | |
| ledger[model_id]["correct_count"] = correct | |
| ledger[model_id]["backtest_count"] = backtest_count | |
| ledger[model_id]["live_count"] = total - backtest_count | |
| ledger[model_id]["accuracy"] = correct / total if total > 0 else None | |
| save_live_accuracy(ledger) | |
| return ledger | |
| def refresh_market_close_data(session_date: date | None = None) -> dict[str, Any]: | |
| now = datetime.now(IST) | |
| session_date = session_date or now.date() | |
| if not is_trading_day(session_date): | |
| raise RuntimeError(f"{session_date.isoformat()} is not an NSE trading session.") | |
| try: | |
| minutes = fetch_yahoo_minutes(period="7d") | |
| minute_frame = append_parquet_rows(NIFTY_1M_PATH, minutes, ["date"]) | |
| daily_info = refresh_daily_data() | |
| # Score live predictions BEFORE they get overwritten by fresh ones | |
| try: | |
| update_live_accuracy(session_date) | |
| except Exception as exc: | |
| print(f"[close-refresh] live accuracy update failed: {exc}", flush=True) | |
| t5_prediction = refresh_first5_prediction(session_date=session_date, minutes=minutes) | |
| tplus1_prediction = refresh_tplus1_prediction(session_date=session_date) | |
| outcomes = update_opening_outcomes_from_daily() | |
| tomorrow_prediction = refresh_tomorrow_prediction(session_date=session_date) | |
| return { | |
| "session_date": session_date.isoformat(), | |
| "nifty_1m_rows": int(len(minute_frame)), | |
| "latest_minute": pd.to_datetime(minute_frame["date"], errors="coerce").max().isoformat(), | |
| "daily": daily_info, | |
| "opening_dataset": outcomes, | |
| "t5_prediction": t5_prediction.to_dict(), | |
| "tplus1_prediction": tplus1_prediction, | |
| "tomorrow_prediction": tomorrow_prediction, | |
| } | |
| except Exception: | |
| raise | |
| def close_refresh_due(now: datetime | None = None) -> bool: | |
| now = now or datetime.now(IST) | |
| if not is_trading_day(now.date()) or now.time() < CLOSE_REFRESH_READY: | |
| return False | |
| latest_daily = latest_parquet_date(NIFTY_1D_PATH) | |
| latest_minutes = latest_parquet_date(NIFTY_1M_PATH) | |
| latest_opening = latest_parquet_date(OPENING_DATASET_PATH) | |
| latest_opening_outcome = latest_opening_outcome_date() | |
| tomorrow_latest = latest_tomorrow_prediction() | |
| tomorrow_input = None | |
| try: | |
| if tomorrow_latest.get("input_date"): | |
| tomorrow_input = date.fromisoformat(str(tomorrow_latest.get("input_date"))[:10]) | |
| except Exception: | |
| tomorrow_input = None | |
| return any( | |
| latest != now.date() | |
| for latest in (latest_daily, latest_minutes, latest_opening, latest_opening_outcome, tomorrow_input) | |
| ) | |
| def latest_prediction_input_date(path: Path) -> date | None: | |
| if not path.exists(): | |
| return None | |
| try: | |
| frame = pd.read_csv(path, usecols=["input_date"]) | |
| except Exception: | |
| return None | |
| if frame.empty: | |
| return None | |
| value = pd.to_datetime(frame["input_date"], errors="coerce").max() | |
| return None if pd.isna(value) else value.date() | |
| def latest_tomorrow_input_date() -> date | None: | |
| try: | |
| latest = latest_tomorrow_prediction() | |
| raw = latest.get("input_date") | |
| return date.fromisoformat(str(raw)[:10]) if raw else None | |
| except Exception: | |
| return None | |
| def expected_completed_daily_date(now: datetime | None = None) -> date: | |
| now = now or datetime.now(IST) | |
| if is_trading_day(now.date()) and now.time() < CLOSE_REFRESH_READY: | |
| return previous_trading_day(now.date() - timedelta(days=1)) | |
| return previous_trading_day(now.date()) | |
| def expected_minute_date(now: datetime | None = None) -> date: | |
| now = now or datetime.now(IST) | |
| if is_trading_day(now.date()) and now.time() >= FIRST5_READY: | |
| return now.date() | |
| return previous_trading_day(now.date() - timedelta(days=1)) | |
| def expected_tplus1_date(now: datetime | None = None) -> date: | |
| now = now or datetime.now(IST) | |
| if is_trading_day(now.date()) and now.time() >= TPLUS1_READY: | |
| return now.date() | |
| return previous_trading_day(now.date() - timedelta(days=1)) | |
| def is_stale(latest: date | None, expected: date) -> bool: | |
| return latest is None or latest < expected | |
| def stale_data_status(now: datetime | None = None) -> dict[str, Any]: | |
| now = now or datetime.now(IST) | |
| expected_daily = expected_completed_daily_date(now) | |
| expected_minutes = expected_minute_date(now) | |
| expected_tplus1 = expected_tplus1_date(now) | |
| latest_1d = latest_parquet_date(NIFTY_1D_PATH) | |
| latest_1m = latest_parquet_date(NIFTY_1M_PATH) | |
| latest_t5 = latest_prediction_input_date(LATEST_PATH) | |
| latest_tomorrow = latest_tomorrow_input_date() | |
| latest_tplus1 = latest_prediction_input_date(TPLUS1_LATEST_PATH) | |
| return { | |
| "daily_stale": expected_daily > (latest_1d or date.min), | |
| "minutes_stale": expected_minutes > (latest_1m or date.min), | |
| "t5_stale": is_stale(latest_t5, expected_minutes), | |
| "tomorrow_stale": is_stale(latest_tomorrow, expected_daily), | |
| "tplus1_stale": is_stale(latest_tplus1, expected_tplus1), | |
| } | |
| def refresh_stale_data_once(now: datetime | None = None) -> dict[str, Any]: | |
| now = now or datetime.now(IST) | |
| status = stale_data_status(now) | |
| if not any(status.values()): | |
| return {"status": "fresh", "actions": []} | |
| if not _stale_refresh_lock.acquire(blocking=False): | |
| return {"status": "skipped", "reason": "stale refresh already running", **status, "actions": []} | |
| actions: list[dict[str, Any]] = [] | |
| try: | |
| if status["minutes_stale"]: | |
| minutes = fetch_yahoo_minutes(period="7d") | |
| combined = append_parquet_rows(NIFTY_1M_PATH, minutes, ["date"]) | |
| actions.append( | |
| { | |
| "name": "minutes", | |
| "rows": int(len(combined)), | |
| "latest_date": pd.to_datetime(combined["date"], errors="coerce").max().date().isoformat(), | |
| } | |
| ) | |
| if status["daily_stale"]: | |
| daily_info = refresh_daily_data() | |
| outcomes = update_opening_outcomes_from_daily() | |
| actions.append({"name": "daily", **daily_info}) | |
| actions.append({"name": "opening_outcomes", **outcomes}) | |
| try: | |
| completed = date.fromisoformat(status["expected_daily_date"]) | |
| scored = update_live_accuracy(completed) | |
| actions.append( | |
| { | |
| "name": "tomorrow_live_accuracy", | |
| "session_date": completed.isoformat(), | |
| "live_count": scored.get("tomorrow", {}).get("live_count"), | |
| } | |
| ) | |
| except Exception as exc: | |
| actions.append({"name": "tomorrow_live_accuracy", "error": str(exc)}) | |
| if status["daily_stale"] or status["tomorrow_stale"]: | |
| try: | |
| tomorrow = refresh_tomorrow_prediction(session_date=date.fromisoformat(status["expected_daily_date"])) | |
| actions.append({"name": "tomorrow_prediction", "input_date": tomorrow.get("input_date")}) | |
| except Exception as exc: | |
| actions.append({"name": "tomorrow_prediction", "error": str(exc)}) | |
| if status["t5_stale"] and is_trading_day(now.date()) and now.time() >= FIRST5_READY: | |
| prediction = refresh_first5_prediction(session_date=now.date()) | |
| actions.append({"name": "t5_prediction", "input_date": prediction.input_date}) | |
| if status["tplus1_stale"] and is_trading_day(now.date()) and now.time() >= TPLUS1_READY: | |
| prediction = refresh_tplus1_prediction(session_date=now.date()) | |
| actions.append({"name": "tplus1_prediction", "input_date": prediction.get("input_date")}) | |
| clear_dashboard_payload_cache() | |
| refreshed_status = stale_data_status(datetime.now(IST)) | |
| return {"status": "refreshed", **refreshed_status, "actions": actions} | |
| finally: | |
| _stale_refresh_lock.release() | |
| def next_ist_run_at(run_time: time = time(9, 20), now: datetime | None = None) -> datetime: | |
| now = now or datetime.now(IST) | |
| target_day = now.date() | |
| if now >= datetime.combine(target_day, run_time, tzinfo=IST): | |
| target_day += timedelta(days=1) | |
| target_day = next_trading_day(target_day) | |
| return datetime.combine(target_day, run_time, tzinfo=IST) | |
| def seconds_until_next_ist_run(run_time: time = time(9, 20)) -> float: | |
| now = datetime.now(IST) | |
| target = next_ist_run_at(run_time, now=now) | |
| return max(1.0, (target - now).total_seconds()) | |