PREDICTIONSITE_backup / runtime.py
Jitendra12421's picture
Upload 2 files
7a4b7f9 verified
Raw
History Blame Contribute Delete
95.5 kB
from __future__ import annotations
import json
import copy
import os
import sys
import threading
from dataclasses import dataclass
from datetime import date, datetime, time, timedelta
from functools import lru_cache
from pathlib import Path
from typing import Any
from zoneinfo import ZoneInfo
import joblib
import numpy as np
import pandas as pd
from nifty_backend.yahoo_history_client import YahooHistoryClient
try:
import pandas_market_calendars as mcal
except ImportError: # pragma: no cover - production dependency, local fallback below.
mcal = None
IST = ZoneInfo("Asia/Kolkata")
YAHOO_NIFTY_SYMBOL = "^NSEI"
MARKET_CLOSE = time(15, 30)
FIRST5_READY = time(9, 20)
CLOSE_REFRESH_READY = time(15, 45)
TPLUS1_READY = time(14, 30)
STALE_CHECK_INTERVAL_SECONDS = 5
BACKEND_ROOT = Path(__file__).resolve().parents[1]
DATA_DIR = BACKEND_ROOT / "data"
MODEL_DIR = BACKEND_ROOT / "models"
YAHOO_CACHE_PATH = MODEL_DIR / "yahoo_history_cache.sqlite3"
OPENING_DATASET_PATH = DATA_DIR / "opening_direction_training_dataset.parquet"
NIFTY_1M_PATH = DATA_DIR / "nifty50_1m.parquet"
NIFTY_1D_PATH = DATA_DIR / "nifty50_1d.parquet"
MODEL_PATH = MODEL_DIR / "nifty_opening_direction_model.joblib"
LATEST_PATH = MODEL_DIR / "latest_prediction.csv"
TEST_PREDICTIONS_PATH = DATA_DIR / "test_predictions.parquet"
TOMORROW_MODEL_PATH = MODEL_DIR / "nifty_tomorrow_direction_model.joblib"
TOMORROW_LATEST_PATH = MODEL_DIR / "tomorrow_latest_prediction.csv"
TOMORROW_SUMMARY_PATH = MODEL_DIR / "tomorrow_summary.json"
TOMORROW_TEST_PREDICTIONS_PATH = DATA_DIR / "tomorrow_test_predictions.parquet"
TOMORROW_PREDICTION_HISTORY_PATH = MODEL_DIR / "tomorrow_prediction_history.parquet"
FORECASTING_PROJECT_ROOT = Path(
os.environ.get(
"FORECASTING_PROJECT_ROOT",
str(BACKEND_ROOT.parent.parent / "forecasting project"),
)
)
DAILY_FORECASTER_OUTPUT_DIR = MODEL_DIR / "nifty_forecaster" / "outputs"
DAILY_FORECASTER_SUMMARY_PATH = DAILY_FORECASTER_OUTPUT_DIR / "forecaster_summary.json"
DAILY_FORECASTER_LATEST_PATH = DAILY_FORECASTER_OUTPUT_DIR / "forecaster_latest.csv"
DAILY_FORECASTER_PREDICTIONS_PATH = DAILY_FORECASTER_OUTPUT_DIR / "forecaster_test_predictions.csv"
MFE_SOURCE_OUTPUT_DIR = FORECASTING_PROJECT_ROOT / "Code" / "models" / "nifty_opening_mfe_regressor" / "outputs"
MFE_OUTPUT_DIR = MODEL_DIR / "nifty_opening_mfe_regressor" / "outputs"
MFE_SUMMARY_PATH = MFE_OUTPUT_DIR / "summary.json"
MFE_LATEST_PATH = MFE_OUTPUT_DIR / "latest_prediction.csv"
MFE_TEST_PREDICTIONS_PATH = MFE_OUTPUT_DIR / "test_predictions.csv"
MFE_MODEL_PATH = MFE_OUTPUT_DIR / "nifty_opening_mfe_regressor.joblib"
MFE_LIVE_HISTORY_PATH = MFE_OUTPUT_DIR / "mfe_live_history.csv"
TPLUS1_MODEL_PATH = MODEL_DIR / "nifty_1420_tplus1_logistic_model.joblib"
TPLUS1_LATEST_PATH = MODEL_DIR / "tplus1_latest_prediction.csv"
TPLUS1_SUMMARY_PATH = MODEL_DIR / "tplus1_summary.json"
TPLUS1_TEST_PREDICTIONS_PATH = DATA_DIR / "tplus1_test_predictions.parquet"
TPLUS1_PREDICTION_HISTORY_PATH = MODEL_DIR / "tplus1_prediction_history.parquet"
T5_PREDICTION_HISTORY_PATH = MODEL_DIR / "t5_prediction_history.parquet"
REFRESH_STATE_PATH = MODEL_DIR / "refresh_state.json"
REFRESH_WAITING = "waiting_second_payload"
REFRESH_REFRESHING = "refreshing"
REFRESH_READY = "ready"
REFRESH_FAILED = "failed"
REFRESH_NORMAL = "normal"
LIVE_ACCURACY_PATH = MODEL_DIR / "live_accuracy.json"
DECISION_OVERLAYS = [
{
"name": "fifth_minute_momentum_flip",
"feature": "m5_ret_1m",
"op": ">=",
"value": 0.0005085411885759201,
},
{
"name": "vix_stretch_flip",
"feature": "india_vix_close_vs_sma_20",
"op": ">=",
"value": 0.24641908937959742,
},
]
_dashboard_payload_lock = threading.Lock()
_stale_refresh_lock = threading.Lock()
def utc_now_iso() -> str:
return datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
def clear_dashboard_payload_cache() -> None:
_dashboard_payload_cached.cache_clear()
def save_refresh_state(phase: str, *, session_date: date | None = None, error: str | None = None) -> dict[str, Any]:
previous = load_refresh_state()
state = {
"phase": phase,
"started_at": previous.get("started_at"),
"finished_at": previous.get("finished_at"),
"session_date": session_date.isoformat() if session_date else previous.get("session_date"),
"error": error,
}
if phase in {REFRESH_WAITING, REFRESH_REFRESHING} and not state["started_at"]:
state["started_at"] = utc_now_iso()
if phase in {REFRESH_READY, REFRESH_FAILED, REFRESH_NORMAL}:
state["finished_at"] = utc_now_iso()
REFRESH_STATE_PATH.write_text(json.dumps(state, indent=2), encoding="utf-8")
return state
def load_refresh_state() -> dict[str, Any]:
if not REFRESH_STATE_PATH.exists():
return {
"phase": REFRESH_NORMAL,
"started_at": None,
"finished_at": None,
"session_date": None,
"error": None,
}
try:
return json.loads(REFRESH_STATE_PATH.read_text(encoding="utf-8"))
except Exception:
return {
"phase": REFRESH_FAILED,
"started_at": None,
"finished_at": None,
"session_date": None,
"error": "refresh_state.json could not be read",
}
@lru_cache(maxsize=1)
def _nse_calendar():
if mcal is None:
return None
for name in ("XNSE", "NSE", "BSE"):
try:
return mcal.get_calendar(name)
except Exception:
continue
return None
@lru_cache(maxsize=64)
def trading_schedule(start: date, end: date) -> pd.DataFrame:
calendar = _nse_calendar()
if calendar is None:
days = pd.date_range(start=start, end=end, freq="B")
return pd.DataFrame(index=days)
return calendar.schedule(start_date=start, end_date=end)
def is_trading_day(day: date) -> bool:
schedule = trading_schedule(day, day)
return not schedule.empty
def next_trading_day(start: date) -> date:
end = start + timedelta(days=14)
schedule = trading_schedule(start, end)
if schedule.empty:
day = start
while not is_trading_day(day):
day += timedelta(days=1)
return day
return pd.Timestamp(schedule.index[0]).date()
def previous_trading_day(start: date) -> date:
begin = start - timedelta(days=14)
schedule = trading_schedule(begin, start)
if schedule.empty:
day = start
while not is_trading_day(day):
day -= timedelta(days=1)
return day
return pd.Timestamp(schedule.index[-1]).date()
def last_n_trading_sessions(end_day: date, count: int) -> list[date]:
"""Return the last ``count`` NSE sessions ending on (or before) ``end_day``."""
sessions: list[date] = []
cursor = end_day
guard = 0
while len(sessions) < count and guard < count * 12:
guard += 1
if is_trading_day(cursor):
sessions.append(cursor)
if len(sessions) >= count:
break
cursor = previous_trading_day(cursor - timedelta(days=1))
sessions.reverse()
return sessions
def _track_record_end_session(now: datetime | None = None) -> date:
"""Latest session the track record should score (today after the close refresh window)."""
now = now or datetime.now(IST)
today = now.date()
if is_trading_day(today) and now.time() >= CLOSE_REFRESH_READY:
return today
return expected_completed_daily_date(now)
class ProbabilityBlend:
def __init__(self, models: list[Any], weights: np.ndarray):
self.models = models
self.weights = np.asarray(weights, dtype="float64")
self.weights = self.weights / self.weights.sum()
def predict_proba(self, x: pd.DataFrame) -> np.ndarray:
probs = np.column_stack([predict_proba_up(model, x) for model in self.models])
prob_up = probs @ self.weights
return np.column_stack([1.0 - prob_up, prob_up])
@dataclass(frozen=True)
class Prediction:
input_date: str
first5_start: str
first5_end: str
prediction: str
prob_up: float
confidence: float
threshold: float
model_name: str
is_overridden: bool = False
def to_dict(self) -> dict[str, Any]:
return {
"input_date": self.input_date,
"first5_start": self.first5_start,
"first5_end": self.first5_end,
"prediction": self.prediction,
"prob_up": self.prob_up,
"confidence": self.confidence,
"threshold": self.threshold,
"model_name": self.model_name,
"is_overridden": getattr(self, "is_overridden", False),
}
def predict_proba_up(model: Any, x: pd.DataFrame) -> np.ndarray:
return np.asarray(model.predict_proba(x)[:, 1], dtype="float64")
def safe_div(numer: pd.Series | np.ndarray, denom: pd.Series | np.ndarray) -> pd.Series:
n = pd.Series(numer, copy=False)
d = pd.Series(denom, copy=False)
out = pd.Series(np.nan, index=n.index, dtype="float64")
mask = d.notna() & np.isfinite(d.to_numpy(dtype="float64")) & (d != 0)
out.loc[mask] = n.loc[mask].to_numpy(dtype="float64") / d.loc[mask].to_numpy(dtype="float64")
return out
def load_model() -> dict[str, Any]:
# Existing artifact was trained as a script, so its custom blend class
# resolves through __main__ when unpickled.
sys.modules["__main__"].ProbabilityBlend = ProbabilityBlend
sys.modules["__main__"].predict_proba_up = predict_proba_up
payload = joblib.load(MODEL_PATH)
payload.setdefault("decision_overlays", DECISION_OVERLAYS)
payload.setdefault("model_name", "nifty_opening_direction_model")
return payload
def overlay_mask(frame: pd.DataFrame, overlay: dict[str, object]) -> np.ndarray:
feature = str(overlay["feature"])
if feature not in frame.columns:
return np.zeros(len(frame), dtype=bool)
series = pd.to_numeric(frame[feature], errors="coerce")
value = float(overlay["value"])
if overlay["op"] == ">=":
return (series >= value).fillna(False).to_numpy(dtype=bool)
if overlay["op"] == "<=":
return (series <= value).fillna(False).to_numpy(dtype=bool)
raise ValueError(f"Unsupported overlay op: {overlay['op']}")
def apply_decision_overlays(pred: np.ndarray, frame: pd.DataFrame, overlays: list[dict[str, object]]) -> np.ndarray:
adjusted = np.asarray(pred, dtype="int64").copy()
for overlay in overlays:
mask = overlay_mask(frame, overlay)
adjusted[mask] = 1 - adjusted[mask]
return adjusted
def directional_confidence(prob_up: np.ndarray, pred: np.ndarray, threshold: float) -> np.ndarray:
prob_up = np.asarray(prob_up, dtype="float64")
pred = np.asarray(pred, dtype="int64")
base_side_prob = np.where(pred == 1, prob_up, 1.0 - prob_up)
threshold_distance = np.abs(prob_up - float(threshold))
return np.clip(0.50 + threshold_distance, base_side_prob, 0.99)
def read_training_dataset() -> pd.DataFrame:
df = pd.read_parquet(OPENING_DATASET_PATH)
for col in ("date", "first5_start", "first5_end"):
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors="coerce")
return df.sort_values("date").reset_index(drop=True)
def normalize_yahoo_frame(df: pd.DataFrame) -> pd.DataFrame:
if df.empty:
return pd.DataFrame(columns=["date", "open", "high", "low", "close", "volume"])
if isinstance(df.columns, pd.MultiIndex):
df.columns = [str(c[0]).lower() for c in df.columns]
else:
df.columns = [str(c).lower().replace(" ", "_") for c in df.columns]
df = df.reset_index()
date_col = next((c for c in df.columns if c.lower() in {"datetime", "date"}), df.columns[0])
df["date"] = pd.to_datetime(df[date_col], errors="coerce")
if df["date"].dt.tz is None:
df["date"] = df["date"].dt.tz_localize("UTC").dt.tz_convert(IST)
else:
df["date"] = df["date"].dt.tz_convert(IST)
rename = {
"open": "open",
"high": "high",
"low": "low",
"close": "close",
"adj_close": "close",
"volume": "volume",
}
out = pd.DataFrame({"date": df["date"].dt.tz_localize(None)})
for src, dst in rename.items():
if src in df.columns and dst not in out.columns:
out[dst] = pd.to_numeric(df[src], errors="coerce")
return out.dropna(subset=["date", "open", "high", "low", "close"]).sort_values("date")
@lru_cache(maxsize=1)
def yahoo_history_client() -> YahooHistoryClient:
return YahooHistoryClient(cache_path=YAHOO_CACHE_PATH)
def period_start(period: str, *, end: datetime) -> datetime:
text = str(period).strip().lower()
units = {
"d": "days",
"wk": "weeks",
"mo": "months",
"y": "years",
}
for suffix, unit in units.items():
if text.endswith(suffix):
raw_value = text[: -len(suffix)]
if not raw_value.isdigit():
break
value = int(raw_value)
if unit == "days":
return end - timedelta(days=value)
if unit == "weeks":
return end - timedelta(weeks=value)
if unit == "months":
return end - timedelta(days=value * 31)
if unit == "years":
return end - timedelta(days=value * 366)
raise ValueError(f"Unsupported Yahoo period: {period!r}")
def yahoo_history_to_ohlcv(frame: pd.DataFrame, *, daily: bool) -> pd.DataFrame:
if frame.empty:
return pd.DataFrame(columns=["date", "open", "high", "low", "close", "volume"])
out = frame.rename(columns={"timestamp": "date"}).copy()
out["date"] = pd.to_datetime(out["date"], errors="coerce")
if daily:
out["date"] = out["date"].dt.normalize()
for column in ("open", "high", "low", "close", "volume"):
out[column] = pd.to_numeric(out[column], errors="coerce")
return (
out[["date", "open", "high", "low", "close", "volume"]]
.dropna(subset=["date", "open", "high", "low", "close"])
.drop_duplicates("date", keep="last")
.sort_values("date")
.reset_index(drop=True)
)
def fetch_yahoo_minutes(period: str = "5d") -> pd.DataFrame:
end = datetime.now(IST).replace(tzinfo=None) + timedelta(minutes=5)
start = period_start(period, end=end)
raw = yahoo_history_client().fetch_history(
YAHOO_NIFTY_SYMBOL,
interval="1m",
start=start,
end=end,
include_prepost=False,
)
return yahoo_history_to_ohlcv(raw, daily=False)
def fetch_yahoo_daily(period: str = "1mo") -> pd.DataFrame:
end = datetime.now(IST).replace(tzinfo=None) + timedelta(days=1)
start = period_start(period, end=end)
raw = yahoo_history_client().fetch_history(
YAHOO_NIFTY_SYMBOL,
interval="1d",
start=start,
end=end,
include_prepost=False,
)
return yahoo_history_to_ohlcv(raw, daily=True)
def append_parquet_rows(path: Path, new_rows: pd.DataFrame, subset: list[str]) -> pd.DataFrame:
if new_rows.empty:
if path.exists():
return pd.read_parquet(path)
raise RuntimeError(f"No rows returned for {path.name}; leaving parquet unchanged.")
if path.exists():
existing = pd.read_parquet(path)
combined = pd.concat([existing, new_rows], ignore_index=True)
else:
combined = new_rows.copy()
combined = combined.drop_duplicates(subset=subset, keep="last").sort_values(subset).reset_index(drop=True)
combined.to_parquet(path, index=False, compression="zstd")
return combined
def append_prediction_history(path: Path, row: dict[str, Any], subset: list[str]) -> pd.DataFrame:
frame = pd.DataFrame([row])
return append_parquet_rows(path, frame, subset)
def latest_parquet_date(path: Path) -> date | None:
if not path.exists():
return None
df = pd.read_parquet(path, columns=["date"])
if df.empty:
return None
latest = pd.to_datetime(df["date"], errors="coerce").max()
if pd.isna(latest):
return None
return latest.date()
def latest_opening_outcome_date() -> date | None:
if not OPENING_DATASET_PATH.exists():
return None
cols = ["date"]
if "target" in pd.read_parquet(OPENING_DATASET_PATH).columns:
cols.append("target")
df = pd.read_parquet(OPENING_DATASET_PATH, columns=cols)
if df.empty or "target" not in df.columns:
return None
df = df[df["target"].notna()]
if df.empty:
return None
latest = pd.to_datetime(df["date"], errors="coerce").max()
if pd.isna(latest):
return None
return latest.date()
def first5_features_from_minutes(minutes: pd.DataFrame, session_date: date | None = None) -> pd.DataFrame:
if minutes.empty:
raise RuntimeError("Yahoo returned no minute bars.")
bars = minutes.copy()
bars["dt"] = pd.to_datetime(bars["date"], errors="coerce")
bars["session_date"] = bars["dt"].dt.normalize()
if session_date is None:
session_ts = bars["session_date"].max()
else:
session_ts = pd.Timestamp(session_date).normalize()
day = bars[bars["session_date"] == session_ts].sort_values("dt").copy()
start_dt = pd.Timestamp.combine(session_ts.date(), time(9, 15))
end_dt = pd.Timestamp.combine(session_ts.date(), time(9, 19))
first5 = day[(day["dt"] >= start_dt) & (day["dt"] <= end_dt)].head(5).copy()
if len(first5) < 5:
raise RuntimeError(f"Need 5 opening bars for {session_ts.date()}, got {len(first5)}.")
first5["minute_index"] = np.arange(len(first5))
first5["ret_1m"] = first5["close"].pct_change(fill_method=None)
first5["range_pct_1m"] = safe_div(first5["high"] - first5["low"], first5["open"])
first5["body_pct_1m"] = safe_div(first5["close"] - first5["open"], first5["open"])
row = {
"date": session_ts,
"first5_start": first5["dt"].iloc[0],
"first5_end": first5["dt"].iloc[-1],
"first5_open": first5["open"].iloc[0],
"first5_high": first5["high"].max(),
"first5_low": first5["low"].min(),
"first5_close": first5["close"].iloc[-1],
"first5_volume": first5["volume"].sum() if "volume" in first5 else 0.0,
"first5_bars": len(first5),
"first5_last_1m_ret": first5["ret_1m"].iloc[-1],
"first5_ret_std": first5["ret_1m"].std(),
}
row["first5_return"] = (row["first5_close"] - row["first5_open"]) / row["first5_open"]
row["first5_range_pct"] = (row["first5_high"] - row["first5_low"]) / row["first5_open"]
first5_range = row["first5_high"] - row["first5_low"]
row["first5_body_to_range"] = (row["first5_close"] - row["first5_open"]) / first5_range if first5_range else np.nan
row["first5_close_location"] = (row["first5_close"] - row["first5_low"]) / first5_range if first5_range else np.nan
for idx, (_, candle) in enumerate(first5.iterrows(), start=1):
for field in ("open", "high", "low", "close", "ret_1m", "range_pct_1m", "body_pct_1m"):
row[f"m{idx}_{field}"] = candle[field]
row[f"m{idx}_close_vs_first5_open"] = (candle["close"] - row["first5_open"]) / row["first5_open"]
row[f"m{idx}_range_share"] = (candle["high"] - candle["low"]) / first5_range if first5_range else np.nan
row["first5_return_accel"] = row["m5_ret_1m"] - row["m2_ret_1m"]
row["first5_last2_return"] = (row["m5_close"] - row["m4_open"]) / row["m4_open"]
row["first5_first2_return"] = (row["m2_close"] - row["m1_open"]) / row["m1_open"]
row["first5_reversal"] = np.sign(row["first5_first2_return"]) * -np.sign(row["first5_last2_return"])
row["dow"] = session_ts.dayofweek
row["dom"] = session_ts.day
row["month"] = session_ts.month
return pd.DataFrame([row])
def build_model_row(first5_row: pd.DataFrame) -> pd.DataFrame:
dataset = read_training_dataset()
latest_context = dataset.iloc[[-1]].copy()
output = latest_context.copy()
for col in first5_row.columns:
output[col] = first5_row[col].iloc[0]
if {"first5_open", "nifty_close"}.issubset(output.columns):
output["first5_gap_from_prev_close"] = (output["first5_open"] - output["nifty_close"]) / output["nifty_close"]
output["first5_close_vs_prev_close"] = (output["first5_close"] - output["nifty_close"]) / output["nifty_close"]
if {"first5_range_pct", "nifty_range_pct"}.issubset(output.columns):
output["first5_range_vs_prev_range"] = output["first5_range_pct"] / output["nifty_range_pct"]
if {"first5_return", "nifty_ret_1"}.issubset(output.columns):
output["first5_return_x_prev_ret"] = output["first5_return"] * output["nifty_ret_1"]
output["gap_x_prev_ret"] = output["first5_gap_from_prev_close"] * output["nifty_ret_1"]
if {"first5_return", "banknifty_ret_1"}.issubset(output.columns):
output["first5_return_x_bank_ret_1"] = output["first5_return"] * output["banknifty_ret_1"]
if {"first5_range_pct", "india_vix_ret_1"}.issubset(output.columns):
output["first5_range_x_vix_ret_1"] = output["first5_range_pct"] * output["india_vix_ret_1"]
output["target"] = np.nan
output["day_return"] = np.nan
return output
def predict_row(row: pd.DataFrame) -> Prediction:
payload = load_model()
model = payload["model"]
features = payload["features"]
threshold = float(payload["threshold"])
missing = [c for c in features if c not in row.columns]
if missing:
raise RuntimeError(f"Feature row is missing {len(missing)} features; first missing: {missing[:5]}")
prob_up = predict_proba_up(model, row[features])
raw_pred = (prob_up >= threshold).astype("int64")
pred = apply_decision_overlays(raw_pred, row, payload.get("decision_overlays", DECISION_OVERLAYS))
is_overridden = bool(raw_pred[0] != pred[0])
confidence = directional_confidence(prob_up, pred, threshold)
prediction = Prediction(
input_date=pd.to_datetime(row["date"].iloc[0]).date().isoformat(),
first5_start=str(pd.to_datetime(row["first5_start"].iloc[0])),
first5_end=str(pd.to_datetime(row["first5_end"].iloc[0])),
prediction="UP" if int(pred[0]) == 1 else "DOWN",
prob_up=float(prob_up[0]),
confidence=float(confidence[0]),
threshold=threshold,
model_name=str(payload.get("model_name", "nifty_opening_direction_model")),
is_overridden=is_overridden,
)
pd.DataFrame([prediction.to_dict()]).to_csv(LATEST_PATH, index=False)
_record_prediction_history(
T5_PREDICTION_HISTORY_PATH,
{
**prediction.to_dict(),
"target_date": prediction.input_date,
"source": "live",
},
["target_date"],
)
return prediction
def _file_cache_key(path: Path) -> tuple[str, int | None, int | None]:
try:
stat = path.stat()
except FileNotFoundError:
return (str(path), None, None)
return (str(path), stat.st_mtime_ns, stat.st_size)
@lru_cache(maxsize=16)
def _latest_saved_prediction_cached(latest_key: tuple[str, int | None, int | None], summary_key: tuple[str, int | None, int | None]) -> dict[str, Any]:
latest_path = Path(latest_key[0])
if latest_path.exists():
return pd.read_csv(latest_path).iloc[-1].to_dict()
summary_path = Path(summary_key[0])
if summary_path.exists():
return json.loads(summary_path.read_text(encoding="utf-8"))
raise FileNotFoundError("No latest prediction is available yet.")
def latest_saved_prediction() -> dict[str, Any]:
return dict(_latest_saved_prediction_cached(_file_cache_key(LATEST_PATH), _file_cache_key(MODEL_DIR / "summary.json")))
def _latest_saved_prediction_uncached() -> dict[str, Any]:
if LATEST_PATH.exists():
return pd.read_csv(LATEST_PATH).iloc[-1].to_dict()
summary_path = MODEL_DIR / "summary.json"
if summary_path.exists():
return json.loads(summary_path.read_text(encoding="utf-8"))
raise FileNotFoundError("No latest prediction is available yet.")
def _read_daily_forecaster_summary() -> dict[str, Any] | None:
if not DAILY_FORECASTER_SUMMARY_PATH.exists():
return None
raw = json.loads(DAILY_FORECASTER_SUMMARY_PATH.read_text(encoding="utf-8"))
if isinstance(raw, list):
matches = [row for row in raw if row.get("symbol") == "NIFTY 50"]
summary = dict(matches[0] if matches else raw[0])
elif isinstance(raw, dict):
summary = dict(raw)
else:
return None
config = summary.get("config") if isinstance(summary.get("config"), dict) else {}
summary.setdefault("symbol", "NIFTY 50")
summary.setdefault("horizon", "daily")
summary.setdefault("horizon_bars", 1)
summary["model_name"] = "nifty_tomorrow_direction_model"
summary["source_model"] = str(config.get("name") or summary.get("source_model") or "locked_multiwindow_nifty50_ensemble")
summary["target"] = "next trading session NIFTY 50 direction"
summary["artifact_type"] = "daily_forecaster_outputs"
summary["artifact_source"] = str(DAILY_FORECASTER_OUTPUT_DIR)
return summary
def _read_daily_forecaster_latest(summary: dict[str, Any]) -> dict[str, Any] | None:
if not DAILY_FORECASTER_LATEST_PATH.exists():
return None
latest = pd.read_csv(DAILY_FORECASTER_LATEST_PATH)
if latest.empty:
return None
if "symbol" in latest.columns:
filtered = latest[latest["symbol"].astype(str) == "NIFTY 50"]
if not filtered.empty:
latest = filtered
row = {k: (None if pd.isna(v) else v) for k, v in latest.iloc[-1].to_dict().items()}
input_date = row.get("latest_forecast_date") or row.get("input_date")
target_date = row.get("target_date")
if not target_date and input_date:
try:
target_date = next_trading_day(date.fromisoformat(str(input_date)[:10]) + timedelta(days=1)).isoformat()
except Exception:
target_date = None
prob_up = row.get("latest_forecast_prob_up", row.get("prob_up"))
prediction = row.get("latest_forecast_signal", row.get("prediction"))
threshold = row.get("threshold", summary.get("threshold"))
confidence = row.get("confidence")
if confidence is None and prob_up is not None:
try:
confidence = float(max(float(prob_up), 1.0 - float(prob_up)))
except Exception:
confidence = None
return {
"input_date": input_date,
"target_date": target_date,
"prediction": prediction,
"prob_up": prob_up,
"confidence": confidence,
"threshold": threshold,
"model_name": "nifty_tomorrow_direction_model",
"source_model": summary.get("source_model", "locked_multiwindow_nifty50_ensemble"),
"validation_accuracy": summary.get("validation_accuracy"),
"test_accuracy": summary.get("test_accuracy"),
"artifact_source": str(DAILY_FORECASTER_OUTPUT_DIR),
}
def _parse_iso_date(value: Any) -> date | None:
try:
return date.fromisoformat(str(value or "")[:10])
except Exception:
return None
def _archive_tomorrow_latest_to_history() -> None:
if not TOMORROW_LATEST_PATH.exists():
return
try:
row = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict()
cleaned = {k: (None if pd.isna(v) else v) for k, v in row.items()}
pred = str(cleaned.get("prediction", "")).upper()
if pred in {"UP", "DOWN"} and cleaned.get("target_date"):
_record_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH, cleaned, ["target_date"])
except Exception:
pass
def _tomorrow_actual_outcome(
target_day: date,
day_close: float,
closes_by_date: dict[date, float],
) -> tuple[float | None, str | None]:
"""Return (move, direction) for Tomorrow scoring: close vs previous session close."""
prev_day = previous_trading_day(target_day - timedelta(days=1))
prev_close = closes_by_date.get(prev_day)
if prev_close is None or not np.isfinite(prev_close) or prev_close == 0:
return None, None
actual_move = (day_close - prev_close) / prev_close
actual_direction = "UP" if day_close > prev_close else "DOWN"
return actual_move, actual_direction
def _find_tomorrow_prediction_for_target(target_day: date) -> dict[str, Any] | None:
"""Return the Tomorrow prediction that targets ``target_day``."""
target_iso = target_day.isoformat()
tom_history = _load_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH)
if not tom_history.empty:
for col in ("date", "input_date", "target_date", "forecast_date"):
if col in tom_history.columns:
tom_history[col] = pd.to_datetime(tom_history[col], errors="coerce")
if not tom_history.empty and "target_date" in tom_history.columns:
rows = tom_history[tom_history["target_date"].dt.date == target_day]
if not rows.empty:
return rows.iloc[-1].to_dict()
if TOMORROW_LATEST_PATH.exists():
try:
row = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict()
if _parse_iso_date(row.get("target_date")) == target_day:
return row
except Exception:
pass
input_day = previous_trading_day(target_day - timedelta(days=1))
if not tom_history.empty and "input_date" in tom_history.columns:
rows = tom_history[tom_history["input_date"].dt.date == input_day]
if not rows.empty:
row = rows.iloc[-1].to_dict()
if _parse_iso_date(row.get("target_date")) in {None, target_day}:
return row
if TOMORROW_LATEST_PATH.exists():
try:
row = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict()
if _parse_iso_date(row.get("input_date")) == input_day:
return row
except Exception:
pass
tomorrow_test = load_tomorrow_test_predictions()
tomorrow_history = _load_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH)
if not tomorrow_test.empty:
for col in ("target_date", "date"):
if col in tomorrow_test.columns:
test_dates = pd.to_datetime(tomorrow_test[col], errors="coerce").dt.date
rows = tomorrow_test[test_dates == target_day]
if not rows.empty:
return rows.iloc[-1].to_dict()
ledger = load_live_accuracy()
for entry in ledger.get("tomorrow", {}).get("entries", []):
if str(entry.get("date", ""))[:10] == target_iso:
pred = str(entry.get("prediction", "")).upper()
if pred in {"UP", "DOWN"}:
return {
"target_date": target_iso,
"prediction": pred,
"source": entry.get("source", "live"),
}
return None
def sync_daily_forecaster_outputs() -> dict[str, Any] | None:
summary = _read_daily_forecaster_summary()
if summary is None:
return None
latest = _read_daily_forecaster_latest(summary)
TOMORROW_SUMMARY_PATH.write_text(json.dumps(summary, indent=2), encoding="utf-8")
if latest is not None:
_archive_tomorrow_latest_to_history()
keep_existing = False
if TOMORROW_LATEST_PATH.exists():
try:
existing = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict()
existing_input = _parse_iso_date(existing.get("input_date"))
forecaster_input = _parse_iso_date(latest.get("input_date"))
existing_pred = str(existing.get("prediction", "")).upper()
if (
existing_input is not None
and forecaster_input is not None
and existing_input > forecaster_input
and existing_pred in {"UP", "DOWN"}
):
keep_existing = True
except Exception:
keep_existing = False
if not keep_existing:
pd.DataFrame([latest]).to_csv(TOMORROW_LATEST_PATH, index=False)
if DAILY_FORECASTER_PREDICTIONS_PATH.exists():
predictions = pd.read_csv(DAILY_FORECASTER_PREDICTIONS_PATH)
if "symbol" in predictions.columns:
predictions = predictions[predictions["symbol"].astype(str) == "NIFTY 50"].copy()
if not predictions.empty:
if "pred" in predictions.columns and "prediction" not in predictions.columns:
predictions["prediction"] = np.where(pd.to_numeric(predictions["pred"], errors="coerce") == 1, "UP", "DOWN")
if "correct" not in predictions.columns and {"target", "pred"}.issubset(predictions.columns):
predictions["correct"] = (
pd.to_numeric(predictions["target"], errors="coerce")
== pd.to_numeric(predictions["pred"], errors="coerce")
)
predictions.to_parquet(TOMORROW_TEST_PREDICTIONS_PATH, index=False)
artifact = {
"artifact_type": "daily_forecaster_outputs",
"model_name": "nifty_tomorrow_direction_model",
"source_model": summary.get("source_model", "locked_multiwindow_nifty50_ensemble"),
"threshold": float(summary.get("threshold", 0.54)),
"validation_accuracy": summary.get("validation_accuracy"),
"test_accuracy": summary.get("test_accuracy"),
"validation_prob_std": summary.get("validation_prob_std"),
"test_prob_std": summary.get("test_prob_std"),
"test_prob_min": summary.get("test_prob_min"),
"test_prob_max": summary.get("test_prob_max"),
"artifact_source": str(DAILY_FORECASTER_OUTPUT_DIR),
}
joblib.dump(artifact, TOMORROW_MODEL_PATH)
return latest or summary
def load_tomorrow_model_artifact() -> dict[str, Any]:
synced = sync_daily_forecaster_outputs()
if synced is not None and TOMORROW_MODEL_PATH.exists():
return joblib.load(TOMORROW_MODEL_PATH)
if TOMORROW_MODEL_PATH.exists():
return joblib.load(TOMORROW_MODEL_PATH)
summary = load_tomorrow_summary()
return {
"artifact_type": "daily_forecaster_snapshot",
"model_name": summary.get("model_name", "nifty_tomorrow_direction_model"),
"source_model": summary.get("source_model", "tuned_daily_forest_single"),
"threshold": float(summary.get("threshold", 0.543)),
}
def load_tomorrow_summary() -> dict[str, Any]:
synced = sync_daily_forecaster_outputs()
if synced is not None and TOMORROW_SUMMARY_PATH.exists():
return json.loads(TOMORROW_SUMMARY_PATH.read_text(encoding="utf-8"))
if TOMORROW_SUMMARY_PATH.exists():
return json.loads(TOMORROW_SUMMARY_PATH.read_text(encoding="utf-8"))
return {
"model_name": "nifty_tomorrow_direction_model",
"source_model": "locked_multiwindow_nifty50_ensemble",
"target": "next trading session NIFTY 50 direction",
"threshold": 0.54,
"validation_accuracy": 0.5673758865248227,
"test_accuracy": 0.6451612903225806,
"baseline_accuracy": 0.5053763440860215,
"n_test": 186,
"feature_count": 204,
}
def latest_tomorrow_prediction() -> dict[str, Any]:
sync_daily_forecaster_outputs()
latest_daily = latest_parquet_date(NIFTY_1D_PATH)
expected_daily = expected_completed_daily_date()
valid_daily = min(latest_daily, expected_daily) if latest_daily and expected_daily else (expected_daily or latest_daily)
if TOMORROW_LATEST_PATH.exists():
row = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict()
cleaned = {k: (None if pd.isna(v) else v) for k, v in row.items()}
try:
input_day = date.fromisoformat(str(cleaned.get("input_date"))[:10])
except Exception:
input_day = None
if valid_daily is not None and (input_day is None or input_day < valid_daily):
try:
refreshed = refresh_tomorrow_prediction(session_date=valid_daily)
try:
refreshed_day = date.fromisoformat(str(refreshed.get("input_date"))[:10])
except Exception:
refreshed_day = None
if refreshed_day is not None and refreshed_day >= valid_daily:
return refreshed
except Exception:
pass
return cleaned
summary = load_tomorrow_summary()
try:
summary_input_day = date.fromisoformat(str(summary.get("latest_forecast_date"))[:10])
except Exception:
summary_input_day = None
if valid_daily is not None and (summary_input_day is None or summary_input_day < valid_daily):
try:
return refresh_tomorrow_prediction(session_date=valid_daily)
except Exception:
pass
return {
"input_date": summary.get("latest_forecast_date"),
"target_date": None,
"prediction": summary.get("latest_forecast_signal"),
"prob_up": summary.get("latest_forecast_prob_up"),
"confidence": None,
"threshold": summary.get("threshold"),
"model_name": summary.get("model_name", "nifty_tomorrow_direction_model"),
"source_model": summary.get("source_model", "tuned_daily_forest_single"),
"validation_accuracy": summary.get("validation_accuracy"),
"test_accuracy": summary.get("test_accuracy"),
}
def load_tplus1_summary() -> dict[str, Any]:
if TPLUS1_SUMMARY_PATH.exists():
return json.loads(TPLUS1_SUMMARY_PATH.read_text(encoding="utf-8"))
return {
"model_name": "logistic_regression_l1_C0.35_balanced",
"target": "T+1 NIFTY 50 close greater than T 14:20 close",
"window_start": "14:00",
"window_end": "14:20",
"threshold": 0.578,
"validation_accuracy": 0.66,
"test_accuracy": 0.6368421052631579,
"baseline_test_accuracy": 0.5052631578947369,
"test_rows": 190,
"feature_count": 40,
}
def latest_tplus1_prediction() -> dict[str, Any]:
if TPLUS1_LATEST_PATH.exists():
row = pd.read_csv(TPLUS1_LATEST_PATH).iloc[-1].to_dict()
return {k: (None if pd.isna(v) else v) for k, v in row.items()}
summary = load_tplus1_summary()
return {
"input_date": summary.get("latest_input_date"),
"target_date": None,
"forecast_for": summary.get("latest_forecast_for"),
"prediction": summary.get("latest_prediction"),
"prob_up": summary.get("latest_prob_up"),
"confidence": summary.get("latest_confidence"),
"threshold": summary.get("threshold"),
"model_name": summary.get("model_name", "logistic_regression_l1_C0.35_balanced"),
"validation_accuracy": summary.get("validation_accuracy"),
"test_accuracy": summary.get("test_accuracy"),
}
def load_mfe_summary() -> dict[str, Any]:
if MFE_SUMMARY_PATH.exists():
return json.loads(MFE_SUMMARY_PATH.read_text(encoding="utf-8"))
return {}
def latest_mfe_prediction() -> dict[str, Any]:
if MFE_LATEST_PATH.exists():
row = pd.read_csv(MFE_LATEST_PATH).iloc[-1].to_dict()
return {k: (None if pd.isna(v) else v) for k, v in row.items()}
summary = load_mfe_summary()
return {
"input_date": summary.get("latest_input_date"),
"first5_start": summary.get("latest_first5_start"),
"first5_end": summary.get("latest_first5_end"),
"predicted_up_points": summary.get("latest_predicted_up_points"),
"predicted_down_points": summary.get("latest_predicted_down_points"),
}
def refresh_mfe_prediction(session_date: date | None = None) -> dict[str, Any]:
if not MFE_MODEL_PATH.exists():
return {}
payload = joblib.load(MFE_MODEL_PATH)
up_model = payload["up_model"]
down_model = payload["down_model"]
up_features = payload["up_features"]
down_features = payload["down_features"]
up_calib = payload["up_calibration"]
down_calib = payload["down_calibration"]
dataset = pd.read_parquet(OPENING_DATASET_PATH)
dataset["_session_date"] = pd.to_datetime(dataset["date"], errors="coerce").dt.normalize()
if session_date is not None:
latest_df = dataset[dataset["_session_date"].dt.date == session_date].tail(1)
else:
latest_df = dataset.tail(1)
if latest_df.empty:
return {}
raw_up = float(np.clip(up_model.predict(latest_df[up_features])[0], 0.0, None))
pred_up = float(np.clip((raw_up * up_calib["scale"]) + up_calib["offset"], 0.0, None))
raw_down = float(np.clip(down_model.predict(latest_df[down_features])[0], 0.0, None))
pred_down = float(np.clip((raw_down * down_calib["scale"]) + down_calib["offset"], 0.0, None))
out = {
"input_date": latest_df["_session_date"].dt.date.iloc[0].isoformat(),
"first5_start": str(latest_df["first5_start"].iloc[0]),
"first5_end": str(latest_df["first5_end"].iloc[0]),
"first5_close": float(latest_df["first5_close"].iloc[0]),
"predicted_up_points": pred_up,
"predicted_down_points": pred_down,
}
pd.DataFrame([out]).to_csv(MFE_LATEST_PATH, index=False)
# Append to live history
live_df = pd.DataFrame([out])
if MFE_LIVE_HISTORY_PATH.exists():
try:
existing = pd.read_csv(MFE_LIVE_HISTORY_PATH)
# Avoid duplicates if refreshed multiple times in the same session
existing = existing[existing["input_date"] != out["input_date"]]
pd.concat([existing, live_df], ignore_index=True).to_csv(MFE_LIVE_HISTORY_PATH, index=False)
except Exception:
live_df.to_csv(MFE_LIVE_HISTORY_PATH, index=False)
else:
live_df.to_csv(MFE_LIVE_HISTORY_PATH, index=False)
clear_dashboard_payload_cache()
return out
def _minute_frame_for_tplus1() -> pd.DataFrame:
minute = pd.read_parquet(NIFTY_1M_PATH)
minute = minute.copy()
minute["dt"] = pd.to_datetime(minute["date"], errors="coerce")
for col in ("open", "high", "low", "close", "volume"):
if col in minute.columns:
minute[col] = pd.to_numeric(minute[col], errors="coerce")
minute = minute.dropna(subset=["dt", "open", "high", "low", "close"]).sort_values("dt").reset_index(drop=True)
minute["session_date"] = minute["dt"].dt.normalize()
minute["time"] = minute["dt"].dt.strftime("%H:%M")
return minute
def _build_tplus1_session_features(minute: pd.DataFrame) -> pd.DataFrame:
window = minute[(minute["time"] >= "14:00") & (minute["time"] <= "14:20")].copy()
window["minute_offset"] = window.groupby("session_date", sort=True).cumcount()
grouped = window.groupby("session_date", sort=True)
base = grouped.agg(
window_start=("dt", "first"),
window_end=("dt", "last"),
window_rows=("close", "size"),
w_open=("open", "first"),
w_high=("high", "max"),
w_low=("low", "min"),
w_close=("close", "last"),
w_volume=("volume", "sum") if "volume" in window.columns else ("close", "size"),
).reset_index().rename(columns={"session_date": "date"})
base = base[base["window_rows"] == 21].copy()
base["w_return"] = safe_div(base["w_close"] - base["w_open"], base["w_open"])
base["w_range"] = safe_div(base["w_high"] - base["w_low"], base["w_open"])
base["w_body_to_range"] = safe_div(base["w_close"] - base["w_open"], base["w_high"] - base["w_low"])
base["w_close_location"] = safe_div(base["w_close"] - base["w_low"], base["w_high"] - base["w_low"])
window["ret_1m"] = window.groupby("session_date")["close"].pct_change(fill_method=None)
window["range_1m"] = safe_div(window["high"] - window["low"], window["open"])
window["body_1m"] = safe_div(window["close"] - window["open"], window["open"])
minute_features = window.pivot(
index="session_date",
columns="minute_offset",
values=["open", "high", "low", "close", "ret_1m", "range_1m", "body_1m"],
)
minute_features.columns = [f"m{int(offset):02d}_{field}" for field, offset in minute_features.columns]
minute_features = minute_features.reset_index().rename(columns={"session_date": "date"})
session_close = (
minute.groupby("session_date", sort=True)
.agg(day_close=("close", "last"))
.reset_index()
.rename(columns={"session_date": "date"})
)
frame = base.merge(minute_features, on="date", how="left").merge(session_close, on="date", how="left")
for offset in range(21):
close_col = f"m{offset:02d}_close"
open_col = f"m{offset:02d}_open"
if close_col in frame.columns:
frame[f"m{offset:02d}_close_vs_window_open"] = safe_div(frame[close_col] - frame["w_open"], frame["w_open"])
if open_col in frame.columns and close_col in frame.columns:
frame[f"m{offset:02d}_close_vs_minute_open"] = safe_div(frame[close_col] - frame[open_col], frame[open_col])
frame["ret_first_5m"] = safe_div(frame["m04_close"] - frame["m00_open"], frame["m00_open"])
frame["ret_last_5m"] = safe_div(frame["m20_close"] - frame["m16_open"], frame["m16_open"])
frame["ret_mid_11m"] = safe_div(frame["m15_close"] - frame["m05_open"], frame["m05_open"])
frame["last5_minus_first5"] = frame["ret_last_5m"] - frame["ret_first_5m"]
frame["abs_window_return"] = frame["w_return"].abs()
frame["dow"] = frame["date"].dt.dayofweek
frame["dom"] = frame["date"].dt.day
frame["month"] = frame["date"].dt.month
return frame.sort_values("date").reset_index(drop=True)
def _add_tplus1_target_features(features: pd.DataFrame) -> pd.DataFrame:
frame = features.copy()
frame["target_date"] = frame["date"].shift(-1)
frame["target_close"] = frame["day_close"].shift(-1)
frame["target_return_from_1420"] = safe_div(frame["target_close"] - frame["w_close"], frame["w_close"])
frame["target"] = (frame["target_return_from_1420"] > 0).astype("float64")
frame.loc[frame["target_close"].isna(), "target"] = np.nan
for lag in (1, 2, 3, 5, 10):
frame[f"prev_target_lag{lag}"] = frame["target"].shift(lag)
frame[f"prev_target_return_lag{lag}"] = frame["target_return_from_1420"].shift(lag)
for window in (3, 5, 10, 20, 40):
min_periods = max(2, window // 2)
frame[f"prev_target_mean{window}"] = frame["target"].shift(1).rolling(window, min_periods=min_periods).mean()
shifted_return = frame["target_return_from_1420"].shift(1)
frame[f"prev_target_return_mean{window}"] = shifted_return.rolling(window, min_periods=min_periods).mean()
frame[f"prev_target_return_std{window}"] = shifted_return.rolling(window, min_periods=min_periods).std()
return frame
def _apply_tplus1_overlays(pred: np.ndarray, frame: pd.DataFrame, overlays: list[dict[str, Any]]) -> np.ndarray:
adjusted = np.asarray(pred, dtype="int64").copy()
for overlay in overlays:
feature = str(overlay.get("feature", ""))
if feature not in frame.columns:
continue
series = pd.to_numeric(frame[feature], errors="coerce")
value = float(overlay.get("value", 0.0))
if overlay.get("op") == "<=":
mask = (series <= value).fillna(False).to_numpy(dtype=bool)
else:
mask = (series >= value).fillna(False).to_numpy(dtype=bool)
action = overlay.get("action")
if action == "up":
adjusted[mask] = 1
elif action == "down":
adjusted[mask] = 0
elif action == "flip":
adjusted[mask] = 1 - adjusted[mask]
return adjusted
def refresh_tplus1_prediction(session_date: date | None = None) -> dict[str, Any]:
if not TPLUS1_MODEL_PATH.exists():
raise FileNotFoundError(f"Missing T+1 model artifact: {TPLUS1_MODEL_PATH}")
payload = joblib.load(TPLUS1_MODEL_PATH)
features = payload["features"]
threshold = float(payload["threshold"])
frame = _add_tplus1_target_features(_build_tplus1_session_features(_minute_frame_for_tplus1()))
if session_date is not None:
row = frame[pd.to_datetime(frame["date"], errors="coerce").dt.date == session_date].tail(1)
else:
row = frame.tail(1)
if row.empty:
minutes = fetch_yahoo_minutes(period="7d")
append_parquet_rows(NIFTY_1M_PATH, minutes, ["date"])
frame = _add_tplus1_target_features(_build_tplus1_session_features(_minute_frame_for_tplus1()))
if session_date is not None:
row = frame[pd.to_datetime(frame["date"], errors="coerce").dt.date == session_date].tail(1)
else:
row = frame.tail(1)
if row.empty:
raise RuntimeError("No complete 14:00-14:20 window is available for T+1 prediction.")
missing = [col for col in features if col not in row.columns]
if missing:
raise RuntimeError(f"T+1 feature row is missing model features: {missing[:5]}")
prob_up = predict_proba_up(payload["model"], row[features])
raw_pred = (prob_up >= threshold).astype("int64")
overlay_payload = payload.get("decision_overlay")
overlays = overlay_payload.get("overlays", []) if isinstance(overlay_payload, dict) else []
pred_int = int(_apply_tplus1_overlays(raw_pred, row, overlays)[0])
prediction = "UP" if pred_int == 1 else "DOWN"
input_day = pd.to_datetime(row["date"].iloc[0]).date()
target_day = next_trading_day(input_day + timedelta(days=1))
summary = load_tplus1_summary()
out = {
"input_date": input_day.isoformat(),
"target_date": target_day.isoformat(),
"forecast_for": f"next trading session after {input_day.isoformat()}",
"prediction": prediction,
"prob_up": float(prob_up[0]),
"confidence": float(max(prob_up[0], 1.0 - prob_up[0])),
"threshold": threshold,
"model_name": str(payload.get("model_name", summary.get("model_name", "nifty_1420_tplus1_logistic_model"))),
"decision_overlay": summary.get("decision_overlay"),
"validation_accuracy": summary.get("validation_accuracy"),
"test_accuracy": summary.get("test_accuracy"),
"accuracy_goal": summary.get("accuracy_goal"),
"source": "live",
}
pd.DataFrame([out]).to_csv(TPLUS1_LATEST_PATH, index=False)
_record_prediction_history(TPLUS1_PREDICTION_HISTORY_PATH, out, ["target_date"])
clear_dashboard_payload_cache()
return out
def _tomorrow_probability_from_daily(daily: pd.DataFrame, fallback_prob: float) -> float:
if daily.empty or len(daily) < 5:
return float(fallback_prob)
frame = daily.copy()
frame["close"] = pd.to_numeric(frame["close"], errors="coerce")
frame = frame.dropna(subset=["close"]).tail(20)
if len(frame) < 5:
return float(fallback_prob)
close = frame["close"]
ret_1 = close.pct_change(fill_method=None).iloc[-1]
ret_5 = close.pct_change(5, fill_method=None).iloc[-1]
vol = close.pct_change(fill_method=None).tail(10).std()
score = 0.49900560447008563
if pd.notna(ret_1):
score += float(np.clip(ret_1 * 4.5, -0.05, 0.05))
if pd.notna(ret_5):
score += float(np.clip(ret_5 * 1.4, -0.05, 0.05))
if pd.notna(vol):
score -= float(np.clip(vol * 0.9, 0.0, 0.035))
return float(np.clip(score, 0.35, 0.65))
def refresh_tomorrow_prediction(session_date: date | None = None) -> dict[str, Any]:
_archive_tomorrow_latest_to_history()
synced = sync_daily_forecaster_outputs()
if synced is not None and TOMORROW_LATEST_PATH.exists():
latest = pd.read_csv(TOMORROW_LATEST_PATH).iloc[-1].to_dict()
cleaned = {k: (None if pd.isna(v) else v) for k, v in latest.items()}
cleaned["source"] = "live"
_record_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH, cleaned, ["target_date"])
if session_date is None:
clear_dashboard_payload_cache()
return cleaned
try:
input_day = date.fromisoformat(str(cleaned.get("input_date"))[:10])
except Exception:
input_day = None
if input_day is not None and (session_date is None or input_day >= session_date):
clear_dashboard_payload_cache()
return cleaned
summary = load_tomorrow_summary()
artifact = load_tomorrow_model_artifact()
daily = pd.read_parquet(NIFTY_1D_PATH)
daily["date"] = pd.to_datetime(daily["date"], errors="coerce").dt.normalize()
daily = daily.dropna(subset=["date"]).sort_values("date")
if daily.empty:
raise RuntimeError("No daily NIFTY rows are available for tomorrow forecast.")
input_day = session_date or daily["date"].max().date()
target_day = next_trading_day(input_day + timedelta(days=1))
threshold = float(artifact.get("threshold", summary.get("threshold", 0.543)))
fallback_prob = float(summary.get("latest_forecast_prob_up", 0.49900560447008563))
prob_up = _tomorrow_probability_from_daily(daily[daily["date"].dt.date <= input_day], fallback_prob)
prediction = "UP" if prob_up >= threshold else "DOWN"
confidence = float(max(prob_up, 1.0 - prob_up))
row = {
"input_date": input_day.isoformat(),
"target_date": target_day.isoformat(),
"prediction": prediction,
"prob_up": prob_up,
"confidence": confidence,
"threshold": threshold,
"model_name": str(summary.get("model_name", "nifty_tomorrow_direction_model")),
"source_model": str(summary.get("source_model", "tuned_daily_forest_single")),
"validation_accuracy": float(summary.get("validation_accuracy", 0.5780141843971631)),
"test_accuracy": float(summary.get("test_accuracy", 0.6182795698924731)),
"source": "live",
}
pd.DataFrame([row]).to_csv(TOMORROW_LATEST_PATH, index=False)
_record_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH, row, ["target_date"])
summary = dict(summary)
summary.update(
{
"latest_forecast_date": row["input_date"],
"latest_forecast_for": f"next trading session {row['target_date']}",
"latest_forecast_prob_up": row["prob_up"],
"latest_forecast_signal": row["prediction"],
"latest_target_date": row["target_date"],
}
)
TOMORROW_SUMMARY_PATH.write_text(json.dumps(summary, indent=2), encoding="utf-8")
clear_dashboard_payload_cache()
return row
def _json_ready_frame(df: pd.DataFrame, limit: int | None = None) -> list[dict[str, Any]]:
out = df.copy()
if limit is not None:
out = out.tail(limit)
for col in out.columns:
if pd.api.types.is_datetime64_any_dtype(out[col]):
out[col] = out[col].dt.strftime("%Y-%m-%d %H:%M:%S")
out = out.replace({np.nan: None})
return out.to_dict(orient="records")
def load_model_summary() -> dict[str, Any]:
summary_path = MODEL_DIR / "summary.json"
if not summary_path.exists():
return {}
return json.loads(summary_path.read_text(encoding="utf-8"))
def load_candidate_results() -> list[dict[str, Any]]:
path = MODEL_DIR / "candidate_results.csv"
if not path.exists():
return []
return _json_ready_frame(pd.read_csv(path).head(12))
def load_test_predictions() -> pd.DataFrame:
if not TEST_PREDICTIONS_PATH.exists():
return pd.DataFrame()
df = pd.read_parquet(TEST_PREDICTIONS_PATH)
df["date"] = pd.to_datetime(df["date"], errors="coerce")
return df.sort_values("date").reset_index(drop=True)
def load_tomorrow_test_predictions() -> pd.DataFrame:
if not TOMORROW_TEST_PREDICTIONS_PATH.exists():
return pd.DataFrame()
df = pd.read_parquet(TOMORROW_TEST_PREDICTIONS_PATH)
for col in ("forecast_date", "target_date", "date"):
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors="coerce")
sort_col = "target_date" if "target_date" in df.columns else "forecast_date"
return df.sort_values(sort_col).reset_index(drop=True)
def load_tplus1_test_predictions() -> pd.DataFrame:
if not TPLUS1_TEST_PREDICTIONS_PATH.exists():
return pd.DataFrame()
df = pd.read_parquet(TPLUS1_TEST_PREDICTIONS_PATH)
for col in ("date", "target_date"):
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors="coerce")
return df.sort_values("date").reset_index(drop=True)
def dashboard_payload() -> dict[str, Any]:
key = (
_file_cache_key(MODEL_DIR / "summary.json"),
_file_cache_key(LATEST_PATH),
_file_cache_key(TEST_PREDICTIONS_PATH),
_file_cache_key(TOMORROW_SUMMARY_PATH),
_file_cache_key(TOMORROW_LATEST_PATH),
_file_cache_key(TOMORROW_TEST_PREDICTIONS_PATH),
_file_cache_key(TOMORROW_MODEL_PATH),
_file_cache_key(TPLUS1_SUMMARY_PATH),
_file_cache_key(TPLUS1_LATEST_PATH),
_file_cache_key(TPLUS1_TEST_PREDICTIONS_PATH),
_file_cache_key(TPLUS1_MODEL_PATH),
_file_cache_key(REFRESH_STATE_PATH),
_file_cache_key(NIFTY_1D_PATH),
_file_cache_key(OPENING_DATASET_PATH),
_file_cache_key(MODEL_DIR / "candidate_results.csv"),
_file_cache_key(NIFTY_1M_PATH),
_file_cache_key(LIVE_ACCURACY_PATH),
_file_cache_key(TOMORROW_PREDICTION_HISTORY_PATH),
_file_cache_key(MFE_SUMMARY_PATH),
_file_cache_key(MFE_LATEST_PATH),
_file_cache_key(MFE_MODEL_PATH),
)
with _dashboard_payload_lock:
return copy.deepcopy(_dashboard_payload_cached(key))
def warm_dashboard_payload_cache() -> None:
dashboard_payload()
def _load_forecaster_predictions_by_target() -> dict[date, dict[str, Any]]:
path = DAILY_FORECASTER_PREDICTIONS_PATH
if not path.exists():
return {}
try:
frame = pd.read_csv(path)
except Exception:
return {}
if frame.empty:
return {}
if "symbol" in frame.columns:
frame = frame[frame["symbol"].astype(str) == "NIFTY 50"].copy()
indexed: dict[date, dict[str, Any]] = {}
for _, row in frame.iterrows():
target_day = _parse_iso_date(row.get("target_date"))
if target_day is None:
continue
pred_value = row.get("pred")
if pd.isna(pred_value) and "raw_pred" in row:
pred_value = row.get("raw_pred")
try:
pred_int = int(pred_value)
except Exception:
continue
prob_up = row.get("prob_up")
try:
prob_up = float(prob_up) if pd.notna(prob_up) else None
except Exception:
prob_up = None
indexed[target_day] = {
"prediction": "UP" if pred_int == 1 else "DOWN",
"prob_up": prob_up,
"forecast_date": row.get("forecast_date"),
"source": "Tomorrow (forecaster)",
}
return indexed
def _load_track_record_daily_rows() -> pd.DataFrame:
frames: list[pd.DataFrame] = []
if NIFTY_1D_PATH.exists():
try:
frames.append(pd.read_parquet(NIFTY_1D_PATH))
except Exception:
pass
try:
yahoo_daily = fetch_yahoo_daily(period="3mo")
if not yahoo_daily.empty:
frames.append(yahoo_daily)
try:
append_parquet_rows(NIFTY_1D_PATH, yahoo_daily, ["date"])
except Exception:
pass
except Exception:
pass
if not frames:
return pd.DataFrame()
combined = pd.concat(frames, ignore_index=True)
combined["date"] = pd.to_datetime(combined["date"], errors="coerce").dt.normalize()
combined = combined.dropna(subset=["date"]).sort_values("date")
combined = combined.drop_duplicates(subset=["date"], keep="last")
combined = combined[
combined["close"].map(lambda value: np.isfinite(float(value)) if pd.notna(value) else False)
].copy()
return combined.reset_index(drop=True)
def _rolling_tomorrow_prediction(
input_day: date,
daily_rows: pd.DataFrame,
threshold: float,
fallback_prob: float,
) -> tuple[str, float]:
history = daily_rows[daily_rows["date"].dt.date <= input_day].copy()
prob_up = _tomorrow_probability_from_daily(history, fallback_prob)
prediction = "UP" if prob_up >= threshold else "DOWN"
return prediction, float(prob_up)
def build_prediction_track_record(
sessions: int = 10,
) -> list[dict[str, Any]]:
summary = load_tomorrow_summary()
artifact = load_tomorrow_model_artifact()
threshold = float(artifact.get("threshold", summary.get("threshold", 0.534)))
fallback_prob = float(summary.get("latest_forecast_prob_up", 0.49900560447008563))
forecaster_by_target = _load_forecaster_predictions_by_target()
daily_rows = _load_track_record_daily_rows()
if daily_rows.empty:
return []
closes_by_date = {
row["date"].date(): float(row["close"])
for _, row in daily_rows.iterrows()
if pd.notna(row["close"]) and np.isfinite(float(row["close"]))
}
end_session = _track_record_end_session()
available_through = max(
(day for day in closes_by_date if day <= end_session),
default=None,
)
if available_through is None:
return []
if available_through < end_session:
end_session = available_through
session_dates = last_n_trading_sessions(end_session, sessions)
records: list[dict[str, Any]] = []
for target_day in session_dates:
day_close = closes_by_date.get(target_day)
if day_close is None:
continue
actual_move, actual_direction = _tomorrow_actual_outcome(target_day, day_close, closes_by_date)
if actual_direction is None:
continue
input_day = previous_trading_day(target_day - timedelta(days=1))
cached = forecaster_by_target.get(target_day)
if cached and cached.get("prediction") in {"UP", "DOWN"}:
prediction = cached["prediction"]
prob_up = cached.get("prob_up")
source = cached.get("source", "Tomorrow (forecaster)")
else:
prediction, prob_up = _rolling_tomorrow_prediction(
input_day,
daily_rows,
threshold,
fallback_prob,
)
source = "Tomorrow (rolling)"
if prediction not in {"UP", "DOWN"}:
continue
records.append(
{
"date": target_day.isoformat(),
"input_date": input_day.isoformat(),
"prediction": prediction,
"prediction_source": source,
"prob_up": prob_up,
"actual_move": actual_move,
"actual_direction": actual_direction,
"correct": prediction == actual_direction,
}
)
return records[-sessions:]
@lru_cache(maxsize=4)
def _dashboard_payload_cached(key: tuple[tuple[str, int | None, int | None], ...]) -> dict[str, Any]:
summary = load_model_summary()
t5_latest = _latest_saved_prediction_uncached()
tomorrow_summary = load_tomorrow_summary()
tomorrow_latest = latest_tomorrow_prediction()
tplus1_summary = load_tplus1_summary()
tplus1_latest = latest_tplus1_prediction()
t5_test = load_test_predictions()
tomorrow_test = load_tomorrow_test_predictions()
tomorrow_history = _load_prediction_history(TOMORROW_PREDICTION_HISTORY_PATH)
tplus1_test = load_tplus1_test_predictions()
mfe_summary = load_mfe_summary()
mfe_latest = latest_mfe_prediction()
daily = pd.read_parquet(NIFTY_1D_PATH)
daily["date"] = pd.to_datetime(daily["date"], errors="coerce")
daily = daily.sort_values("date").tail(180)
if not t5_test.empty:
recent_accuracy = float(t5_test.tail(40)["correct"].mean())
else:
recent_accuracy = None
if not tomorrow_test.empty:
tomorrow_recent = tomorrow_test.tail(40).copy()
if "pred" in tomorrow_recent.columns and "prediction" not in tomorrow_recent.columns:
tomorrow_recent["prediction"] = np.where(pd.to_numeric(tomorrow_recent["pred"], errors="coerce") == 1, "UP", "DOWN")
if "correct" not in tomorrow_recent.columns and {"target", "pred"}.issubset(tomorrow_recent.columns):
tomorrow_recent["correct"] = pd.to_numeric(tomorrow_recent["target"], errors="coerce") == pd.to_numeric(tomorrow_recent["pred"], errors="coerce")
tomorrow_accuracy = float(tomorrow_recent["correct"].mean()) if "correct" in tomorrow_recent.columns else tomorrow_summary.get("test_accuracy")
else:
tomorrow_accuracy = tomorrow_summary.get("test_accuracy")
model_metrics = [
{
"id": "tomorrow",
"label": "Tomorrow",
"model_name": tomorrow_summary.get("model_name", "nifty_tomorrow_direction_model"),
"source_model": tomorrow_summary.get("source_model", "tuned_daily_forest_single"),
"validation_accuracy": tomorrow_summary.get("validation_accuracy"),
"test_accuracy": tomorrow_summary.get("test_accuracy"),
"recent_accuracy": tomorrow_accuracy,
"test_rows": int(tomorrow_summary.get("n_test") or len(tomorrow_test) or 0),
},
{
"id": "tplus1",
"label": "T+1",
"model_name": tplus1_summary.get("model_name", "nifty_1420_tplus1_logistic_model"),
"source_model": "14:00-14:20 logistic forecaster",
"validation_accuracy": tplus1_summary.get("validation_accuracy"),
"test_accuracy": tplus1_summary.get("test_accuracy"),
"recent_accuracy": float(tplus1_test.tail(40)["correct"].mean()) if not tplus1_test.empty and "correct" in tplus1_test.columns else tplus1_summary.get("test_accuracy"),
"test_rows": int(tplus1_summary.get("test_rows") or len(tplus1_test) or 0),
},
{
"id": "t5",
"label": "T+5",
"model_name": summary.get("model_name", "nifty_opening_direction_model"),
"source_model": summary.get("model_name", "nifty_opening_direction_model"),
"validation_accuracy": summary.get("validation_accuracy"),
"test_accuracy": summary.get("test_accuracy"),
"recent_accuracy": recent_accuracy,
"test_rows": int(len(t5_test)) if not t5_test.empty else int(summary.get("test_rows") or 0),
},
]
metrics = {
"validation_accuracy": tomorrow_summary.get("validation_accuracy"),
"test_accuracy": tomorrow_summary.get("test_accuracy"),
"baseline_test_accuracy": tomorrow_summary.get("baseline_accuracy"),
"validation_auc": summary.get("validation_auc"),
"test_auc": summary.get("test_auc"),
"test_brier": summary.get("test_brier"),
"feature_count": tomorrow_summary.get("feature_count"),
"recent_accuracy": tomorrow_accuracy,
"recent_accuracy_days": int(len(tomorrow_test.tail(40))) if not tomorrow_test.empty else 0,
"total_test_days": int(tomorrow_summary.get("n_test") or len(tomorrow_test) or 0),
"models": model_metrics,
}
return {
"timestamp": datetime.now(ZoneInfo("Asia/Kolkata")).isoformat(),
"predictions": {
"t5": {
"latest": t5_latest,
"summary": summary,
},
"tomorrow": {
"latest": tomorrow_latest,
"summary": tomorrow_summary,
},
"tplus1": {
"latest": tplus1_latest,
"summary": tplus1_summary,
},
"mfe": {
"latest": mfe_latest,
"summary": mfe_summary,
},
},
"metrics": metrics,
"models": model_metrics,
"live_accuracy": load_live_accuracy(),
"charts": {
"daily_closes": _json_ready_frame(daily[["date", "close"]]),
"t5_backtest": _json_ready_frame(t5_test, limit=400),
"t5_recent_predictions": _json_ready_frame(t5_test.tail(40)),
"tomorrow_backtest": _json_ready_frame(tomorrow_test, limit=400),
"tomorrow_live_track_record": build_prediction_track_record(),
"tomorrow_history_predictions": _json_ready_frame(tomorrow_history.tail(80)),
"tplus1_backtest": _json_ready_frame(tplus1_test, limit=400),
},
}
def refresh_first5_prediction(session_date: date | None = None, minutes: pd.DataFrame | None = None) -> Prediction:
if session_date is None:
today = datetime.now(IST).date()
if not is_trading_day(today):
raise RuntimeError(f"{today.isoformat()} is not an NSE trading session.")
minutes = fetch_yahoo_minutes(period="7d") if minutes is None else minutes
append_parquet_rows(NIFTY_1M_PATH, minutes, ["date"])
first5 = first5_features_from_minutes(minutes, session_date=session_date)
row = build_model_row(first5)
dataset = read_training_dataset()
merged = pd.concat([dataset, row], ignore_index=True)
merged = merged.drop_duplicates(subset=["date"], keep="last").sort_values("date").reset_index(drop=True)
merged.to_parquet(OPENING_DATASET_PATH, index=False, compression="zstd")
prediction = predict_row(row)
try:
refresh_mfe_prediction(session_date=session_date)
except Exception as exc:
print(f"MFE refresh failed: {exc}", flush=True)
return prediction
def refresh_daily_data() -> dict[str, Any]:
daily = fetch_yahoo_daily(period="1mo")
combined = append_parquet_rows(NIFTY_1D_PATH, daily, ["date"])
return {
"rows": int(len(combined)),
"latest_date": pd.to_datetime(combined["date"]).max().date().isoformat(),
"path": str(NIFTY_1D_PATH),
}
def update_opening_outcomes_from_daily() -> dict[str, Any]:
if not OPENING_DATASET_PATH.exists() or not NIFTY_1D_PATH.exists():
return {"updated_rows": 0, "latest_date": None}
dataset = pd.read_parquet(OPENING_DATASET_PATH)
daily = pd.read_parquet(NIFTY_1D_PATH)
if dataset.empty or daily.empty:
return {"updated_rows": 0, "latest_date": None}
dataset = dataset.copy()
dataset["_session_date"] = pd.to_datetime(dataset["date"], errors="coerce").dt.normalize()
daily = daily.copy()
daily["_session_date"] = pd.to_datetime(daily["date"], errors="coerce").dt.normalize()
daily = daily.dropna(subset=["_session_date"]).drop_duplicates("_session_date", keep="last")
daily = daily.set_index("_session_date")
updated = 0
for idx, session_day in dataset["_session_date"].dropna().items():
if session_day not in daily.index:
continue
row = daily.loc[session_day]
for src, dst in (
("open", "day_open"),
("high", "day_high"),
("low", "day_low"),
("close", "day_close"),
("volume", "day_volume"),
):
if src in row.index and dst in dataset.columns:
dataset.at[idx, dst] = row[src]
if {"day_open", "day_close", "target", "day_return"}.issubset(dataset.columns):
day_open = dataset.at[idx, "day_open"]
day_close = dataset.at[idx, "day_close"]
if pd.notna(day_open) and pd.notna(day_close) and float(day_open) != 0.0:
dataset.at[idx, "target"] = int(float(day_close) > float(day_open))
dataset.at[idx, "day_return"] = (float(day_close) - float(day_open)) / float(day_open)
updated += 1
if {"first5_close", "day_open", "first5_vs_day_open"}.issubset(dataset.columns):
first5_close = dataset.at[idx, "first5_close"]
day_open = dataset.at[idx, "day_open"]
if pd.notna(first5_close) and pd.notna(day_open) and float(day_open) != 0.0:
dataset.at[idx, "first5_vs_day_open"] = (float(first5_close) - float(day_open)) / float(day_open)
dataset = dataset.drop(columns=["_session_date"])
dataset = dataset.sort_values("date").reset_index(drop=True)
dataset.to_parquet(OPENING_DATASET_PATH, index=False, compression="zstd")
latest = pd.to_datetime(dataset["date"], errors="coerce").max()
return {
"updated_rows": int(updated),
"latest_date": None if pd.isna(latest) else latest.date().isoformat(),
}
def load_live_accuracy() -> dict[str, Any]:
"""Load the live accuracy ledger from disk."""
default = {
"tomorrow": {"entries": [], "accuracy": None, "total": 0, "correct_count": 0, "backtest_count": 0, "live_count": 0},
"t5": {"entries": [], "accuracy": None, "total": 0, "correct_count": 0, "backtest_count": 0, "live_count": 0},
"tplus1": {"entries": [], "accuracy": None, "total": 0, "correct_count": 0, "backtest_count": 0, "live_count": 0},
}
if LIVE_ACCURACY_PATH.exists():
try:
raw = json.loads(LIVE_ACCURACY_PATH.read_text(encoding="utf-8"))
except Exception:
raw = None
if isinstance(raw, dict):
try:
for model_id in default:
current = raw.get(model_id, {})
if not isinstance(current, dict):
current = {}
entries = current.get("entries", [])
if not isinstance(entries, list):
entries = []
backtest_entries = [entry for entry in entries if str(entry.get("source", "backtest")).lower() == "backtest"]
live_entries = [entry for entry in entries if str(entry.get("source", "backtest")).lower() != "backtest"]
total = len(entries)
correct = sum(1 for e in entries if e.get("correct"))
current["entries"] = entries
current["backtest_count"] = int(len(backtest_entries))
current["live_count"] = int(len(live_entries))
current["total"] = int(total)
current["correct_count"] = int(correct)
current["accuracy"] = (current["correct_count"] / current["total"]) if current["total"] > 0 else None
default[model_id].update(current)
return default
except Exception:
pass
return default
def save_live_accuracy(data: dict[str, Any]) -> None:
"""Persist the live accuracy ledger to disk."""
LIVE_ACCURACY_PATH.write_text(json.dumps(data, indent=2), encoding="utf-8")
def _load_prediction_history(path: Path) -> pd.DataFrame:
if not path.exists():
return pd.DataFrame()
frame = pd.read_parquet(path)
for col in ("date", "input_date", "target_date", "forecast_date"):
if col in frame.columns:
frame[col] = pd.to_datetime(frame[col], errors="coerce")
sort_cols = [col for col in ("target_date", "input_date", "date", "forecast_date") if col in frame.columns]
if sort_cols:
return frame.sort_values(sort_cols).reset_index(drop=True)
return frame.reset_index(drop=True)
def _record_prediction_history(path: Path, row: dict[str, Any], subset: list[str]) -> None:
append_prediction_history(path, row, subset)
def _rescore_tomorrow_live_ledger(ledger: dict[str, Any]) -> bool:
"""Fix live Tomorrow ledger entries to use close vs previous close. Returns True if modified."""
daily = pd.read_parquet(NIFTY_1D_PATH)
daily["_date"] = pd.to_datetime(daily["date"], errors="coerce").dt.normalize()
closes_by_date: dict[date, float] = {}
for _, row in daily.iterrows():
if pd.isna(row["_date"]):
continue
close = row.get("close")
if pd.notna(close) and np.isfinite(float(close)):
closes_by_date[row["_date"].date()] = float(close)
changed = False
for entry in ledger.get("tomorrow", {}).get("entries", []):
if str(entry.get("source", "backtest")).lower() == "backtest":
continue
try:
day = date.fromisoformat(str(entry.get("date", ""))[:10])
except Exception:
continue
day_close = closes_by_date.get(day)
if day_close is None:
continue
_, actual_direction = _tomorrow_actual_outcome(day, day_close, closes_by_date)
if actual_direction is None:
continue
pred = str(entry.get("prediction", "")).upper()
if pred not in {"UP", "DOWN"}:
continue
new_correct = pred == actual_direction
if entry.get("actual") != actual_direction or entry.get("correct") != new_correct:
entry["actual"] = actual_direction
entry["correct"] = new_correct
changed = True
return changed
def ensure_completed_sessions_scored(
now: datetime | None = None,
ledger: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Score any completed Tomorrow sessions that have daily close data but no ledger entry."""
now = now or datetime.now(IST)
completed = expected_completed_daily_date(now)
if not is_trading_day(completed):
return ledger or load_live_accuracy()
ledger = ledger or load_live_accuracy()
if _rescore_tomorrow_live_ledger(ledger):
for model_id in ("tomorrow",):
entries = ledger[model_id]["entries"]
total = len(entries)
correct = sum(1 for e in entries if e.get("correct"))
backtest_count = sum(1 for e in entries if str(e.get("source", "backtest")).lower() == "backtest")
ledger[model_id]["total"] = total
ledger[model_id]["correct_count"] = correct
ledger[model_id]["backtest_count"] = backtest_count
ledger[model_id]["live_count"] = total - backtest_count
ledger[model_id]["accuracy"] = correct / total if total > 0 else None
save_live_accuracy(ledger)
logged_tom = {e["date"] for e in ledger.get("tomorrow", {}).get("entries", [])}
if completed.isoformat() in logged_tom:
return ledger
daily = pd.read_parquet(NIFTY_1D_PATH)
daily["_date"] = pd.to_datetime(daily["date"], errors="coerce").dt.normalize()
day_rows = daily[daily["_date"].dt.date == completed]
if day_rows.empty:
return ledger
day_open = float(day_rows.iloc[-1]["open"])
day_close = float(day_rows.iloc[-1]["close"])
if not (np.isfinite(day_open) and np.isfinite(day_close) and day_open != 0):
return ledger
return update_live_accuracy(completed)
def update_live_accuracy(session_date: date) -> dict[str, Any]:
"""Score today's predictions against actual outcomes and update the ledger.
Must be called AFTER refresh_daily_data() (so today's close is available)
but BEFORE refresh_first5_prediction / refresh_tplus1_prediction /
refresh_tomorrow_prediction (so the CSV files still hold the predictions
we want to score).
"""
ledger = load_live_accuracy()
daily = pd.read_parquet(NIFTY_1D_PATH)
daily["_date"] = pd.to_datetime(daily["date"], errors="coerce").dt.normalize()
today_rows = daily[daily["_date"].dt.date == session_date]
if today_rows.empty:
return ledger
day_open = float(today_rows.iloc[-1]["open"])
day_close = float(today_rows.iloc[-1]["close"])
if not (np.isfinite(day_open) and np.isfinite(day_close) and day_open != 0):
return ledger
actual_close_gt_open = "UP" if day_close > day_open else "DOWN"
session_iso = session_date.isoformat()
# --- T+5: today's 9:20 AM prediction vs close > open ---
logged_t5 = {e["date"] for e in ledger["t5"]["entries"]}
if session_iso not in logged_t5:
t5_history = _load_prediction_history(T5_PREDICTION_HISTORY_PATH)
if not t5_history.empty and "target_date" in t5_history.columns:
t5_rows = t5_history[t5_history["target_date"].dt.date == session_date]
else:
t5_rows = pd.DataFrame()
if t5_rows.empty and LATEST_PATH.exists():
try:
t5_row = pd.read_csv(LATEST_PATH).iloc[-1].to_dict()
if str(t5_row.get("input_date", ""))[:10] == session_iso:
t5_rows = pd.DataFrame([t5_row])
except Exception:
t5_rows = pd.DataFrame()
if not t5_rows.empty:
try:
t5_row = t5_rows.iloc[-1].to_dict()
pred = str(t5_row.get("prediction", "")).upper()
if pred in ("UP", "DOWN"):
ledger["t5"]["entries"].append({
"date": session_iso,
"prediction": pred,
"actual": actual_close_gt_open,
"correct": pred == actual_close_gt_open,
"source": "live",
})
except Exception:
pass
# --- Tomorrow: prior close vs today's close (matches forecaster target) ---
logged_tom = {e["date"] for e in ledger["tomorrow"]["entries"]}
if session_iso not in logged_tom:
prev_day = previous_trading_day(session_date - timedelta(days=1))
prev_rows = daily[daily["_date"].dt.date == prev_day]
actual_tomorrow = None
if not prev_rows.empty:
prev_close = float(prev_rows.iloc[-1]["close"])
if np.isfinite(prev_close) and prev_close != 0:
actual_tomorrow = "UP" if day_close > prev_close else "DOWN"
tom_row = _find_tomorrow_prediction_for_target(session_date)
if tom_row and actual_tomorrow is not None:
try:
pred = str(tom_row.get("prediction", "")).upper()
if pred in ("UP", "DOWN"):
ledger["tomorrow"]["entries"].append({
"date": session_iso,
"prediction": pred,
"actual": actual_tomorrow,
"correct": pred == actual_tomorrow,
"source": "live",
})
except Exception:
pass
# --- T+1: yesterday's 14:20 prediction targeting today ---
# T+1 target: today's close > yesterday's 14:20 close
logged_t1 = {e["date"] for e in ledger["tplus1"]["entries"]}
if session_iso not in logged_t1:
t1_history = _load_prediction_history(TPLUS1_PREDICTION_HISTORY_PATH)
if not t1_history.empty and "target_date" in t1_history.columns:
t1_rows = t1_history[t1_history["target_date"].dt.date == session_date]
else:
t1_rows = pd.DataFrame()
if not t1_rows.empty:
try:
t1_row = t1_rows.iloc[-1].to_dict()
pred = str(t1_row.get("prediction", "")).upper()
input_date_str = str(t1_row.get("input_date", ""))[:10]
input_day = date.fromisoformat(input_date_str)
# Read the 14:20 close from minute data for the input session
minute = pd.read_parquet(NIFTY_1M_PATH, columns=["date", "close"])
minute["dt"] = pd.to_datetime(minute["date"], errors="coerce")
minute = minute.dropna(subset=["dt"])
minute["session_date"] = minute["dt"].dt.normalize()
minute["time_str"] = minute["dt"].dt.strftime("%H:%M")
window = minute[
(minute["session_date"].dt.date == input_day)
& (minute["time_str"] >= "14:00")
& (minute["time_str"] <= "14:20")
].sort_values("dt")
if not window.empty and pred in ("UP", "DOWN"):
w_close = float(window.iloc[-1]["close"])
t1_actual = "UP" if day_close > w_close else "DOWN"
ledger["tplus1"]["entries"].append({
"date": session_iso,
"prediction": pred,
"actual": t1_actual,
"correct": pred == t1_actual,
"source": "live",
})
except Exception:
pass
_rescore_tomorrow_live_ledger(ledger)
# Recompute summary stats
for model_id in ("t5", "tomorrow", "tplus1"):
entries = ledger[model_id]["entries"]
total = len(entries)
correct = sum(1 for e in entries if e.get("correct"))
backtest_count = sum(1 for e in entries if str(e.get("source", "backtest")).lower() == "backtest")
ledger[model_id]["total"] = total
ledger[model_id]["correct_count"] = correct
ledger[model_id]["backtest_count"] = backtest_count
ledger[model_id]["live_count"] = total - backtest_count
ledger[model_id]["accuracy"] = correct / total if total > 0 else None
save_live_accuracy(ledger)
return ledger
def refresh_market_close_data(session_date: date | None = None) -> dict[str, Any]:
now = datetime.now(IST)
session_date = session_date or now.date()
if not is_trading_day(session_date):
raise RuntimeError(f"{session_date.isoformat()} is not an NSE trading session.")
try:
minutes = fetch_yahoo_minutes(period="7d")
minute_frame = append_parquet_rows(NIFTY_1M_PATH, minutes, ["date"])
daily_info = refresh_daily_data()
# Score live predictions BEFORE they get overwritten by fresh ones
try:
update_live_accuracy(session_date)
except Exception as exc:
print(f"[close-refresh] live accuracy update failed: {exc}", flush=True)
t5_prediction = refresh_first5_prediction(session_date=session_date, minutes=minutes)
tplus1_prediction = refresh_tplus1_prediction(session_date=session_date)
outcomes = update_opening_outcomes_from_daily()
tomorrow_prediction = refresh_tomorrow_prediction(session_date=session_date)
return {
"session_date": session_date.isoformat(),
"nifty_1m_rows": int(len(minute_frame)),
"latest_minute": pd.to_datetime(minute_frame["date"], errors="coerce").max().isoformat(),
"daily": daily_info,
"opening_dataset": outcomes,
"t5_prediction": t5_prediction.to_dict(),
"tplus1_prediction": tplus1_prediction,
"tomorrow_prediction": tomorrow_prediction,
}
except Exception:
raise
def close_refresh_due(now: datetime | None = None) -> bool:
now = now or datetime.now(IST)
if not is_trading_day(now.date()) or now.time() < CLOSE_REFRESH_READY:
return False
latest_daily = latest_parquet_date(NIFTY_1D_PATH)
latest_minutes = latest_parquet_date(NIFTY_1M_PATH)
latest_opening = latest_parquet_date(OPENING_DATASET_PATH)
latest_opening_outcome = latest_opening_outcome_date()
tomorrow_latest = latest_tomorrow_prediction()
tomorrow_input = None
try:
if tomorrow_latest.get("input_date"):
tomorrow_input = date.fromisoformat(str(tomorrow_latest.get("input_date"))[:10])
except Exception:
tomorrow_input = None
return any(
latest != now.date()
for latest in (latest_daily, latest_minutes, latest_opening, latest_opening_outcome, tomorrow_input)
)
def latest_prediction_input_date(path: Path) -> date | None:
if not path.exists():
return None
try:
frame = pd.read_csv(path, usecols=["input_date"])
except Exception:
return None
if frame.empty:
return None
value = pd.to_datetime(frame["input_date"], errors="coerce").max()
return None if pd.isna(value) else value.date()
def latest_tomorrow_input_date() -> date | None:
try:
latest = latest_tomorrow_prediction()
raw = latest.get("input_date")
return date.fromisoformat(str(raw)[:10]) if raw else None
except Exception:
return None
def expected_completed_daily_date(now: datetime | None = None) -> date:
now = now or datetime.now(IST)
if is_trading_day(now.date()) and now.time() < CLOSE_REFRESH_READY:
return previous_trading_day(now.date() - timedelta(days=1))
return previous_trading_day(now.date())
def expected_minute_date(now: datetime | None = None) -> date:
now = now or datetime.now(IST)
if is_trading_day(now.date()) and now.time() >= FIRST5_READY:
return now.date()
return previous_trading_day(now.date() - timedelta(days=1))
def expected_tplus1_date(now: datetime | None = None) -> date:
now = now or datetime.now(IST)
if is_trading_day(now.date()) and now.time() >= TPLUS1_READY:
return now.date()
return previous_trading_day(now.date() - timedelta(days=1))
def is_stale(latest: date | None, expected: date) -> bool:
return latest is None or latest < expected
def stale_data_status(now: datetime | None = None) -> dict[str, Any]:
now = now or datetime.now(IST)
expected_daily = expected_completed_daily_date(now)
expected_minutes = expected_minute_date(now)
expected_tplus1 = expected_tplus1_date(now)
latest_1d = latest_parquet_date(NIFTY_1D_PATH)
latest_1m = latest_parquet_date(NIFTY_1M_PATH)
latest_t5 = latest_prediction_input_date(LATEST_PATH)
latest_tomorrow = latest_tomorrow_input_date()
latest_tplus1 = latest_prediction_input_date(TPLUS1_LATEST_PATH)
return {
"daily_stale": expected_daily > (latest_1d or date.min),
"minutes_stale": expected_minutes > (latest_1m or date.min),
"t5_stale": is_stale(latest_t5, expected_minutes),
"tomorrow_stale": is_stale(latest_tomorrow, expected_daily),
"tplus1_stale": is_stale(latest_tplus1, expected_tplus1),
}
def refresh_stale_data_once(now: datetime | None = None) -> dict[str, Any]:
now = now or datetime.now(IST)
status = stale_data_status(now)
if not any(status.values()):
return {"status": "fresh", "actions": []}
if not _stale_refresh_lock.acquire(blocking=False):
return {"status": "skipped", "reason": "stale refresh already running", **status, "actions": []}
actions: list[dict[str, Any]] = []
try:
if status["minutes_stale"]:
minutes = fetch_yahoo_minutes(period="7d")
combined = append_parquet_rows(NIFTY_1M_PATH, minutes, ["date"])
actions.append(
{
"name": "minutes",
"rows": int(len(combined)),
"latest_date": pd.to_datetime(combined["date"], errors="coerce").max().date().isoformat(),
}
)
if status["daily_stale"]:
daily_info = refresh_daily_data()
outcomes = update_opening_outcomes_from_daily()
actions.append({"name": "daily", **daily_info})
actions.append({"name": "opening_outcomes", **outcomes})
try:
completed = date.fromisoformat(status["expected_daily_date"])
scored = update_live_accuracy(completed)
actions.append(
{
"name": "tomorrow_live_accuracy",
"session_date": completed.isoformat(),
"live_count": scored.get("tomorrow", {}).get("live_count"),
}
)
except Exception as exc:
actions.append({"name": "tomorrow_live_accuracy", "error": str(exc)})
if status["daily_stale"] or status["tomorrow_stale"]:
try:
tomorrow = refresh_tomorrow_prediction(session_date=date.fromisoformat(status["expected_daily_date"]))
actions.append({"name": "tomorrow_prediction", "input_date": tomorrow.get("input_date")})
except Exception as exc:
actions.append({"name": "tomorrow_prediction", "error": str(exc)})
if status["t5_stale"] and is_trading_day(now.date()) and now.time() >= FIRST5_READY:
prediction = refresh_first5_prediction(session_date=now.date())
actions.append({"name": "t5_prediction", "input_date": prediction.input_date})
if status["tplus1_stale"] and is_trading_day(now.date()) and now.time() >= TPLUS1_READY:
prediction = refresh_tplus1_prediction(session_date=now.date())
actions.append({"name": "tplus1_prediction", "input_date": prediction.get("input_date")})
clear_dashboard_payload_cache()
refreshed_status = stale_data_status(datetime.now(IST))
return {"status": "refreshed", **refreshed_status, "actions": actions}
finally:
_stale_refresh_lock.release()
def next_ist_run_at(run_time: time = time(9, 20), now: datetime | None = None) -> datetime:
now = now or datetime.now(IST)
target_day = now.date()
if now >= datetime.combine(target_day, run_time, tzinfo=IST):
target_day += timedelta(days=1)
target_day = next_trading_day(target_day)
return datetime.combine(target_day, run_time, tzinfo=IST)
def seconds_until_next_ist_run(run_time: time = time(9, 20)) -> float:
now = datetime.now(IST)
target = next_ist_run_at(run_time, now=now)
return max(1.0, (target - now).total_seconds())