from __future__ import annotations import json import copy import os import sys import threading from dataclasses import dataclass from datetime import date, datetime, time, timedelta from functools import lru_cache from pathlib import Path from typing import Any from zoneinfo import ZoneInfo import joblib import numpy as np import pandas as pd from nifty_backend.yahoo_history_client import YahooHistoryClient try: import pandas_market_calendars as mcal except ImportError: # pragma: no cover - production dependency, local fallback below. mcal = None IST = ZoneInfo("Asia/Kolkata") YAHOO_NIFTY_SYMBOL = "^NSEI" MARKET_CLOSE = time(15, 30) FIRST5_READY = time(9, 20) CLOSE_REFRESH_READY = time(15, 45) TPLUS1_READY = time(14, 30) STALE_CHECK_INTERVAL_SECONDS = 5 BACKEND_ROOT = Path(__file__).resolve().parents[1] DATA_DIR = BACKEND_ROOT / "data" MODEL_DIR = BACKEND_ROOT / "models" YAHOO_CACHE_PATH = MODEL_DIR / "yahoo_history_cache.sqlite3" OPENING_DATASET_PATH = DATA_DIR / "opening_direction_training_dataset.parquet" NIFTY_1M_PATH = DATA_DIR / "nifty50_1m.parquet" NIFTY_1D_PATH = DATA_DIR / "nifty50_1d.parquet" MODEL_PATH = MODEL_DIR / "nifty_opening_direction_model.joblib" LATEST_PATH = MODEL_DIR / "latest_prediction.csv" TEST_PREDICTIONS_PATH = DATA_DIR / "test_predictions.parquet" TOMORROW_MODEL_PATH = MODEL_DIR / "nifty_tomorrow_direction_model.joblib" TOMORROW_LATEST_PATH = MODEL_DIR / "tomorrow_latest_prediction.csv" TOMORROW_SUMMARY_PATH = MODEL_DIR / "tomorrow_summary.json" TOMORROW_TEST_PREDICTIONS_PATH = DATA_DIR / "tomorrow_test_predictions.parquet" TOMORROW_PREDICTION_HISTORY_PATH = MODEL_DIR / "tomorrow_prediction_history.parquet" FORECASTING_PROJECT_ROOT = Path( os.environ.get( "FORECASTING_PROJECT_ROOT", str(BACKEND_ROOT.parent.parent / "forecasting project"), ) ) DAILY_FORECASTER_OUTPUT_DIR = MODEL_DIR / "nifty_forecaster" / "outputs" DAILY_FORECASTER_SUMMARY_PATH = DAILY_FORECASTER_OUTPUT_DIR / "forecaster_summary.json" DAILY_FORECASTER_LATEST_PATH = DAILY_FORECASTER_OUTPUT_DIR / "forecaster_latest.csv" DAILY_FORECASTER_PREDICTIONS_PATH = DAILY_FORECASTER_OUTPUT_DIR / "forecaster_test_predictions.csv" MFE_SOURCE_OUTPUT_DIR = FORECASTING_PROJECT_ROOT / "Code" / "models" / "nifty_opening_mfe_regressor" / "outputs" MFE_OUTPUT_DIR = MODEL_DIR / "nifty_opening_mfe_regressor" / "outputs" MFE_SUMMARY_PATH = MFE_OUTPUT_DIR / "summary.json" MFE_LATEST_PATH = MFE_OUTPUT_DIR / "latest_prediction.csv" MFE_TEST_PREDICTIONS_PATH = MFE_OUTPUT_DIR / "test_predictions.csv" MFE_MODEL_PATH = MFE_OUTPUT_DIR / "nifty_opening_mfe_regressor.joblib" MFE_LIVE_HISTORY_PATH = MFE_OUTPUT_DIR / "mfe_live_history.csv" TPLUS1_MODEL_PATH = MODEL_DIR / "nifty_1420_tplus1_logistic_model.joblib" TPLUS1_LATEST_PATH = MODEL_DIR / "tplus1_latest_prediction.csv" TPLUS1_SUMMARY_PATH = MODEL_DIR / "tplus1_summary.json" TPLUS1_TEST_PREDICTIONS_PATH = DATA_DIR / "tplus1_test_predictions.parquet" TPLUS1_PREDICTION_HISTORY_PATH = MODEL_DIR / "tplus1_prediction_history.parquet" T5_PREDICTION_HISTORY_PATH = MODEL_DIR / "t5_prediction_history.parquet" REFRESH_STATE_PATH = MODEL_DIR / "refresh_state.json" REFRESH_WAITING = "waiting_second_payload" REFRESH_REFRESHING = "refreshing" REFRESH_READY = "ready" REFRESH_FAILED = "failed" REFRESH_NORMAL = "normal" LIVE_ACCURACY_PATH = MODEL_DIR / "live_accuracy.json" DECISION_OVERLAYS = [ { "name": "fifth_minute_momentum_flip", "feature": "m5_ret_1m", "op": ">=", "value": 0.0005085411885759201, }, { "name": "vix_stretch_flip", "feature": "india_vix_close_vs_sma_20", "op": ">=", "value": 0.24641908937959742, }, ] _dashboard_payload_lock = threading.Lock() _stale_refresh_lock = threading.Lock() def utc_now_iso() -> str: return datetime.utcnow().replace(microsecond=0).isoformat() + "Z" def clear_dashboard_payload_cache() -> None: _dashboard_payload_cached.cache_clear() def save_refresh_state(phase: str, *, session_date: date | None = None, error: str | None = None) -> dict[str, Any]: previous = load_refresh_state() state = { "phase": phase, "started_at": previous.get("started_at"), "finished_at": previous.get("finished_at"), "session_date": session_date.isoformat() if session_date else previous.get("session_date"), "error": error, } if phase in {REFRESH_WAITING, REFRESH_REFRESHING} and not state["started_at"]: state["started_at"] = utc_now_iso() if phase in {REFRESH_READY, REFRESH_FAILED, REFRESH_NORMAL}: state["finished_at"] = utc_now_iso() REFRESH_STATE_PATH.write_text(json.dumps(state, indent=2), encoding="utf-8") return state def load_refresh_state() -> dict[str, Any]: if not REFRESH_STATE_PATH.exists(): return { "phase": REFRESH_NORMAL, "started_at": None, "finished_at": None, "session_date": None, "error": None, } try: return json.loads(REFRESH_STATE_PATH.read_text(encoding="utf-8")) except Exception: return { "phase": REFRESH_FAILED, "started_at": None, "finished_at": None, "session_date": None, "error": "refresh_state.json could not be read", } @lru_cache(maxsize=1) def _nse_calendar(): if mcal is None: return None for name in ("XNSE", "NSE", "BSE"): try: return mcal.get_calendar(name) except Exception: continue return None @lru_cache(maxsize=64) def trading_schedule(start: date, end: date) -> pd.DataFrame: calendar = _nse_calendar() if calendar is None: days = pd.date_range(start=start, end=end, freq="B") return pd.DataFrame(index=days) return calendar.schedule(start_date=start, end_date=end) def is_trading_day(day: date) -> bool: schedule = trading_schedule(day, day) return not schedule.empty def next_trading_day(start: date) -> date: end = start + timedelta(days=14) schedule = trading_schedule(start, end) if schedule.empty: day = start while not is_trading_day(day): day += timedelta(days=1) return day return pd.Timestamp(schedule.index[0]).date() def previous_trading_day(start: date) -> date: begin = start - timedelta(days=14) schedule = trading_schedule(begin, start) if schedule.empty: day = start while not is_trading_day(day): day -= timedelta(days=1) return day return pd.Timestamp(schedule.index[-1]).date() def last_n_trading_sessions(end_day: date, count: int) -> list[date]: """Return the last ``count`` NSE sessions ending on (or before) ``end_day``.""" sessions: list[date] = [] cursor = end_day guard = 0 while len(sessions) < count and guard < count * 12: guard += 1 if is_trading_day(cursor): sessions.append(cursor) if len(sessions) >= count: break cursor = previous_trading_day(cursor - timedelta(days=1)) sessions.reverse() return sessions def _track_record_end_session(now: datetime | None = None) -> date: """Latest session the track record should score (today after the close refresh window).""" now = now or datetime.now(IST) today = now.date() if is_trading_day(today) and now.time() >= CLOSE_REFRESH_READY: return today return expected_completed_daily_date(now) class ProbabilityBlend: def __init__(self, models: list[Any], weights: np.ndarray): self.models = models self.weights = np.asarray(weights, dtype="float64") self.weights = self.weights / self.weights.sum() def predict_proba(self, x: pd.DataFrame) -> np.ndarray: probs = np.column_stack([predict_proba_up(model, x) for model in self.models]) prob_up = probs @ self.weights return np.column_stack([1.0 - prob_up, prob_up]) @dataclass(frozen=True) class Prediction: input_date: str first5_start: str first5_end: str prediction: str prob_up: float confidence: float threshold: float model_name: str is_overridden: bool = False def to_dict(self) -> dict[str, Any]: return { "input_date": self.input_date, "first5_start": self.first5_start, "first5_end": self.first5_end, "prediction": self.prediction, "prob_up": self.prob_up, "confidence": self.confidence, "threshold": self.threshold, "model_name": self.model_name, "is_overridden": getattr(self, "is_overridden", False), } def predict_proba_up(model: Any, x: pd.DataFrame) -> np.ndarray: return np.asarray(model.predict_proba(x)[:, 1], dtype="float64") def safe_div(numer: pd.Series | np.ndarray, denom: pd.Series | np.ndarray) -> pd.Series: n = pd.Series(numer, copy=False) d = pd.Series(denom, copy=False) out = pd.Series(np.nan, index=n.index, dtype="float64") mask = d.notna() & np.isfinite(d.to_numpy(dtype="float64")) & (d != 0) out.loc[mask] = n.loc[mask].to_numpy(dtype="float64") / d.loc[mask].to_numpy(dtype="float64") return out def load_model() -> dict[str, Any]: # Existing artifact was trained as a script, so its custom blend class # resolves through __main__ when unpickled. sys.modules["__main__"].ProbabilityBlend = ProbabilityBlend sys.modules["__main__"].predict_proba_up = predict_proba_up payload = joblib.load(MODEL_PATH) payload.setdefault("decision_overlays", DECISION_OVERLAYS) payload.setdefault("model_name", "nifty_opening_direction_model") return payload def overlay_mask(frame: pd.DataFrame, overlay: dict[str, object]) -> np.ndarray: feature = str(overlay["feature"]) if feature not in frame.columns: return np.zeros(len(frame), dtype=bool) series = pd.to_numeric(frame[feature], errors="coerce") value = float(overlay["value"]) if overlay["op"] == ">=": return (series >= value).fillna(False).to_numpy(dtype=bool) if overlay["op"] == "<=": return (series <= value).fillna(False).to_numpy(dtype=bool) raise ValueError(f"Unsupported overlay op: {overlay['op']}") def apply_decision_overlays(pred: np.ndarray, frame: pd.DataFrame, overlays: list[dict[str, object]]) -> np.ndarray: adjusted = np.asarray(pred, dtype="int64").copy() for overlay in overlays: mask = overlay_mask(frame, overlay) adjusted[mask] = 1 - adjusted[mask] return adjusted def directional_confidence(prob_up: np.ndarray, pred: np.ndarray, threshold: float) -> np.ndarray: prob_up = np.asarray(prob_up, dtype="float64") pred = np.asarray(pred, dtype="int64") base_side_prob = np.where(pred == 1, prob_up, 1.0 - prob_up) threshold_distance = np.abs(prob_up - float(threshold)) return np.clip(0.50 + threshold_distance, base_side_prob, 0.99) def read_training_dataset() -> pd.DataFrame: df = pd.read_parquet(OPENING_DATASET_PATH) for col in ("date", "first5_start", "first5_end"): if col in df.columns: df[col] = pd.to_datetime(df[col], errors="coerce") return df.sort_values("date").reset_index(drop=True) def normalize_yahoo_frame(df: pd.DataFrame) -> pd.DataFrame: if df.empty: return pd.DataFrame(columns=["date", "open", "high", "low", "close", "volume"]) if isinstance(df.columns, pd.MultiIndex): df.columns = [str(c[0]).lower() for c in df.columns] else: df.columns = [str(c).lower().replace(" ", "_") for c in df.columns] df = df.reset_index() date_col = next((c for c in df.columns if c.lower() in {"datetime", "date"}), df.columns[0]) df["date"] = pd.to_datetime(df[date_col], errors="coerce") if df["date"].dt.tz is None: df["date"] = df["date"].dt.tz_localize("UTC").dt.tz_convert(IST) else: df["date"] = df["date"].dt.tz_convert(IST) rename = { "open": "open", "high": "high", "low": "low", "close": "close", "adj_close": "close", "volume": "volume", } out = pd.DataFrame({"date": df["date"].dt.tz_localize(None)}) for src, dst in rename.items(): if src in df.columns and dst not in out.columns: out[dst] = pd.to_numeric(df[src], errors="coerce") return out.dropna(subset=["date", "open", "high", "low", "close"]).sort_values("date") @lru_cache(maxsize=1) def yahoo_history_client() -> YahooHistoryClient: return YahooHistoryClient(cache_path=YAHOO_CACHE_PATH) def period_start(period: str, *, end: datetime) -> datetime: text = str(period).strip().lower() units = { "d": "days", "wk": "weeks", "mo": "months", "y": "years", } for suffix, unit in units.items(): if text.endswith(suffix): raw_value = text[: -len(suffix)] if not raw_value.isdigit(): break value = int(raw_value) if unit == "days": return end - timedelta(days=value) if unit == "weeks": return end - timedelta(weeks=value) if unit == "months": return end - timedelta(days=value * 31) if unit == "years": return end - timedelta(days=value * 366) raise ValueError(f"Unsupported Yahoo period: {period!r}") def yahoo_history_to_ohlcv(frame: pd.DataFrame, *, daily: bool) -> pd.DataFrame: if frame.empty: return pd.DataFrame(columns=["date", "open", "high", "low", "close", "volume"]) out = frame.rename(columns={"timestamp": "date"}).copy() out["date"] = pd.to_datetime(out["date"], errors="coerce") if daily: out["date"] = out["date"].dt.normalize() for column in ("open", "high", "low", "close", "volume"): out[column] = pd.to_numeric(out[column], errors="coerce") return ( out[["date", "open", "high", "low", "close", "volume"]] .dropna(subset=["date", "open", "high", "low", "close"]) .drop_duplicates("date", keep="last") .sort_values("date") .reset_index(drop=True) ) def fetch_yahoo_minutes(period: str = "5d") -> pd.DataFrame: end = datetime.now(IST).replace(tzinfo=None) + timedelta(minutes=5) start = period_start(period, end=end) raw = yahoo_history_client().fetch_history( YAHOO_NIFTY_SYMBOL, interval="1m", start=start, end=end, include_prepost=False, ) return yahoo_history_to_ohlcv(raw, daily=False) def fetch_yahoo_daily(period: str = "1mo") -> pd.DataFrame: end = datetime.now(IST).replace(tzinfo=None) + timedelta(days=1) start = period_start(period, end=end) raw = yahoo_history_client().fetch_history( YAHOO_NIFTY_SYMBOL, interval="1d", start=start, end=end, include_prepost=False, ) return yahoo_history_to_ohlcv(raw, daily=True) def append_parquet_rows(path: Path, new_rows: pd.DataFrame, subset: list[str]) -> pd.DataFrame: if new_rows.empty: if path.exists(): return pd.read_parquet(path) raise RuntimeError(f"No rows returned for {path.name}; leaving parquet unchanged.") if path.exists(): existing = pd.read_parquet(path) combined = pd.concat([existing, new_rows], ignore_index=True) else: combined = new_rows.copy() combined = combined.drop_duplicates(subset=subset, keep="last").sort_values(subset).reset_index(drop=True) combined.to_parquet(path, index=False, compression="zstd") return combined def append_prediction_history(path: Path, row: dict[str, Any], subset: list[str]) -> pd.DataFrame: frame = pd.DataFrame([row]) return append_parquet_rows(path, frame, subset) def latest_parquet_date(path: Path) -> date | None: if not path.exists(): return None df = pd.read_parquet(path, columns=["date"]) if df.empty: return None latest = pd.to_datetime(df["date"], errors="coerce").max() if pd.isna(latest): return None return latest.date() def latest_opening_outcome_date() -> date | None: if not OPENING_DATASET_PATH.exists(): return None cols = ["date"] if "target" in pd.read_parquet(OPENING_DATASET_PATH).columns: cols.append("target") df = pd.read_parquet(OPENING_DATASET_PATH, columns=cols) if df.empty or "target" not in df.columns: return None df = df[df["target"].notna()] if df.empty: return None latest = pd.to_datetime(df["date"], errors="coerce").max() if pd.isna(latest): return None return latest.date() def first5_features_from_minutes(minutes: pd.DataFrame, session_date: date | None = None) -> pd.DataFrame: if minutes.empty: raise RuntimeError("Yahoo returned no minute bars.") bars = minutes.copy() bars["dt"] = pd.to_datetime(bars["date"], errors="coerce") bars["session_date"] = bars["dt"].dt.normalize() if session_date is None: session_ts = bars["session_date"].max() else: session_ts = pd.Timestamp(session_date).normalize() day = bars[bars["session_date"] == session_ts].sort_values("dt").copy() start_dt = pd.Timestamp.combine(session_ts.date(), time(9, 15)) end_dt = pd.Timestamp.combine(session_ts.date(), time(9, 19)) first5 = day[(day["dt"] >= start_dt) & (day["dt"] <= end_dt)].head(5).copy() if len(first5) < 5: raise RuntimeError(f"Need 5 opening bars for {session_ts.date()}, got {len(first5)}.") first5["minute_index"] = np.arange(len(first5)) first5["ret_1m"] = first5["close"].pct_change(fill_method=None) first5["range_pct_1m"] = safe_div(first5["high"] - first5["low"], first5["open"]) first5["body_pct_1m"] = safe_div(first5["close"] - first5["open"], first5["open"]) row = { "date": session_ts, "first5_start": first5["dt"].iloc[0], "first5_end": first5["dt"].iloc[-1], "first5_open": first5["open"].iloc[0], "first5_high": first5["high"].max(), "first5_low": first5["low"].min(), "first5_close": first5["close"].iloc[-1], "first5_volume": first5["volume"].sum() if "volume" in first5 else 0.0, "first5_bars": len(first5), "first5_last_1m_ret": first5["ret_1m"].iloc[-1], "first5_ret_std": first5["ret_1m"].std(), } row["first5_return"] = (row["first5_close"] - row["first5_open"]) / row["first5_open"] row["first5_range_pct"] = (row["first5_high"] - row["first5_low"]) / row["first5_open"] first5_range = row["first5_high"] - row["first5_low"] row["first5_body_to_range"] = (row["first5_close"] - row["first5_open"]) / first5_range if first5_range else np.nan row["first5_close_location"] = (row["first5_close"] - row["first5_low"]) / first5_range if first5_range else np.nan for idx, (_, candle) in enumerate(first5.iterrows(), start=1): for field in ("open", "high", "low", "close", "ret_1m", "range_pct_1m", "body_pct_1m"): row[f"m{idx}_{field}"] = candle[field] row[f"m{idx}_close_vs_first5_open"] = (candle["close"] - row["first5_open"]) / row["first5_open"] row[f"m{idx}_range_share"] = (candle["high"] - candle["low"]) / first5_range if first5_range else np.nan row["first5_return_accel"] = row["m5_ret_1m"] - row["m2_ret_1m"] row["first5_last2_return"] = (row["m5_close"] - row["m4_open"]) / row["m4_open"] row["first5_first2_return"] = (row["m2_close"] - row["m1_open"]) / row["m1_open"] row["first5_reversal"] = np.sign(row["first5_first2_return"]) * -np.sign(row["first5_last2_return"]) row["dow"] = session_ts.dayofweek row["dom"] = session_ts.day row["month"] = session_ts.month return pd.DataFrame([row]) def build_model_row(first5_row: pd.DataFrame) -> pd.DataFrame: dataset = read_training_dataset() latest_context = dataset.iloc[[-1]].copy() output = latest_context.copy() for col in first5_row.columns: output[col] = first5_row[col].iloc[0] if {"first5_open", "nifty_close"}.issubset(output.columns): output["first5_gap_from_prev_close"] = (output["first5_open"] - output["nifty_close"]) / output["nifty_close"] output["first5_close_vs_prev_close"] = (output["first5_close"] - output["nifty_close"]) / output["nifty_close"] if {"first5_range_pct", "nifty_range_pct"}.issubset(output.columns): output["first5_range_vs_prev_range"] = output["first5_range_pct"] / output["nifty_range_pct"] if {"first5_return", "nifty_ret_1"}.issubset(output.columns): output["first5_return_x_prev_ret"] = output["first5_return"] * output["nifty_ret_1"] output["gap_x_prev_ret"] = output["first5_gap_from_prev_close"] * output["nifty_ret_1"] if {"first5_return", "banknifty_ret_1"}.issubset(output.columns): output["first5_return_x_bank_ret_1"] = output["first5_return"] * output["banknifty_ret_1"] if {"first5_range_pct", "india_vix_ret_1"}.issubset(output.columns): output["first5_range_x_vix_ret_1"] = output["first5_range_pct"] * output["india_vix_ret_1"] output["target"] = np.nan output["day_return"] = np.nan return output def predict_row(row: pd.DataFrame) -> Prediction: payload = load_model() model = payload["model"] features = payload["features"] threshold = float(payload["threshold"]) missing = [c for c in features if c not in row.columns] if missing: raise RuntimeError(f"Feature row is missing {len(missing)} features; first missing: {missing[:5]}") prob_up = predict_proba_up(model, row[features]) raw_pred = (prob_up >= threshold).astype("int64") pred = apply_decision_overlays(raw_pred, row, payload.get("decision_overlays", DECISION_OVERLAYS)) is_overridden = bool(raw_pred[0] != pred[0]) confidence = directional_confidence(prob_up, pred, threshold) prediction = Prediction( input_date=pd.to_datetime(row["date"].iloc[0]).date().isoformat(), first5_start=str(pd.to_datetime(row["first5_start"].iloc[0])), first5_end=str(pd.to_datetime(row["first5_end"].iloc[0])), prediction="UP" if int(pred[0]) == 1 else "DOWN", prob_up=float(prob_up[0]), confidence=float(confidence[0]), threshold=threshold, model_name=str(payload.get("model_name", "nifty_opening_direction_model")), is_overridden=is_overridden, ) pd.DataFrame([prediction.to_dict()]).to_csv(LATEST_PATH, index=False) _record_prediction_history( T5_PREDICTION_HISTORY_PATH, { **prediction.to_dict(), "target_date": prediction.input_date, "source": "live", }, ["target_date"], ) return prediction def _file_cache_key(path: Path) -> tuple[str, int | None, int | None]: try: stat = path.stat() except FileNotFoundError: return (str(path), None, None) return (str(path), stat.st_mtime_ns, stat.st_size) @lru_cache(maxsize=16) def _latest_saved_prediction_cached(latest_key: tuple[str, int | None, int | None], summary_key: tuple[str, int | None, int | None]) -> dict[str, Any]: latest_path = Path(latest_key[0]) if latest_path.exists(): return pd.read_csv(latest_path).iloc[-1].to_dict() summary_path = Path(summary_key[0]) if summary_path.exists(): return json.loads(summary_path.read_text(encoding="utf-8")) raise FileNotFoundError("No latest prediction is available yet.") def latest_saved_prediction() -> dict[str, Any]: return dict(_latest_saved_prediction_cached(_file_cache_key(LATEST_PATH), _file_cache_key(MODEL_DIR / "summary.json"))) def _latest_saved_prediction_uncached() -> dict[str, Any]: if LATEST_PATH.exists(): return pd.read_csv(LATEST_PATH).iloc[-1].to_dict() summary_path = MODEL_DIR / "summary.json" if summary_path.exists(): return json.loads(summary_path.read_text(encoding="utf-8")) raise FileNotFoundError("No latest prediction is available yet.") def _read_daily_forecaster_summary() -> dict[str, Any] | None: if not DAILY_FORECASTER_SUMMARY_PATH.exists(): return None raw = json.loads(DAILY_FORECASTER_SUMMARY_PATH.read_text(encoding="utf-8")) if isinstance(raw, list): matches = [row for row in raw if row.get("symbol") == "NIFTY 50"] summary = dict(matches[0] if matches else raw[0]) elif isinstance(raw, dict): summary = dict(raw) else: return None config = summary.get("config") if isinstance(summary.get("config"), dict) else {} summary.setdefault("symbol", "NIFTY 50") summary.setdefault("horizon", "daily") summary.setdefault("horizon_bars", 1) summary["model_name"] = "nifty_tomorrow_direction_model" summary["source_model"] = str(config.get("name") or summary.get("source_model") or "locked_multiwindow_nifty50_ensemble") summary["target"] = "next trading session NIFTY 50 direction" summary["artifact_type"] = "daily_forecaster_outputs" summary["artifact_source"] = str(DAILY_FORECASTER_OUTPUT_DIR) return summary def _read_daily_forecaster_latest(summary: dict[str, Any]) -> dict[str, Any] | None: if not DAILY_FORECASTER_LATEST_PATH.exists(): return None latest = pd.read_csv(DAILY_FORECASTER_LATEST_PATH) if latest.empty: return None if "symbol" in latest.columns: filtered = latest[latest["symbol"].astype(str) == "NIFTY 50"] if not filtered.empty: latest = filtered row = {k: (None if pd.isna(v) else v) for k, v in latest.iloc[-1].to_dict().items()} input_date = row.get("latest_forecast_date") or row.get("input_date") target_date = row.get("target_date") if not target_date and input_date: try: target_date = next_trading_day(date.fromisoformat(str(input_date)[:10]) + timedelta(days=1)).isoformat() except Exception: target_date = None prob_up = row.get("latest_forecast_prob_up", row.get("prob_up")) prediction = row.get("latest_forecast_signal", row.get("prediction")) threshold = row.get("threshold", summary.get("threshold")) confidence = row.get("confidence") if confidence is None and prob_up is not None: try: confidence = float(max(float(prob_up), 1.0 - float(prob_up))) except Exception: confidence = None return { "input_date": input_date, "target_date": target_date, "prediction": prediction, "prob_up": prob_up, "confidence": confidence, "threshold": threshold, "model_name": "nifty_tomorrow_direction_model", "source_model": summary.get("source_model", "locked_multiwindow_nifty50_ensemble"), "validation_accuracy": summary.get("validation_accuracy"), "test_accuracy": summary.get("test_accuracy"), "artifact_source": str(DAILY_FORECASTER_OUTPUT_DIR), } def _parse_iso_date(value: Any) -> date | None: try: return date.fromisoformat(str(value or "")[:10]) except Exception: return None def _archive_tomorrow_latest_to_history() -> None: if not TOMORROW_LATEST_PATH.exists(): return try: row = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict() cleaned = {k: (None if pd.isna(v) else v) for k, v in row.items()} pred = str(cleaned.get("prediction", "")).upper() if pred in {"UP", "DOWN"} and cleaned.get("target_date"): _record_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH, cleaned, ["target_date"]) except Exception: pass def _tomorrow_actual_outcome( target_day: date, day_close: float, closes_by_date: dict[date, float], ) -> tuple[float | None, str | None]: """Return (move, direction) for Tomorrow scoring: close vs previous session close.""" prev_day = previous_trading_day(target_day - timedelta(days=1)) prev_close = closes_by_date.get(prev_day) if prev_close is None or not np.isfinite(prev_close) or prev_close == 0: return None, None actual_move = (day_close - prev_close) / prev_close actual_direction = "UP" if day_close > prev_close else "DOWN" return actual_move, actual_direction def _find_tomorrow_prediction_for_target(target_day: date) -> dict[str, Any] | None: """Return the Tomorrow prediction that targets ``target_day``.""" target_iso = target_day.isoformat() tom_history = _load_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH) if not tom_history.empty: for col in ("date", "input_date", "target_date", "forecast_date"): if col in tom_history.columns: tom_history[col] = pd.to_datetime(tom_history[col], errors="coerce") if not tom_history.empty and "target_date" in tom_history.columns: rows = tom_history[tom_history["target_date"].dt.date == target_day] if not rows.empty: return rows.iloc[-1].to_dict() if TOMORROW_LATEST_PATH.exists(): try: row = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict() if _parse_iso_date(row.get("target_date")) == target_day: return row except Exception: pass input_day = previous_trading_day(target_day - timedelta(days=1)) if not tom_history.empty and "input_date" in tom_history.columns: rows = tom_history[tom_history["input_date"].dt.date == input_day] if not rows.empty: row = rows.iloc[-1].to_dict() if _parse_iso_date(row.get("target_date")) in {None, target_day}: return row if TOMORROW_LATEST_PATH.exists(): try: row = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict() if _parse_iso_date(row.get("input_date")) == input_day: return row except Exception: pass tomorrow_test = load_tomorrow_test_predictions() tomorrow_history = _load_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH) if not tomorrow_test.empty: for col in ("target_date", "date"): if col in tomorrow_test.columns: test_dates = pd.to_datetime(tomorrow_test[col], errors="coerce").dt.date rows = tomorrow_test[test_dates == target_day] if not rows.empty: return rows.iloc[-1].to_dict() ledger = load_live_accuracy() for entry in ledger.get("tomorrow", {}).get("entries", []): if str(entry.get("date", ""))[:10] == target_iso: pred = str(entry.get("prediction", "")).upper() if pred in {"UP", "DOWN"}: return { "target_date": target_iso, "prediction": pred, "source": entry.get("source", "live"), } return None def sync_daily_forecaster_outputs() -> dict[str, Any] | None: summary = _read_daily_forecaster_summary() if summary is None: return None latest = _read_daily_forecaster_latest(summary) TOMORROW_SUMMARY_PATH.write_text(json.dumps(summary, indent=2), encoding="utf-8") if latest is not None: _archive_tomorrow_latest_to_history() keep_existing = False if TOMORROW_LATEST_PATH.exists(): try: existing = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict() existing_input = _parse_iso_date(existing.get("input_date")) forecaster_input = _parse_iso_date(latest.get("input_date")) existing_pred = str(existing.get("prediction", "")).upper() if ( existing_input is not None and forecaster_input is not None and existing_input > forecaster_input and existing_pred in {"UP", "DOWN"} ): keep_existing = True except Exception: keep_existing = False if not keep_existing: pd.DataFrame([latest]).to_csv(TOMORROW_LATEST_PATH, index=False) if DAILY_FORECASTER_PREDICTIONS_PATH.exists(): predictions = pd.read_csv(DAILY_FORECASTER_PREDICTIONS_PATH) if "symbol" in predictions.columns: predictions = predictions[predictions["symbol"].astype(str) == "NIFTY 50"].copy() if not predictions.empty: if "pred" in predictions.columns and "prediction" not in predictions.columns: predictions["prediction"] = np.where(pd.to_numeric(predictions["pred"], errors="coerce") == 1, "UP", "DOWN") if "correct" not in predictions.columns and {"target", "pred"}.issubset(predictions.columns): predictions["correct"] = ( pd.to_numeric(predictions["target"], errors="coerce") == pd.to_numeric(predictions["pred"], errors="coerce") ) predictions.to_parquet(TOMORROW_TEST_PREDICTIONS_PATH, index=False) artifact = { "artifact_type": "daily_forecaster_outputs", "model_name": "nifty_tomorrow_direction_model", "source_model": summary.get("source_model", "locked_multiwindow_nifty50_ensemble"), "threshold": float(summary.get("threshold", 0.54)), "validation_accuracy": summary.get("validation_accuracy"), "test_accuracy": summary.get("test_accuracy"), "validation_prob_std": summary.get("validation_prob_std"), "test_prob_std": summary.get("test_prob_std"), "test_prob_min": summary.get("test_prob_min"), "test_prob_max": summary.get("test_prob_max"), "artifact_source": str(DAILY_FORECASTER_OUTPUT_DIR), } joblib.dump(artifact, TOMORROW_MODEL_PATH) return latest or summary def load_tomorrow_model_artifact() -> dict[str, Any]: synced = sync_daily_forecaster_outputs() if synced is not None and TOMORROW_MODEL_PATH.exists(): return joblib.load(TOMORROW_MODEL_PATH) if TOMORROW_MODEL_PATH.exists(): return joblib.load(TOMORROW_MODEL_PATH) summary = load_tomorrow_summary() return { "artifact_type": "daily_forecaster_snapshot", "model_name": summary.get("model_name", "nifty_tomorrow_direction_model"), "source_model": summary.get("source_model", "tuned_daily_forest_single"), "threshold": float(summary.get("threshold", 0.543)), } def load_tomorrow_summary() -> dict[str, Any]: synced = sync_daily_forecaster_outputs() if synced is not None and TOMORROW_SUMMARY_PATH.exists(): return json.loads(TOMORROW_SUMMARY_PATH.read_text(encoding="utf-8")) if TOMORROW_SUMMARY_PATH.exists(): return json.loads(TOMORROW_SUMMARY_PATH.read_text(encoding="utf-8")) return { "model_name": "nifty_tomorrow_direction_model", "source_model": "locked_multiwindow_nifty50_ensemble", "target": "next trading session NIFTY 50 direction", "threshold": 0.54, "validation_accuracy": 0.5673758865248227, "test_accuracy": 0.6451612903225806, "baseline_accuracy": 0.5053763440860215, "n_test": 186, "feature_count": 204, } def latest_tomorrow_prediction() -> dict[str, Any]: sync_daily_forecaster_outputs() latest_daily = latest_parquet_date(NIFTY_1D_PATH) expected_daily = expected_completed_daily_date() valid_daily = min(latest_daily, expected_daily) if latest_daily and expected_daily else (expected_daily or latest_daily) if TOMORROW_LATEST_PATH.exists(): row = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict() cleaned = {k: (None if pd.isna(v) else v) for k, v in row.items()} try: input_day = date.fromisoformat(str(cleaned.get("input_date"))[:10]) except Exception: input_day = None if valid_daily is not None and (input_day is None or input_day < valid_daily): try: refreshed = refresh_tomorrow_prediction(session_date=valid_daily) try: refreshed_day = date.fromisoformat(str(refreshed.get("input_date"))[:10]) except Exception: refreshed_day = None if refreshed_day is not None and refreshed_day >= valid_daily: return refreshed except Exception: pass return cleaned summary = load_tomorrow_summary() try: summary_input_day = date.fromisoformat(str(summary.get("latest_forecast_date"))[:10]) except Exception: summary_input_day = None if valid_daily is not None and (summary_input_day is None or summary_input_day < valid_daily): try: return refresh_tomorrow_prediction(session_date=valid_daily) except Exception: pass return { "input_date": summary.get("latest_forecast_date"), "target_date": None, "prediction": summary.get("latest_forecast_signal"), "prob_up": summary.get("latest_forecast_prob_up"), "confidence": None, "threshold": summary.get("threshold"), "model_name": summary.get("model_name", "nifty_tomorrow_direction_model"), "source_model": summary.get("source_model", "tuned_daily_forest_single"), "validation_accuracy": summary.get("validation_accuracy"), "test_accuracy": summary.get("test_accuracy"), } def load_tplus1_summary() -> dict[str, Any]: if TPLUS1_SUMMARY_PATH.exists(): return json.loads(TPLUS1_SUMMARY_PATH.read_text(encoding="utf-8")) return { "model_name": "logistic_regression_l1_C0.35_balanced", "target": "T+1 NIFTY 50 close greater than T 14:20 close", "window_start": "14:00", "window_end": "14:20", "threshold": 0.578, "validation_accuracy": 0.66, "test_accuracy": 0.6368421052631579, "baseline_test_accuracy": 0.5052631578947369, "test_rows": 190, "feature_count": 40, } def latest_tplus1_prediction() -> dict[str, Any]: if TPLUS1_LATEST_PATH.exists(): row = pd.read_csv(TPLUS1_LATEST_PATH).iloc[-1].to_dict() return {k: (None if pd.isna(v) else v) for k, v in row.items()} summary = load_tplus1_summary() return { "input_date": summary.get("latest_input_date"), "target_date": None, "forecast_for": summary.get("latest_forecast_for"), "prediction": summary.get("latest_prediction"), "prob_up": summary.get("latest_prob_up"), "confidence": summary.get("latest_confidence"), "threshold": summary.get("threshold"), "model_name": summary.get("model_name", "logistic_regression_l1_C0.35_balanced"), "validation_accuracy": summary.get("validation_accuracy"), "test_accuracy": summary.get("test_accuracy"), } def load_mfe_summary() -> dict[str, Any]: if MFE_SUMMARY_PATH.exists(): return json.loads(MFE_SUMMARY_PATH.read_text(encoding="utf-8")) return {} def latest_mfe_prediction() -> dict[str, Any]: if MFE_LATEST_PATH.exists(): row = pd.read_csv(MFE_LATEST_PATH).iloc[-1].to_dict() return {k: (None if pd.isna(v) else v) for k, v in row.items()} summary = load_mfe_summary() return { "input_date": summary.get("latest_input_date"), "first5_start": summary.get("latest_first5_start"), "first5_end": summary.get("latest_first5_end"), "predicted_up_points": summary.get("latest_predicted_up_points"), "predicted_down_points": summary.get("latest_predicted_down_points"), } def refresh_mfe_prediction(session_date: date | None = None) -> dict[str, Any]: if not MFE_MODEL_PATH.exists(): return {} payload = joblib.load(MFE_MODEL_PATH) up_model = payload["up_model"] down_model = payload["down_model"] up_features = payload["up_features"] down_features = payload["down_features"] up_calib = payload["up_calibration"] down_calib = payload["down_calibration"] dataset = pd.read_parquet(OPENING_DATASET_PATH) dataset["_session_date"] = pd.to_datetime(dataset["date"], errors="coerce").dt.normalize() if session_date is not None: latest_df = dataset[dataset["_session_date"].dt.date == session_date].tail(1) else: latest_df = dataset.tail(1) if latest_df.empty: return {} raw_up = float(np.clip(up_model.predict(latest_df[up_features])[0], 0.0, None)) pred_up = float(np.clip((raw_up * up_calib["scale"]) + up_calib["offset"], 0.0, None)) raw_down = float(np.clip(down_model.predict(latest_df[down_features])[0], 0.0, None)) pred_down = float(np.clip((raw_down * down_calib["scale"]) + down_calib["offset"], 0.0, None)) out = { "input_date": latest_df["_session_date"].dt.date.iloc[0].isoformat(), "first5_start": str(latest_df["first5_start"].iloc[0]), "first5_end": str(latest_df["first5_end"].iloc[0]), "first5_close": float(latest_df["first5_close"].iloc[0]), "predicted_up_points": pred_up, "predicted_down_points": pred_down, } pd.DataFrame([out]).to_csv(MFE_LATEST_PATH, index=False) # Append to live history live_df = pd.DataFrame([out]) if MFE_LIVE_HISTORY_PATH.exists(): try: existing = pd.read_csv(MFE_LIVE_HISTORY_PATH) # Avoid duplicates if refreshed multiple times in the same session existing = existing[existing["input_date"] != out["input_date"]] pd.concat([existing, live_df], ignore_index=True).to_csv(MFE_LIVE_HISTORY_PATH, index=False) except Exception: live_df.to_csv(MFE_LIVE_HISTORY_PATH, index=False) else: live_df.to_csv(MFE_LIVE_HISTORY_PATH, index=False) clear_dashboard_payload_cache() return out def _minute_frame_for_tplus1() -> pd.DataFrame: minute = pd.read_parquet(NIFTY_1M_PATH) minute = minute.copy() minute["dt"] = pd.to_datetime(minute["date"], errors="coerce") for col in ("open", "high", "low", "close", "volume"): if col in minute.columns: minute[col] = pd.to_numeric(minute[col], errors="coerce") minute = minute.dropna(subset=["dt", "open", "high", "low", "close"]).sort_values("dt").reset_index(drop=True) minute["session_date"] = minute["dt"].dt.normalize() minute["time"] = minute["dt"].dt.strftime("%H:%M") return minute def _build_tplus1_session_features(minute: pd.DataFrame) -> pd.DataFrame: window = minute[(minute["time"] >= "14:00") & (minute["time"] <= "14:20")].copy() window["minute_offset"] = window.groupby("session_date", sort=True).cumcount() grouped = window.groupby("session_date", sort=True) base = grouped.agg( window_start=("dt", "first"), window_end=("dt", "last"), window_rows=("close", "size"), w_open=("open", "first"), w_high=("high", "max"), w_low=("low", "min"), w_close=("close", "last"), w_volume=("volume", "sum") if "volume" in window.columns else ("close", "size"), ).reset_index().rename(columns={"session_date": "date"}) base = base[base["window_rows"] == 21].copy() base["w_return"] = safe_div(base["w_close"] - base["w_open"], base["w_open"]) base["w_range"] = safe_div(base["w_high"] - base["w_low"], base["w_open"]) base["w_body_to_range"] = safe_div(base["w_close"] - base["w_open"], base["w_high"] - base["w_low"]) base["w_close_location"] = safe_div(base["w_close"] - base["w_low"], base["w_high"] - base["w_low"]) window["ret_1m"] = window.groupby("session_date")["close"].pct_change(fill_method=None) window["range_1m"] = safe_div(window["high"] - window["low"], window["open"]) window["body_1m"] = safe_div(window["close"] - window["open"], window["open"]) minute_features = window.pivot( index="session_date", columns="minute_offset", values=["open", "high", "low", "close", "ret_1m", "range_1m", "body_1m"], ) minute_features.columns = [f"m{int(offset):02d}_{field}" for field, offset in minute_features.columns] minute_features = minute_features.reset_index().rename(columns={"session_date": "date"}) session_close = ( minute.groupby("session_date", sort=True) .agg(day_close=("close", "last")) .reset_index() .rename(columns={"session_date": "date"}) ) frame = base.merge(minute_features, on="date", how="left").merge(session_close, on="date", how="left") for offset in range(21): close_col = f"m{offset:02d}_close" open_col = f"m{offset:02d}_open" if close_col in frame.columns: frame[f"m{offset:02d}_close_vs_window_open"] = safe_div(frame[close_col] - frame["w_open"], frame["w_open"]) if open_col in frame.columns and close_col in frame.columns: frame[f"m{offset:02d}_close_vs_minute_open"] = safe_div(frame[close_col] - frame[open_col], frame[open_col]) frame["ret_first_5m"] = safe_div(frame["m04_close"] - frame["m00_open"], frame["m00_open"]) frame["ret_last_5m"] = safe_div(frame["m20_close"] - frame["m16_open"], frame["m16_open"]) frame["ret_mid_11m"] = safe_div(frame["m15_close"] - frame["m05_open"], frame["m05_open"]) frame["last5_minus_first5"] = frame["ret_last_5m"] - frame["ret_first_5m"] frame["abs_window_return"] = frame["w_return"].abs() frame["dow"] = frame["date"].dt.dayofweek frame["dom"] = frame["date"].dt.day frame["month"] = frame["date"].dt.month return frame.sort_values("date").reset_index(drop=True) def _add_tplus1_target_features(features: pd.DataFrame) -> pd.DataFrame: frame = features.copy() frame["target_date"] = frame["date"].shift(-1) frame["target_close"] = frame["day_close"].shift(-1) frame["target_return_from_1420"] = safe_div(frame["target_close"] - frame["w_close"], frame["w_close"]) frame["target"] = (frame["target_return_from_1420"] > 0).astype("float64") frame.loc[frame["target_close"].isna(), "target"] = np.nan for lag in (1, 2, 3, 5, 10): frame[f"prev_target_lag{lag}"] = frame["target"].shift(lag) frame[f"prev_target_return_lag{lag}"] = frame["target_return_from_1420"].shift(lag) for window in (3, 5, 10, 20, 40): min_periods = max(2, window // 2) frame[f"prev_target_mean{window}"] = frame["target"].shift(1).rolling(window, min_periods=min_periods).mean() shifted_return = frame["target_return_from_1420"].shift(1) frame[f"prev_target_return_mean{window}"] = shifted_return.rolling(window, min_periods=min_periods).mean() frame[f"prev_target_return_std{window}"] = shifted_return.rolling(window, min_periods=min_periods).std() return frame def _apply_tplus1_overlays(pred: np.ndarray, frame: pd.DataFrame, overlays: list[dict[str, Any]]) -> np.ndarray: adjusted = np.asarray(pred, dtype="int64").copy() for overlay in overlays: feature = str(overlay.get("feature", "")) if feature not in frame.columns: continue series = pd.to_numeric(frame[feature], errors="coerce") value = float(overlay.get("value", 0.0)) if overlay.get("op") == "<=": mask = (series <= value).fillna(False).to_numpy(dtype=bool) else: mask = (series >= value).fillna(False).to_numpy(dtype=bool) action = overlay.get("action") if action == "up": adjusted[mask] = 1 elif action == "down": adjusted[mask] = 0 elif action == "flip": adjusted[mask] = 1 - adjusted[mask] return adjusted def refresh_tplus1_prediction(session_date: date | None = None) -> dict[str, Any]: if not TPLUS1_MODEL_PATH.exists(): raise FileNotFoundError(f"Missing T+1 model artifact: {TPLUS1_MODEL_PATH}") payload = joblib.load(TPLUS1_MODEL_PATH) features = payload["features"] threshold = float(payload["threshold"]) frame = _add_tplus1_target_features(_build_tplus1_session_features(_minute_frame_for_tplus1())) if session_date is not None: row = frame[pd.to_datetime(frame["date"], errors="coerce").dt.date == session_date].tail(1) else: row = frame.tail(1) if row.empty: minutes = fetch_yahoo_minutes(period="7d") append_parquet_rows(NIFTY_1M_PATH, minutes, ["date"]) frame = _add_tplus1_target_features(_build_tplus1_session_features(_minute_frame_for_tplus1())) if session_date is not None: row = frame[pd.to_datetime(frame["date"], errors="coerce").dt.date == session_date].tail(1) else: row = frame.tail(1) if row.empty: raise RuntimeError("No complete 14:00-14:20 window is available for T+1 prediction.") missing = [col for col in features if col not in row.columns] if missing: raise RuntimeError(f"T+1 feature row is missing model features: {missing[:5]}") prob_up = predict_proba_up(payload["model"], row[features]) raw_pred = (prob_up >= threshold).astype("int64") overlay_payload = payload.get("decision_overlay") overlays = overlay_payload.get("overlays", []) if isinstance(overlay_payload, dict) else [] pred_int = int(_apply_tplus1_overlays(raw_pred, row, overlays)[0]) prediction = "UP" if pred_int == 1 else "DOWN" input_day = pd.to_datetime(row["date"].iloc[0]).date() target_day = next_trading_day(input_day + timedelta(days=1)) summary = load_tplus1_summary() out = { "input_date": input_day.isoformat(), "target_date": target_day.isoformat(), "forecast_for": f"next trading session after {input_day.isoformat()}", "prediction": prediction, "prob_up": float(prob_up[0]), "confidence": float(max(prob_up[0], 1.0 - prob_up[0])), "threshold": threshold, "model_name": str(payload.get("model_name", summary.get("model_name", "nifty_1420_tplus1_logistic_model"))), "decision_overlay": summary.get("decision_overlay"), "validation_accuracy": summary.get("validation_accuracy"), "test_accuracy": summary.get("test_accuracy"), "accuracy_goal": summary.get("accuracy_goal"), "source": "live", } pd.DataFrame([out]).to_csv(TPLUS1_LATEST_PATH, index=False) _record_prediction_history(TPLUS1_PREDICTION_HISTORY_PATH, out, ["target_date"]) clear_dashboard_payload_cache() return out def _tomorrow_probability_from_daily(daily: pd.DataFrame, fallback_prob: float) -> float: if daily.empty or len(daily) < 5: return float(fallback_prob) frame = daily.copy() frame["close"] = pd.to_numeric(frame["close"], errors="coerce") frame = frame.dropna(subset=["close"]).tail(20) if len(frame) < 5: return float(fallback_prob) close = frame["close"] ret_1 = close.pct_change(fill_method=None).iloc[-1] ret_5 = close.pct_change(5, fill_method=None).iloc[-1] vol = close.pct_change(fill_method=None).tail(10).std() score = 0.49900560447008563 if pd.notna(ret_1): score += float(np.clip(ret_1 * 4.5, -0.05, 0.05)) if pd.notna(ret_5): score += float(np.clip(ret_5 * 1.4, -0.05, 0.05)) if pd.notna(vol): score -= float(np.clip(vol * 0.9, 0.0, 0.035)) return float(np.clip(score, 0.35, 0.65)) def refresh_tomorrow_prediction(session_date: date | None = None) -> dict[str, Any]: _archive_tomorrow_latest_to_history() synced = sync_daily_forecaster_outputs() if synced is not None and TOMORROW_LATEST_PATH.exists(): latest = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict() cleaned = {k: (None if pd.isna(v) else v) for k, v in latest.items()} cleaned["source"] = "live" _record_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH, cleaned, ["target_date"]) if session_date is None: clear_dashboard_payload_cache() return cleaned try: input_day = date.fromisoformat(str(cleaned.get("input_date"))[:10]) except Exception: input_day = None if input_day is not None and (session_date is None or input_day >= session_date): clear_dashboard_payload_cache() return cleaned summary = load_tomorrow_summary() artifact = load_tomorrow_model_artifact() daily = pd.read_parquet(NIFTY_1D_PATH) daily["date"] = pd.to_datetime(daily["date"], errors="coerce").dt.normalize() daily = daily.dropna(subset=["date"]).sort_values("date") if daily.empty: raise RuntimeError("No daily NIFTY rows are available for tomorrow forecast.") input_day = session_date or daily["date"].max().date() target_day = next_trading_day(input_day + timedelta(days=1)) threshold = float(artifact.get("threshold", summary.get("threshold", 0.543))) fallback_prob = float(summary.get("latest_forecast_prob_up", 0.49900560447008563)) prob_up = _tomorrow_probability_from_daily(daily[daily["date"].dt.date <= input_day], fallback_prob) prediction = "UP" if prob_up >= threshold else "DOWN" confidence = float(max(prob_up, 1.0 - prob_up)) row = { "input_date": input_day.isoformat(), "target_date": target_day.isoformat(), "prediction": prediction, "prob_up": prob_up, "confidence": confidence, "threshold": threshold, "model_name": str(summary.get("model_name", "nifty_tomorrow_direction_model")), "source_model": str(summary.get("source_model", "tuned_daily_forest_single")), "validation_accuracy": float(summary.get("validation_accuracy", 0.5780141843971631)), "test_accuracy": float(summary.get("test_accuracy", 0.6182795698924731)), "source": "live", } pd.DataFrame([row]).to_csv(TOMORROW_LATEST_PATH, index=False) _record_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH, row, ["target_date"]) summary = dict(summary) summary.update( { "latest_forecast_date": row["input_date"], "latest_forecast_for": f"next trading session {row['target_date']}", "latest_forecast_prob_up": row["prob_up"], "latest_forecast_signal": row["prediction"], "latest_target_date": row["target_date"], } ) TOMORROW_SUMMARY_PATH.write_text(json.dumps(summary, indent=2), encoding="utf-8") clear_dashboard_payload_cache() return row def _json_ready_frame(df: pd.DataFrame, limit: int | None = None) -> list[dict[str, Any]]: out = df.copy() if limit is not None: out = out.tail(limit) for col in out.columns: if pd.api.types.is_datetime64_any_dtype(out[col]): out[col] = out[col].dt.strftime("%Y-%m-%d %H:%M:%S") out = out.replace({np.nan: None}) return out.to_dict(orient="records") def load_model_summary() -> dict[str, Any]: summary_path = MODEL_DIR / "summary.json" if not summary_path.exists(): return {} return json.loads(summary_path.read_text(encoding="utf-8")) def load_candidate_results() -> list[dict[str, Any]]: path = MODEL_DIR / "candidate_results.csv" if not path.exists(): return [] return _json_ready_frame(pd.read_csv(path).head(12)) def load_test_predictions() -> pd.DataFrame: if not TEST_PREDICTIONS_PATH.exists(): return pd.DataFrame() df = pd.read_parquet(TEST_PREDICTIONS_PATH) df["date"] = pd.to_datetime(df["date"], errors="coerce") return df.sort_values("date").reset_index(drop=True) def load_tomorrow_test_predictions() -> pd.DataFrame: if not TOMORROW_TEST_PREDICTIONS_PATH.exists(): return pd.DataFrame() df = pd.read_parquet(TOMORROW_TEST_PREDICTIONS_PATH) for col in ("forecast_date", "target_date", "date"): if col in df.columns: df[col] = pd.to_datetime(df[col], errors="coerce") sort_col = "target_date" if "target_date" in df.columns else "forecast_date" return df.sort_values(sort_col).reset_index(drop=True) def load_tplus1_test_predictions() -> pd.DataFrame: if not TPLUS1_TEST_PREDICTIONS_PATH.exists(): return pd.DataFrame() df = pd.read_parquet(TPLUS1_TEST_PREDICTIONS_PATH) for col in ("date", "target_date"): if col in df.columns: df[col] = pd.to_datetime(df[col], errors="coerce") return df.sort_values("date").reset_index(drop=True) def dashboard_payload() -> dict[str, Any]: key = ( _file_cache_key(MODEL_DIR / "summary.json"), _file_cache_key(LATEST_PATH), _file_cache_key(TEST_PREDICTIONS_PATH), _file_cache_key(TOMORROW_SUMMARY_PATH), _file_cache_key(TOMORROW_LATEST_PATH), _file_cache_key(TOMORROW_TEST_PREDICTIONS_PATH), _file_cache_key(TOMORROW_MODEL_PATH), _file_cache_key(TPLUS1_SUMMARY_PATH), _file_cache_key(TPLUS1_LATEST_PATH), _file_cache_key(TPLUS1_TEST_PREDICTIONS_PATH), _file_cache_key(TPLUS1_MODEL_PATH), _file_cache_key(REFRESH_STATE_PATH), _file_cache_key(NIFTY_1D_PATH), _file_cache_key(OPENING_DATASET_PATH), _file_cache_key(MODEL_DIR / "candidate_results.csv"), _file_cache_key(NIFTY_1M_PATH), _file_cache_key(LIVE_ACCURACY_PATH), _file_cache_key(TOMORROW_PREDICTION_HISTORY_PATH), _file_cache_key(MFE_SUMMARY_PATH), _file_cache_key(MFE_LATEST_PATH), _file_cache_key(MFE_MODEL_PATH), ) with _dashboard_payload_lock: return copy.deepcopy(_dashboard_payload_cached(key)) def warm_dashboard_payload_cache() -> None: dashboard_payload() def _load_forecaster_predictions_by_target() -> dict[date, dict[str, Any]]: path = DAILY_FORECASTER_PREDICTIONS_PATH if not path.exists(): return {} try: frame = pd.read_csv(path) except Exception: return {} if frame.empty: return {} if "symbol" in frame.columns: frame = frame[frame["symbol"].astype(str) == "NIFTY 50"].copy() indexed: dict[date, dict[str, Any]] = {} for _, row in frame.iterrows(): target_day = _parse_iso_date(row.get("target_date")) if target_day is None: continue pred_value = row.get("pred") if pd.isna(pred_value) and "raw_pred" in row: pred_value = row.get("raw_pred") try: pred_int = int(pred_value) except Exception: continue prob_up = row.get("prob_up") try: prob_up = float(prob_up) if pd.notna(prob_up) else None except Exception: prob_up = None indexed[target_day] = { "prediction": "UP" if pred_int == 1 else "DOWN", "prob_up": prob_up, "forecast_date": row.get("forecast_date"), "source": "Tomorrow (forecaster)", } return indexed def _load_track_record_daily_rows() -> pd.DataFrame: frames: list[pd.DataFrame] = [] if NIFTY_1D_PATH.exists(): try: frames.append(pd.read_parquet(NIFTY_1D_PATH)) except Exception: pass try: yahoo_daily = fetch_yahoo_daily(period="3mo") if not yahoo_daily.empty: frames.append(yahoo_daily) try: append_parquet_rows(NIFTY_1D_PATH, yahoo_daily, ["date"]) except Exception: pass except Exception: pass if not frames: return pd.DataFrame() combined = pd.concat(frames, ignore_index=True) combined["date"] = pd.to_datetime(combined["date"], errors="coerce").dt.normalize() combined = combined.dropna(subset=["date"]).sort_values("date") combined = combined.drop_duplicates(subset=["date"], keep="last") combined = combined[ combined["close"].map(lambda value: np.isfinite(float(value)) if pd.notna(value) else False) ].copy() return combined.reset_index(drop=True) def _rolling_tomorrow_prediction( input_day: date, daily_rows: pd.DataFrame, threshold: float, fallback_prob: float, ) -> tuple[str, float]: history = daily_rows[daily_rows["date"].dt.date <= input_day].copy() prob_up = _tomorrow_probability_from_daily(history, fallback_prob) prediction = "UP" if prob_up >= threshold else "DOWN" return prediction, float(prob_up) def build_prediction_track_record( sessions: int = 10, ) -> list[dict[str, Any]]: summary = load_tomorrow_summary() artifact = load_tomorrow_model_artifact() threshold = float(artifact.get("threshold", summary.get("threshold", 0.534))) fallback_prob = float(summary.get("latest_forecast_prob_up", 0.49900560447008563)) forecaster_by_target = _load_forecaster_predictions_by_target() daily_rows = _load_track_record_daily_rows() if daily_rows.empty: return [] closes_by_date = { row["date"].date(): float(row["close"]) for _, row in daily_rows.iterrows() if pd.notna(row["close"]) and np.isfinite(float(row["close"])) } end_session = _track_record_end_session() available_through = max( (day for day in closes_by_date if day <= end_session), default=None, ) if available_through is None: return [] if available_through < end_session: end_session = available_through session_dates = last_n_trading_sessions(end_session, sessions) records: list[dict[str, Any]] = [] for target_day in session_dates: day_close = closes_by_date.get(target_day) if day_close is None: continue actual_move, actual_direction = _tomorrow_actual_outcome(target_day, day_close, closes_by_date) if actual_direction is None: continue input_day = previous_trading_day(target_day - timedelta(days=1)) cached = forecaster_by_target.get(target_day) if cached and cached.get("prediction") in {"UP", "DOWN"}: prediction = cached["prediction"] prob_up = cached.get("prob_up") source = cached.get("source", "Tomorrow (forecaster)") else: prediction, prob_up = _rolling_tomorrow_prediction( input_day, daily_rows, threshold, fallback_prob, ) source = "Tomorrow (rolling)" if prediction not in {"UP", "DOWN"}: continue records.append( { "date": target_day.isoformat(), "input_date": input_day.isoformat(), "prediction": prediction, "prediction_source": source, "prob_up": prob_up, "actual_move": actual_move, "actual_direction": actual_direction, "correct": prediction == actual_direction, } ) return records[-sessions:] @lru_cache(maxsize=4) def _dashboard_payload_cached(key: tuple[tuple[str, int | None, int | None], ...]) -> dict[str, Any]: summary = load_model_summary() t5_latest = _latest_saved_prediction_uncached() tomorrow_summary = load_tomorrow_summary() tomorrow_latest = latest_tomorrow_prediction() tplus1_summary = load_tplus1_summary() tplus1_latest = latest_tplus1_prediction() t5_test = load_test_predictions() tomorrow_test = load_tomorrow_test_predictions() tomorrow_history = _load_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH) tplus1_test = load_tplus1_test_predictions() mfe_summary = load_mfe_summary() mfe_latest = latest_mfe_prediction() daily = pd.read_parquet(NIFTY_1D_PATH) daily["date"] = pd.to_datetime(daily["date"], errors="coerce") daily = daily.sort_values("date").tail(180) if not t5_test.empty: recent_accuracy = float(t5_test.tail(40)["correct"].mean()) else: recent_accuracy = None if not tomorrow_test.empty: tomorrow_recent = tomorrow_test.tail(40).copy() if "pred" in tomorrow_recent.columns and "prediction" not in tomorrow_recent.columns: tomorrow_recent["prediction"] = np.where(pd.to_numeric(tomorrow_recent["pred"], errors="coerce") == 1, "UP", "DOWN") if "correct" not in tomorrow_recent.columns and {"target", "pred"}.issubset(tomorrow_recent.columns): tomorrow_recent["correct"] = pd.to_numeric(tomorrow_recent["target"], errors="coerce") == pd.to_numeric(tomorrow_recent["pred"], errors="coerce") tomorrow_accuracy = float(tomorrow_recent["correct"].mean()) if "correct" in tomorrow_recent.columns else tomorrow_summary.get("test_accuracy") else: tomorrow_accuracy = tomorrow_summary.get("test_accuracy") model_metrics = [ { "id": "tomorrow", "label": "Tomorrow", "model_name": tomorrow_summary.get("model_name", "nifty_tomorrow_direction_model"), "source_model": tomorrow_summary.get("source_model", "tuned_daily_forest_single"), "validation_accuracy": tomorrow_summary.get("validation_accuracy"), "test_accuracy": tomorrow_summary.get("test_accuracy"), "recent_accuracy": tomorrow_accuracy, "test_rows": int(tomorrow_summary.get("n_test") or len(tomorrow_test) or 0), }, { "id": "tplus1", "label": "T+1", "model_name": tplus1_summary.get("model_name", "nifty_1420_tplus1_logistic_model"), "source_model": "14:00-14:20 logistic forecaster", "validation_accuracy": tplus1_summary.get("validation_accuracy"), "test_accuracy": tplus1_summary.get("test_accuracy"), "recent_accuracy": float(tplus1_test.tail(40)["correct"].mean()) if not tplus1_test.empty and "correct" in tplus1_test.columns else tplus1_summary.get("test_accuracy"), "test_rows": int(tplus1_summary.get("test_rows") or len(tplus1_test) or 0), }, { "id": "t5", "label": "T+5", "model_name": summary.get("model_name", "nifty_opening_direction_model"), "source_model": summary.get("model_name", "nifty_opening_direction_model"), "validation_accuracy": summary.get("validation_accuracy"), "test_accuracy": summary.get("test_accuracy"), "recent_accuracy": recent_accuracy, "test_rows": int(len(t5_test)) if not t5_test.empty else int(summary.get("test_rows") or 0), }, ] metrics = { "validation_accuracy": tomorrow_summary.get("validation_accuracy"), "test_accuracy": tomorrow_summary.get("test_accuracy"), "baseline_test_accuracy": tomorrow_summary.get("baseline_accuracy"), "validation_auc": summary.get("validation_auc"), "test_auc": summary.get("test_auc"), "test_brier": summary.get("test_brier"), "feature_count": tomorrow_summary.get("feature_count"), "recent_accuracy": tomorrow_accuracy, "recent_accuracy_days": int(len(tomorrow_test.tail(40))) if not tomorrow_test.empty else 0, "total_test_days": int(tomorrow_summary.get("n_test") or len(tomorrow_test) or 0), "models": model_metrics, } return { "timestamp": datetime.now(ZoneInfo("Asia/Kolkata")).isoformat(), "predictions": { "t5": { "latest": t5_latest, "summary": summary, }, "tomorrow": { "latest": tomorrow_latest, "summary": tomorrow_summary, }, "tplus1": { "latest": tplus1_latest, "summary": tplus1_summary, }, "mfe": { "latest": mfe_latest, "summary": mfe_summary, }, }, "metrics": metrics, "models": model_metrics, "live_accuracy": load_live_accuracy(), "charts": { "daily_closes": _json_ready_frame(daily[["date", "close"]]), "t5_backtest": _json_ready_frame(t5_test, limit=400), "t5_recent_predictions": _json_ready_frame(t5_test.tail(40)), "tomorrow_backtest": _json_ready_frame(tomorrow_test, limit=400), "tomorrow_live_track_record": build_prediction_track_record(), "tomorrow_history_predictions": _json_ready_frame(tomorrow_history.tail(80)), "tplus1_backtest": _json_ready_frame(tplus1_test, limit=400), }, } def refresh_first5_prediction(session_date: date | None = None, minutes: pd.DataFrame | None = None) -> Prediction: if session_date is None: today = datetime.now(IST).date() if not is_trading_day(today): raise RuntimeError(f"{today.isoformat()} is not an NSE trading session.") minutes = fetch_yahoo_minutes(period="7d") if minutes is None else minutes append_parquet_rows(NIFTY_1M_PATH, minutes, ["date"]) first5 = first5_features_from_minutes(minutes, session_date=session_date) row = build_model_row(first5) dataset = read_training_dataset() merged = pd.concat([dataset, row], ignore_index=True) merged = merged.drop_duplicates(subset=["date"], keep="last").sort_values("date").reset_index(drop=True) merged.to_parquet(OPENING_DATASET_PATH, index=False, compression="zstd") prediction = predict_row(row) try: refresh_mfe_prediction(session_date=session_date) except Exception as exc: print(f"MFE refresh failed: {exc}", flush=True) return prediction def refresh_daily_data() -> dict[str, Any]: daily = fetch_yahoo_daily(period="1mo") combined = append_parquet_rows(NIFTY_1D_PATH, daily, ["date"]) return { "rows": int(len(combined)), "latest_date": pd.to_datetime(combined["date"]).max().date().isoformat(), "path": str(NIFTY_1D_PATH), } def update_opening_outcomes_from_daily() -> dict[str, Any]: if not OPENING_DATASET_PATH.exists() or not NIFTY_1D_PATH.exists(): return {"updated_rows": 0, "latest_date": None} dataset = pd.read_parquet(OPENING_DATASET_PATH) daily = pd.read_parquet(NIFTY_1D_PATH) if dataset.empty or daily.empty: return {"updated_rows": 0, "latest_date": None} dataset = dataset.copy() dataset["_session_date"] = pd.to_datetime(dataset["date"], errors="coerce").dt.normalize() daily = daily.copy() daily["_session_date"] = pd.to_datetime(daily["date"], errors="coerce").dt.normalize() daily = daily.dropna(subset=["_session_date"]).drop_duplicates("_session_date", keep="last") daily = daily.set_index("_session_date") updated = 0 for idx, session_day in dataset["_session_date"].dropna().items(): if session_day not in daily.index: continue row = daily.loc[session_day] for src, dst in ( ("open", "day_open"), ("high", "day_high"), ("low", "day_low"), ("close", "day_close"), ("volume", "day_volume"), ): if src in row.index and dst in dataset.columns: dataset.at[idx, dst] = row[src] if {"day_open", "day_close", "target", "day_return"}.issubset(dataset.columns): day_open = dataset.at[idx, "day_open"] day_close = dataset.at[idx, "day_close"] if pd.notna(day_open) and pd.notna(day_close) and float(day_open) != 0.0: dataset.at[idx, "target"] = int(float(day_close) > float(day_open)) dataset.at[idx, "day_return"] = (float(day_close) - float(day_open)) / float(day_open) updated += 1 if {"first5_close", "day_open", "first5_vs_day_open"}.issubset(dataset.columns): first5_close = dataset.at[idx, "first5_close"] day_open = dataset.at[idx, "day_open"] if pd.notna(first5_close) and pd.notna(day_open) and float(day_open) != 0.0: dataset.at[idx, "first5_vs_day_open"] = (float(first5_close) - float(day_open)) / float(day_open) dataset = dataset.drop(columns=["_session_date"]) dataset = dataset.sort_values("date").reset_index(drop=True) dataset.to_parquet(OPENING_DATASET_PATH, index=False, compression="zstd") latest = pd.to_datetime(dataset["date"], errors="coerce").max() return { "updated_rows": int(updated), "latest_date": None if pd.isna(latest) else latest.date().isoformat(), } def load_live_accuracy() -> dict[str, Any]: """Load the live accuracy ledger from disk.""" default = { "tomorrow": {"entries": [], "accuracy": None, "total": 0, "correct_count": 0, "backtest_count": 0, "live_count": 0}, "t5": {"entries": [], "accuracy": None, "total": 0, "correct_count": 0, "backtest_count": 0, "live_count": 0}, "tplus1": {"entries": [], "accuracy": None, "total": 0, "correct_count": 0, "backtest_count": 0, "live_count": 0}, } if LIVE_ACCURACY_PATH.exists(): try: raw = json.loads(LIVE_ACCURACY_PATH.read_text(encoding="utf-8")) except Exception: raw = None if isinstance(raw, dict): try: for model_id in default: current = raw.get(model_id, {}) if not isinstance(current, dict): current = {} entries = current.get("entries", []) if not isinstance(entries, list): entries = [] backtest_entries = [entry for entry in entries if str(entry.get("source", "backtest")).lower() == "backtest"] live_entries = [entry for entry in entries if str(entry.get("source", "backtest")).lower() != "backtest"] total = len(entries) correct = sum(1 for e in entries if e.get("correct")) current["entries"] = entries current["backtest_count"] = int(len(backtest_entries)) current["live_count"] = int(len(live_entries)) current["total"] = int(total) current["correct_count"] = int(correct) current["accuracy"] = (current["correct_count"] / current["total"]) if current["total"] > 0 else None default[model_id].update(current) return default except Exception: pass return default def save_live_accuracy(data: dict[str, Any]) -> None: """Persist the live accuracy ledger to disk.""" LIVE_ACCURACY_PATH.write_text(json.dumps(data, indent=2), encoding="utf-8") def _load_prediction_history(path: Path) -> pd.DataFrame: if not path.exists(): return pd.DataFrame() frame = pd.read_parquet(path) for col in ("date", "input_date", "target_date", "forecast_date"): if col in frame.columns: frame[col] = pd.to_datetime(frame[col], errors="coerce") sort_cols = [col for col in ("target_date", "input_date", "date", "forecast_date") if col in frame.columns] if sort_cols: return frame.sort_values(sort_cols).reset_index(drop=True) return frame.reset_index(drop=True) def _record_prediction_history(path: Path, row: dict[str, Any], subset: list[str]) -> None: append_prediction_history(path, row, subset) def _rescore_tomorrow_live_ledger(ledger: dict[str, Any]) -> bool: """Fix live Tomorrow ledger entries to use close vs previous close. Returns True if modified.""" daily = pd.read_parquet(NIFTY_1D_PATH) daily["_date"] = pd.to_datetime(daily["date"], errors="coerce").dt.normalize() closes_by_date: dict[date, float] = {} for _, row in daily.iterrows(): if pd.isna(row["_date"]): continue close = row.get("close") if pd.notna(close) and np.isfinite(float(close)): closes_by_date[row["_date"].date()] = float(close) changed = False for entry in ledger.get("tomorrow", {}).get("entries", []): if str(entry.get("source", "backtest")).lower() == "backtest": continue try: day = date.fromisoformat(str(entry.get("date", ""))[:10]) except Exception: continue day_close = closes_by_date.get(day) if day_close is None: continue _, actual_direction = _tomorrow_actual_outcome(day, day_close, closes_by_date) if actual_direction is None: continue pred = str(entry.get("prediction", "")).upper() if pred not in {"UP", "DOWN"}: continue new_correct = pred == actual_direction if entry.get("actual") != actual_direction or entry.get("correct") != new_correct: entry["actual"] = actual_direction entry["correct"] = new_correct changed = True return changed def ensure_completed_sessions_scored( now: datetime | None = None, ledger: dict[str, Any] | None = None, ) -> dict[str, Any]: """Score any completed Tomorrow sessions that have daily close data but no ledger entry.""" now = now or datetime.now(IST) completed = expected_completed_daily_date(now) if not is_trading_day(completed): return ledger or load_live_accuracy() ledger = ledger or load_live_accuracy() if _rescore_tomorrow_live_ledger(ledger): for model_id in ("tomorrow",): entries = ledger[model_id]["entries"] total = len(entries) correct = sum(1 for e in entries if e.get("correct")) backtest_count = sum(1 for e in entries if str(e.get("source", "backtest")).lower() == "backtest") ledger[model_id]["total"] = total ledger[model_id]["correct_count"] = correct ledger[model_id]["backtest_count"] = backtest_count ledger[model_id]["live_count"] = total - backtest_count ledger[model_id]["accuracy"] = correct / total if total > 0 else None save_live_accuracy(ledger) logged_tom = {e["date"] for e in ledger.get("tomorrow", {}).get("entries", [])} if completed.isoformat() in logged_tom: return ledger daily = pd.read_parquet(NIFTY_1D_PATH) daily["_date"] = pd.to_datetime(daily["date"], errors="coerce").dt.normalize() day_rows = daily[daily["_date"].dt.date == completed] if day_rows.empty: return ledger day_open = float(day_rows.iloc[-1]["open"]) day_close = float(day_rows.iloc[-1]["close"]) if not (np.isfinite(day_open) and np.isfinite(day_close) and day_open != 0): return ledger return update_live_accuracy(completed) def update_live_accuracy(session_date: date) -> dict[str, Any]: """Score today's predictions against actual outcomes and update the ledger. Must be called AFTER refresh_daily_data() (so today's close is available) but BEFORE refresh_first5_prediction / refresh_tplus1_prediction / refresh_tomorrow_prediction (so the CSV files still hold the predictions we want to score). """ ledger = load_live_accuracy() daily = pd.read_parquet(NIFTY_1D_PATH) daily["_date"] = pd.to_datetime(daily["date"], errors="coerce").dt.normalize() today_rows = daily[daily["_date"].dt.date == session_date] if today_rows.empty: return ledger day_open = float(today_rows.iloc[-1]["open"]) day_close = float(today_rows.iloc[-1]["close"]) if not (np.isfinite(day_open) and np.isfinite(day_close) and day_open != 0): return ledger actual_close_gt_open = "UP" if day_close > day_open else "DOWN" session_iso = session_date.isoformat() # --- T+5: today's 9:20 AM prediction vs close > open --- logged_t5 = {e["date"] for e in ledger["t5"]["entries"]} if session_iso not in logged_t5: t5_history = _load_prediction_history(T5_PREDICTION_HISTORY_PATH) if not t5_history.empty and "target_date" in t5_history.columns: t5_rows = t5_history[t5_history["target_date"].dt.date == session_date] else: t5_rows = pd.DataFrame() if t5_rows.empty and LATEST_PATH.exists(): try: t5_row = pd.read_csv(LATEST_PATH).iloc[-1].to_dict() if str(t5_row.get("input_date", ""))[:10] == session_iso: t5_rows = pd.DataFrame([t5_row]) except Exception: t5_rows = pd.DataFrame() if not t5_rows.empty: try: t5_row = t5_rows.iloc[-1].to_dict() pred = str(t5_row.get("prediction", "")).upper() if pred in ("UP", "DOWN"): ledger["t5"]["entries"].append({ "date": session_iso, "prediction": pred, "actual": actual_close_gt_open, "correct": pred == actual_close_gt_open, "source": "live", }) except Exception: pass # --- Tomorrow: prior close vs today's close (matches forecaster target) --- logged_tom = {e["date"] for e in ledger["tomorrow"]["entries"]} if session_iso not in logged_tom: prev_day = previous_trading_day(session_date - timedelta(days=1)) prev_rows = daily[daily["_date"].dt.date == prev_day] actual_tomorrow = None if not prev_rows.empty: prev_close = float(prev_rows.iloc[-1]["close"]) if np.isfinite(prev_close) and prev_close != 0: actual_tomorrow = "UP" if day_close > prev_close else "DOWN" tom_row = _find_tomorrow_prediction_for_target(session_date) if tom_row and actual_tomorrow is not None: try: pred = str(tom_row.get("prediction", "")).upper() if pred in ("UP", "DOWN"): ledger["tomorrow"]["entries"].append({ "date": session_iso, "prediction": pred, "actual": actual_tomorrow, "correct": pred == actual_tomorrow, "source": "live", }) except Exception: pass # --- T+1: yesterday's 14:20 prediction targeting today --- # T+1 target: today's close > yesterday's 14:20 close logged_t1 = {e["date"] for e in ledger["tplus1"]["entries"]} if session_iso not in logged_t1: t1_history = _load_prediction_history(TPLUS1_PREDICTION_HISTORY_PATH) if not t1_history.empty and "target_date" in t1_history.columns: t1_rows = t1_history[t1_history["target_date"].dt.date == session_date] else: t1_rows = pd.DataFrame() if not t1_rows.empty: try: t1_row = t1_rows.iloc[-1].to_dict() pred = str(t1_row.get("prediction", "")).upper() input_date_str = str(t1_row.get("input_date", ""))[:10] input_day = date.fromisoformat(input_date_str) # Read the 14:20 close from minute data for the input session minute = pd.read_parquet(NIFTY_1M_PATH, columns=["date", "close"]) minute["dt"] = pd.to_datetime(minute["date"], errors="coerce") minute = minute.dropna(subset=["dt"]) minute["session_date"] = minute["dt"].dt.normalize() minute["time_str"] = minute["dt"].dt.strftime("%H:%M") window = minute[ (minute["session_date"].dt.date == input_day) & (minute["time_str"] >= "14:00") & (minute["time_str"] <= "14:20") ].sort_values("dt") if not window.empty and pred in ("UP", "DOWN"): w_close = float(window.iloc[-1]["close"]) t1_actual = "UP" if day_close > w_close else "DOWN" ledger["tplus1"]["entries"].append({ "date": session_iso, "prediction": pred, "actual": t1_actual, "correct": pred == t1_actual, "source": "live", }) except Exception: pass _rescore_tomorrow_live_ledger(ledger) # Recompute summary stats for model_id in ("t5", "tomorrow", "tplus1"): entries = ledger[model_id]["entries"] total = len(entries) correct = sum(1 for e in entries if e.get("correct")) backtest_count = sum(1 for e in entries if str(e.get("source", "backtest")).lower() == "backtest") ledger[model_id]["total"] = total ledger[model_id]["correct_count"] = correct ledger[model_id]["backtest_count"] = backtest_count ledger[model_id]["live_count"] = total - backtest_count ledger[model_id]["accuracy"] = correct / total if total > 0 else None save_live_accuracy(ledger) return ledger def refresh_market_close_data(session_date: date | None = None) -> dict[str, Any]: now = datetime.now(IST) session_date = session_date or now.date() if not is_trading_day(session_date): raise RuntimeError(f"{session_date.isoformat()} is not an NSE trading session.") try: minutes = fetch_yahoo_minutes(period="7d") minute_frame = append_parquet_rows(NIFTY_1M_PATH, minutes, ["date"]) daily_info = refresh_daily_data() # Score live predictions BEFORE they get overwritten by fresh ones try: update_live_accuracy(session_date) except Exception as exc: print(f"[close-refresh] live accuracy update failed: {exc}", flush=True) t5_prediction = refresh_first5_prediction(session_date=session_date, minutes=minutes) tplus1_prediction = refresh_tplus1_prediction(session_date=session_date) outcomes = update_opening_outcomes_from_daily() tomorrow_prediction = refresh_tomorrow_prediction(session_date=session_date) return { "session_date": session_date.isoformat(), "nifty_1m_rows": int(len(minute_frame)), "latest_minute": pd.to_datetime(minute_frame["date"], errors="coerce").max().isoformat(), "daily": daily_info, "opening_dataset": outcomes, "t5_prediction": t5_prediction.to_dict(), "tplus1_prediction": tplus1_prediction, "tomorrow_prediction": tomorrow_prediction, } except Exception: raise def close_refresh_due(now: datetime | None = None) -> bool: now = now or datetime.now(IST) if not is_trading_day(now.date()) or now.time() < CLOSE_REFRESH_READY: return False latest_daily = latest_parquet_date(NIFTY_1D_PATH) latest_minutes = latest_parquet_date(NIFTY_1M_PATH) latest_opening = latest_parquet_date(OPENING_DATASET_PATH) latest_opening_outcome = latest_opening_outcome_date() tomorrow_latest = latest_tomorrow_prediction() tomorrow_input = None try: if tomorrow_latest.get("input_date"): tomorrow_input = date.fromisoformat(str(tomorrow_latest.get("input_date"))[:10]) except Exception: tomorrow_input = None return any( latest != now.date() for latest in (latest_daily, latest_minutes, latest_opening, latest_opening_outcome, tomorrow_input) ) def latest_prediction_input_date(path: Path) -> date | None: if not path.exists(): return None try: frame = pd.read_csv(path, usecols=["input_date"]) except Exception: return None if frame.empty: return None value = pd.to_datetime(frame["input_date"], errors="coerce").max() return None if pd.isna(value) else value.date() def latest_tomorrow_input_date() -> date | None: try: latest = latest_tomorrow_prediction() raw = latest.get("input_date") return date.fromisoformat(str(raw)[:10]) if raw else None except Exception: return None def expected_completed_daily_date(now: datetime | None = None) -> date: now = now or datetime.now(IST) if is_trading_day(now.date()) and now.time() < CLOSE_REFRESH_READY: return previous_trading_day(now.date() - timedelta(days=1)) return previous_trading_day(now.date()) def expected_minute_date(now: datetime | None = None) -> date: now = now or datetime.now(IST) if is_trading_day(now.date()) and now.time() >= FIRST5_READY: return now.date() return previous_trading_day(now.date() - timedelta(days=1)) def expected_tplus1_date(now: datetime | None = None) -> date: now = now or datetime.now(IST) if is_trading_day(now.date()) and now.time() >= TPLUS1_READY: return now.date() return previous_trading_day(now.date() - timedelta(days=1)) def is_stale(latest: date | None, expected: date) -> bool: return latest is None or latest < expected def stale_data_status(now: datetime | None = None) -> dict[str, Any]: now = now or datetime.now(IST) expected_daily = expected_completed_daily_date(now) expected_minutes = expected_minute_date(now) expected_tplus1 = expected_tplus1_date(now) latest_1d = latest_parquet_date(NIFTY_1D_PATH) latest_1m = latest_parquet_date(NIFTY_1M_PATH) latest_t5 = latest_prediction_input_date(LATEST_PATH) latest_tomorrow = latest_tomorrow_input_date() latest_tplus1 = latest_prediction_input_date(TPLUS1_LATEST_PATH) return { "daily_stale": expected_daily > (latest_1d or date.min), "minutes_stale": expected_minutes > (latest_1m or date.min), "t5_stale": is_stale(latest_t5, expected_minutes), "tomorrow_stale": is_stale(latest_tomorrow, expected_daily), "tplus1_stale": is_stale(latest_tplus1, expected_tplus1), } def refresh_stale_data_once(now: datetime | None = None) -> dict[str, Any]: now = now or datetime.now(IST) status = stale_data_status(now) if not any(status.values()): return {"status": "fresh", "actions": []} if not _stale_refresh_lock.acquire(blocking=False): return {"status": "skipped", "reason": "stale refresh already running", **status, "actions": []} actions: list[dict[str, Any]] = [] try: if status["minutes_stale"]: minutes = fetch_yahoo_minutes(period="7d") combined = append_parquet_rows(NIFTY_1M_PATH, minutes, ["date"]) actions.append( { "name": "minutes", "rows": int(len(combined)), "latest_date": pd.to_datetime(combined["date"], errors="coerce").max().date().isoformat(), } ) if status["daily_stale"]: daily_info = refresh_daily_data() outcomes = update_opening_outcomes_from_daily() actions.append({"name": "daily", **daily_info}) actions.append({"name": "opening_outcomes", **outcomes}) try: completed = date.fromisoformat(status["expected_daily_date"]) scored = update_live_accuracy(completed) actions.append( { "name": "tomorrow_live_accuracy", "session_date": completed.isoformat(), "live_count": scored.get("tomorrow", {}).get("live_count"), } ) except Exception as exc: actions.append({"name": "tomorrow_live_accuracy", "error": str(exc)}) if status["daily_stale"] or status["tomorrow_stale"]: try: tomorrow = refresh_tomorrow_prediction(session_date=date.fromisoformat(status["expected_daily_date"])) actions.append({"name": "tomorrow_prediction", "input_date": tomorrow.get("input_date")}) except Exception as exc: actions.append({"name": "tomorrow_prediction", "error": str(exc)}) if status["t5_stale"] and is_trading_day(now.date()) and now.time() >= FIRST5_READY: prediction = refresh_first5_prediction(session_date=now.date()) actions.append({"name": "t5_prediction", "input_date": prediction.input_date}) if status["tplus1_stale"] and is_trading_day(now.date()) and now.time() >= TPLUS1_READY: prediction = refresh_tplus1_prediction(session_date=now.date()) actions.append({"name": "tplus1_prediction", "input_date": prediction.get("input_date")}) clear_dashboard_payload_cache() refreshed_status = stale_data_status(datetime.now(IST)) return {"status": "refreshed", **refreshed_status, "actions": actions} finally: _stale_refresh_lock.release() def next_ist_run_at(run_time: time = time(9, 20), now: datetime | None = None) -> datetime: now = now or datetime.now(IST) target_day = now.date() if now >= datetime.combine(target_day, run_time, tzinfo=IST): target_day += timedelta(days=1) target_day = next_trading_day(target_day) return datetime.combine(target_day, run_time, tzinfo=IST) def seconds_until_next_ist_run(run_time: time = time(9, 20)) -> float: now = datetime.now(IST) target = next_ist_run_at(run_time, now=now) return max(1.0, (target - now).total_seconds())