from __future__ import annotations import sys import time from datetime import date, datetime, time as dt_time from pathlib import Path from zoneinfo import ZoneInfo sys.path.insert(0, str(Path(__file__).resolve().parents[1])) from nifty_backend.runtime import ( CLOSE_REFRESH_READY, STALE_CHECK_INTERVAL_SECONDS, is_trading_day, latest_saved_prediction, close_refresh_due, refresh_market_close_data, refresh_daily_data, refresh_first5_prediction, refresh_stale_data_once, seconds_until_next_ist_run, ) IST = ZoneInfo("Asia/Kolkata") FIRST5_READY = dt_time(9, 20) def latest_prediction_date() -> date | None: try: raw = latest_saved_prediction().get("input_date") return date.fromisoformat(str(raw)) if raw else None except Exception: return None def refresh_if_current_session_is_ready() -> None: now = datetime.now(IST) if not is_trading_day(now.date()) or now.time() < FIRST5_READY: return if latest_prediction_date() == now.date(): return prediction = refresh_first5_prediction() print(f"[scheduler] first5 prediction refreshed: {prediction.to_dict()}") info = refresh_daily_data() print(f"[scheduler] daily data refreshed: {info}") def refresh_close_data_if_due() -> None: if not close_refresh_due(): return info = refresh_market_close_data() print(f"[scheduler] close data refreshed: {info}") def refresh_stale_data_if_due() -> None: info = refresh_stale_data_once() if info.get("status") == "refreshed": print(f"[scheduler] stale data refreshed: {info}") def main() -> None: print("[scheduler] NIFTY first-five-minute scheduler started.") print("[scheduler] Runs the opening prediction after 09:20 IST so the 09:15-09:19 candles are complete.") while True: try: refresh_if_current_session_is_ready() refresh_close_data_if_due() refresh_stale_data_if_due() except Exception as exc: print(f"[scheduler] current-session refresh failed: {exc}") next_first5 = seconds_until_next_ist_run() if next_first5 > STALE_CHECK_INTERVAL_SECONDS: time.sleep(STALE_CHECK_INTERVAL_SECONDS) continue time.sleep(next_first5) try: prediction = refresh_first5_prediction() print(f"[scheduler] first5 prediction refreshed: {prediction.to_dict()}") except Exception as exc: print(f"[scheduler] first5 refresh failed: {exc}") try: info = refresh_daily_data() print(f"[scheduler] daily data refreshed: {info}") except Exception as exc: print(f"[scheduler] daily refresh failed: {exc}") next_close = seconds_until_next_ist_run(CLOSE_REFRESH_READY) if next_close > STALE_CHECK_INTERVAL_SECONDS: time.sleep(STALE_CHECK_INTERVAL_SECONDS) continue time.sleep(next_close) try: refresh_close_data_if_due() except Exception as exc: print(f"[scheduler] close refresh failed: {exc}") if __name__ == "__main__": main()