from collections import OrderedDict from dataclasses import dataclass import json import logging import re import tempfile from pathlib import Path from datetime import datetime from threading import Lock, RLock, Thread import pandas as pd import polars as pl from Forecaster import start_forecaster_refresh_scheduler from market_refresh import refresh_index_if_due, start_background_refresh_scheduler def _find_nearest_dir(start: Path, target_name: str) -> Path | None: for candidate in [start, *start.parents]: target = candidate / target_name if target.exists(): return target return None MODULE_DIR = Path(__file__).resolve().parent BASE_DIR = _find_nearest_dir(MODULE_DIR, "MAIN DATA SOURCE") or (MODULE_DIR / "MAIN DATA SOURCE") INTRADAY_DIR = BASE_DIR / "intraday" DAILY_DIR = BASE_DIR / "daily" LOGGER = logging.getLogger(__name__) DATA_FILE_EXTENSION = ".parquet" LEGACY_DATA_FILE_EXTENSION = ".csv" _ASYNC_REFRESH_LOCK = Lock() _ASYNC_REFRESH_IN_FLIGHT: set[tuple[str, str]] = set() INTRADAY_RULES = { "minute": "minute", "5m": "5m", "15m": "15m", "30m": "30m", "1_hour": "1_hour", "4_hour": "4_hour", } DAILY_RULES = { "1_day": "1_day", "1_week": "1_week", "1_month": "1_month", "6_months": "6_months", "1_year": "1_year", "5_year": "5_year", "MAX": "MAX", } INTRADAY_RESAMPLE_RULES = { "5m": "5m", "15m": "15m", "30m": "30m", "1_hour": "1h", "4_hour": "4h", } DAILY_RESAMPLE_RULES = { "1_week": "W-FRI", "1_month": "MS", "6_months": "6MS", "1_year": "YS", "5_year": "5YS", } TIMEFRAME_ORDER = { "minute": 0, "5m": 1, "15m": 2, "30m": 3, "1_hour": 4, "4_hour": 5, "1_day": 6, "1_week": 7, "1_month": 8, "6_months": 9, "1_year": 10, "5_year": 11, "MAX": 12, } MAX_DATASET_CACHE_ENTRIES = 24 MAX_CUSTOM_CACHE_ENTRIES = 12 HISTORICAL_INTRADAY_CUTOFF = pd.Timestamp("2015-01-09") HISTORICAL_EXPORT_FORMATS = ("csv", "parquet", "json") HISTORICAL_EXPORT_TEMPLATES = ("full_history", "last_n_rows", "custom_range") HISTORICAL_EXPORT_DEFAULT_LIMIT = 1000 EXPORT_DATE_FORMAT = "%Y-%m-%d" EXPORT_DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" EXPORT_COLUMNS = ("date", "open", "high", "low", "close", "volume", "source_timeframe") WARM_DATASETS = ( ("NIFTY 50", "5m"), ("NIFTY 50", "1_hour"), ("NIFTY 50", "1_day"), ) @dataclass class CachedDataset: frame: pl.DataFrame chart_points: dict file_mtime_ns: int @dataclass class CachedCustomInterval: frame: pl.DataFrame source_mtime_ns: int def _sorted_timeframes(values: set[str]) -> list[str]: return sorted(values, key=lambda value: (TIMEFRAME_ORDER.get(value, 999), value)) def _timeframe_base_dir(timeframe: str) -> Path: if timeframe == "minute" or timeframe in INTRADAY_RESAMPLE_RULES: return INTRADAY_DIR if timeframe == "1_day" or timeframe in DAILY_RESAMPLE_RULES or timeframe == "MAX": return DAILY_DIR raise ValueError(f"Unknown timeframe: {timeframe}") def _resolve_data_file(index_name: str, timeframe: str) -> Path: base_path = _timeframe_base_dir(timeframe) source_timeframe = "minute" if base_path == INTRADAY_DIR else "1_day" parquet_path = base_path / index_name / source_timeframe / f"{index_name}_{source_timeframe}{DATA_FILE_EXTENSION}" if parquet_path.exists(): return parquet_path legacy_path = parquet_path.with_suffix(LEGACY_DATA_FILE_EXTENSION) if legacy_path.exists(): return legacy_path raise FileNotFoundError(f"Data file not found: {parquet_path}") def _stat_mtime_ns(path: Path) -> int: return path.stat().st_mtime_ns def _read_frame(file_path: Path) -> pl.DataFrame: if file_path.suffix.lower() == DATA_FILE_EXTENSION: return pl.read_parquet(file_path) return pl.read_csv(file_path) def _source_timeframe(timeframe: str) -> str: if timeframe == "minute" or timeframe in INTRADAY_RESAMPLE_RULES: return "minute" if timeframe == "1_day" or timeframe in DAILY_RESAMPLE_RULES or timeframe == "MAX": return "1_day" raise ValueError(f"Unknown timeframe: {timeframe}") def _ensure_datetime_column(frame: pl.DataFrame) -> pl.DataFrame: if "date" not in frame.columns: for alias in ("datetime", "trade_date", "eod_timestamp", "timestamp", "date_time"): if alias in frame.columns: frame = frame.rename({alias: "date"}) break if "date" not in frame.columns: return frame if frame["date"].dtype == pl.Utf8: parsed = pl.coalesce([ pl.col("date").str.strptime(pl.Datetime, "%Y-%m-%d %H:%M:%S", strict=False), pl.col("date").str.strptime(pl.Datetime, "%Y-%m-%d %H:%M", strict=False), pl.col("date").str.strptime(pl.Datetime, "%Y-%m-%d", strict=False), pl.col("date").str.strptime(pl.Datetime, "%Y-%m-%dT%H:%M:%S", strict=False), pl.col("date").str.strptime(pl.Datetime, "%Y-%m-%dT%H:%M:%S%.f", strict=False), ]) return frame.with_columns( parsed.alias("date") ) return frame.with_columns(pl.col("date").cast(pl.Datetime, strict=False)) def _normalize_ohlcv_frame(frame: pl.DataFrame) -> pl.DataFrame: rename_map = { column: str(column).strip().lower().replace(" ", "_") for column in frame.columns } if rename_map: frame = frame.rename(rename_map) frame = _ensure_datetime_column(frame).drop_nulls(subset=["date"]) numeric_columns = [column for column in ("open", "high", "low", "close", "volume") if column in frame.columns] if numeric_columns: frame = frame.with_columns([pl.col(column).cast(pl.Float64, strict=False) for column in numeric_columns]) return frame.sort("date") if "date" in frame.columns else frame def _resample_to_timeframe(frame: pl.DataFrame, timeframe: str) -> pl.DataFrame: if frame.is_empty(): return frame if timeframe == "minute" or timeframe == "1_day": return _normalize_ohlcv_frame(frame) frame = _normalize_ohlcv_frame(frame) if timeframe == "MAX": return frame.select( pl.col("date").min().alias("date"), pl.col("open").first().alias("open"), pl.col("high").max().alias("high"), pl.col("low").min().alias("low"), pl.col("close").last().alias("close"), pl.col("volume").sum().alias("volume"), ) intraday_rule = INTRADAY_RESAMPLE_RULES.get(timeframe) if intraday_rule is not None: return ( frame.group_by_dynamic("date", every=intraday_rule, closed="left", label="left") .agg( [ pl.col("open").first().alias("open"), pl.col("high").max().alias("high"), pl.col("low").min().alias("low"), pl.col("close").last().alias("close"), pl.col("volume").sum().alias("volume"), ] ) .drop_nulls(subset=["open", "high", "low", "close"]) .sort("date") ) rule = INTRADAY_RESAMPLE_RULES.get(timeframe) or DAILY_RESAMPLE_RULES.get(timeframe) if rule is None: raise ValueError(f"Unknown timeframe: {timeframe}") pdf = pd.DataFrame(frame.select(["date", "open", "high", "low", "close", "volume"]).to_dict(as_series=False)) pdf["volume"] = pdf["volume"].fillna(0) aggregated = ( pdf.set_index("date") .resample(rule) .agg( { "open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum", } ) .dropna(subset=["open", "high", "low", "close"]) .reset_index() ) return _normalize_ohlcv_frame(pl.DataFrame(aggregated.to_dict("list"))) def _ensure_timeframe_ready(index_name: str, timeframe: str) -> None: file_exists = False try: file_exists = _resolve_data_file(index_name, timeframe).exists() except FileNotFoundError: file_exists = False if file_exists: _kick_async_refresh(index_name, timeframe) return try: refresh_index_if_due(index_name, timeframe=timeframe, request_driven=True) except Exception as exc: try: _resolve_data_file(index_name, timeframe) except FileNotFoundError: raise LOGGER.warning("Serving cached file for %s/%s after refresh failure: %s", index_name, timeframe, exc) def _ensure_minute_ready(index_name: str) -> Path: minute_path = INTRADAY_DIR / index_name / "minute" / f"{index_name}_minute{DATA_FILE_EXTENSION}" if minute_path.exists(): _kick_async_refresh(index_name, "minute") return minute_path try: refresh_index_if_due(index_name, timeframe="minute", request_driven=True) except Exception as exc: if not minute_path.exists(): raise LOGGER.warning("Serving cached minute file for %s after refresh failure: %s", index_name, exc) if not minute_path.exists(): raise FileNotFoundError(f"Minute data file not found: {minute_path}") return minute_path def _kick_async_refresh(index_name: str, timeframe: str) -> None: key = (index_name, timeframe) with _ASYNC_REFRESH_LOCK: if key in _ASYNC_REFRESH_IN_FLIGHT: return _ASYNC_REFRESH_IN_FLIGHT.add(key) def _run() -> None: try: refresh_index_if_due(index_name, timeframe=timeframe, request_driven=True) except Exception as exc: LOGGER.warning("Async refresh failed for %s/%s: %s", index_name, timeframe, exc) finally: with _ASYNC_REFRESH_LOCK: _ASYNC_REFRESH_IN_FLIGHT.discard(key) Thread(target=_run, name=f"async-refresh-{index_name}-{timeframe}", daemon=True).start() class DatasetStore: def __init__(self, max_entries: int): self.max_entries = max_entries self._cache: OrderedDict[tuple[str, str], CachedDataset] = OrderedDict() self._lock = RLock() def _build_chart_points(self, frame: pl.DataFrame) -> dict: required = {"date", "open", "high", "low", "close"} if not required.issubset(set(frame.columns)): return {"times": [], "prices": [], "opens": [], "highs": [], "lows": [], "closes": []} frame = _ensure_datetime_column(frame) times = frame["date"].dt.timestamp("ms") // 1000 return { "times": times.cast(pl.Int64).to_list(), "prices": frame["close"].to_list(), "opens": frame["open"].to_list(), "highs": frame["high"].to_list(), "lows": frame["low"].to_list(), "closes": frame["close"].to_list(), } def _load_dataset(self, file_path: Path, timeframe: str) -> CachedDataset: frame = _ensure_datetime_column(_read_frame(file_path)) frame = _resample_to_timeframe(frame, timeframe) return CachedDataset( frame=frame, chart_points=self._build_chart_points(frame), file_mtime_ns=_stat_mtime_ns(file_path), ) def get(self, index_name: str, timeframe: str) -> CachedDataset: _ensure_timeframe_ready(index_name, timeframe) key = (index_name, timeframe) file_path = _resolve_data_file(index_name, timeframe) file_mtime_ns = _stat_mtime_ns(file_path) with self._lock: cached = self._cache.get(key) if cached is not None and cached.file_mtime_ns == file_mtime_ns: self._cache.move_to_end(key) return cached loaded = self._load_dataset(file_path, timeframe) with self._lock: self._cache[key] = loaded self._cache.move_to_end(key) while len(self._cache) > self.max_entries: self._cache.popitem(last=False) return loaded def warm(self, pairs: tuple[tuple[str, str], ...]) -> None: for index_name, timeframe in pairs: try: self.get(index_name, timeframe) except Exception: continue class CustomIntervalStore: def __init__(self, max_entries: int): self.max_entries = max_entries self._cache: OrderedDict[tuple[str, str], CachedCustomInterval] = OrderedDict() self._lock = RLock() def _load(self, file_path: Path, interval: str) -> CachedCustomInterval: frame = _ensure_datetime_column(_read_frame(file_path)) resampled = frame.group_by_dynamic("date", every=interval).agg( [ pl.col("open").first().alias("open"), pl.col("high").max().alias("high"), pl.col("low").min().alias("low"), pl.col("close").last().alias("close"), pl.col("volume").sum().alias("volume"), ] ).drop_nulls(subset=["open", "high", "low", "close"]).with_columns( pl.col("date").dt.to_string("%Y-%m-%d %H:%M:%S") ) return CachedCustomInterval(frame=resampled, source_mtime_ns=_stat_mtime_ns(file_path)) def get(self, index_name: str, interval: str) -> pl.DataFrame: file_path = _ensure_minute_ready(index_name) file_mtime_ns = _stat_mtime_ns(file_path) key = (index_name, interval) with self._lock: cached = self._cache.get(key) if cached is not None and cached.source_mtime_ns == file_mtime_ns: self._cache.move_to_end(key) return cached.frame loaded = self._load(file_path, interval) with self._lock: self._cache[key] = loaded self._cache.move_to_end(key) while len(self._cache) > self.max_entries: self._cache.popitem(last=False) return loaded.frame DATASET_STORE = DatasetStore(MAX_DATASET_CACHE_ENTRIES) CUSTOM_INTERVAL_STORE = CustomIntervalStore(MAX_CUSTOM_CACHE_ENTRIES) def get_available_tickers() -> list[str]: tickers = set() for base_dir in (INTRADAY_DIR, DAILY_DIR): if not base_dir.exists(): continue tickers.update(path.name for path in base_dir.iterdir() if path.is_dir()) return sorted(tickers) def get_available_granularities(index_name: str | None = None) -> list[str]: granularities = set(INTRADAY_RULES.keys()) | set(DAILY_RULES.keys()) return _sorted_timeframes(granularities) def get_data_catalog() -> dict: tickers = get_available_tickers() granularities_by_ticker = { ticker: get_available_granularities(ticker) for ticker in tickers } all_granularities = _sorted_timeframes({ granularity for values in granularities_by_ticker.values() for granularity in values }) or get_available_granularities() return { "tickers": tickers, "granularities": all_granularities, "granularities_by_ticker": granularities_by_ticker, "historical_export": { "formats": list(HISTORICAL_EXPORT_FORMATS), "templates": list(HISTORICAL_EXPORT_TEMPLATES), "default_limit": HISTORICAL_EXPORT_DEFAULT_LIMIT, "intraday_cutoff": HISTORICAL_INTRADAY_CUTOFF.strftime(EXPORT_DATE_FORMAT), }, "defaults": { "ticker": "NIFTY 50" if "NIFTY 50" in tickers else (tickers[0] if tickers else ""), "granularity": "1_hour", }, } def _safe_filename_part(value: str) -> str: cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", value.strip()) return cleaned.strip("._-") or "export" def _ensure_export_columns(frame: pl.DataFrame, *, blank_volume: bool = False, source_label: str | None = None) -> pl.DataFrame: frame = _normalize_ohlcv_frame(frame) if blank_volume or "volume" not in frame.columns: frame = frame.with_columns(pl.lit(None, dtype=pl.Float64).alias("volume")) if source_label is not None or "source_timeframe" not in frame.columns: frame = frame.with_columns(pl.lit(source_label or "unknown").alias("source_timeframe")) existing = [column for column in EXPORT_COLUMNS if column in frame.columns] return frame.select(existing) def _blank_intraday_volume(frame: pl.DataFrame) -> pl.DataFrame: if "volume" not in frame.columns: return frame.with_columns(pl.lit(None, dtype=pl.Float64).alias("volume")) return frame.with_columns(pl.lit(None, dtype=pl.Float64).alias("volume")) def _apply_export_template( frame: pl.DataFrame, *, template: str, row_limit: int | None = None, start_date: str | None = None, end_date: str | None = None, ) -> pl.DataFrame: template = (template or "full_history").strip().lower() frame = _ensure_export_columns(frame) if frame.is_empty(): return frame if template == "full_history": return frame if template == "last_n_rows": limit = max(int(row_limit or HISTORICAL_EXPORT_DEFAULT_LIMIT), 1) return frame.tail(limit) if template == "custom_range": if start_date: start_dt = pd.Timestamp(start_date) frame = frame.filter(pl.col("date") >= start_dt.to_pydatetime()) if end_date: end_dt = pd.Timestamp(end_date) + pd.Timedelta(days=1) frame = frame.filter(pl.col("date") < end_dt.to_pydatetime()) return frame raise ValueError(f"Unknown export template: {template}") def _load_historical_source(index_name: str, timeframe: str) -> pl.DataFrame: return DATASET_STORE.get(index_name, timeframe).frame def _tag_export_source(frame: pl.DataFrame, source_label: str) -> pl.DataFrame: return frame.with_columns(pl.lit(source_label).alias("source_timeframe")) def _prepare_historical_export_frame( index_name: str, timeframe: str, *, template: str = "full_history", row_limit: int | None = None, start_date: str | None = None, end_date: str | None = None, ) -> pl.DataFrame: if timeframe in INTRADAY_RULES: daily_frame = _ensure_export_columns( _tag_export_source(DATASET_STORE.get(index_name, "1_day").frame, "daily"), source_label="daily", ) intraday_frame = _ensure_export_columns( _tag_export_source(DATASET_STORE.get(index_name, timeframe).frame, "intraday"), blank_volume=True, source_label="intraday", ) cutoff_dt = HISTORICAL_INTRADAY_CUTOFF.to_pydatetime() frame = pl.concat( [ daily_frame.filter(pl.col("date") < cutoff_dt), intraday_frame.filter(pl.col("date") >= cutoff_dt), ], how="diagonal_relaxed", ).sort("date") else: frame = _ensure_export_columns( _tag_export_source(_load_historical_source(index_name, timeframe), "daily"), source_label="daily", ) frame = _apply_export_template( frame, template=template, row_limit=row_limit, start_date=start_date, end_date=end_date, ) if frame.is_empty(): raise ValueError("No rows available for the selected export filters") return _ensure_export_columns(frame, blank_volume=False).sort("date") def _write_historical_export_file(frame: pl.DataFrame, *, file_format: str, filename_stem: str) -> Path: export_dir = Path(tempfile.mkdtemp(prefix="historical_export_")) file_format = file_format.lower().strip() file_path = export_dir / f"{filename_stem}.{file_format}" if file_format == "csv": frame.write_csv(file_path) elif file_format == "parquet": frame.write_parquet(file_path) elif file_format == "json": try: frame.write_json(file_path, row_oriented=True) except TypeError: with file_path.open("w", encoding="utf-8") as handle: json.dump(frame.to_dicts(), handle, ensure_ascii=False, default=str, indent=2) else: raise ValueError(f"Unsupported export format: {file_format}") return file_path def build_historical_export( index_name: str, timeframe: str, *, template: str = "full_history", row_limit: int | None = None, start_date: str | None = None, end_date: str | None = None, file_format: str = "csv", ) -> tuple[str, dict]: frame = _prepare_historical_export_frame( index_name, timeframe, template=template, row_limit=row_limit, start_date=start_date, end_date=end_date, ) export_label = "historical" filename_stem = "_".join( part for part in ( _safe_filename_part(index_name), _safe_filename_part(timeframe), _safe_filename_part(template), export_label, datetime.utcnow().strftime("%Y%m%d_%H%M%S"), ) if part ) file_path = _write_historical_export_file(frame, file_format=file_format, filename_stem=filename_stem) metadata = { "ticker": index_name, "timeframe": timeframe, "template": template, "format": file_format.lower().strip(), "rows": frame.height, "columns": frame.columns, "filename": file_path.name, "file_path": str(file_path), "date_min": frame["date"].min().strftime(EXPORT_DATETIME_FORMAT) if frame.height else None, "date_max": frame["date"].max().strftime(EXPORT_DATETIME_FORMAT) if frame.height else None, } return str(file_path), metadata def _slice_bounds(total_rows: int, limit: int | None = None, offset: int = 0) -> tuple[int, int]: if limit is None: return 0, total_rows offset = max(int(offset or 0), 0) limit = max(int(limit), 1) end = max(total_rows - offset, 0) start = max(end - limit, 0) return start, max(end - start, 0) def get_precalculated_data(index_name: str, timeframe: str, limit: int | None = None, offset: int = 0) -> list[dict]: cached = DATASET_STORE.get(index_name, timeframe) start, length = _slice_bounds(cached.frame.height, limit=limit, offset=offset) if length <= 0: return [] return cached.frame.slice(start, length).to_dicts() def get_precalculated_chart_slice(index_name: str, timeframe: str, limit: int = 500, offset: int = 0) -> dict: cached = DATASET_STORE.get(index_name, timeframe) total_rows = len(cached.chart_points["times"]) start, length = _slice_bounds(total_rows, limit=limit, offset=offset) if length > 0: times = cached.chart_points["times"][start:start + length] prices = cached.chart_points["prices"][start:start + length] opens = cached.chart_points["opens"][start:start + length] highs = cached.chart_points["highs"][start:start + length] lows = cached.chart_points["lows"][start:start + length] closes = cached.chart_points["closes"][start:start + length] else: times = [] prices = [] opens = [] highs = [] lows = [] closes = [] return { "points": { "times": times, "prices": prices, "opens": opens, "highs": highs, "lows": lows, "closes": closes, }, "next_offset": offset + length, "has_more": (offset + length) < total_rows, "total_rows": total_rows, "granularity": timeframe, "ticker": index_name, } def get_custom_intraday_data(index_name: str, interval: str, limit: int | None = None) -> list[dict]: frame = CUSTOM_INTERVAL_STORE.get(index_name, interval) if limit is not None and frame.height > limit: return frame.tail(int(limit)).to_dicts() return frame.to_dicts() def warm_common_datasets() -> None: DATASET_STORE.warm(WARM_DATASETS) def start_runtime_refresh() -> None: start_background_refresh_scheduler() start_forecaster_refresh_scheduler()