from __future__ import annotations import asyncio import threading from datetime import date, datetime, time, timedelta import sys from pathlib import Path from fastapi import BackgroundTasks, HTTPException, Query, Response from fastapi.middleware.cors import CORSMiddleware from fastapi import FastAPI from pydantic import BaseModel sys.path.insert(0, str(Path(__file__).resolve().parent)) from nifty_backend.runtime import ( CLOSE_REFRESH_READY, IST, STALE_CHECK_INTERVAL_SECONDS, TPLUS1_READY, close_refresh_due, dashboard_payload, is_trading_day, latest_saved_prediction, latest_tplus1_prediction, next_trading_day, refresh_daily_data, refresh_first5_prediction, refresh_market_close_data, refresh_stale_data_once, refresh_tplus1_prediction, seconds_until_next_ist_run, warm_dashboard_payload_cache, ) from kotak_neo import ( KotakNeoConfigError, KotakNeoError, KotakNeoSessionRequired, kotak_neo_manager, ) from scraper import get_stock_info app = FastAPI(title="NIFTY 50 Forecaster Backend") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=False, allow_methods=["*"], allow_headers=["*"], ) market_status = "Waiting for next session" close_refresh_lock = threading.Lock() tplus1_refresh_lock = threading.Lock() MARKET_OPEN = time(9, 15) FIRST5_READY = time(9, 20) MARKET_CLOSE = time(15, 30) class TotpRequest(BaseModel): totp: str def refresh_market_close_data_if_due() -> dict: if not close_refresh_due(): return {"status": "skipped", "reason": "close refresh is not due"} if not close_refresh_lock.acquire(blocking=False): return {"status": "skipped", "reason": "close refresh already running"} try: info = refresh_market_close_data() return {"status": "refreshed", **info} finally: close_refresh_lock.release() def latest_tplus1_prediction_date(payload: dict | None = None) -> date | None: try: latest = payload if payload is not None else latest_tplus1_prediction() raw = latest.get("input_date") return date.fromisoformat(str(raw)[:10]) if raw else None except Exception: return None def tplus1_refresh_due(now: datetime | None = None, latest_date: date | None = None) -> bool: now = now or datetime.now(IST) if not is_trading_day(now.date()) or not (TPLUS1_READY <= now.time() < MARKET_CLOSE): return False latest_date = latest_date if latest_date is not None else latest_tplus1_prediction_date() return latest_date != now.date() def refresh_tplus1_if_due() -> dict: now = datetime.now(IST) latest_date = latest_tplus1_prediction_date() if not tplus1_refresh_due(now=now, latest_date=latest_date): return {"status": "skipped", "reason": "tplus1 refresh is not due"} if not tplus1_refresh_lock.acquire(blocking=False): return {"status": "skipped", "reason": "tplus1 refresh already running"} try: prediction = refresh_tplus1_prediction(session_date=now.date()) return {"status": "refreshed", "prediction": prediction} finally: tplus1_refresh_lock.release() def latest_prediction_date(payload: dict | None = None) -> date | None: try: latest = payload if payload is not None else latest_saved_prediction() raw = latest.get("input_date") return date.fromisoformat(str(raw)) if raw else None except Exception: return None def current_market_state(now: datetime | None = None) -> dict: global market_status now = now or datetime.now(IST) today = now.date() current_time = now.time() trading_day = is_trading_day(today) latest_date = latest_prediction_date() market_is_open_for_t5 = trading_day and FIRST5_READY <= current_time < MARKET_CLOSE market_is_open_for_tplus1 = trading_day and TPLUS1_READY <= current_time < MARKET_CLOSE has_current_first5 = market_is_open_for_t5 and latest_date == today tplus1_latest_date = latest_tplus1_prediction_date() has_current_tplus1 = market_is_open_for_tplus1 and tplus1_latest_date == today next_session = today if trading_day and current_time < MARKET_CLOSE else next_trading_day(today + timedelta(days=1)) if not trading_day: status = "Market Closed" detail = f"Next trading session is {next_session.isoformat()}." elif current_time < time(9, 0): status = "Waiting for 9:00 AM" detail = "Market has not entered pre-open yet." elif current_time < MARKET_OPEN: status = "Market Pre-Open" detail = "Market opens at 9:15 AM IST." elif current_time < FIRST5_READY: status = "Market Officially Opened" detail = "Waiting for the first 5 one-minute bars." elif current_time <= MARKET_CLOSE: if market_status in {"Fetching T+5 Prediction Data...", "Prediction Failed"}: status = market_status detail = "The first-five-minute prediction job is still resolving." elif has_current_first5: status = "Prediction Ready" detail = "Today's first-five-minute prediction is available." else: status = "Prediction Pending" detail = "No current-session prediction has been generated yet." else: status = "Market Closed" detail = "Trading session has ended." if not trading_day: tplus1_status = "Market Closed" tplus1_detail = f"Next trading session is {next_session.isoformat()}." elif current_time < TPLUS1_READY: tplus1_status = "Waiting for 2:30 PM" tplus1_detail = "The T+1 forecast becomes available at 2:30 PM IST." elif current_time < MARKET_CLOSE: if has_current_tplus1: tplus1_status = "Ready" tplus1_detail = "Today's T+1 prediction is available." else: tplus1_status = "Pending" tplus1_detail = "No current-session T+1 prediction has been generated yet." else: tplus1_status = "Market Closed" tplus1_detail = "Trading session has ended." if not trading_day: t5_status = "Market Closed" t5_detail = f"Next trading session is {next_session.isoformat()}." elif current_time < FIRST5_READY: t5_status = "Waiting for 9:20 AM" t5_detail = "The T+5 forecast becomes available after the first five one-minute bars." elif current_time < MARKET_CLOSE: if market_status in {"Fetching T+5 Prediction Data...", "Prediction Failed"}: t5_status = market_status t5_detail = "The first-five-minute prediction job is still resolving." elif has_current_first5: t5_status = "Ready" t5_detail = "Today's first-five-minute prediction is available." else: t5_status = "Pending" t5_detail = "No current-session prediction has been generated yet." else: t5_status = "Market Closed" t5_detail = "Trading session has ended." return { "market_status": status, "market_detail": detail, "server_time_ist": now.isoformat(), "is_trading_day": trading_day, "session_date": today.isoformat(), "next_session_date": next_session.isoformat(), "latest_prediction_date": latest_date.isoformat() if latest_date else None, "t5_available": has_current_first5, "t5_status": t5_status, "t5_detail": t5_detail, "market_is_open_for_t5": market_is_open_for_t5, "tplus1_available": has_current_tplus1, "tplus1_status": tplus1_status, "tplus1_detail": tplus1_detail, "market_is_open_for_tplus1": market_is_open_for_tplus1, "latest_tplus1_prediction_date": tplus1_latest_date.isoformat() if tplus1_latest_date else None, } def attach_market_state(payload: dict) -> dict: state = current_market_state() payload.setdefault("data_status", {}) payload["data_status"].update(state) try: payload["nifty_quote"] = kotak_neo_manager.fetch_nifty50_quote() payload["nifty_quote_error"] = None except KotakNeoSessionRequired as exc: payload["nifty_quote"] = None payload["nifty_quote_error"] = {"status": 401, "message": str(exc)} except KotakNeoConfigError as exc: payload["nifty_quote"] = None payload["nifty_quote_error"] = {"status": 503, "message": str(exc)} except KotakNeoError as exc: payload["nifty_quote"] = None payload["nifty_quote_error"] = {"status": 502, "message": str(exc)} import json import pandas as pd from pathlib import Path # AGGRESSIVE FALLBACK: If runtime.py fails to load it (due to path resolution issues on Hugging Face), # we forcefully load it directly from app.py's relative path. mfe_summary_fallback = {} mfe_latest_fallback = {} mfe_history_fallback = [] try: models_dir = Path(__file__).resolve().parent / "models" mfe_out = models_dir / "nifty_opening_mfe_regressor" / "outputs" data_dir = Path(__file__).resolve().parent / "data" if (mfe_out / "summary.json").exists(): mfe_summary_fallback = json.loads((mfe_out / "summary.json").read_text(encoding="utf-8")) if (mfe_out / "latest_prediction.csv").exists(): row = pd.read_csv(mfe_out / "latest_prediction.csv").iloc[-1].to_dict() mfe_latest_fallback = {k: (None if pd.isna(v) else v) for k, v in row.items()} hist_records = [] import numpy as np if (mfe_out / "test_predictions.csv").exists(): hist_df = pd.read_csv(mfe_out / "test_predictions.csv") for _, r in hist_df.iterrows(): try: dt = str(r["date"]) f5c = float(r["first5_close"]) pred_up = float(r["predicted_up_points"]) pred_dn = float(r["predicted_down_points"]) act_hi = float(r["day_high"]) act_lo = float(r["day_low"]) hist_records.append({ "date": dt, "first5_close": f5c, "predicted_up_points": pred_up, "predicted_down_points": pred_dn, "actual_high": act_hi, "predicted_high": f5c + pred_up, "actual_low": act_lo, "predicted_low": f5c - pred_dn }) except Exception: continue # Load live history if it exists if (mfe_out / "mfe_live_history.csv").exists(): live_df = pd.read_csv(mfe_out / "mfe_live_history.csv") daily_df = None if (data_dir / "nifty50_1d.parquet").exists(): daily_df = pd.read_parquet(data_dir / "nifty50_1d.parquet") daily_df["date"] = pd.to_datetime(daily_df["date"]).dt.strftime("%Y-%m-%d") daily_df = daily_df.set_index("date") for _, r in live_df.iterrows(): try: dt = str(r["input_date"]) if daily_df is not None and dt in daily_df.index: # Extract as scalar float using .iloc[0] or .item() in case of duplicates act_hi_raw = daily_df.loc[dt, "high"] act_lo_raw = daily_df.loc[dt, "low"] act_hi = float(act_hi_raw.iloc[0] if isinstance(act_hi_raw, pd.Series) else act_hi_raw) act_lo = float(act_lo_raw.iloc[0] if isinstance(act_lo_raw, pd.Series) else act_lo_raw) f5c = float(r["first5_close"]) pred_up = float(r["predicted_up_points"]) pred_dn = float(r["predicted_down_points"]) hist_records.append({ "date": dt, "first5_close": f5c, "predicted_up_points": pred_up, "predicted_down_points": pred_dn, "actual_high": act_hi, "predicted_high": f5c + pred_up, "actual_low": act_lo, "predicted_low": f5c - pred_dn }) except Exception as ex: print(f"Error appending live row: {ex}") continue mfe_history_fallback = hist_records # Recalculate RMSE and MAE over the combined history if hist_records: up_errors = [] down_errors = [] for r in hist_records: pred_up_pts = r["predicted_up_points"] pred_dn_pts = r["predicted_down_points"] act_up_pts = r["actual_high"] - r["first5_close"] act_dn_pts = r["first5_close"] - r["actual_low"] up_errors.append(act_up_pts - pred_up_pts) down_errors.append(act_dn_pts - pred_dn_pts) up_errors = np.array(up_errors) down_errors = np.array(down_errors) up_rmse = float(np.sqrt(np.mean(up_errors**2))) up_mae = float(np.mean(np.abs(up_errors))) down_rmse = float(np.sqrt(np.mean(down_errors**2))) down_mae = float(np.mean(np.abs(down_errors))) if "up" not in mfe_summary_fallback: mfe_summary_fallback["up"] = {} if "down" not in mfe_summary_fallback: mfe_summary_fallback["down"] = {} mfe_summary_fallback["up"]["test_rmse_points"] = up_rmse mfe_summary_fallback["up"]["test_mae_points"] = up_mae mfe_summary_fallback["down"]["test_rmse_points"] = down_rmse mfe_summary_fallback["down"]["test_mae_points"] = down_mae except Exception as exc: print(f"Fallback MFE load failed: {exc}", flush=True) t5_latest = payload.get("predictions", {}).get("t5", {}).get("latest") or payload.get("latest") or {} tomorrow_latest = payload.get("predictions", {}).get("tomorrow", {}).get("latest") or payload.get("tomorrow_latest") or {} tplus1_latest = payload.get("predictions", {}).get("tplus1", {}).get("latest") or payload.get("tplus1_latest") or {} mfe_latest = payload.get("predictions", {}).get("mfe", {}).get("latest") or mfe_latest_fallback mfe_summary = payload.get("predictions", {}).get("mfe", {}).get("summary") or mfe_summary_fallback mfe_history = payload.get("predictions", {}).get("mfe", {}).get("history") or mfe_history_fallback t5_available = bool(state["t5_available"] and t5_latest.get("prediction")) tplus1_available = bool(state["tplus1_available"] and tplus1_latest.get("prediction")) tomorrow_available = bool(tomorrow_latest.get("prediction")) refresh_phase = payload.get("data_status", {}).get("refresh_phase") if refresh_phase in {"waiting_second_payload", "refreshing"}: tomorrow_status = "WAITING FOR SECOND PAYLOAD" tomorrow_reason = "Market close refresh is generating the next-session payload." else: tomorrow_status = "Ready" if tomorrow_available else "Pending" tomorrow_reason = None if tomorrow_available else "No saved next-session signal is available." payload["predictions"] = { "tomorrow": { "available": tomorrow_available, "status": tomorrow_status, "reason": tomorrow_reason, "target_date": tomorrow_latest.get("target_date") or state["next_session_date"], "input_date": tomorrow_latest.get("input_date"), "prediction": tomorrow_latest.get("prediction") if tomorrow_available else None, "prob_up": tomorrow_latest.get("prob_up") if tomorrow_available else None, "confidence": tomorrow_latest.get("confidence") if tomorrow_available else None, "threshold": tomorrow_latest.get("threshold") if tomorrow_available else None, "model_name": tomorrow_latest.get("model_name"), "source_model": tomorrow_latest.get("source_model"), "validation_accuracy": tomorrow_latest.get("validation_accuracy"), "test_accuracy": tomorrow_latest.get("test_accuracy"), }, "t5": { "available": t5_available, "status": "Ready" if t5_available else state["t5_status"], "reason": None if t5_available else state["t5_detail"], "input_date": t5_latest.get("input_date"), "prediction": t5_latest.get("prediction") if t5_available else None, "prob_up": t5_latest.get("prob_up") if t5_available else None, "confidence": t5_latest.get("confidence") if t5_available else None, "threshold": t5_latest.get("threshold") if t5_available else None, "is_overridden": bool(t5_latest.get("is_overridden")) if t5_available else False, "model_name": t5_latest.get("model_name"), "validation_accuracy": (payload.get("summary") or {}).get("validation_accuracy"), "test_accuracy": (payload.get("summary") or {}).get("test_accuracy"), }, "tplus1": { "available": tplus1_available, "status": "Ready" if tplus1_available else state["tplus1_status"], "reason": None if tplus1_available else state["tplus1_detail"], "target_date": tplus1_latest.get("target_date") or state["next_session_date"], "input_date": tplus1_latest.get("input_date"), "prediction": tplus1_latest.get("prediction") if tplus1_available else None, "prob_up": tplus1_latest.get("prob_up") if tplus1_available else None, "confidence": tplus1_latest.get("confidence") if tplus1_available else None, "threshold": tplus1_latest.get("threshold") if tplus1_available else None, "is_overridden": bool(tplus1_latest.get("overlay_changed")) if tplus1_available else False, "model_name": tplus1_latest.get("model_name"), "validation_accuracy": (payload.get("tplus1_summary") or {}).get("validation_accuracy"), "test_accuracy": (payload.get("tplus1_summary") or {}).get("test_accuracy"), }, "mfe": { "available": t5_available, "status": "Ready" if t5_available else state["t5_status"], "reason": None if t5_available else state["t5_detail"], "latest": mfe_latest, "summary": mfe_summary, "history": mfe_history, }, } return payload async def daily_ist_refresh_loop() -> None: global market_status while True: # Wait until 9:00 AM IST await asyncio.sleep(seconds_until_next_ist_run(time(9, 0))) if not is_trading_day(datetime.now(IST).date()): market_status = "Market Closed" continue market_status = "Market Pre-Open" print("[scheduler] 9:00 AM IST - Market Pre-Open", flush=True) # Wait until 9:15 AM IST await asyncio.sleep(seconds_until_next_ist_run(time(9, 15))) market_status = "Market Officially Opened" print("[scheduler] 9:15 AM IST - Market Officially Opened", flush=True) # Wait until 9:20 AM IST await asyncio.sleep(seconds_until_next_ist_run(time(9, 20))) market_status = "Fetching T+5 Prediction Data..." print("[scheduler] 9:20 AM IST - Fetching Data", flush=True) try: await asyncio.to_thread(refresh_first5_prediction) market_status = "Prediction Ready" except Exception as exc: print(f"[scheduler] first5 refresh failed: {exc}", flush=True) market_status = "Prediction Failed" try: await asyncio.to_thread(refresh_daily_data) except Exception as exc: print(f"[scheduler] daily refresh failed: {exc}", flush=True) await asyncio.sleep(seconds_until_next_ist_run(TPLUS1_READY)) print("[scheduler] 2:30 PM IST - Refreshing T+1 prediction", flush=True) try: info = await asyncio.to_thread(refresh_tplus1_if_due) print(f"[scheduler] tplus1 refresh result: {info}", flush=True) except Exception as exc: print(f"[scheduler] tplus1 refresh failed: {exc}", flush=True) await asyncio.sleep(seconds_until_next_ist_run(CLOSE_REFRESH_READY)) print("[scheduler] 3:45 PM IST - Refreshing close data", flush=True) try: info = await asyncio.to_thread(refresh_market_close_data_if_due) print(f"[scheduler] close refresh result: {info}", flush=True) except Exception as exc: print(f"[scheduler] close refresh failed: {exc}", flush=True) async def refresh_current_session_once() -> None: global market_status now = datetime.now(IST) if not is_trading_day(now.date()) or now.time() < FIRST5_READY: return if latest_prediction_date() == now.date(): return market_status = "Fetching T+5 Prediction Data..." print("[startup] Current session needs first-five refresh; fetching now.", flush=True) try: await asyncio.to_thread(refresh_first5_prediction) market_status = "Prediction Ready" except Exception as exc: print(f"[startup] first5 refresh failed: {exc}", flush=True) market_status = "Prediction Failed" try: await asyncio.to_thread(refresh_daily_data) except Exception as exc: print(f"[startup] daily refresh failed: {exc}", flush=True) async def refresh_market_close_once_if_due() -> None: try: info = await asyncio.to_thread(refresh_market_close_data_if_due) if info.get("status") == "refreshed": print(f"[startup] close refresh result: {info}", flush=True) except Exception as exc: print(f"[startup] close refresh failed: {exc}", flush=True) async def refresh_tplus1_once_if_due() -> None: try: info = await asyncio.to_thread(refresh_tplus1_if_due) if info.get("status") == "refreshed": print(f"[startup] tplus1 refresh result: {info}", flush=True) except Exception as exc: print(f"[startup] tplus1 refresh failed: {exc}", flush=True) async def warm_dashboard_payload_cache_once() -> None: try: await asyncio.to_thread(warm_dashboard_payload_cache) except Exception as exc: print(f"[startup] dashboard payload warmup failed: {exc}", flush=True) async def stale_data_watch_loop() -> None: while True: try: info = await asyncio.to_thread(refresh_stale_data_once) if info.get("status") == "refreshed": print(f"[stale-watch] refreshed stale data: {info}", flush=True) except Exception as exc: print(f"[stale-watch] stale refresh failed: {exc}", flush=True) await asyncio.sleep(STALE_CHECK_INTERVAL_SECONDS) @app.on_event("startup") async def start_scheduler() -> None: global market_status # Initialize correct status on startup based on current time now = datetime.now(IST).time() today = datetime.now(IST).date() if not is_trading_day(today): market_status = "Market Closed" elif now < time(9, 0): market_status = "Waiting for 9:00 AM" elif now < time(9, 15): market_status = "Market Pre-Open" elif now < time(9, 20): market_status = "Market Officially Opened" elif latest_prediction_date() == today: market_status = "Prediction Ready" else: market_status = "Prediction Pending" asyncio.create_task(refresh_current_session_once()) asyncio.create_task(refresh_tplus1_once_if_due()) asyncio.create_task(refresh_market_close_once_if_due()) asyncio.create_task(warm_dashboard_payload_cache_once()) asyncio.create_task(stale_data_watch_loop()) asyncio.create_task(daily_ist_refresh_loop()) @app.get("/health") def health() -> dict[str, str]: return {"status": "ok"} @app.get("/") def root() -> dict[str, str]: return {"service": "NIFTY 50 Forecaster Backend", "status": "ok"} @app.get("/dashboard") def dashboard(response: Response) -> dict: response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate" response.headers["Pragma"] = "no-cache" try: refresh_stale_data_once() except Exception as exc: print(f"[dashboard] stale refresh failed: {exc}", flush=True) return attach_market_state(dashboard_payload()) @app.get("/kotak/status") def kotak_status() -> dict: return kotak_neo_manager.status() @app.post("/kotak/auth/totp") def kotak_auth_totp(payload: TotpRequest) -> dict: try: return kotak_neo_manager.authenticate_with_totp(payload.totp) except KotakNeoConfigError as exc: raise HTTPException(status_code=503, detail=str(exc)) from exc except KotakNeoError as exc: raise HTTPException(status_code=400, detail=str(exc)) from exc @app.get("/kotak/account") def kotak_account() -> dict: try: return kotak_neo_manager.fetch_account_snapshot() except KotakNeoConfigError as exc: raise HTTPException(status_code=503, detail=str(exc)) from exc except KotakNeoSessionRequired as exc: raise HTTPException(status_code=401, detail=str(exc)) from exc except KotakNeoError as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc @app.get("/kotak/quote/nifty50") def kotak_nifty50_quote() -> dict: try: return kotak_neo_manager.fetch_nifty50_quote() except KotakNeoConfigError as exc: raise HTTPException(status_code=503, detail=str(exc)) from exc except KotakNeoSessionRequired as exc: raise HTTPException(status_code=401, detail=str(exc)) from exc except KotakNeoError as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc @app.get("/kotak/activity-log") def kotak_activity_log() -> dict: try: snapshot = kotak_neo_manager.fetch_account_snapshot() return { "activity_log": snapshot.get("activity_log", {}), "trade_history": snapshot.get("trade_history", []), "order_book": snapshot.get("order_book", []), } except KotakNeoConfigError as exc: raise HTTPException(status_code=503, detail=str(exc)) from exc except KotakNeoSessionRequired as exc: raise HTTPException(status_code=401, detail=str(exc)) from exc except KotakNeoError as exc: raise HTTPException(status_code=502, detail=str(exc)) from exc @app.get("/cron/keepalive") def cron_keepalive(background_tasks: BackgroundTasks) -> dict: close_refresh = {"status": "not_checked"} tplus1_refresh = {"status": "not_checked"} if tplus1_refresh_due(): background_tasks.add_task(refresh_tplus1_if_due) tplus1_refresh = {"status": "scheduled"} if close_refresh_due(): background_tasks.add_task(refresh_market_close_data_if_due) close_refresh = {"status": "scheduled"} return { "status": "awake", "market": current_market_state(), "tplus1_refresh": tplus1_refresh, "close_refresh": close_refresh, } @app.get("/prediction/latest") def prediction_latest() -> dict: return latest_saved_prediction() @app.post("/prediction/refresh-first5") def prediction_refresh_first5( session_date: date | None = Query(default=None, description="Optional YYYY-MM-DD session date in IST."), ) -> dict: prediction = refresh_first5_prediction(session_date=session_date) return prediction.to_dict() @app.post("/data/refresh-daily") def data_refresh_daily() -> dict: return refresh_daily_data() @app.post("/data/refresh-market-close") def data_refresh_market_close( session_date: date | None = Query(default=None, description="Optional YYYY-MM-DD session date in IST."), ) -> dict: return refresh_market_close_data(session_date=session_date) @app.get("/info/{ticker}") def stock_info(ticker: str) -> dict: data = get_stock_info(ticker) if "error" in data: raise HTTPException(status_code=404, detail=data["error"]) return data