Spaces:
Running
Running
| from __future__ import annotations | |
| import json | |
| import os | |
| import shutil | |
| import subprocess | |
| import sys | |
| import threading | |
| import time | |
| from datetime import datetime, time as dt_time | |
| from pathlib import Path | |
| from typing import Any | |
| from zoneinfo import ZoneInfo | |
| import pandas as pd | |
| from fastapi import BackgroundTasks, FastAPI, Header, HTTPException, Query, Request | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse, PlainTextResponse | |
| from huggingface_hub import snapshot_download | |
| BASE_DIR = Path(__file__).resolve().parent | |
| RESEARCH_ROOT = Path(os.environ.get("FORECASTING_PROJECT_ROOT", BASE_DIR / "research_runtime")).resolve() | |
| STATE_DIR = Path(os.environ.get("SPACE_STATE_DIR", "/data/forecasting-space-state" if Path("/data").exists() else BASE_DIR / ".space_state")) | |
| STATUS_PATH = STATE_DIR / "update_status.json" | |
| DATASET_READY_MARKER = STATE_DIR / "dataset_ready.json" | |
| API_TITLE = "Trading Forecasting Space Backend" | |
| API_VERSION = "1.0.0" | |
| DEFAULT_TIMEZONE = os.environ.get("UPDATE_TIMEZONE", "Asia/Kolkata") | |
| DEFAULT_UPDATE_TIME = os.environ.get("DAILY_UPDATE_TIME", "17:30") | |
| app = FastAPI(title=API_TITLE, version=API_VERSION) | |
| def cors_origins() -> list[str]: | |
| raw = os.environ.get("FRONTEND_ORIGINS", "*").strip() | |
| return ["*"] if raw == "*" else [item.strip() for item in raw.split(",") if item.strip()] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=cors_origins(), | |
| allow_credentials=False, | |
| allow_methods=["GET", "POST", "OPTIONS"], | |
| allow_headers=["*"], | |
| ) | |
| update_lock = threading.Lock() | |
| worker_thread: threading.Thread | None = None | |
| dataset_lock = threading.Lock() | |
| def now_utc() -> str: | |
| return datetime.utcnow().replace(microsecond=0).isoformat() + "Z" | |
| def safe_json(value: Any) -> Any: | |
| if isinstance(value, dict): | |
| return {str(k): safe_json(v) for k, v in value.items()} | |
| if isinstance(value, list): | |
| return [safe_json(v) for v in value] | |
| if not isinstance(value, (tuple, set)): | |
| try: | |
| if pd.isna(value): | |
| return None | |
| except Exception: | |
| pass | |
| if hasattr(value, "item"): | |
| try: | |
| return safe_json(value.item()) | |
| except Exception: | |
| pass | |
| if isinstance(value, Path): | |
| return str(value) | |
| if isinstance(value, datetime): | |
| return value.isoformat() | |
| return value | |
| def read_json(path: Path, default: Any) -> Any: | |
| try: | |
| return json.loads(path.read_text(encoding="utf-8")) | |
| except Exception: | |
| return default | |
| def write_json(path: Path, payload: Any) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| path.write_text(json.dumps(safe_json(payload), indent=2), encoding="utf-8") | |
| def read_status() -> dict[str, Any]: | |
| return read_json( | |
| STATUS_PATH, | |
| { | |
| "state": "idle", | |
| "last_started_at": None, | |
| "last_finished_at": None, | |
| "last_success_at": None, | |
| "last_error": None, | |
| "last_exit_code": None, | |
| "last_log_tail": [], | |
| }, | |
| ) | |
| def write_status(**updates: Any) -> None: | |
| status = read_status() | |
| status.update(updates) | |
| write_json(STATUS_PATH, status) | |
| def require_secret(x_cron_secret: str | None = Header(default=None), x_admin_secret: str | None = Header(default=None)) -> None: | |
| expected = os.environ.get("CRON_SECRET") or os.environ.get("ADMIN_SECRET") | |
| if not expected: | |
| return | |
| supplied = x_cron_secret or x_admin_secret | |
| if supplied != expected: | |
| raise HTTPException(status_code=401, detail="Missing or invalid cron/admin secret.") | |
| def csv_rows(path: Path, *, limit: int | None = None, columns: list[str] | None = None) -> list[dict[str, Any]]: | |
| if not path.exists(): | |
| return [] | |
| try: | |
| frame = pd.read_csv(path, usecols=columns) | |
| except ValueError: | |
| frame = pd.read_csv(path) | |
| if columns: | |
| frame = frame[[col for col in columns if col in frame.columns]] | |
| if limit is not None: | |
| frame = frame.head(limit) | |
| return safe_json(frame.where(pd.notna(frame), None).to_dict(orient="records")) | |
| def model_output_path(*parts: str) -> Path: | |
| return RESEARCH_ROOT / "Code" / "models" / Path(*parts) | |
| def manifest_path() -> Path: | |
| return RESEARCH_ROOT / "Data" / "metadata" / "manifest.csv" | |
| def dataset_dirs_present() -> bool: | |
| return (RESEARCH_ROOT / "Data").is_dir() and (RESEARCH_ROOT / "Alt Data").is_dir() | |
| def dataset_status() -> dict[str, Any]: | |
| marker = read_json(DATASET_READY_MARKER, {}) | |
| return { | |
| "ready": dataset_dirs_present(), | |
| "repo_id": os.environ.get("HF_DATASET_REPO_ID"), | |
| "revision": os.environ.get("HF_DATASET_REVISION", "main"), | |
| "data_dir": file_meta(RESEARCH_ROOT / "Data"), | |
| "alt_data_dir": file_meta(RESEARCH_ROOT / "Alt Data"), | |
| "last_sync": marker, | |
| } | |
| def ensure_dataset_available(force: bool = False) -> bool: | |
| if dataset_dirs_present() and not force: | |
| return True | |
| repo_id = os.environ.get("HF_DATASET_REPO_ID", "").strip() | |
| if not repo_id: | |
| return dataset_dirs_present() | |
| with dataset_lock: | |
| if dataset_dirs_present() and not force: | |
| return True | |
| STATE_DIR.mkdir(parents=True, exist_ok=True) | |
| revision = os.environ.get("HF_DATASET_REVISION", "main") | |
| local_dir = Path(os.environ.get("HF_DATASET_LOCAL_DIR", str(RESEARCH_ROOT))).resolve() | |
| local_dir.mkdir(parents=True, exist_ok=True) | |
| snapshot_download( | |
| repo_id=repo_id, | |
| repo_type="dataset", | |
| revision=revision, | |
| local_dir=str(local_dir), | |
| local_dir_use_symlinks=False, | |
| allow_patterns=["Data/**", "Alt Data/**", "README.md"], | |
| ) | |
| write_json( | |
| DATASET_READY_MARKER, | |
| { | |
| "repo_id": repo_id, | |
| "revision": revision, | |
| "synced_at": now_utc(), | |
| "local_dir": str(local_dir), | |
| }, | |
| ) | |
| return dataset_dirs_present() | |
| def resolve_dataset_path(value: str) -> Path: | |
| raw = str(value) | |
| candidate = Path(raw) | |
| if candidate.exists(): | |
| return candidate | |
| normalized = raw.replace("\\", "/") | |
| marker = "research_runtime/" | |
| if marker in normalized: | |
| suffix = normalized.split(marker, 1)[1] | |
| return BASE_DIR / "research_runtime" / Path(*suffix.split("/")) | |
| relative = Path(*normalized.split("/")) | |
| if not relative.is_absolute(): | |
| return BASE_DIR / relative | |
| return candidate | |
| def file_meta(path: Path) -> dict[str, Any]: | |
| if not path.exists(): | |
| return {"exists": False, "path": str(path)} | |
| stat = path.stat() | |
| return { | |
| "exists": True, | |
| "path": str(path), | |
| "bytes": stat.st_size, | |
| "modified_at": datetime.utcfromtimestamp(stat.st_mtime).replace(microsecond=0).isoformat() + "Z", | |
| } | |
| def latest_manifest_end() -> str | None: | |
| path = manifest_path() | |
| if not path.exists(): | |
| return None | |
| try: | |
| frame = pd.read_csv(path, usecols=["end"]) | |
| dates = pd.to_datetime(frame["end"], errors="coerce").dropna() | |
| return str(dates.max()) if not dates.empty else None | |
| except Exception: | |
| return None | |
| def parse_daily_update_time() -> dt_time: | |
| hour, minute = DEFAULT_UPDATE_TIME.split(":", 1) | |
| return dt_time(int(hour), int(minute)) | |
| def update_due() -> bool: | |
| if os.environ.get("AUTO_UPDATE_ENABLED", "true").lower() not in {"1", "true", "yes", "on"}: | |
| return False | |
| status = read_status() | |
| if status.get("state") == "running": | |
| return False | |
| tz = ZoneInfo(DEFAULT_TIMEZONE) | |
| local_now = datetime.now(tz) | |
| if local_now.time() < parse_daily_update_time(): | |
| return False | |
| last_success = status.get("last_success_at") | |
| if not last_success: | |
| return True | |
| try: | |
| last_success_date = datetime.fromisoformat(last_success.replace("Z", "+00:00")).astimezone(tz).date() | |
| except ValueError: | |
| return True | |
| return last_success_date < local_now.date() | |
| def build_update_commands(retrain: bool) -> list[list[str]]: | |
| commands = [ | |
| [ | |
| sys.executable, | |
| "Code/scripts/data_ingestion/refresh_market_data.py", | |
| "--end-date", | |
| datetime.now(ZoneInfo(DEFAULT_TIMEZONE)).date().isoformat(), | |
| ] | |
| ] | |
| if retrain: | |
| commands.extend( | |
| [ | |
| [sys.executable, "Code/models/stock_high_low_forecaster/train.py"], | |
| [sys.executable, "Code/models/first_extrema_forecaster/train.py", "--rebuild-cache"], | |
| [sys.executable, "Code/models/nifty_forecaster/train.py", "--no-progress"], | |
| ] | |
| ) | |
| return commands | |
| def prune_generated_junk() -> None: | |
| patterns = [ | |
| "Code/artifacts", | |
| "Code/models/*/outputs/*dataset*.csv", | |
| "Code/models/*/outputs/test_predictions.csv", | |
| "Code/models/*/outputs/*_test_predictions.csv", | |
| "Code/models/*/outputs/*predictions.csv", | |
| "Code/models/*/outputs/*.joblib", | |
| "Code/models/*/outputs/report.md", | |
| "Code/models/*/outputs/*report.md", | |
| "Code/models/*/outputs/candidate*.csv", | |
| "Code/models/*/outputs/*candidate*.csv", | |
| "Code/models/first_extrema_forecaster/outputs/may7_forecasts.csv", | |
| "Code/models/nifty_forecaster/outputs/forecaster_latest.csv", | |
| "Code/models/nifty_forecaster/outputs/forecaster_blend_details.json", | |
| ] | |
| for pattern in patterns: | |
| for path in RESEARCH_ROOT.glob(pattern): | |
| try: | |
| if path.is_dir(): | |
| shutil.rmtree(path) | |
| elif path.exists(): | |
| path.unlink() | |
| except OSError: | |
| pass | |
| for cache_dir in RESEARCH_ROOT.rglob("__pycache__"): | |
| try: | |
| shutil.rmtree(cache_dir) | |
| except OSError: | |
| pass | |
| def run_update_job(trigger: str = "manual", retrain: bool | None = None) -> None: | |
| global worker_thread | |
| with update_lock: | |
| status = read_status() | |
| if status.get("state") == "running": | |
| return | |
| write_status( | |
| state="running", | |
| trigger=trigger, | |
| last_started_at=now_utc(), | |
| last_finished_at=None, | |
| last_error=None, | |
| last_exit_code=None, | |
| last_log_tail=[], | |
| ) | |
| if retrain is None: | |
| retrain = os.environ.get("AUTO_RETRAIN_ENABLED", "true").lower() in {"1", "true", "yes", "on"} | |
| env = os.environ.copy() | |
| env["FORECASTING_PROJECT_ROOT"] = str(RESEARCH_ROOT) | |
| env.setdefault("PYTHONUNBUFFERED", "1") | |
| env.setdefault("MARKET_BUILD_WORKERS", "2") | |
| log_tail: list[str] = [] | |
| exit_code = 0 | |
| try: | |
| if not ensure_dataset_available(): | |
| raise RuntimeError("Dataset folders are missing. Set HF_DATASET_REPO_ID to the Hugging Face Dataset repo.") | |
| for command in build_update_commands(retrain): | |
| log_tail.append("$ " + " ".join(command)) | |
| process = subprocess.Popen( | |
| command, | |
| cwd=RESEARCH_ROOT, | |
| env=env, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, | |
| text=True, | |
| bufsize=1, | |
| ) | |
| assert process.stdout is not None | |
| for line in process.stdout: | |
| line = line.rstrip() | |
| if line: | |
| log_tail.append(line) | |
| log_tail = log_tail[-80:] | |
| exit_code = process.wait() | |
| if exit_code != 0: | |
| raise RuntimeError(f"Command failed with exit code {exit_code}: {' '.join(command)}") | |
| prune_generated_junk() | |
| write_status( | |
| state="idle", | |
| last_finished_at=now_utc(), | |
| last_success_at=now_utc(), | |
| last_error=None, | |
| last_exit_code=exit_code, | |
| last_log_tail=log_tail[-80:], | |
| ) | |
| except Exception as exc: | |
| write_status( | |
| state="failed", | |
| last_finished_at=now_utc(), | |
| last_error=str(exc), | |
| last_exit_code=exit_code, | |
| last_log_tail=log_tail[-80:], | |
| ) | |
| def start_update(trigger: str, retrain: bool | None = None) -> bool: | |
| global worker_thread | |
| status = read_status() | |
| if status.get("state") == "running": | |
| return False | |
| worker_thread = threading.Thread(target=run_update_job, kwargs={"trigger": trigger, "retrain": retrain}, daemon=True) | |
| worker_thread.start() | |
| return True | |
| def scheduler_loop() -> None: | |
| while True: | |
| if update_due(): | |
| start_update("internal_scheduler") | |
| time.sleep(300) | |
| def startup() -> None: | |
| STATE_DIR.mkdir(parents=True, exist_ok=True) | |
| prune_generated_junk() | |
| if not STATUS_PATH.exists(): | |
| write_status(state="idle", app_started_at=now_utc()) | |
| if os.environ.get("DATASET_SYNC_ON_START", "true").lower() in {"1", "true", "yes", "on"}: | |
| try: | |
| ensure_dataset_available() | |
| except Exception as exc: | |
| write_status(dataset_sync_error=str(exc), dataset_sync_failed_at=now_utc()) | |
| threading.Thread(target=scheduler_loop, daemon=True).start() | |
| if os.environ.get("AUTO_UPDATE_ON_START", "false").lower() in {"1", "true", "yes", "on"}: | |
| start_update("startup") | |
| def root() -> str: | |
| return "Trading Forecasting Hugging Face Space backend is running. See /docs for API routes." | |
| def health() -> dict[str, Any]: | |
| required = { | |
| "research_root": file_meta(RESEARCH_ROOT), | |
| "manifest": file_meta(manifest_path()), | |
| "stock_latest": file_meta(model_output_path("stock_high_low_forecaster", "outputs", "latest_forecasts.csv")), | |
| "extrema_latest": file_meta(model_output_path("first_extrema_forecaster", "outputs", "latest_forecasts.csv")), | |
| "nifty_latest": file_meta(model_output_path("nifty_forecaster", "outputs", "forecaster_latest_forecasts.csv")), | |
| } | |
| ok = all(item["exists"] for item in required.values()) | |
| return { | |
| "ok": ok, | |
| "service": API_TITLE, | |
| "version": API_VERSION, | |
| "checked_at": now_utc(), | |
| "latest_manifest_end": latest_manifest_end(), | |
| "dataset": dataset_status(), | |
| "update_status": read_status(), | |
| "files": required, | |
| } | |
| def api_status() -> dict[str, Any]: | |
| return health() | |
| def latest_forecasts() -> dict[str, Any]: | |
| return { | |
| "generated_at": now_utc(), | |
| "stock_high_low": csv_rows(model_output_path("stock_high_low_forecaster", "outputs", "latest_forecasts.csv")), | |
| "first_extrema": csv_rows( | |
| model_output_path("first_extrema_forecaster", "outputs", "latest_forecasts.csv"), | |
| columns=["date", "symbol", "target", "prob_high_first", "prediction"], | |
| ), | |
| "nifty_direction": csv_rows(model_output_path("nifty_forecaster", "outputs", "forecaster_latest_forecasts.csv")), | |
| } | |
| def model_summaries() -> dict[str, Any]: | |
| return safe_json( | |
| { | |
| "stock_high_low": read_json(model_output_path("stock_high_low_forecaster", "outputs", "summary.json"), {}), | |
| "first_extrema": read_json(model_output_path("first_extrema_forecaster", "outputs", "summary.json"), {}), | |
| "nifty_direction": read_json(model_output_path("nifty_forecaster", "outputs", "forecaster_summary.json"), []), | |
| } | |
| ) | |
| def data_catalog( | |
| category: str | None = None, | |
| asset: str | None = None, | |
| timeframe: str | None = None, | |
| limit: int = Query(default=500, ge=1, le=5000), | |
| ) -> dict[str, Any]: | |
| path = manifest_path() | |
| if not path.exists(): | |
| ensure_dataset_available() | |
| if not path.exists(): | |
| return {"count": 0, "items": []} | |
| frame = pd.read_csv(path) | |
| if category: | |
| frame = frame[frame["category"].astype(str).str.lower() == category.lower()] | |
| if asset: | |
| frame = frame[frame["asset"].astype(str).str.lower() == asset.lower()] | |
| if timeframe: | |
| frame = frame[frame["timeframe"].astype(str).str.lower() == timeframe.lower()] | |
| return {"count": int(len(frame)), "items": safe_json(frame.head(limit).where(pd.notna(frame), None).to_dict(orient="records"))} | |
| def data_sample( | |
| category: str, | |
| asset: str, | |
| timeframe: str, | |
| limit: int = Query(default=50, ge=1, le=1000), | |
| ) -> dict[str, Any]: | |
| path = manifest_path() | |
| if not path.exists(): | |
| ensure_dataset_available() | |
| if not path.exists(): | |
| raise HTTPException(status_code=404, detail="Data manifest not found.") | |
| manifest = pd.read_csv(path) | |
| matches = manifest[ | |
| (manifest["category"].astype(str).str.lower() == category.lower()) | |
| & (manifest["asset"].astype(str).str.lower() == asset.lower()) | |
| & (manifest["timeframe"].astype(str).str.lower() == timeframe.lower()) | |
| ] | |
| if matches.empty: | |
| raise HTTPException(status_code=404, detail="No matching dataset in manifest.") | |
| dataset_path = resolve_dataset_path(str(matches.iloc[0]["path"])) | |
| if not dataset_path.exists(): | |
| raise HTTPException(status_code=404, detail=f"Dataset file not found: {dataset_path}") | |
| return { | |
| "dataset": safe_json(matches.iloc[0].to_dict()), | |
| "rows": csv_rows(dataset_path, limit=limit), | |
| } | |
| async def cron_tick( | |
| request: Request, | |
| background_tasks: BackgroundTasks, | |
| x_cron_secret: str | None = Header(default=None), | |
| ) -> JSONResponse: | |
| require_secret(x_cron_secret=x_cron_secret) | |
| due = update_due() | |
| started = False | |
| if due: | |
| background_tasks.add_task(start_update, "netlify_cron") | |
| started = True | |
| return JSONResponse({"ok": True, "checked_at": now_utc(), "update_due": due, "update_start_queued": started, "status": read_status()}) | |
| def manual_update( | |
| retrain: bool | None = None, | |
| x_admin_secret: str | None = Header(default=None), | |
| ) -> dict[str, Any]: | |
| require_secret(x_admin_secret=x_admin_secret) | |
| started = start_update("manual_api", retrain=retrain) | |
| return {"ok": True, "started": started, "status": read_status()} | |
| def sync_dataset( | |
| force: bool = False, | |
| x_admin_secret: str | None = Header(default=None), | |
| ) -> dict[str, Any]: | |
| require_secret(x_admin_secret=x_admin_secret) | |
| ok = ensure_dataset_available(force=force) | |
| return {"ok": ok, "dataset": dataset_status()} | |