ML_and_main_data_pipeline1 / data_handler.py
Jitendra12421's picture
Upload 4 files
79ffe03 verified
from collections import OrderedDict
from dataclasses import dataclass
import json
import logging
import re
import tempfile
from pathlib import Path
from datetime import datetime
from threading import Lock, RLock, Thread
import pandas as pd
import polars as pl
from Forecaster import start_forecaster_refresh_scheduler
from market_refresh import refresh_index_if_due, start_background_refresh_scheduler
def _find_nearest_dir(start: Path, target_name: str) -> Path | None:
for candidate in [start, *start.parents]:
target = candidate / target_name
if target.exists():
return target
return None
MODULE_DIR = Path(__file__).resolve().parent
BASE_DIR = _find_nearest_dir(MODULE_DIR, "MAIN DATA SOURCE") or (MODULE_DIR / "MAIN DATA SOURCE")
INTRADAY_DIR = BASE_DIR / "intraday"
DAILY_DIR = BASE_DIR / "daily"
LOGGER = logging.getLogger(__name__)
DATA_FILE_EXTENSION = ".parquet"
LEGACY_DATA_FILE_EXTENSION = ".csv"
_ASYNC_REFRESH_LOCK = Lock()
_ASYNC_REFRESH_IN_FLIGHT: set[tuple[str, str]] = set()
INTRADAY_RULES = {
"minute": "minute",
"5m": "5m",
"15m": "15m",
"30m": "30m",
"1_hour": "1_hour",
"4_hour": "4_hour",
}
DAILY_RULES = {
"1_day": "1_day",
"1_week": "1_week",
"1_month": "1_month",
"6_months": "6_months",
"1_year": "1_year",
"5_year": "5_year",
"MAX": "MAX",
}
INTRADAY_RESAMPLE_RULES = {
"5m": "5m",
"15m": "15m",
"30m": "30m",
"1_hour": "1h",
"4_hour": "4h",
}
DAILY_RESAMPLE_RULES = {
"1_week": "W-FRI",
"1_month": "MS",
"6_months": "6MS",
"1_year": "YS",
"5_year": "5YS",
}
TIMEFRAME_ORDER = {
"minute": 0,
"5m": 1,
"15m": 2,
"30m": 3,
"1_hour": 4,
"4_hour": 5,
"1_day": 6,
"1_week": 7,
"1_month": 8,
"6_months": 9,
"1_year": 10,
"5_year": 11,
"MAX": 12,
}
MAX_DATASET_CACHE_ENTRIES = 24
MAX_CUSTOM_CACHE_ENTRIES = 12
HISTORICAL_INTRADAY_CUTOFF = pd.Timestamp("2015-01-09")
HISTORICAL_EXPORT_FORMATS = ("csv", "parquet", "json")
HISTORICAL_EXPORT_TEMPLATES = ("full_history", "last_n_rows", "custom_range")
HISTORICAL_EXPORT_DEFAULT_LIMIT = 1000
EXPORT_DATE_FORMAT = "%Y-%m-%d"
EXPORT_DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S"
EXPORT_COLUMNS = ("date", "open", "high", "low", "close", "volume", "source_timeframe")
WARM_DATASETS = (
("NIFTY 50", "5m"),
("NIFTY 50", "1_hour"),
("NIFTY 50", "1_day"),
)
@dataclass
class CachedDataset:
frame: pl.DataFrame
chart_points: dict
file_mtime_ns: int
@dataclass
class CachedCustomInterval:
frame: pl.DataFrame
source_mtime_ns: int
def _sorted_timeframes(values: set[str]) -> list[str]:
return sorted(values, key=lambda value: (TIMEFRAME_ORDER.get(value, 999), value))
def _timeframe_base_dir(timeframe: str) -> Path:
if timeframe == "minute" or timeframe in INTRADAY_RESAMPLE_RULES:
return INTRADAY_DIR
if timeframe == "1_day" or timeframe in DAILY_RESAMPLE_RULES or timeframe == "MAX":
return DAILY_DIR
raise ValueError(f"Unknown timeframe: {timeframe}")
def _resolve_data_file(index_name: str, timeframe: str) -> Path:
base_path = _timeframe_base_dir(timeframe)
source_timeframe = "minute" if base_path == INTRADAY_DIR else "1_day"
parquet_path = base_path / index_name / source_timeframe / f"{index_name}_{source_timeframe}{DATA_FILE_EXTENSION}"
if parquet_path.exists():
return parquet_path
legacy_path = parquet_path.with_suffix(LEGACY_DATA_FILE_EXTENSION)
if legacy_path.exists():
return legacy_path
raise FileNotFoundError(f"Data file not found: {parquet_path}")
def _stat_mtime_ns(path: Path) -> int:
return path.stat().st_mtime_ns
def _read_frame(file_path: Path) -> pl.DataFrame:
if file_path.suffix.lower() == DATA_FILE_EXTENSION:
return pl.read_parquet(file_path)
return pl.read_csv(file_path)
def _source_timeframe(timeframe: str) -> str:
if timeframe == "minute" or timeframe in INTRADAY_RESAMPLE_RULES:
return "minute"
if timeframe == "1_day" or timeframe in DAILY_RESAMPLE_RULES or timeframe == "MAX":
return "1_day"
raise ValueError(f"Unknown timeframe: {timeframe}")
def _ensure_datetime_column(frame: pl.DataFrame) -> pl.DataFrame:
if "date" not in frame.columns:
for alias in ("datetime", "trade_date", "eod_timestamp", "timestamp", "date_time"):
if alias in frame.columns:
frame = frame.rename({alias: "date"})
break
if "date" not in frame.columns:
return frame
if frame["date"].dtype == pl.Utf8:
parsed = pl.coalesce([
pl.col("date").str.strptime(pl.Datetime, "%Y-%m-%d %H:%M:%S", strict=False),
pl.col("date").str.strptime(pl.Datetime, "%Y-%m-%d %H:%M", strict=False),
pl.col("date").str.strptime(pl.Datetime, "%Y-%m-%d", strict=False),
pl.col("date").str.strptime(pl.Datetime, "%Y-%m-%dT%H:%M:%S", strict=False),
pl.col("date").str.strptime(pl.Datetime, "%Y-%m-%dT%H:%M:%S%.f", strict=False),
])
return frame.with_columns(
parsed.alias("date")
)
return frame.with_columns(pl.col("date").cast(pl.Datetime, strict=False))
def _normalize_ohlcv_frame(frame: pl.DataFrame) -> pl.DataFrame:
rename_map = {
column: str(column).strip().lower().replace(" ", "_")
for column in frame.columns
}
if rename_map:
frame = frame.rename(rename_map)
frame = _ensure_datetime_column(frame).drop_nulls(subset=["date"])
numeric_columns = [column for column in ("open", "high", "low", "close", "volume") if column in frame.columns]
if numeric_columns:
frame = frame.with_columns([pl.col(column).cast(pl.Float64, strict=False) for column in numeric_columns])
return frame.sort("date") if "date" in frame.columns else frame
def _resample_to_timeframe(frame: pl.DataFrame, timeframe: str) -> pl.DataFrame:
if frame.is_empty():
return frame
if timeframe == "minute" or timeframe == "1_day":
return _normalize_ohlcv_frame(frame)
frame = _normalize_ohlcv_frame(frame)
if timeframe == "MAX":
return frame.select(
pl.col("date").min().alias("date"),
pl.col("open").first().alias("open"),
pl.col("high").max().alias("high"),
pl.col("low").min().alias("low"),
pl.col("close").last().alias("close"),
pl.col("volume").sum().alias("volume"),
)
intraday_rule = INTRADAY_RESAMPLE_RULES.get(timeframe)
if intraday_rule is not None:
return (
frame.group_by_dynamic("date", every=intraday_rule, closed="left", label="left")
.agg(
[
pl.col("open").first().alias("open"),
pl.col("high").max().alias("high"),
pl.col("low").min().alias("low"),
pl.col("close").last().alias("close"),
pl.col("volume").sum().alias("volume"),
]
)
.drop_nulls(subset=["open", "high", "low", "close"])
.sort("date")
)
rule = INTRADAY_RESAMPLE_RULES.get(timeframe) or DAILY_RESAMPLE_RULES.get(timeframe)
if rule is None:
raise ValueError(f"Unknown timeframe: {timeframe}")
pdf = pd.DataFrame(frame.select(["date", "open", "high", "low", "close", "volume"]).to_dict(as_series=False))
pdf["volume"] = pdf["volume"].fillna(0)
aggregated = (
pdf.set_index("date")
.resample(rule)
.agg(
{
"open": "first",
"high": "max",
"low": "min",
"close": "last",
"volume": "sum",
}
)
.dropna(subset=["open", "high", "low", "close"])
.reset_index()
)
return _normalize_ohlcv_frame(pl.DataFrame(aggregated.to_dict("list")))
def _ensure_timeframe_ready(index_name: str, timeframe: str) -> None:
file_exists = False
try:
file_exists = _resolve_data_file(index_name, timeframe).exists()
except FileNotFoundError:
file_exists = False
if file_exists:
_kick_async_refresh(index_name, timeframe)
return
try:
refresh_index_if_due(index_name, timeframe=timeframe, request_driven=True)
except Exception as exc:
try:
_resolve_data_file(index_name, timeframe)
except FileNotFoundError:
raise
LOGGER.warning("Serving cached file for %s/%s after refresh failure: %s", index_name, timeframe, exc)
def _ensure_minute_ready(index_name: str) -> Path:
minute_path = INTRADAY_DIR / index_name / "minute" / f"{index_name}_minute{DATA_FILE_EXTENSION}"
if minute_path.exists():
_kick_async_refresh(index_name, "minute")
return minute_path
try:
refresh_index_if_due(index_name, timeframe="minute", request_driven=True)
except Exception as exc:
if not minute_path.exists():
raise
LOGGER.warning("Serving cached minute file for %s after refresh failure: %s", index_name, exc)
if not minute_path.exists():
raise FileNotFoundError(f"Minute data file not found: {minute_path}")
return minute_path
def _kick_async_refresh(index_name: str, timeframe: str) -> None:
key = (index_name, timeframe)
with _ASYNC_REFRESH_LOCK:
if key in _ASYNC_REFRESH_IN_FLIGHT:
return
_ASYNC_REFRESH_IN_FLIGHT.add(key)
def _run() -> None:
try:
refresh_index_if_due(index_name, timeframe=timeframe, request_driven=True)
except Exception as exc:
LOGGER.warning("Async refresh failed for %s/%s: %s", index_name, timeframe, exc)
finally:
with _ASYNC_REFRESH_LOCK:
_ASYNC_REFRESH_IN_FLIGHT.discard(key)
Thread(target=_run, name=f"async-refresh-{index_name}-{timeframe}", daemon=True).start()
class DatasetStore:
def __init__(self, max_entries: int):
self.max_entries = max_entries
self._cache: OrderedDict[tuple[str, str], CachedDataset] = OrderedDict()
self._lock = RLock()
def _build_chart_points(self, frame: pl.DataFrame) -> dict:
required = {"date", "open", "high", "low", "close"}
if not required.issubset(set(frame.columns)):
return {"times": [], "prices": [], "opens": [], "highs": [], "lows": [], "closes": []}
frame = _ensure_datetime_column(frame)
times = frame["date"].dt.timestamp("ms") // 1000
return {
"times": times.cast(pl.Int64).to_list(),
"prices": frame["close"].to_list(),
"opens": frame["open"].to_list(),
"highs": frame["high"].to_list(),
"lows": frame["low"].to_list(),
"closes": frame["close"].to_list(),
}
def _load_dataset(self, file_path: Path, timeframe: str) -> CachedDataset:
frame = _ensure_datetime_column(_read_frame(file_path))
frame = _resample_to_timeframe(frame, timeframe)
return CachedDataset(
frame=frame,
chart_points=self._build_chart_points(frame),
file_mtime_ns=_stat_mtime_ns(file_path),
)
def get(self, index_name: str, timeframe: str) -> CachedDataset:
_ensure_timeframe_ready(index_name, timeframe)
key = (index_name, timeframe)
file_path = _resolve_data_file(index_name, timeframe)
file_mtime_ns = _stat_mtime_ns(file_path)
with self._lock:
cached = self._cache.get(key)
if cached is not None and cached.file_mtime_ns == file_mtime_ns:
self._cache.move_to_end(key)
return cached
loaded = self._load_dataset(file_path, timeframe)
with self._lock:
self._cache[key] = loaded
self._cache.move_to_end(key)
while len(self._cache) > self.max_entries:
self._cache.popitem(last=False)
return loaded
def warm(self, pairs: tuple[tuple[str, str], ...]) -> None:
for index_name, timeframe in pairs:
try:
self.get(index_name, timeframe)
except Exception:
continue
class CustomIntervalStore:
def __init__(self, max_entries: int):
self.max_entries = max_entries
self._cache: OrderedDict[tuple[str, str], CachedCustomInterval] = OrderedDict()
self._lock = RLock()
def _load(self, file_path: Path, interval: str) -> CachedCustomInterval:
frame = _ensure_datetime_column(_read_frame(file_path))
resampled = frame.group_by_dynamic("date", every=interval).agg(
[
pl.col("open").first().alias("open"),
pl.col("high").max().alias("high"),
pl.col("low").min().alias("low"),
pl.col("close").last().alias("close"),
pl.col("volume").sum().alias("volume"),
]
).drop_nulls(subset=["open", "high", "low", "close"]).with_columns(
pl.col("date").dt.to_string("%Y-%m-%d %H:%M:%S")
)
return CachedCustomInterval(frame=resampled, source_mtime_ns=_stat_mtime_ns(file_path))
def get(self, index_name: str, interval: str) -> pl.DataFrame:
file_path = _ensure_minute_ready(index_name)
file_mtime_ns = _stat_mtime_ns(file_path)
key = (index_name, interval)
with self._lock:
cached = self._cache.get(key)
if cached is not None and cached.source_mtime_ns == file_mtime_ns:
self._cache.move_to_end(key)
return cached.frame
loaded = self._load(file_path, interval)
with self._lock:
self._cache[key] = loaded
self._cache.move_to_end(key)
while len(self._cache) > self.max_entries:
self._cache.popitem(last=False)
return loaded.frame
DATASET_STORE = DatasetStore(MAX_DATASET_CACHE_ENTRIES)
CUSTOM_INTERVAL_STORE = CustomIntervalStore(MAX_CUSTOM_CACHE_ENTRIES)
def get_available_tickers() -> list[str]:
tickers = set()
for base_dir in (INTRADAY_DIR, DAILY_DIR):
if not base_dir.exists():
continue
tickers.update(path.name for path in base_dir.iterdir() if path.is_dir())
return sorted(tickers)
def get_available_granularities(index_name: str | None = None) -> list[str]:
granularities = set(INTRADAY_RULES.keys()) | set(DAILY_RULES.keys())
return _sorted_timeframes(granularities)
def get_data_catalog() -> dict:
tickers = get_available_tickers()
granularities_by_ticker = {
ticker: get_available_granularities(ticker)
for ticker in tickers
}
all_granularities = _sorted_timeframes({
granularity
for values in granularities_by_ticker.values()
for granularity in values
}) or get_available_granularities()
return {
"tickers": tickers,
"granularities": all_granularities,
"granularities_by_ticker": granularities_by_ticker,
"historical_export": {
"formats": list(HISTORICAL_EXPORT_FORMATS),
"templates": list(HISTORICAL_EXPORT_TEMPLATES),
"default_limit": HISTORICAL_EXPORT_DEFAULT_LIMIT,
"intraday_cutoff": HISTORICAL_INTRADAY_CUTOFF.strftime(EXPORT_DATE_FORMAT),
},
"defaults": {
"ticker": "NIFTY 50" if "NIFTY 50" in tickers else (tickers[0] if tickers else ""),
"granularity": "1_hour",
},
}
def _safe_filename_part(value: str) -> str:
cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", value.strip())
return cleaned.strip("._-") or "export"
def _ensure_export_columns(frame: pl.DataFrame, *, blank_volume: bool = False, source_label: str | None = None) -> pl.DataFrame:
frame = _normalize_ohlcv_frame(frame)
if blank_volume or "volume" not in frame.columns:
frame = frame.with_columns(pl.lit(None, dtype=pl.Float64).alias("volume"))
if source_label is not None or "source_timeframe" not in frame.columns:
frame = frame.with_columns(pl.lit(source_label or "unknown").alias("source_timeframe"))
existing = [column for column in EXPORT_COLUMNS if column in frame.columns]
return frame.select(existing)
def _blank_intraday_volume(frame: pl.DataFrame) -> pl.DataFrame:
if "volume" not in frame.columns:
return frame.with_columns(pl.lit(None, dtype=pl.Float64).alias("volume"))
return frame.with_columns(pl.lit(None, dtype=pl.Float64).alias("volume"))
def _apply_export_template(
frame: pl.DataFrame,
*,
template: str,
row_limit: int | None = None,
start_date: str | None = None,
end_date: str | None = None,
) -> pl.DataFrame:
template = (template or "full_history").strip().lower()
frame = _ensure_export_columns(frame)
if frame.is_empty():
return frame
if template == "full_history":
return frame
if template == "last_n_rows":
limit = max(int(row_limit or HISTORICAL_EXPORT_DEFAULT_LIMIT), 1)
return frame.tail(limit)
if template == "custom_range":
if start_date:
start_dt = pd.Timestamp(start_date)
frame = frame.filter(pl.col("date") >= start_dt.to_pydatetime())
if end_date:
end_dt = pd.Timestamp(end_date) + pd.Timedelta(days=1)
frame = frame.filter(pl.col("date") < end_dt.to_pydatetime())
return frame
raise ValueError(f"Unknown export template: {template}")
def _load_historical_source(index_name: str, timeframe: str) -> pl.DataFrame:
return DATASET_STORE.get(index_name, timeframe).frame
def _tag_export_source(frame: pl.DataFrame, source_label: str) -> pl.DataFrame:
return frame.with_columns(pl.lit(source_label).alias("source_timeframe"))
def _prepare_historical_export_frame(
index_name: str,
timeframe: str,
*,
template: str = "full_history",
row_limit: int | None = None,
start_date: str | None = None,
end_date: str | None = None,
) -> pl.DataFrame:
if timeframe in INTRADAY_RULES:
daily_frame = _ensure_export_columns(
_tag_export_source(DATASET_STORE.get(index_name, "1_day").frame, "daily"),
source_label="daily",
)
intraday_frame = _ensure_export_columns(
_tag_export_source(DATASET_STORE.get(index_name, timeframe).frame, "intraday"),
blank_volume=True,
source_label="intraday",
)
cutoff_dt = HISTORICAL_INTRADAY_CUTOFF.to_pydatetime()
frame = pl.concat(
[
daily_frame.filter(pl.col("date") < cutoff_dt),
intraday_frame.filter(pl.col("date") >= cutoff_dt),
],
how="diagonal_relaxed",
).sort("date")
else:
frame = _ensure_export_columns(
_tag_export_source(_load_historical_source(index_name, timeframe), "daily"),
source_label="daily",
)
frame = _apply_export_template(
frame,
template=template,
row_limit=row_limit,
start_date=start_date,
end_date=end_date,
)
if frame.is_empty():
raise ValueError("No rows available for the selected export filters")
return _ensure_export_columns(frame, blank_volume=False).sort("date")
def _write_historical_export_file(frame: pl.DataFrame, *, file_format: str, filename_stem: str) -> Path:
export_dir = Path(tempfile.mkdtemp(prefix="historical_export_"))
file_format = file_format.lower().strip()
file_path = export_dir / f"{filename_stem}.{file_format}"
if file_format == "csv":
frame.write_csv(file_path)
elif file_format == "parquet":
frame.write_parquet(file_path)
elif file_format == "json":
try:
frame.write_json(file_path, row_oriented=True)
except TypeError:
with file_path.open("w", encoding="utf-8") as handle:
json.dump(frame.to_dicts(), handle, ensure_ascii=False, default=str, indent=2)
else:
raise ValueError(f"Unsupported export format: {file_format}")
return file_path
def build_historical_export(
index_name: str,
timeframe: str,
*,
template: str = "full_history",
row_limit: int | None = None,
start_date: str | None = None,
end_date: str | None = None,
file_format: str = "csv",
) -> tuple[str, dict]:
frame = _prepare_historical_export_frame(
index_name,
timeframe,
template=template,
row_limit=row_limit,
start_date=start_date,
end_date=end_date,
)
export_label = "historical"
filename_stem = "_".join(
part for part in (
_safe_filename_part(index_name),
_safe_filename_part(timeframe),
_safe_filename_part(template),
export_label,
datetime.utcnow().strftime("%Y%m%d_%H%M%S"),
)
if part
)
file_path = _write_historical_export_file(frame, file_format=file_format, filename_stem=filename_stem)
metadata = {
"ticker": index_name,
"timeframe": timeframe,
"template": template,
"format": file_format.lower().strip(),
"rows": frame.height,
"columns": frame.columns,
"filename": file_path.name,
"file_path": str(file_path),
"date_min": frame["date"].min().strftime(EXPORT_DATETIME_FORMAT) if frame.height else None,
"date_max": frame["date"].max().strftime(EXPORT_DATETIME_FORMAT) if frame.height else None,
}
return str(file_path), metadata
def _slice_bounds(total_rows: int, limit: int | None = None, offset: int = 0) -> tuple[int, int]:
if limit is None:
return 0, total_rows
offset = max(int(offset or 0), 0)
limit = max(int(limit), 1)
end = max(total_rows - offset, 0)
start = max(end - limit, 0)
return start, max(end - start, 0)
def get_precalculated_data(index_name: str, timeframe: str, limit: int | None = None, offset: int = 0) -> list[dict]:
cached = DATASET_STORE.get(index_name, timeframe)
start, length = _slice_bounds(cached.frame.height, limit=limit, offset=offset)
if length <= 0:
return []
return cached.frame.slice(start, length).to_dicts()
def get_precalculated_chart_slice(index_name: str, timeframe: str, limit: int = 500, offset: int = 0) -> dict:
cached = DATASET_STORE.get(index_name, timeframe)
total_rows = len(cached.chart_points["times"])
start, length = _slice_bounds(total_rows, limit=limit, offset=offset)
if length > 0:
times = cached.chart_points["times"][start:start + length]
prices = cached.chart_points["prices"][start:start + length]
opens = cached.chart_points["opens"][start:start + length]
highs = cached.chart_points["highs"][start:start + length]
lows = cached.chart_points["lows"][start:start + length]
closes = cached.chart_points["closes"][start:start + length]
else:
times = []
prices = []
opens = []
highs = []
lows = []
closes = []
return {
"points": {
"times": times,
"prices": prices,
"opens": opens,
"highs": highs,
"lows": lows,
"closes": closes,
},
"next_offset": offset + length,
"has_more": (offset + length) < total_rows,
"total_rows": total_rows,
"granularity": timeframe,
"ticker": index_name,
}
def get_custom_intraday_data(index_name: str, interval: str, limit: int | None = None) -> list[dict]:
frame = CUSTOM_INTERVAL_STORE.get(index_name, interval)
if limit is not None and frame.height > limit:
return frame.tail(int(limit)).to_dicts()
return frame.to_dicts()
def warm_common_datasets() -> None:
DATASET_STORE.warm(WARM_DATASETS)
def start_runtime_refresh() -> None:
start_background_refresh_scheduler()
start_forecaster_refresh_scheduler()