| from collections import OrderedDict |
| from dataclasses import dataclass |
| import json |
| import logging |
| import re |
| import tempfile |
| from pathlib import Path |
| from datetime import datetime |
| from threading import Lock, RLock, Thread |
|
|
| import pandas as pd |
| import polars as pl |
|
|
| from Forecaster import start_forecaster_refresh_scheduler |
| from market_refresh import refresh_index_if_due, start_background_refresh_scheduler |
|
|
| def _find_nearest_dir(start: Path, target_name: str) -> Path | None: |
| for candidate in [start, *start.parents]: |
| target = candidate / target_name |
| if target.exists(): |
| return target |
| return None |
|
|
|
|
| MODULE_DIR = Path(__file__).resolve().parent |
| BASE_DIR = _find_nearest_dir(MODULE_DIR, "MAIN DATA SOURCE") or (MODULE_DIR / "MAIN DATA SOURCE") |
| INTRADAY_DIR = BASE_DIR / "intraday" |
| DAILY_DIR = BASE_DIR / "daily" |
| LOGGER = logging.getLogger(__name__) |
| DATA_FILE_EXTENSION = ".parquet" |
| LEGACY_DATA_FILE_EXTENSION = ".csv" |
| _ASYNC_REFRESH_LOCK = Lock() |
| _ASYNC_REFRESH_IN_FLIGHT: set[tuple[str, str]] = set() |
|
|
| INTRADAY_RULES = { |
| "minute": "minute", |
| "5m": "5m", |
| "15m": "15m", |
| "30m": "30m", |
| "1_hour": "1_hour", |
| "4_hour": "4_hour", |
| } |
|
|
| DAILY_RULES = { |
| "1_day": "1_day", |
| "1_week": "1_week", |
| "1_month": "1_month", |
| "6_months": "6_months", |
| "1_year": "1_year", |
| "5_year": "5_year", |
| "MAX": "MAX", |
| } |
|
|
| INTRADAY_RESAMPLE_RULES = { |
| "5m": "5m", |
| "15m": "15m", |
| "30m": "30m", |
| "1_hour": "1h", |
| "4_hour": "4h", |
| } |
|
|
| DAILY_RESAMPLE_RULES = { |
| "1_week": "W-FRI", |
| "1_month": "MS", |
| "6_months": "6MS", |
| "1_year": "YS", |
| "5_year": "5YS", |
| } |
|
|
| TIMEFRAME_ORDER = { |
| "minute": 0, |
| "5m": 1, |
| "15m": 2, |
| "30m": 3, |
| "1_hour": 4, |
| "4_hour": 5, |
| "1_day": 6, |
| "1_week": 7, |
| "1_month": 8, |
| "6_months": 9, |
| "1_year": 10, |
| "5_year": 11, |
| "MAX": 12, |
| } |
|
|
| MAX_DATASET_CACHE_ENTRIES = 24 |
| MAX_CUSTOM_CACHE_ENTRIES = 12 |
| HISTORICAL_INTRADAY_CUTOFF = pd.Timestamp("2015-01-09") |
| HISTORICAL_EXPORT_FORMATS = ("csv", "parquet", "json") |
| HISTORICAL_EXPORT_TEMPLATES = ("full_history", "last_n_rows", "custom_range") |
| HISTORICAL_EXPORT_DEFAULT_LIMIT = 1000 |
| EXPORT_DATE_FORMAT = "%Y-%m-%d" |
| EXPORT_DATETIME_FORMAT = "%Y-%m-%d %H:%M:%S" |
| EXPORT_COLUMNS = ("date", "open", "high", "low", "close", "volume", "source_timeframe") |
| WARM_DATASETS = ( |
| ("NIFTY 50", "5m"), |
| ("NIFTY 50", "1_hour"), |
| ("NIFTY 50", "1_day"), |
| ) |
|
|
|
|
| @dataclass |
| class CachedDataset: |
| frame: pl.DataFrame |
| chart_points: dict |
| file_mtime_ns: int |
|
|
|
|
| @dataclass |
| class CachedCustomInterval: |
| frame: pl.DataFrame |
| source_mtime_ns: int |
|
|
|
|
| def _sorted_timeframes(values: set[str]) -> list[str]: |
| return sorted(values, key=lambda value: (TIMEFRAME_ORDER.get(value, 999), value)) |
|
|
|
|
| def _timeframe_base_dir(timeframe: str) -> Path: |
| if timeframe == "minute" or timeframe in INTRADAY_RESAMPLE_RULES: |
| return INTRADAY_DIR |
| if timeframe == "1_day" or timeframe in DAILY_RESAMPLE_RULES or timeframe == "MAX": |
| return DAILY_DIR |
| raise ValueError(f"Unknown timeframe: {timeframe}") |
|
|
|
|
| def _resolve_data_file(index_name: str, timeframe: str) -> Path: |
| base_path = _timeframe_base_dir(timeframe) |
| source_timeframe = "minute" if base_path == INTRADAY_DIR else "1_day" |
| parquet_path = base_path / index_name / source_timeframe / f"{index_name}_{source_timeframe}{DATA_FILE_EXTENSION}" |
| if parquet_path.exists(): |
| return parquet_path |
|
|
| legacy_path = parquet_path.with_suffix(LEGACY_DATA_FILE_EXTENSION) |
| if legacy_path.exists(): |
| return legacy_path |
|
|
| raise FileNotFoundError(f"Data file not found: {parquet_path}") |
|
|
|
|
| def _stat_mtime_ns(path: Path) -> int: |
| return path.stat().st_mtime_ns |
|
|
|
|
| def _read_frame(file_path: Path) -> pl.DataFrame: |
| if file_path.suffix.lower() == DATA_FILE_EXTENSION: |
| return pl.read_parquet(file_path) |
| return pl.read_csv(file_path) |
|
|
|
|
| def _source_timeframe(timeframe: str) -> str: |
| if timeframe == "minute" or timeframe in INTRADAY_RESAMPLE_RULES: |
| return "minute" |
| if timeframe == "1_day" or timeframe in DAILY_RESAMPLE_RULES or timeframe == "MAX": |
| return "1_day" |
| raise ValueError(f"Unknown timeframe: {timeframe}") |
|
|
|
|
| def _ensure_datetime_column(frame: pl.DataFrame) -> pl.DataFrame: |
| if "date" not in frame.columns: |
| for alias in ("datetime", "trade_date", "eod_timestamp", "timestamp", "date_time"): |
| if alias in frame.columns: |
| frame = frame.rename({alias: "date"}) |
| break |
| if "date" not in frame.columns: |
| return frame |
| if frame["date"].dtype == pl.Utf8: |
| parsed = pl.coalesce([ |
| pl.col("date").str.strptime(pl.Datetime, "%Y-%m-%d %H:%M:%S", strict=False), |
| pl.col("date").str.strptime(pl.Datetime, "%Y-%m-%d %H:%M", strict=False), |
| pl.col("date").str.strptime(pl.Datetime, "%Y-%m-%d", strict=False), |
| pl.col("date").str.strptime(pl.Datetime, "%Y-%m-%dT%H:%M:%S", strict=False), |
| pl.col("date").str.strptime(pl.Datetime, "%Y-%m-%dT%H:%M:%S%.f", strict=False), |
| ]) |
| return frame.with_columns( |
| parsed.alias("date") |
| ) |
| return frame.with_columns(pl.col("date").cast(pl.Datetime, strict=False)) |
|
|
|
|
| def _normalize_ohlcv_frame(frame: pl.DataFrame) -> pl.DataFrame: |
| rename_map = { |
| column: str(column).strip().lower().replace(" ", "_") |
| for column in frame.columns |
| } |
| if rename_map: |
| frame = frame.rename(rename_map) |
| frame = _ensure_datetime_column(frame).drop_nulls(subset=["date"]) |
| numeric_columns = [column for column in ("open", "high", "low", "close", "volume") if column in frame.columns] |
| if numeric_columns: |
| frame = frame.with_columns([pl.col(column).cast(pl.Float64, strict=False) for column in numeric_columns]) |
| return frame.sort("date") if "date" in frame.columns else frame |
|
|
|
|
| def _resample_to_timeframe(frame: pl.DataFrame, timeframe: str) -> pl.DataFrame: |
| if frame.is_empty(): |
| return frame |
|
|
| if timeframe == "minute" or timeframe == "1_day": |
| return _normalize_ohlcv_frame(frame) |
|
|
| frame = _normalize_ohlcv_frame(frame) |
|
|
| if timeframe == "MAX": |
| return frame.select( |
| pl.col("date").min().alias("date"), |
| pl.col("open").first().alias("open"), |
| pl.col("high").max().alias("high"), |
| pl.col("low").min().alias("low"), |
| pl.col("close").last().alias("close"), |
| pl.col("volume").sum().alias("volume"), |
| ) |
|
|
| intraday_rule = INTRADAY_RESAMPLE_RULES.get(timeframe) |
| if intraday_rule is not None: |
| return ( |
| frame.group_by_dynamic("date", every=intraday_rule, closed="left", label="left") |
| .agg( |
| [ |
| pl.col("open").first().alias("open"), |
| pl.col("high").max().alias("high"), |
| pl.col("low").min().alias("low"), |
| pl.col("close").last().alias("close"), |
| pl.col("volume").sum().alias("volume"), |
| ] |
| ) |
| .drop_nulls(subset=["open", "high", "low", "close"]) |
| .sort("date") |
| ) |
|
|
| rule = INTRADAY_RESAMPLE_RULES.get(timeframe) or DAILY_RESAMPLE_RULES.get(timeframe) |
| if rule is None: |
| raise ValueError(f"Unknown timeframe: {timeframe}") |
|
|
| pdf = pd.DataFrame(frame.select(["date", "open", "high", "low", "close", "volume"]).to_dict(as_series=False)) |
| pdf["volume"] = pdf["volume"].fillna(0) |
| aggregated = ( |
| pdf.set_index("date") |
| .resample(rule) |
| .agg( |
| { |
| "open": "first", |
| "high": "max", |
| "low": "min", |
| "close": "last", |
| "volume": "sum", |
| } |
| ) |
| .dropna(subset=["open", "high", "low", "close"]) |
| .reset_index() |
| ) |
| return _normalize_ohlcv_frame(pl.DataFrame(aggregated.to_dict("list"))) |
|
|
|
|
| def _ensure_timeframe_ready(index_name: str, timeframe: str) -> None: |
| file_exists = False |
| try: |
| file_exists = _resolve_data_file(index_name, timeframe).exists() |
| except FileNotFoundError: |
| file_exists = False |
|
|
| if file_exists: |
| _kick_async_refresh(index_name, timeframe) |
| return |
|
|
| try: |
| refresh_index_if_due(index_name, timeframe=timeframe, request_driven=True) |
| except Exception as exc: |
| try: |
| _resolve_data_file(index_name, timeframe) |
| except FileNotFoundError: |
| raise |
| LOGGER.warning("Serving cached file for %s/%s after refresh failure: %s", index_name, timeframe, exc) |
|
|
|
|
| def _ensure_minute_ready(index_name: str) -> Path: |
| minute_path = INTRADAY_DIR / index_name / "minute" / f"{index_name}_minute{DATA_FILE_EXTENSION}" |
| if minute_path.exists(): |
| _kick_async_refresh(index_name, "minute") |
| return minute_path |
|
|
| try: |
| refresh_index_if_due(index_name, timeframe="minute", request_driven=True) |
| except Exception as exc: |
| if not minute_path.exists(): |
| raise |
| LOGGER.warning("Serving cached minute file for %s after refresh failure: %s", index_name, exc) |
|
|
| if not minute_path.exists(): |
| raise FileNotFoundError(f"Minute data file not found: {minute_path}") |
| return minute_path |
|
|
|
|
| def _kick_async_refresh(index_name: str, timeframe: str) -> None: |
| key = (index_name, timeframe) |
| with _ASYNC_REFRESH_LOCK: |
| if key in _ASYNC_REFRESH_IN_FLIGHT: |
| return |
| _ASYNC_REFRESH_IN_FLIGHT.add(key) |
|
|
| def _run() -> None: |
| try: |
| refresh_index_if_due(index_name, timeframe=timeframe, request_driven=True) |
| except Exception as exc: |
| LOGGER.warning("Async refresh failed for %s/%s: %s", index_name, timeframe, exc) |
| finally: |
| with _ASYNC_REFRESH_LOCK: |
| _ASYNC_REFRESH_IN_FLIGHT.discard(key) |
|
|
| Thread(target=_run, name=f"async-refresh-{index_name}-{timeframe}", daemon=True).start() |
|
|
|
|
| class DatasetStore: |
| def __init__(self, max_entries: int): |
| self.max_entries = max_entries |
| self._cache: OrderedDict[tuple[str, str], CachedDataset] = OrderedDict() |
| self._lock = RLock() |
|
|
| def _build_chart_points(self, frame: pl.DataFrame) -> dict: |
| required = {"date", "open", "high", "low", "close"} |
| if not required.issubset(set(frame.columns)): |
| return {"times": [], "prices": [], "opens": [], "highs": [], "lows": [], "closes": []} |
|
|
| frame = _ensure_datetime_column(frame) |
| times = frame["date"].dt.timestamp("ms") // 1000 |
| return { |
| "times": times.cast(pl.Int64).to_list(), |
| "prices": frame["close"].to_list(), |
| "opens": frame["open"].to_list(), |
| "highs": frame["high"].to_list(), |
| "lows": frame["low"].to_list(), |
| "closes": frame["close"].to_list(), |
| } |
|
|
| def _load_dataset(self, file_path: Path, timeframe: str) -> CachedDataset: |
| frame = _ensure_datetime_column(_read_frame(file_path)) |
| frame = _resample_to_timeframe(frame, timeframe) |
| return CachedDataset( |
| frame=frame, |
| chart_points=self._build_chart_points(frame), |
| file_mtime_ns=_stat_mtime_ns(file_path), |
| ) |
|
|
| def get(self, index_name: str, timeframe: str) -> CachedDataset: |
| _ensure_timeframe_ready(index_name, timeframe) |
|
|
| key = (index_name, timeframe) |
| file_path = _resolve_data_file(index_name, timeframe) |
| file_mtime_ns = _stat_mtime_ns(file_path) |
| with self._lock: |
| cached = self._cache.get(key) |
| if cached is not None and cached.file_mtime_ns == file_mtime_ns: |
| self._cache.move_to_end(key) |
| return cached |
|
|
| loaded = self._load_dataset(file_path, timeframe) |
|
|
| with self._lock: |
| self._cache[key] = loaded |
| self._cache.move_to_end(key) |
| while len(self._cache) > self.max_entries: |
| self._cache.popitem(last=False) |
| return loaded |
|
|
| def warm(self, pairs: tuple[tuple[str, str], ...]) -> None: |
| for index_name, timeframe in pairs: |
| try: |
| self.get(index_name, timeframe) |
| except Exception: |
| continue |
|
|
|
|
| class CustomIntervalStore: |
| def __init__(self, max_entries: int): |
| self.max_entries = max_entries |
| self._cache: OrderedDict[tuple[str, str], CachedCustomInterval] = OrderedDict() |
| self._lock = RLock() |
|
|
| def _load(self, file_path: Path, interval: str) -> CachedCustomInterval: |
| frame = _ensure_datetime_column(_read_frame(file_path)) |
|
|
| resampled = frame.group_by_dynamic("date", every=interval).agg( |
| [ |
| pl.col("open").first().alias("open"), |
| pl.col("high").max().alias("high"), |
| pl.col("low").min().alias("low"), |
| pl.col("close").last().alias("close"), |
| pl.col("volume").sum().alias("volume"), |
| ] |
| ).drop_nulls(subset=["open", "high", "low", "close"]).with_columns( |
| pl.col("date").dt.to_string("%Y-%m-%d %H:%M:%S") |
| ) |
|
|
| return CachedCustomInterval(frame=resampled, source_mtime_ns=_stat_mtime_ns(file_path)) |
|
|
| def get(self, index_name: str, interval: str) -> pl.DataFrame: |
| file_path = _ensure_minute_ready(index_name) |
| file_mtime_ns = _stat_mtime_ns(file_path) |
| key = (index_name, interval) |
| with self._lock: |
| cached = self._cache.get(key) |
| if cached is not None and cached.source_mtime_ns == file_mtime_ns: |
| self._cache.move_to_end(key) |
| return cached.frame |
|
|
| loaded = self._load(file_path, interval) |
|
|
| with self._lock: |
| self._cache[key] = loaded |
| self._cache.move_to_end(key) |
| while len(self._cache) > self.max_entries: |
| self._cache.popitem(last=False) |
| return loaded.frame |
|
|
|
|
| DATASET_STORE = DatasetStore(MAX_DATASET_CACHE_ENTRIES) |
| CUSTOM_INTERVAL_STORE = CustomIntervalStore(MAX_CUSTOM_CACHE_ENTRIES) |
|
|
|
|
| def get_available_tickers() -> list[str]: |
| tickers = set() |
| for base_dir in (INTRADAY_DIR, DAILY_DIR): |
| if not base_dir.exists(): |
| continue |
| tickers.update(path.name for path in base_dir.iterdir() if path.is_dir()) |
| return sorted(tickers) |
|
|
|
|
| def get_available_granularities(index_name: str | None = None) -> list[str]: |
| granularities = set(INTRADAY_RULES.keys()) | set(DAILY_RULES.keys()) |
| return _sorted_timeframes(granularities) |
|
|
|
|
| def get_data_catalog() -> dict: |
| tickers = get_available_tickers() |
| granularities_by_ticker = { |
| ticker: get_available_granularities(ticker) |
| for ticker in tickers |
| } |
| all_granularities = _sorted_timeframes({ |
| granularity |
| for values in granularities_by_ticker.values() |
| for granularity in values |
| }) or get_available_granularities() |
| return { |
| "tickers": tickers, |
| "granularities": all_granularities, |
| "granularities_by_ticker": granularities_by_ticker, |
| "historical_export": { |
| "formats": list(HISTORICAL_EXPORT_FORMATS), |
| "templates": list(HISTORICAL_EXPORT_TEMPLATES), |
| "default_limit": HISTORICAL_EXPORT_DEFAULT_LIMIT, |
| "intraday_cutoff": HISTORICAL_INTRADAY_CUTOFF.strftime(EXPORT_DATE_FORMAT), |
| }, |
| "defaults": { |
| "ticker": "NIFTY 50" if "NIFTY 50" in tickers else (tickers[0] if tickers else ""), |
| "granularity": "1_hour", |
| }, |
| } |
|
|
|
|
| def _safe_filename_part(value: str) -> str: |
| cleaned = re.sub(r"[^A-Za-z0-9._-]+", "_", value.strip()) |
| return cleaned.strip("._-") or "export" |
|
|
|
|
| def _ensure_export_columns(frame: pl.DataFrame, *, blank_volume: bool = False, source_label: str | None = None) -> pl.DataFrame: |
| frame = _normalize_ohlcv_frame(frame) |
| if blank_volume or "volume" not in frame.columns: |
| frame = frame.with_columns(pl.lit(None, dtype=pl.Float64).alias("volume")) |
| if source_label is not None or "source_timeframe" not in frame.columns: |
| frame = frame.with_columns(pl.lit(source_label or "unknown").alias("source_timeframe")) |
| existing = [column for column in EXPORT_COLUMNS if column in frame.columns] |
| return frame.select(existing) |
|
|
|
|
| def _blank_intraday_volume(frame: pl.DataFrame) -> pl.DataFrame: |
| if "volume" not in frame.columns: |
| return frame.with_columns(pl.lit(None, dtype=pl.Float64).alias("volume")) |
| return frame.with_columns(pl.lit(None, dtype=pl.Float64).alias("volume")) |
|
|
|
|
| def _apply_export_template( |
| frame: pl.DataFrame, |
| *, |
| template: str, |
| row_limit: int | None = None, |
| start_date: str | None = None, |
| end_date: str | None = None, |
| ) -> pl.DataFrame: |
| template = (template or "full_history").strip().lower() |
| frame = _ensure_export_columns(frame) |
| if frame.is_empty(): |
| return frame |
|
|
| if template == "full_history": |
| return frame |
|
|
| if template == "last_n_rows": |
| limit = max(int(row_limit or HISTORICAL_EXPORT_DEFAULT_LIMIT), 1) |
| return frame.tail(limit) |
|
|
| if template == "custom_range": |
| if start_date: |
| start_dt = pd.Timestamp(start_date) |
| frame = frame.filter(pl.col("date") >= start_dt.to_pydatetime()) |
| if end_date: |
| end_dt = pd.Timestamp(end_date) + pd.Timedelta(days=1) |
| frame = frame.filter(pl.col("date") < end_dt.to_pydatetime()) |
| return frame |
|
|
| raise ValueError(f"Unknown export template: {template}") |
|
|
|
|
| def _load_historical_source(index_name: str, timeframe: str) -> pl.DataFrame: |
| return DATASET_STORE.get(index_name, timeframe).frame |
|
|
|
|
| def _tag_export_source(frame: pl.DataFrame, source_label: str) -> pl.DataFrame: |
| return frame.with_columns(pl.lit(source_label).alias("source_timeframe")) |
|
|
|
|
| def _prepare_historical_export_frame( |
| index_name: str, |
| timeframe: str, |
| *, |
| template: str = "full_history", |
| row_limit: int | None = None, |
| start_date: str | None = None, |
| end_date: str | None = None, |
| ) -> pl.DataFrame: |
| if timeframe in INTRADAY_RULES: |
| daily_frame = _ensure_export_columns( |
| _tag_export_source(DATASET_STORE.get(index_name, "1_day").frame, "daily"), |
| source_label="daily", |
| ) |
| intraday_frame = _ensure_export_columns( |
| _tag_export_source(DATASET_STORE.get(index_name, timeframe).frame, "intraday"), |
| blank_volume=True, |
| source_label="intraday", |
| ) |
| cutoff_dt = HISTORICAL_INTRADAY_CUTOFF.to_pydatetime() |
| frame = pl.concat( |
| [ |
| daily_frame.filter(pl.col("date") < cutoff_dt), |
| intraday_frame.filter(pl.col("date") >= cutoff_dt), |
| ], |
| how="diagonal_relaxed", |
| ).sort("date") |
| else: |
| frame = _ensure_export_columns( |
| _tag_export_source(_load_historical_source(index_name, timeframe), "daily"), |
| source_label="daily", |
| ) |
|
|
| frame = _apply_export_template( |
| frame, |
| template=template, |
| row_limit=row_limit, |
| start_date=start_date, |
| end_date=end_date, |
| ) |
| if frame.is_empty(): |
| raise ValueError("No rows available for the selected export filters") |
| return _ensure_export_columns(frame, blank_volume=False).sort("date") |
|
|
|
|
| def _write_historical_export_file(frame: pl.DataFrame, *, file_format: str, filename_stem: str) -> Path: |
| export_dir = Path(tempfile.mkdtemp(prefix="historical_export_")) |
| file_format = file_format.lower().strip() |
| file_path = export_dir / f"{filename_stem}.{file_format}" |
|
|
| if file_format == "csv": |
| frame.write_csv(file_path) |
| elif file_format == "parquet": |
| frame.write_parquet(file_path) |
| elif file_format == "json": |
| try: |
| frame.write_json(file_path, row_oriented=True) |
| except TypeError: |
| with file_path.open("w", encoding="utf-8") as handle: |
| json.dump(frame.to_dicts(), handle, ensure_ascii=False, default=str, indent=2) |
| else: |
| raise ValueError(f"Unsupported export format: {file_format}") |
|
|
| return file_path |
|
|
|
|
| def build_historical_export( |
| index_name: str, |
| timeframe: str, |
| *, |
| template: str = "full_history", |
| row_limit: int | None = None, |
| start_date: str | None = None, |
| end_date: str | None = None, |
| file_format: str = "csv", |
| ) -> tuple[str, dict]: |
| frame = _prepare_historical_export_frame( |
| index_name, |
| timeframe, |
| template=template, |
| row_limit=row_limit, |
| start_date=start_date, |
| end_date=end_date, |
| ) |
|
|
| export_label = "historical" |
| filename_stem = "_".join( |
| part for part in ( |
| _safe_filename_part(index_name), |
| _safe_filename_part(timeframe), |
| _safe_filename_part(template), |
| export_label, |
| datetime.utcnow().strftime("%Y%m%d_%H%M%S"), |
| ) |
| if part |
| ) |
| file_path = _write_historical_export_file(frame, file_format=file_format, filename_stem=filename_stem) |
| metadata = { |
| "ticker": index_name, |
| "timeframe": timeframe, |
| "template": template, |
| "format": file_format.lower().strip(), |
| "rows": frame.height, |
| "columns": frame.columns, |
| "filename": file_path.name, |
| "file_path": str(file_path), |
| "date_min": frame["date"].min().strftime(EXPORT_DATETIME_FORMAT) if frame.height else None, |
| "date_max": frame["date"].max().strftime(EXPORT_DATETIME_FORMAT) if frame.height else None, |
| } |
| return str(file_path), metadata |
|
|
|
|
| def _slice_bounds(total_rows: int, limit: int | None = None, offset: int = 0) -> tuple[int, int]: |
| if limit is None: |
| return 0, total_rows |
|
|
| offset = max(int(offset or 0), 0) |
| limit = max(int(limit), 1) |
| end = max(total_rows - offset, 0) |
| start = max(end - limit, 0) |
| return start, max(end - start, 0) |
|
|
|
|
| def get_precalculated_data(index_name: str, timeframe: str, limit: int | None = None, offset: int = 0) -> list[dict]: |
| cached = DATASET_STORE.get(index_name, timeframe) |
| start, length = _slice_bounds(cached.frame.height, limit=limit, offset=offset) |
| if length <= 0: |
| return [] |
| return cached.frame.slice(start, length).to_dicts() |
|
|
|
|
| def get_precalculated_chart_slice(index_name: str, timeframe: str, limit: int = 500, offset: int = 0) -> dict: |
| cached = DATASET_STORE.get(index_name, timeframe) |
| total_rows = len(cached.chart_points["times"]) |
| start, length = _slice_bounds(total_rows, limit=limit, offset=offset) |
|
|
| if length > 0: |
| times = cached.chart_points["times"][start:start + length] |
| prices = cached.chart_points["prices"][start:start + length] |
| opens = cached.chart_points["opens"][start:start + length] |
| highs = cached.chart_points["highs"][start:start + length] |
| lows = cached.chart_points["lows"][start:start + length] |
| closes = cached.chart_points["closes"][start:start + length] |
| else: |
| times = [] |
| prices = [] |
| opens = [] |
| highs = [] |
| lows = [] |
| closes = [] |
|
|
| return { |
| "points": { |
| "times": times, |
| "prices": prices, |
| "opens": opens, |
| "highs": highs, |
| "lows": lows, |
| "closes": closes, |
| }, |
| "next_offset": offset + length, |
| "has_more": (offset + length) < total_rows, |
| "total_rows": total_rows, |
| "granularity": timeframe, |
| "ticker": index_name, |
| } |
|
|
|
|
| def get_custom_intraday_data(index_name: str, interval: str, limit: int | None = None) -> list[dict]: |
| frame = CUSTOM_INTERVAL_STORE.get(index_name, interval) |
| if limit is not None and frame.height > limit: |
| return frame.tail(int(limit)).to_dicts() |
| return frame.to_dicts() |
|
|
|
|
| def warm_common_datasets() -> None: |
| DATASET_STORE.warm(WARM_DATASETS) |
|
|
|
|
| def start_runtime_refresh() -> None: |
| start_background_refresh_scheduler() |
| start_forecaster_refresh_scheduler() |
|
|