Spaces:
No application file
No application file
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional, Tuple, Union | |
| import pandas as pd | |
| from loguru import logger | |
| from pathlib import Path | |
| from ..util.misc import date_conversion, value_counts_data | |
| class TickDataLoader: | |
| """ | |
| Loader for tick-level bid/ask data with intelligent local caching. | |
| Features: | |
| 1. Smart caching that checks if requested date range is within cached ranges | |
| 2. Handles partial overlaps by reusing available cached data | |
| 3. Memory management with cache size limits | |
| 4. Cache statistics tracking | |
| Notes | |
| ----- | |
| - Typical performance: ~0.5s for cached retrieval | |
| - Memory usage: ~100MB per 1M ticks | |
| """ | |
| def __init__(self, max_cache_size_mb: int = 5000, max_cached_symbols: int = 20, path: Union[str, Path] = None): | |
| """ | |
| Initialize the tick data loader. | |
| Parameters | |
| ---------- | |
| max_cache_size_mb : int, optional | |
| Maximum cache size in MB (default: 5000MB) | |
| max_cached_symbols : int, optional | |
| Maximum number of symbols to keep in cache (default: 20) | |
| path : Union[str, Path] | |
| Path to folder that contains tick data | |
| """ | |
| self._cache: Dict[Tuple[str, str], pd.DataFrame] = {} # (symbol, account_name) -> DataFrame | |
| self._cache_metadata: Dict[Tuple[str, str], Dict] = {} # (symbol, account_name) -> metadata | |
| self.max_cache_size_mb = max_cache_size_mb | |
| self.max_cached_symbols = max_cached_symbols | |
| self.cache_stats = { | |
| "hits": 0, | |
| "misses": 0, | |
| "partial_hits": 0, | |
| "total_loaded": 0, | |
| } | |
| self.path = path | |
| def get_tick_data( | |
| self, symbol: str, start_date: str, end_date: str, account_name: str | |
| ) -> pd.DataFrame: | |
| """ | |
| Retrieve tick-level bid/ask data with intelligent caching. | |
| Parameters | |
| ---------- | |
| symbol : str | |
| Trading instrument symbol (e.g., 'EURUSD') | |
| start_date : str | |
| Start date in 'YYYY-MM-DD' format | |
| end_date : str | |
| End date in 'YYYY-MM-DD' format | |
| account_name : str | |
| MT5 account identifier for data retrieval | |
| Returns | |
| ------- | |
| pd.DataFrame | |
| Tick data with columns ['bid', 'ask'] indexed by timestamp | |
| Notes | |
| ----- | |
| - Checks if cached data fully covers requested date range | |
| - If partial coverage exists, loads only missing data | |
| - Merges cached and newly loaded data seamlessly | |
| """ | |
| cache_key = (symbol, account_name) | |
| start_dt, end_dt = date_conversion(start_date, end_date) | |
| # Check if we have cached data for this symbol/account | |
| if cache_key in self._cache: | |
| cached_df = self._cache[cache_key] | |
| metadata = self._cache_metadata[cache_key] | |
| cached_start, cached_end = date_conversion(metadata["start_date"], metadata["end_date"]) | |
| # Check if cached data fully covers requested range | |
| if cached_start <= start_dt and cached_end >= end_dt: | |
| self.cache_stats["hits"] += 1 | |
| logger.debug(f"Cache hit for {symbol} {start_date} to {end_date}") | |
| # Return subset of cached data | |
| mask = (cached_df.index >= start_dt) & (cached_df.index <= end_dt) | |
| return cached_df[mask].copy() | |
| # Check if there's partial overlap | |
| if cached_end >= start_dt and cached_start <= end_dt: | |
| self.cache_stats["partial_hits"] += 1 | |
| logger.debug(f"Partial cache hit for {symbol}") | |
| return self._load_with_partial_cache( | |
| symbol, start_date, end_date, account_name, cache_key | |
| ) | |
| # No cache hit, load all data | |
| self.cache_stats["misses"] += 1 | |
| logger.debug(f"Cache miss for {symbol} {start_date} to {end_date}") | |
| return self._load_and_cache_data(symbol, start_date, end_date, account_name, cache_key) | |
| def _load_with_partial_cache( | |
| self, | |
| symbol: str, | |
| start_date: str, | |
| end_date: str, | |
| account_name: str, | |
| cache_key: Tuple[str, str], | |
| ) -> pd.DataFrame: | |
| """ | |
| Load data when we have partial cache coverage. | |
| Strategy: | |
| 1. Identify what parts of the requested range are already cached | |
| 2. Load only the missing date ranges | |
| 3. Merge cached and new data | |
| 4. Update cache with extended range | |
| """ | |
| cached_df = self._cache[cache_key] | |
| cached_start = self._cache_metadata[cache_key]["start_date"] | |
| cached_end = self._cache_metadata[cache_key]["end_date"] | |
| start_dt, end_dt = date_conversion(start_date, end_date) | |
| # Determine what we need to load | |
| load_ranges = [] | |
| # Check if we need data before cached range | |
| if start_dt < cached_start: | |
| load_ranges.append( | |
| (start_date, (cached_start - timedelta(days=1)).strftime("%Y-%m-%d")) | |
| ) | |
| # Check if we need data after cached range | |
| if end_dt > cached_end: | |
| load_ranges.append(((cached_end + timedelta(days=1)).strftime("%Y-%m-%d"), end_date)) | |
| # Load missing data ranges | |
| new_data = [] | |
| for load_start, load_end in load_ranges: | |
| logger.info(f"Loading additional data for {symbol}: {load_start} to {load_end}") | |
| df_part = self._load_data(symbol, load_start, load_end, account_name) | |
| if not df_part.empty: | |
| new_data.append(df_part) | |
| # Combine all data | |
| if new_data: | |
| all_new_data = pd.concat(new_data) if len(new_data) > 1 else new_data[0] | |
| combined_data = pd.concat([cached_df, all_new_data]) | |
| combined_data = combined_data.sort_index() | |
| # Update cache with extended range | |
| new_start = min(start_dt, cached_start) | |
| new_end = max(end_dt, cached_end) | |
| self._cache[cache_key] = combined_data | |
| self._cache_metadata[cache_key] = { | |
| "start_date": new_start, | |
| "end_date": new_end, | |
| "last_accessed": datetime.now(), | |
| "size_mb": combined_data.memory_usage(deep=True).sum() / (1024**2), | |
| } | |
| # Clean cache if needed | |
| self._clean_cache() | |
| # Return requested subset | |
| mask = (combined_data.index >= start_dt) & (combined_data.index <= end_dt) | |
| return combined_data[mask].copy() | |
| else: | |
| # Shouldn't happen, but return cached subset | |
| mask = (cached_df.index >= start_dt) & (cached_df.index <= end_dt) | |
| return cached_df[mask].copy() | |
| def _load_and_cache_data( | |
| self, | |
| symbol: str, | |
| start_date: str, | |
| end_date: str, | |
| account_name: str, | |
| cache_key: Tuple[str, str], | |
| ) -> pd.DataFrame: | |
| """ | |
| Load data from source and cache it. | |
| """ | |
| logger.info(f"Loading data for {symbol} from {start_date} to {end_date}") | |
| df = self._load_data(symbol, start_date, end_date, account_name) | |
| start_date, end_date = date_conversion(start_date, end_date) | |
| if not df.empty: | |
| # Cache the data | |
| self._cache[cache_key] = df | |
| self._cache_metadata[cache_key] = { | |
| "start_date": start_date, | |
| "end_date": end_date, | |
| "last_accessed": datetime.now(), | |
| "size_mb": df.memory_usage(deep=True).sum() / (1024**2), | |
| } | |
| # Clean cache if needed | |
| self._clean_cache() | |
| self.cache_stats["total_loaded"] += 1 | |
| return df | |
| def _load_data( | |
| self, symbol: str, start_date: str, end_date: str, account_name: str | |
| ) -> pd.DataFrame: | |
| """ | |
| Load data from parquet file or MT5. | |
| """ | |
| from .load_data import load_tick_data, save_data_to_parquet | |
| tick_params = dict( | |
| symbol=symbol, | |
| start_date=start_date, | |
| end_date=end_date, | |
| account_name=account_name, | |
| path=self.path, | |
| columns=["bid", "ask"], | |
| verbose=False, | |
| ) | |
| df = load_tick_data(**tick_params) | |
| if df.empty: | |
| logger.info("Data not found on drive, fetching from MT5...") | |
| save_data_to_parquet(symbol, start_date, end_date, account_name) | |
| df = load_tick_data(**tick_params) | |
| return df | |
| def _clean_cache(self): | |
| """ | |
| Clean cache based on size and LRU policy. | |
| """ | |
| # Check if we have too many symbols | |
| if len(self._cache) > self.max_cached_symbols: | |
| # Remove least recently used | |
| lru_items = sorted(self._cache_metadata.items(), key=lambda x: x[1]["last_accessed"]) | |
| for key, _ in lru_items[: len(self._cache) - self.max_cached_symbols]: | |
| del self._cache[key] | |
| del self._cache_metadata[key] | |
| logger.debug(f"Removed {key} from cache (LRU policy)") | |
| # Check total cache size | |
| total_size = sum(meta["size_mb"] for meta in self._cache_metadata.values()) | |
| if total_size > self.max_cache_size_mb: | |
| # Remove largest items until under limit | |
| items_by_size = sorted( | |
| self._cache_metadata.items(), | |
| key=lambda x: x[1]["size_mb"], | |
| reverse=True, | |
| ) | |
| removed_size = 0 | |
| for key, meta in items_by_size: | |
| if total_size - removed_size <= self.max_cache_size_mb: | |
| break | |
| removed_size += meta["size_mb"] | |
| del self._cache[key] | |
| del self._cache_metadata[key] | |
| logger.debug(f"Removed {key} from cache (size: {meta['size_mb']:.2f}MB)") | |
| def clear_cache(self, symbol: Optional[str] = None, account_name: Optional[str] = None): | |
| """ | |
| Clear cache for specific symbol/account or all cache. | |
| Parameters | |
| ---------- | |
| symbol : str, optional | |
| Symbol to clear cache for | |
| account_name : str, optional | |
| Account name to clear cache for | |
| """ | |
| if symbol is None and account_name is None: | |
| self._cache.clear() | |
| self._cache_metadata.clear() | |
| logger.info("Cleared all cache") | |
| else: | |
| keys_to_remove = [] | |
| for key in self._cache.keys(): | |
| sym, acc = key | |
| if (symbol is None or sym == symbol) and ( | |
| account_name is None or acc == account_name | |
| ): | |
| keys_to_remove.append(key) | |
| for key in keys_to_remove: | |
| del self._cache[key] | |
| del self._cache_metadata[key] | |
| logger.info(f"Cleared cache for {len(keys_to_remove)} items") | |
| def get_cache_info(self) -> Dict: | |
| """ | |
| Get cache statistics and information. | |
| Returns | |
| ------- | |
| Dict | |
| Cache information including: | |
| - total_cached_symbols: Number of symbols in cache | |
| - total_cache_size_mb: Total cache size in MB | |
| - cache_hits: Number of cache hits | |
| - cache_misses: Number of cache misses | |
| - hit_rate: Cache hit rate percentage | |
| - cached_symbols: List of cached symbols with date ranges | |
| """ | |
| total_size = sum(meta["size_mb"] for meta in self._cache_metadata.values()) | |
| total_requests = self.cache_stats["hits"] + self.cache_stats["misses"] | |
| hit_rate = (self.cache_stats["hits"] / total_requests * 100) if total_requests > 0 else 0 | |
| cached_symbols_info = [] | |
| for (symbol, account), meta in self._cache_metadata.items(): | |
| cached_symbols_info.append( | |
| { | |
| "symbol": symbol, | |
| "account": account, | |
| "date_range": f"{meta['start_date'].date()} to {meta['end_date'].date()}", | |
| "size_mb": meta["size_mb"], | |
| "last_accessed": meta["last_accessed"], | |
| } | |
| ) | |
| return { | |
| "total_cached_symbols": len(self._cache), | |
| "total_cache_size_mb": total_size, | |
| "cache_hits": self.cache_stats["hits"], | |
| "cache_misses": self.cache_stats["misses"], | |
| "partial_hits": self.cache_stats["partial_hits"], | |
| "hit_rate": hit_rate, | |
| "cached_symbols": cached_symbols_info, | |
| } | |
| def preload_data(self, symbols: List[str], start_date: str, end_date: str, account_name: str): | |
| """ | |
| Preload data for multiple symbols into cache. | |
| Parameters | |
| ---------- | |
| symbols : List[str] | |
| List of symbols to preload | |
| start_date : str | |
| Start date in 'YYYY-MM-DD' format | |
| end_date : str | |
| End date in 'YYYY-MM-DD' format | |
| account_name : str | |
| MT5 account identifier | |
| """ | |
| logger.info(f"Preloading data for {len(symbols)} symbols") | |
| for symbol in symbols: | |
| try: | |
| self.get_tick_data(symbol, start_date, end_date, account_name) | |
| logger.debug(f"Preloaded {symbol}") | |
| except Exception as e: | |
| logger.warning(f"Failed to preload {symbol}: {e}") | |
| tick_data_loader = TickDataLoader() | |