| from __future__ import annotations |
|
|
| import sys |
| import time |
| from datetime import date, datetime, time as dt_time |
| from pathlib import Path |
| from zoneinfo import ZoneInfo |
|
|
| sys.path.insert(0, str(Path(__file__).resolve().parents[1])) |
| from nifty_backend.runtime import ( |
| CLOSE_REFRESH_READY, |
| STALE_CHECK_INTERVAL_SECONDS, |
| is_trading_day, |
| latest_saved_prediction, |
| close_refresh_due, |
| refresh_market_close_data, |
| refresh_daily_data, |
| refresh_first5_prediction, |
| refresh_stale_data_once, |
| seconds_until_next_ist_run, |
| ) |
|
|
|
|
| IST = ZoneInfo("Asia/Kolkata") |
| FIRST5_READY = dt_time(9, 20) |
|
|
|
|
| def latest_prediction_date() -> date | None: |
| try: |
| raw = latest_saved_prediction().get("input_date") |
| return date.fromisoformat(str(raw)) if raw else None |
| except Exception: |
| return None |
|
|
|
|
| def refresh_if_current_session_is_ready() -> None: |
| now = datetime.now(IST) |
| if not is_trading_day(now.date()) or now.time() < FIRST5_READY: |
| return |
| if latest_prediction_date() == now.date(): |
| return |
| prediction = refresh_first5_prediction() |
| print(f"[scheduler] first5 prediction refreshed: {prediction.to_dict()}") |
| info = refresh_daily_data() |
| print(f"[scheduler] daily data refreshed: {info}") |
|
|
|
|
| def refresh_close_data_if_due() -> None: |
| if not close_refresh_due(): |
| return |
| info = refresh_market_close_data() |
| print(f"[scheduler] close data refreshed: {info}") |
|
|
|
|
| def refresh_stale_data_if_due() -> None: |
| info = refresh_stale_data_once() |
| if info.get("status") == "refreshed": |
| print(f"[scheduler] stale data refreshed: {info}") |
|
|
|
|
| def main() -> None: |
| print("[scheduler] NIFTY first-five-minute scheduler started.") |
| print("[scheduler] Runs the opening prediction after 09:20 IST so the 09:15-09:19 candles are complete.") |
| while True: |
| try: |
| refresh_if_current_session_is_ready() |
| refresh_close_data_if_due() |
| refresh_stale_data_if_due() |
| except Exception as exc: |
| print(f"[scheduler] current-session refresh failed: {exc}") |
| next_first5 = seconds_until_next_ist_run() |
| if next_first5 > STALE_CHECK_INTERVAL_SECONDS: |
| time.sleep(STALE_CHECK_INTERVAL_SECONDS) |
| continue |
| time.sleep(next_first5) |
| try: |
| prediction = refresh_first5_prediction() |
| print(f"[scheduler] first5 prediction refreshed: {prediction.to_dict()}") |
| except Exception as exc: |
| print(f"[scheduler] first5 refresh failed: {exc}") |
| try: |
| info = refresh_daily_data() |
| print(f"[scheduler] daily data refreshed: {info}") |
| except Exception as exc: |
| print(f"[scheduler] daily refresh failed: {exc}") |
| next_close = seconds_until_next_ist_run(CLOSE_REFRESH_READY) |
| if next_close > STALE_CHECK_INTERVAL_SECONDS: |
| time.sleep(STALE_CHECK_INTERVAL_SECONDS) |
| continue |
| time.sleep(next_close) |
| try: |
| refresh_close_data_if_due() |
| except Exception as exc: |
| print(f"[scheduler] close refresh failed: {exc}") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|