| import sys |
| from pathlib import Path |
| sys.path.append(str(Path("backend").resolve())) |
| from nifty_backend.runtime import update_live_accuracy, TPLUS1_LATEST_PATH, NIFTY_1M_PATH, latest_parquet_date, NIFTY_1D_PATH |
| import pandas as pd |
| from datetime import date |
|
|
| latest_date = latest_parquet_date(NIFTY_1D_PATH) |
| session_iso = latest_date.isoformat() |
| print(f"session_iso: {session_iso}") |
| try: |
| t1_row = pd.read_csv(TPLUS1_LATEST_PATH).iloc[-1].to_dict() |
| print("t1_row target:", str(t1_row.get("target_date", ""))[:10]) |
| if str(t1_row.get("target_date", ""))[:10] == session_iso: |
| pred = str(t1_row.get("prediction", "")).upper() |
| input_date_str = str(t1_row.get("input_date", ""))[:10] |
| input_day = date.fromisoformat(input_date_str) |
| print("input day:", input_day) |
| minute = pd.read_parquet(NIFTY_1M_PATH, columns=["date", "close"]) |
| minute["dt"] = pd.to_datetime(minute["date"], errors="coerce") |
| minute = minute.dropna(subset=["dt"]) |
| minute["session_date"] = minute["dt"].dt.normalize() |
| minute["time_str"] = minute["dt"].dt.strftime("%H:%M") |
| window = minute[ |
| (minute["session_date"].dt.date == input_day) |
| & (minute["time_str"] >= "14:00") |
| & (minute["time_str"] <= "14:20") |
| ].sort_values("dt") |
| print("window len:", len(window)) |
| if not window.empty: |
| w_close = float(window.iloc[-1]["close"]) |
| print("w_close:", w_close) |
| else: |
| print("window empty!") |
| print("minute dates available:", minute["session_date"].dt.date.unique()) |
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
|
|