AFML / afml /mt5 /tick_data_loader.py
akshayboora's picture
Upload 940 files
669d6a1 verified
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Tuple, Union
import pandas as pd
from loguru import logger
from pathlib import Path
from ..util.misc import date_conversion, value_counts_data
class TickDataLoader:
"""
Loader for tick-level bid/ask data with intelligent local caching.
Features:
1. Smart caching that checks if requested date range is within cached ranges
2. Handles partial overlaps by reusing available cached data
3. Memory management with cache size limits
4. Cache statistics tracking
Notes
-----
- Typical performance: ~0.5s for cached retrieval
- Memory usage: ~100MB per 1M ticks
"""
def __init__(self, max_cache_size_mb: int = 5000, max_cached_symbols: int = 20, path: Union[str, Path] = None):
"""
Initialize the tick data loader.
Parameters
----------
max_cache_size_mb : int, optional
Maximum cache size in MB (default: 5000MB)
max_cached_symbols : int, optional
Maximum number of symbols to keep in cache (default: 20)
path : Union[str, Path]
Path to folder that contains tick data
"""
self._cache: Dict[Tuple[str, str], pd.DataFrame] = {} # (symbol, account_name) -> DataFrame
self._cache_metadata: Dict[Tuple[str, str], Dict] = {} # (symbol, account_name) -> metadata
self.max_cache_size_mb = max_cache_size_mb
self.max_cached_symbols = max_cached_symbols
self.cache_stats = {
"hits": 0,
"misses": 0,
"partial_hits": 0,
"total_loaded": 0,
}
self.path = path
def get_tick_data(
self, symbol: str, start_date: str, end_date: str, account_name: str
) -> pd.DataFrame:
"""
Retrieve tick-level bid/ask data with intelligent caching.
Parameters
----------
symbol : str
Trading instrument symbol (e.g., 'EURUSD')
start_date : str
Start date in 'YYYY-MM-DD' format
end_date : str
End date in 'YYYY-MM-DD' format
account_name : str
MT5 account identifier for data retrieval
Returns
-------
pd.DataFrame
Tick data with columns ['bid', 'ask'] indexed by timestamp
Notes
-----
- Checks if cached data fully covers requested date range
- If partial coverage exists, loads only missing data
- Merges cached and newly loaded data seamlessly
"""
cache_key = (symbol, account_name)
start_dt, end_dt = date_conversion(start_date, end_date)
# Check if we have cached data for this symbol/account
if cache_key in self._cache:
cached_df = self._cache[cache_key]
metadata = self._cache_metadata[cache_key]
cached_start, cached_end = date_conversion(metadata["start_date"], metadata["end_date"])
# Check if cached data fully covers requested range
if cached_start <= start_dt and cached_end >= end_dt:
self.cache_stats["hits"] += 1
logger.debug(f"Cache hit for {symbol} {start_date} to {end_date}")
# Return subset of cached data
mask = (cached_df.index >= start_dt) & (cached_df.index <= end_dt)
return cached_df[mask].copy()
# Check if there's partial overlap
if cached_end >= start_dt and cached_start <= end_dt:
self.cache_stats["partial_hits"] += 1
logger.debug(f"Partial cache hit for {symbol}")
return self._load_with_partial_cache(
symbol, start_date, end_date, account_name, cache_key
)
# No cache hit, load all data
self.cache_stats["misses"] += 1
logger.debug(f"Cache miss for {symbol} {start_date} to {end_date}")
return self._load_and_cache_data(symbol, start_date, end_date, account_name, cache_key)
def _load_with_partial_cache(
self,
symbol: str,
start_date: str,
end_date: str,
account_name: str,
cache_key: Tuple[str, str],
) -> pd.DataFrame:
"""
Load data when we have partial cache coverage.
Strategy:
1. Identify what parts of the requested range are already cached
2. Load only the missing date ranges
3. Merge cached and new data
4. Update cache with extended range
"""
cached_df = self._cache[cache_key]
cached_start = self._cache_metadata[cache_key]["start_date"]
cached_end = self._cache_metadata[cache_key]["end_date"]
start_dt, end_dt = date_conversion(start_date, end_date)
# Determine what we need to load
load_ranges = []
# Check if we need data before cached range
if start_dt < cached_start:
load_ranges.append(
(start_date, (cached_start - timedelta(days=1)).strftime("%Y-%m-%d"))
)
# Check if we need data after cached range
if end_dt > cached_end:
load_ranges.append(((cached_end + timedelta(days=1)).strftime("%Y-%m-%d"), end_date))
# Load missing data ranges
new_data = []
for load_start, load_end in load_ranges:
logger.info(f"Loading additional data for {symbol}: {load_start} to {load_end}")
df_part = self._load_data(symbol, load_start, load_end, account_name)
if not df_part.empty:
new_data.append(df_part)
# Combine all data
if new_data:
all_new_data = pd.concat(new_data) if len(new_data) > 1 else new_data[0]
combined_data = pd.concat([cached_df, all_new_data])
combined_data = combined_data.sort_index()
# Update cache with extended range
new_start = min(start_dt, cached_start)
new_end = max(end_dt, cached_end)
self._cache[cache_key] = combined_data
self._cache_metadata[cache_key] = {
"start_date": new_start,
"end_date": new_end,
"last_accessed": datetime.now(),
"size_mb": combined_data.memory_usage(deep=True).sum() / (1024**2),
}
# Clean cache if needed
self._clean_cache()
# Return requested subset
mask = (combined_data.index >= start_dt) & (combined_data.index <= end_dt)
return combined_data[mask].copy()
else:
# Shouldn't happen, but return cached subset
mask = (cached_df.index >= start_dt) & (cached_df.index <= end_dt)
return cached_df[mask].copy()
def _load_and_cache_data(
self,
symbol: str,
start_date: str,
end_date: str,
account_name: str,
cache_key: Tuple[str, str],
) -> pd.DataFrame:
"""
Load data from source and cache it.
"""
logger.info(f"Loading data for {symbol} from {start_date} to {end_date}")
df = self._load_data(symbol, start_date, end_date, account_name)
start_date, end_date = date_conversion(start_date, end_date)
if not df.empty:
# Cache the data
self._cache[cache_key] = df
self._cache_metadata[cache_key] = {
"start_date": start_date,
"end_date": end_date,
"last_accessed": datetime.now(),
"size_mb": df.memory_usage(deep=True).sum() / (1024**2),
}
# Clean cache if needed
self._clean_cache()
self.cache_stats["total_loaded"] += 1
return df
def _load_data(
self, symbol: str, start_date: str, end_date: str, account_name: str
) -> pd.DataFrame:
"""
Load data from parquet file or MT5.
"""
from .load_data import load_tick_data, save_data_to_parquet
tick_params = dict(
symbol=symbol,
start_date=start_date,
end_date=end_date,
account_name=account_name,
path=self.path,
columns=["bid", "ask"],
verbose=False,
)
df = load_tick_data(**tick_params)
if df.empty:
logger.info("Data not found on drive, fetching from MT5...")
save_data_to_parquet(symbol, start_date, end_date, account_name)
df = load_tick_data(**tick_params)
return df
def _clean_cache(self):
"""
Clean cache based on size and LRU policy.
"""
# Check if we have too many symbols
if len(self._cache) > self.max_cached_symbols:
# Remove least recently used
lru_items = sorted(self._cache_metadata.items(), key=lambda x: x[1]["last_accessed"])
for key, _ in lru_items[: len(self._cache) - self.max_cached_symbols]:
del self._cache[key]
del self._cache_metadata[key]
logger.debug(f"Removed {key} from cache (LRU policy)")
# Check total cache size
total_size = sum(meta["size_mb"] for meta in self._cache_metadata.values())
if total_size > self.max_cache_size_mb:
# Remove largest items until under limit
items_by_size = sorted(
self._cache_metadata.items(),
key=lambda x: x[1]["size_mb"],
reverse=True,
)
removed_size = 0
for key, meta in items_by_size:
if total_size - removed_size <= self.max_cache_size_mb:
break
removed_size += meta["size_mb"]
del self._cache[key]
del self._cache_metadata[key]
logger.debug(f"Removed {key} from cache (size: {meta['size_mb']:.2f}MB)")
def clear_cache(self, symbol: Optional[str] = None, account_name: Optional[str] = None):
"""
Clear cache for specific symbol/account or all cache.
Parameters
----------
symbol : str, optional
Symbol to clear cache for
account_name : str, optional
Account name to clear cache for
"""
if symbol is None and account_name is None:
self._cache.clear()
self._cache_metadata.clear()
logger.info("Cleared all cache")
else:
keys_to_remove = []
for key in self._cache.keys():
sym, acc = key
if (symbol is None or sym == symbol) and (
account_name is None or acc == account_name
):
keys_to_remove.append(key)
for key in keys_to_remove:
del self._cache[key]
del self._cache_metadata[key]
logger.info(f"Cleared cache for {len(keys_to_remove)} items")
def get_cache_info(self) -> Dict:
"""
Get cache statistics and information.
Returns
-------
Dict
Cache information including:
- total_cached_symbols: Number of symbols in cache
- total_cache_size_mb: Total cache size in MB
- cache_hits: Number of cache hits
- cache_misses: Number of cache misses
- hit_rate: Cache hit rate percentage
- cached_symbols: List of cached symbols with date ranges
"""
total_size = sum(meta["size_mb"] for meta in self._cache_metadata.values())
total_requests = self.cache_stats["hits"] + self.cache_stats["misses"]
hit_rate = (self.cache_stats["hits"] / total_requests * 100) if total_requests > 0 else 0
cached_symbols_info = []
for (symbol, account), meta in self._cache_metadata.items():
cached_symbols_info.append(
{
"symbol": symbol,
"account": account,
"date_range": f"{meta['start_date'].date()} to {meta['end_date'].date()}",
"size_mb": meta["size_mb"],
"last_accessed": meta["last_accessed"],
}
)
return {
"total_cached_symbols": len(self._cache),
"total_cache_size_mb": total_size,
"cache_hits": self.cache_stats["hits"],
"cache_misses": self.cache_stats["misses"],
"partial_hits": self.cache_stats["partial_hits"],
"hit_rate": hit_rate,
"cached_symbols": cached_symbols_info,
}
def preload_data(self, symbols: List[str], start_date: str, end_date: str, account_name: str):
"""
Preload data for multiple symbols into cache.
Parameters
----------
symbols : List[str]
List of symbols to preload
start_date : str
Start date in 'YYYY-MM-DD' format
end_date : str
End date in 'YYYY-MM-DD' format
account_name : str
MT5 account identifier
"""
logger.info(f"Preloading data for {len(symbols)} symbols")
for symbol in symbols:
try:
self.get_tick_data(symbol, start_date, end_date, account_name)
logger.debug(f"Preloaded {symbol}")
except Exception as e:
logger.warning(f"Failed to preload {symbol}: {e}")
tick_data_loader = TickDataLoader()