quant-research-env / server /data_loader.py
yobro4619's picture
Upload folder using huggingface_hub
e9c52c9 verified
"""
Data loader for the Quant Research Environment.
Loads compressed NIFTY/BANKNIFTY CSV data, performs inner-join merge,
and caches results in memory.
"""
import gzip
import io
from pathlib import Path
from typing import Tuple
import pandas as pd
_DATA_DIR = Path(__file__).parent / "data"
# Module-level cache
_cache: dict = {}
def _load_and_merge(nifty_path: Path, banknifty_path: Path) -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""Load two CSVs and inner-join merge on Date."""
if str(nifty_path).endswith(".gz"):
with gzip.open(nifty_path, "rt") as f:
nifty = pd.read_csv(f, parse_dates=["Date"])
with gzip.open(banknifty_path, "rt") as f:
banknifty = pd.read_csv(f, parse_dates=["Date"])
else:
nifty = pd.read_csv(nifty_path, parse_dates=["Date"])
banknifty = pd.read_csv(banknifty_path, parse_dates=["Date"])
merged = pd.merge(
nifty, banknifty,
on="Date", how="inner", suffixes=("_nifty", "_banknifty"),
).sort_values("Date").reset_index(drop=True)
nifty_aligned = pd.DataFrame({
"Date": merged["Date"],
"Open": merged["Open_nifty"],
"High": merged["High_nifty"],
"Low": merged["Low_nifty"],
"Close": merged["Close_nifty"],
})
banknifty_aligned = pd.DataFrame({
"Date": merged["Date"],
"Open": merged["Open_banknifty"],
"High": merged["High_banknifty"],
"Low": merged["Low_banknifty"],
"Close": merged["Close_banknifty"],
})
return nifty_aligned, banknifty_aligned, merged
def get_train_data() -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
Load and cache training data (2015-2022).
Returns (nifty_aligned, banknifty_aligned, merged).
"""
if "train" not in _cache:
nifty_path = _DATA_DIR / "nifty_train.csv.gz"
banknifty_path = _DATA_DIR / "banknifty_train.csv.gz"
_cache["train"] = _load_and_merge(nifty_path, banknifty_path)
return _cache["train"]
def get_test_data() -> Tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
"""
Load and cache test data (2023-2026).
Returns (nifty_aligned, banknifty_aligned, merged).
"""
if "test" not in _cache:
nifty_path = _DATA_DIR / "nifty_test.csv.gz"
banknifty_path = _DATA_DIR / "banknifty_test.csv.gz"
if not nifty_path.exists():
raise FileNotFoundError("Test data not available")
_cache["test"] = _load_and_merge(nifty_path, banknifty_path)
return _cache["test"]
def get_data_schema(merged: pd.DataFrame) -> str:
"""Return a human-readable schema description."""
n_rows = len(merged)
n_days = merged["Date"].dt.date.nunique()
date_range = f"{merged['Date'].iloc[0].date()} to {merged['Date'].iloc[-1].date()}"
cols = list(merged.columns)
return (
f"Pre-merged NIFTY 50 and BANKNIFTY minute-bar OHLC data.\n"
f"Rows: {n_rows:,} | Trading days: {n_days:,} | Period: {date_range}\n"
f"Columns: {cols}\n"
f"Suffixes: _nifty for NIFTY 50, _banknifty for BANKNIFTY.\n"
f"Index is numeric (0 to {n_rows - 1}). 375 bars per trading day."
)
def get_data_preview(merged: pd.DataFrame, n_rows: int = 10) -> str:
"""Return first N rows as a formatted string."""
return merged.head(n_rows).to_string(index=True, max_cols=None)