PREDICTIONSITE / app.py
Jitendra12421's picture
Upload app.py
f500c11 verified
Raw
History Blame Contribute Delete
29.1 kB
from __future__ import annotations
import asyncio
import threading
from datetime import date, datetime, time, timedelta
import sys
from pathlib import Path
from fastapi import BackgroundTasks, HTTPException, Query, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI
from pydantic import BaseModel
sys.path.insert(0, str(Path(__file__).resolve().parent))
from nifty_backend.runtime import (
CLOSE_REFRESH_READY,
IST,
STALE_CHECK_INTERVAL_SECONDS,
TPLUS1_READY,
close_refresh_due,
dashboard_payload,
is_trading_day,
latest_saved_prediction,
latest_tplus1_prediction,
next_trading_day,
refresh_daily_data,
refresh_first5_prediction,
refresh_market_close_data,
refresh_stale_data_once,
refresh_tplus1_prediction,
seconds_until_next_ist_run,
warm_dashboard_payload_cache,
)
from kotak_neo import (
KotakNeoConfigError,
KotakNeoError,
KotakNeoSessionRequired,
kotak_neo_manager,
)
from scraper import get_stock_info
app = FastAPI(title="NIFTY 50 Forecaster Backend")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
market_status = "Waiting for next session"
close_refresh_lock = threading.Lock()
tplus1_refresh_lock = threading.Lock()
MARKET_OPEN = time(9, 15)
FIRST5_READY = time(9, 20)
MARKET_CLOSE = time(15, 30)
class TotpRequest(BaseModel):
totp: str
def refresh_market_close_data_if_due() -> dict:
if not close_refresh_due():
return {"status": "skipped", "reason": "close refresh is not due"}
if not close_refresh_lock.acquire(blocking=False):
return {"status": "skipped", "reason": "close refresh already running"}
try:
info = refresh_market_close_data()
return {"status": "refreshed", **info}
finally:
close_refresh_lock.release()
def latest_tplus1_prediction_date(payload: dict | None = None) -> date | None:
try:
latest = payload if payload is not None else latest_tplus1_prediction()
raw = latest.get("input_date")
return date.fromisoformat(str(raw)[:10]) if raw else None
except Exception:
return None
def tplus1_refresh_due(now: datetime | None = None, latest_date: date | None = None) -> bool:
now = now or datetime.now(IST)
if not is_trading_day(now.date()) or not (TPLUS1_READY <= now.time() < MARKET_CLOSE):
return False
latest_date = latest_date if latest_date is not None else latest_tplus1_prediction_date()
return latest_date != now.date()
def refresh_tplus1_if_due() -> dict:
now = datetime.now(IST)
latest_date = latest_tplus1_prediction_date()
if not tplus1_refresh_due(now=now, latest_date=latest_date):
return {"status": "skipped", "reason": "tplus1 refresh is not due"}
if not tplus1_refresh_lock.acquire(blocking=False):
return {"status": "skipped", "reason": "tplus1 refresh already running"}
try:
prediction = refresh_tplus1_prediction(session_date=now.date())
return {"status": "refreshed", "prediction": prediction}
finally:
tplus1_refresh_lock.release()
def latest_prediction_date(payload: dict | None = None) -> date | None:
try:
latest = payload if payload is not None else latest_saved_prediction()
raw = latest.get("input_date")
return date.fromisoformat(str(raw)) if raw else None
except Exception:
return None
def current_market_state(now: datetime | None = None) -> dict:
global market_status
now = now or datetime.now(IST)
today = now.date()
current_time = now.time()
trading_day = is_trading_day(today)
latest_date = latest_prediction_date()
market_is_open_for_t5 = trading_day and FIRST5_READY <= current_time < MARKET_CLOSE
market_is_open_for_tplus1 = trading_day and TPLUS1_READY <= current_time < MARKET_CLOSE
has_current_first5 = market_is_open_for_t5 and latest_date == today
tplus1_latest_date = latest_tplus1_prediction_date()
has_current_tplus1 = market_is_open_for_tplus1 and tplus1_latest_date == today
next_session = today if trading_day and current_time < MARKET_CLOSE else next_trading_day(today + timedelta(days=1))
if not trading_day:
status = "Market Closed"
detail = f"Next trading session is {next_session.isoformat()}."
elif current_time < time(9, 0):
status = "Waiting for 9:00 AM"
detail = "Market has not entered pre-open yet."
elif current_time < MARKET_OPEN:
status = "Market Pre-Open"
detail = "Market opens at 9:15 AM IST."
elif current_time < FIRST5_READY:
status = "Market Officially Opened"
detail = "Waiting for the first 5 one-minute bars."
elif current_time <= MARKET_CLOSE:
if market_status in {"Fetching T+5 Prediction Data...", "Prediction Failed"}:
status = market_status
detail = "The first-five-minute prediction job is still resolving."
elif has_current_first5:
status = "Prediction Ready"
detail = "Today's first-five-minute prediction is available."
else:
status = "Prediction Pending"
detail = "No current-session prediction has been generated yet."
else:
status = "Market Closed"
detail = "Trading session has ended."
if not trading_day:
tplus1_status = "Market Closed"
tplus1_detail = f"Next trading session is {next_session.isoformat()}."
elif current_time < TPLUS1_READY:
tplus1_status = "Waiting for 2:30 PM"
tplus1_detail = "The T+1 forecast becomes available at 2:30 PM IST."
elif current_time < MARKET_CLOSE:
if has_current_tplus1:
tplus1_status = "Ready"
tplus1_detail = "Today's T+1 prediction is available."
else:
tplus1_status = "Pending"
tplus1_detail = "No current-session T+1 prediction has been generated yet."
else:
tplus1_status = "Market Closed"
tplus1_detail = "Trading session has ended."
if not trading_day:
t5_status = "Market Closed"
t5_detail = f"Next trading session is {next_session.isoformat()}."
elif current_time < FIRST5_READY:
t5_status = "Waiting for 9:20 AM"
t5_detail = "The T+5 forecast becomes available after the first five one-minute bars."
elif current_time < MARKET_CLOSE:
if market_status in {"Fetching T+5 Prediction Data...", "Prediction Failed"}:
t5_status = market_status
t5_detail = "The first-five-minute prediction job is still resolving."
elif has_current_first5:
t5_status = "Ready"
t5_detail = "Today's first-five-minute prediction is available."
else:
t5_status = "Pending"
t5_detail = "No current-session prediction has been generated yet."
else:
t5_status = "Market Closed"
t5_detail = "Trading session has ended."
return {
"market_status": status,
"market_detail": detail,
"server_time_ist": now.isoformat(),
"is_trading_day": trading_day,
"session_date": today.isoformat(),
"next_session_date": next_session.isoformat(),
"latest_prediction_date": latest_date.isoformat() if latest_date else None,
"t5_available": has_current_first5,
"t5_status": t5_status,
"t5_detail": t5_detail,
"market_is_open_for_t5": market_is_open_for_t5,
"tplus1_available": has_current_tplus1,
"tplus1_status": tplus1_status,
"tplus1_detail": tplus1_detail,
"market_is_open_for_tplus1": market_is_open_for_tplus1,
"latest_tplus1_prediction_date": tplus1_latest_date.isoformat() if tplus1_latest_date else None,
}
def attach_market_state(payload: dict) -> dict:
state = current_market_state()
payload.setdefault("data_status", {})
payload["data_status"].update(state)
try:
payload["nifty_quote"] = kotak_neo_manager.fetch_nifty50_quote()
payload["nifty_quote_error"] = None
except KotakNeoSessionRequired as exc:
payload["nifty_quote"] = None
payload["nifty_quote_error"] = {"status": 401, "message": str(exc)}
except KotakNeoConfigError as exc:
payload["nifty_quote"] = None
payload["nifty_quote_error"] = {"status": 503, "message": str(exc)}
except KotakNeoError as exc:
payload["nifty_quote"] = None
payload["nifty_quote_error"] = {"status": 502, "message": str(exc)}
import json
import pandas as pd
from pathlib import Path
# AGGRESSIVE FALLBACK: If runtime.py fails to load it (due to path resolution issues on Hugging Face),
# we forcefully load it directly from app.py's relative path.
mfe_summary_fallback = {}
mfe_latest_fallback = {}
mfe_history_fallback = []
try:
models_dir = Path(__file__).resolve().parent / "models"
mfe_out = models_dir / "nifty_opening_mfe_regressor" / "outputs"
data_dir = Path(__file__).resolve().parent / "data"
if (mfe_out / "summary.json").exists():
mfe_summary_fallback = json.loads((mfe_out / "summary.json").read_text(encoding="utf-8"))
if (mfe_out / "latest_prediction.csv").exists():
row = pd.read_csv(mfe_out / "latest_prediction.csv").iloc[-1].to_dict()
mfe_latest_fallback = {k: (None if pd.isna(v) else v) for k, v in row.items()}
hist_records = []
import numpy as np
if (mfe_out / "test_predictions.csv").exists():
hist_df = pd.read_csv(mfe_out / "test_predictions.csv")
for _, r in hist_df.iterrows():
try:
dt = str(r["date"])
f5c = float(r["first5_close"])
pred_up = float(r["predicted_up_points"])
pred_dn = float(r["predicted_down_points"])
act_hi = float(r["day_high"])
act_lo = float(r["day_low"])
hist_records.append({
"date": dt,
"first5_close": f5c,
"predicted_up_points": pred_up,
"predicted_down_points": pred_dn,
"actual_high": act_hi,
"predicted_high": f5c + pred_up,
"actual_low": act_lo,
"predicted_low": f5c - pred_dn
})
except Exception:
continue
# Load live history if it exists
if (mfe_out / "mfe_live_history.csv").exists():
live_df = pd.read_csv(mfe_out / "mfe_live_history.csv")
daily_df = None
if (data_dir / "nifty50_1d.parquet").exists():
daily_df = pd.read_parquet(data_dir / "nifty50_1d.parquet")
daily_df["date"] = pd.to_datetime(daily_df["date"]).dt.strftime("%Y-%m-%d")
daily_df = daily_df.set_index("date")
for _, r in live_df.iterrows():
try:
dt = str(r["input_date"])
if daily_df is not None and dt in daily_df.index:
# Extract as scalar float using .iloc[0] or .item() in case of duplicates
act_hi_raw = daily_df.loc[dt, "high"]
act_lo_raw = daily_df.loc[dt, "low"]
act_hi = float(act_hi_raw.iloc[0] if isinstance(act_hi_raw, pd.Series) else act_hi_raw)
act_lo = float(act_lo_raw.iloc[0] if isinstance(act_lo_raw, pd.Series) else act_lo_raw)
f5c = float(r["first5_close"])
pred_up = float(r["predicted_up_points"])
pred_dn = float(r["predicted_down_points"])
hist_records.append({
"date": dt,
"first5_close": f5c,
"predicted_up_points": pred_up,
"predicted_down_points": pred_dn,
"actual_high": act_hi,
"predicted_high": f5c + pred_up,
"actual_low": act_lo,
"predicted_low": f5c - pred_dn
})
except Exception as ex:
print(f"Error appending live row: {ex}")
continue
mfe_history_fallback = hist_records
# Recalculate RMSE and MAE over the combined history
if hist_records:
up_errors = []
down_errors = []
for r in hist_records:
pred_up_pts = r["predicted_up_points"]
pred_dn_pts = r["predicted_down_points"]
act_up_pts = r["actual_high"] - r["first5_close"]
act_dn_pts = r["first5_close"] - r["actual_low"]
up_errors.append(act_up_pts - pred_up_pts)
down_errors.append(act_dn_pts - pred_dn_pts)
up_errors = np.array(up_errors)
down_errors = np.array(down_errors)
up_rmse = float(np.sqrt(np.mean(up_errors**2)))
up_mae = float(np.mean(np.abs(up_errors)))
down_rmse = float(np.sqrt(np.mean(down_errors**2)))
down_mae = float(np.mean(np.abs(down_errors)))
if "up" not in mfe_summary_fallback:
mfe_summary_fallback["up"] = {}
if "down" not in mfe_summary_fallback:
mfe_summary_fallback["down"] = {}
mfe_summary_fallback["up"]["test_rmse_points"] = up_rmse
mfe_summary_fallback["up"]["test_mae_points"] = up_mae
mfe_summary_fallback["down"]["test_rmse_points"] = down_rmse
mfe_summary_fallback["down"]["test_mae_points"] = down_mae
except Exception as exc:
print(f"Fallback MFE load failed: {exc}", flush=True)
t5_latest = payload.get("predictions", {}).get("t5", {}).get("latest") or payload.get("latest") or {}
tomorrow_latest = payload.get("predictions", {}).get("tomorrow", {}).get("latest") or payload.get("tomorrow_latest") or {}
tplus1_latest = payload.get("predictions", {}).get("tplus1", {}).get("latest") or payload.get("tplus1_latest") or {}
mfe_latest = payload.get("predictions", {}).get("mfe", {}).get("latest") or mfe_latest_fallback
mfe_summary = payload.get("predictions", {}).get("mfe", {}).get("summary") or mfe_summary_fallback
mfe_history = payload.get("predictions", {}).get("mfe", {}).get("history") or mfe_history_fallback
t5_available = bool(state["t5_available"] and t5_latest.get("prediction"))
tplus1_available = bool(state["tplus1_available"] and tplus1_latest.get("prediction"))
tomorrow_available = bool(tomorrow_latest.get("prediction"))
refresh_phase = payload.get("data_status", {}).get("refresh_phase")
if refresh_phase in {"waiting_second_payload", "refreshing"}:
tomorrow_status = "WAITING FOR SECOND PAYLOAD"
tomorrow_reason = "Market close refresh is generating the next-session payload."
else:
tomorrow_status = "Ready" if tomorrow_available else "Pending"
tomorrow_reason = None if tomorrow_available else "No saved next-session signal is available."
payload["predictions"] = {
"tomorrow": {
"available": tomorrow_available,
"status": tomorrow_status,
"reason": tomorrow_reason,
"target_date": tomorrow_latest.get("target_date") or state["next_session_date"],
"input_date": tomorrow_latest.get("input_date"),
"prediction": tomorrow_latest.get("prediction") if tomorrow_available else None,
"prob_up": tomorrow_latest.get("prob_up") if tomorrow_available else None,
"confidence": tomorrow_latest.get("confidence") if tomorrow_available else None,
"threshold": tomorrow_latest.get("threshold") if tomorrow_available else None,
"model_name": tomorrow_latest.get("model_name"),
"source_model": tomorrow_latest.get("source_model"),
"validation_accuracy": tomorrow_latest.get("validation_accuracy"),
"test_accuracy": tomorrow_latest.get("test_accuracy"),
},
"t5": {
"available": t5_available,
"status": "Ready" if t5_available else state["t5_status"],
"reason": None if t5_available else state["t5_detail"],
"input_date": t5_latest.get("input_date"),
"prediction": t5_latest.get("prediction") if t5_available else None,
"prob_up": t5_latest.get("prob_up") if t5_available else None,
"confidence": t5_latest.get("confidence") if t5_available else None,
"threshold": t5_latest.get("threshold") if t5_available else None,
"is_overridden": bool(t5_latest.get("is_overridden")) if t5_available else False,
"model_name": t5_latest.get("model_name"),
"validation_accuracy": (payload.get("summary") or {}).get("validation_accuracy"),
"test_accuracy": (payload.get("summary") or {}).get("test_accuracy"),
},
"tplus1": {
"available": tplus1_available,
"status": "Ready" if tplus1_available else state["tplus1_status"],
"reason": None if tplus1_available else state["tplus1_detail"],
"target_date": tplus1_latest.get("target_date") or state["next_session_date"],
"input_date": tplus1_latest.get("input_date"),
"prediction": tplus1_latest.get("prediction") if tplus1_available else None,
"prob_up": tplus1_latest.get("prob_up") if tplus1_available else None,
"confidence": tplus1_latest.get("confidence") if tplus1_available else None,
"threshold": tplus1_latest.get("threshold") if tplus1_available else None,
"is_overridden": bool(tplus1_latest.get("overlay_changed")) if tplus1_available else False,
"model_name": tplus1_latest.get("model_name"),
"validation_accuracy": (payload.get("tplus1_summary") or {}).get("validation_accuracy"),
"test_accuracy": (payload.get("tplus1_summary") or {}).get("test_accuracy"),
},
"mfe": {
"available": t5_available,
"status": "Ready" if t5_available else state["t5_status"],
"reason": None if t5_available else state["t5_detail"],
"latest": mfe_latest,
"summary": mfe_summary,
"history": mfe_history,
},
}
return payload
async def daily_ist_refresh_loop() -> None:
global market_status
while True:
# Wait until 9:00 AM IST
await asyncio.sleep(seconds_until_next_ist_run(time(9, 0)))
if not is_trading_day(datetime.now(IST).date()):
market_status = "Market Closed"
continue
market_status = "Market Pre-Open"
print("[scheduler] 9:00 AM IST - Market Pre-Open", flush=True)
# Wait until 9:15 AM IST
await asyncio.sleep(seconds_until_next_ist_run(time(9, 15)))
market_status = "Market Officially Opened"
print("[scheduler] 9:15 AM IST - Market Officially Opened", flush=True)
# Wait until 9:20 AM IST
await asyncio.sleep(seconds_until_next_ist_run(time(9, 20)))
market_status = "Fetching T+5 Prediction Data..."
print("[scheduler] 9:20 AM IST - Fetching Data", flush=True)
try:
await asyncio.to_thread(refresh_first5_prediction)
market_status = "Prediction Ready"
except Exception as exc:
print(f"[scheduler] first5 refresh failed: {exc}", flush=True)
market_status = "Prediction Failed"
try:
await asyncio.to_thread(refresh_daily_data)
except Exception as exc:
print(f"[scheduler] daily refresh failed: {exc}", flush=True)
await asyncio.sleep(seconds_until_next_ist_run(TPLUS1_READY))
print("[scheduler] 2:30 PM IST - Refreshing T+1 prediction", flush=True)
try:
info = await asyncio.to_thread(refresh_tplus1_if_due)
print(f"[scheduler] tplus1 refresh result: {info}", flush=True)
except Exception as exc:
print(f"[scheduler] tplus1 refresh failed: {exc}", flush=True)
await asyncio.sleep(seconds_until_next_ist_run(CLOSE_REFRESH_READY))
print("[scheduler] 3:45 PM IST - Refreshing close data", flush=True)
try:
info = await asyncio.to_thread(refresh_market_close_data_if_due)
print(f"[scheduler] close refresh result: {info}", flush=True)
except Exception as exc:
print(f"[scheduler] close refresh failed: {exc}", flush=True)
async def refresh_current_session_once() -> None:
global market_status
now = datetime.now(IST)
if not is_trading_day(now.date()) or now.time() < FIRST5_READY:
return
if latest_prediction_date() == now.date():
return
market_status = "Fetching T+5 Prediction Data..."
print("[startup] Current session needs first-five refresh; fetching now.", flush=True)
try:
await asyncio.to_thread(refresh_first5_prediction)
market_status = "Prediction Ready"
except Exception as exc:
print(f"[startup] first5 refresh failed: {exc}", flush=True)
market_status = "Prediction Failed"
try:
await asyncio.to_thread(refresh_daily_data)
except Exception as exc:
print(f"[startup] daily refresh failed: {exc}", flush=True)
async def refresh_market_close_once_if_due() -> None:
try:
info = await asyncio.to_thread(refresh_market_close_data_if_due)
if info.get("status") == "refreshed":
print(f"[startup] close refresh result: {info}", flush=True)
except Exception as exc:
print(f"[startup] close refresh failed: {exc}", flush=True)
async def refresh_tplus1_once_if_due() -> None:
try:
info = await asyncio.to_thread(refresh_tplus1_if_due)
if info.get("status") == "refreshed":
print(f"[startup] tplus1 refresh result: {info}", flush=True)
except Exception as exc:
print(f"[startup] tplus1 refresh failed: {exc}", flush=True)
async def warm_dashboard_payload_cache_once() -> None:
try:
await asyncio.to_thread(warm_dashboard_payload_cache)
except Exception as exc:
print(f"[startup] dashboard payload warmup failed: {exc}", flush=True)
async def stale_data_watch_loop() -> None:
while True:
try:
info = await asyncio.to_thread(refresh_stale_data_once)
if info.get("status") == "refreshed":
print(f"[stale-watch] refreshed stale data: {info}", flush=True)
except Exception as exc:
print(f"[stale-watch] stale refresh failed: {exc}", flush=True)
await asyncio.sleep(STALE_CHECK_INTERVAL_SECONDS)
@app.on_event("startup")
async def start_scheduler() -> None:
global market_status
# Initialize correct status on startup based on current time
now = datetime.now(IST).time()
today = datetime.now(IST).date()
if not is_trading_day(today):
market_status = "Market Closed"
elif now < time(9, 0):
market_status = "Waiting for 9:00 AM"
elif now < time(9, 15):
market_status = "Market Pre-Open"
elif now < time(9, 20):
market_status = "Market Officially Opened"
elif latest_prediction_date() == today:
market_status = "Prediction Ready"
else:
market_status = "Prediction Pending"
asyncio.create_task(refresh_current_session_once())
asyncio.create_task(refresh_tplus1_once_if_due())
asyncio.create_task(refresh_market_close_once_if_due())
asyncio.create_task(warm_dashboard_payload_cache_once())
asyncio.create_task(stale_data_watch_loop())
asyncio.create_task(daily_ist_refresh_loop())
@app.get("/health")
def health() -> dict[str, str]:
return {"status": "ok"}
@app.get("/")
def root() -> dict[str, str]:
return {"service": "NIFTY 50 Forecaster Backend", "status": "ok"}
@app.get("/dashboard")
def dashboard(response: Response) -> dict:
response.headers["Cache-Control"] = "no-store, no-cache, must-revalidate"
response.headers["Pragma"] = "no-cache"
try:
refresh_stale_data_once()
except Exception as exc:
print(f"[dashboard] stale refresh failed: {exc}", flush=True)
return attach_market_state(dashboard_payload())
@app.get("/kotak/status")
def kotak_status() -> dict:
return kotak_neo_manager.status()
@app.post("/kotak/auth/totp")
def kotak_auth_totp(payload: TotpRequest) -> dict:
try:
return kotak_neo_manager.authenticate_with_totp(payload.totp)
except KotakNeoConfigError as exc:
raise HTTPException(status_code=503, detail=str(exc)) from exc
except KotakNeoError as exc:
raise HTTPException(status_code=400, detail=str(exc)) from exc
@app.get("/kotak/account")
def kotak_account() -> dict:
try:
return kotak_neo_manager.fetch_account_snapshot()
except KotakNeoConfigError as exc:
raise HTTPException(status_code=503, detail=str(exc)) from exc
except KotakNeoSessionRequired as exc:
raise HTTPException(status_code=401, detail=str(exc)) from exc
except KotakNeoError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
@app.get("/kotak/quote/nifty50")
def kotak_nifty50_quote() -> dict:
try:
return kotak_neo_manager.fetch_nifty50_quote()
except KotakNeoConfigError as exc:
raise HTTPException(status_code=503, detail=str(exc)) from exc
except KotakNeoSessionRequired as exc:
raise HTTPException(status_code=401, detail=str(exc)) from exc
except KotakNeoError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
@app.get("/kotak/activity-log")
def kotak_activity_log() -> dict:
try:
snapshot = kotak_neo_manager.fetch_account_snapshot()
return {
"activity_log": snapshot.get("activity_log", {}),
"trade_history": snapshot.get("trade_history", []),
"order_book": snapshot.get("order_book", []),
}
except KotakNeoConfigError as exc:
raise HTTPException(status_code=503, detail=str(exc)) from exc
except KotakNeoSessionRequired as exc:
raise HTTPException(status_code=401, detail=str(exc)) from exc
except KotakNeoError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
@app.get("/cron/keepalive")
def cron_keepalive(background_tasks: BackgroundTasks) -> dict:
close_refresh = {"status": "not_checked"}
tplus1_refresh = {"status": "not_checked"}
if tplus1_refresh_due():
background_tasks.add_task(refresh_tplus1_if_due)
tplus1_refresh = {"status": "scheduled"}
if close_refresh_due():
background_tasks.add_task(refresh_market_close_data_if_due)
close_refresh = {"status": "scheduled"}
return {
"status": "awake",
"market": current_market_state(),
"tplus1_refresh": tplus1_refresh,
"close_refresh": close_refresh,
}
@app.get("/prediction/latest")
def prediction_latest() -> dict:
return latest_saved_prediction()
@app.post("/prediction/refresh-first5")
def prediction_refresh_first5(
session_date: date | None = Query(default=None, description="Optional YYYY-MM-DD session date in IST."),
) -> dict:
prediction = refresh_first5_prediction(session_date=session_date)
return prediction.to_dict()
@app.post("/data/refresh-daily")
def data_refresh_daily() -> dict:
return refresh_daily_data()
@app.post("/data/refresh-market-close")
def data_refresh_market_close(
session_date: date | None = Query(default=None, description="Optional YYYY-MM-DD session date in IST."),
) -> dict:
return refresh_market_close_data(session_date=session_date)
@app.get("/info/{ticker}")
def stock_info(ticker: str) -> dict:
data = get_stock_info(ticker)
if "error" in data:
raise HTTPException(status_code=404, detail=data["error"])
return data