Spaces:
Running
Running
| import sys | |
| from pathlib import Path | |
| sys.path.append(str(Path("backend").resolve())) | |
| from nifty_backend.runtime import update_live_accuracy, TPLUS1_LATEST_PATH, NIFTY_1M_PATH, latest_parquet_date, NIFTY_1D_PATH | |
| import pandas as pd | |
| from datetime import date | |
| latest_date = latest_parquet_date(NIFTY_1D_PATH) | |
| session_iso = latest_date.isoformat() | |
| print(f"session_iso: {session_iso}") | |
| try: | |
| t1_row = pd.read_csv(TPLUS1_LATEST_PATH).iloc[-1].to_dict() | |
| print("t1_row target:", str(t1_row.get("target_date", ""))[:10]) | |
| if str(t1_row.get("target_date", ""))[:10] == session_iso: | |
| pred = str(t1_row.get("prediction", "")).upper() | |
| input_date_str = str(t1_row.get("input_date", ""))[:10] | |
| input_day = date.fromisoformat(input_date_str) | |
| print("input day:", input_day) | |
| minute = pd.read_parquet(NIFTY_1M_PATH, columns=["date", "close"]) | |
| minute["dt"] = pd.to_datetime(minute["date"], errors="coerce") | |
| minute = minute.dropna(subset=["dt"]) | |
| minute["session_date"] = minute["dt"].dt.normalize() | |
| minute["time_str"] = minute["dt"].dt.strftime("%H:%M") | |
| window = minute[ | |
| (minute["session_date"].dt.date == input_day) | |
| & (minute["time_str"] >= "14:00") | |
| & (minute["time_str"] <= "14:20") | |
| ].sort_values("dt") | |
| print("window len:", len(window)) | |
| if not window.empty: | |
| w_close = float(window.iloc[-1]["close"]) | |
| print("w_close:", w_close) | |
| else: | |
| print("window empty!") | |
| print("minute dates available:", minute["session_date"].dt.date.unique()) | |
| except Exception as e: | |
| import traceback | |
| traceback.print_exc() | |