trading-tools / data /cache /cache_manager.py
Deploy Bot
Deploy Trading Analysis Platform to HuggingFace Spaces
a1bf219
"""Cache manager with TTL-based caching."""
import hashlib
import logging
from datetime import datetime, timedelta
from typing import Any, Optional
from .cache_store import CacheStore
logger = logging.getLogger(__name__)
class CacheManager:
"""Manages caching with TTL (time-to-live) per data type."""
def __init__(self, config: dict):
"""
Initialize cache manager.
Args:
config: Configuration dictionary with cache TTL settings
"""
self.config = config
self.store = CacheStore()
self.cache_ttl = config.get("cache_ttl", {})
def generate_cache_key(
self,
data_type: str,
ticker: str,
timeframe: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> str:
"""
Generate cache key for data request.
Format: {data_type}:{ticker}:{timeframe}:{start_date}:{end_date}
Args:
data_type: Type of data ("ohlc", "fundamentals", "news")
ticker: Asset ticker symbol
timeframe: Candlestick timeframe (for OHLC data)
start_date: Start date (for OHLC data)
end_date: End date (for OHLC data)
Returns:
Cache key string
"""
key_parts = [data_type, ticker]
if timeframe:
key_parts.append(timeframe)
if start_date:
key_parts.append(start_date)
if end_date:
key_parts.append(end_date)
key = ":".join(key_parts)
# Hash long keys to keep them manageable
if len(key) > 200:
key = hashlib.md5(key.encode()).hexdigest()
return key
def get_ttl(self, data_type: str, timeframe: Optional[str] = None) -> int:
"""
Get TTL (seconds) for data type.
Args:
data_type: Type of data
timeframe: Timeframe (for OHLC data)
Returns:
TTL in seconds
"""
if data_type == "ohlc" and timeframe:
ttl_key = f"ohlc_{timeframe}"
return self.cache_ttl.get(ttl_key, 300) # Default 5 minutes
return self.cache_ttl.get(data_type, 3600) # Default 1 hour
def get(
self,
data_type: str,
ticker: str,
timeframe: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> Optional[Any]:
"""
Get cached data if available and not expired.
Args:
data_type: Type of data
ticker: Asset ticker
timeframe: Timeframe (optional)
start_date: Start date (optional)
end_date: End date (optional)
Returns:
Cached data or None if not found/expired
"""
cache_key = self.generate_cache_key(
data_type, ticker, timeframe, start_date, end_date
)
cached_entry = self.store.get(cache_key)
if cached_entry is None:
logger.debug(f"Cache miss: {cache_key}")
return None
# Check if expired
cached_at = cached_entry.get("cached_at")
ttl = self.get_ttl(data_type, timeframe)
if cached_at and datetime.now() - cached_at > timedelta(seconds=ttl):
logger.debug(f"Cache expired: {cache_key}")
self.store.delete(cache_key)
return None
logger.debug(f"Cache hit: {cache_key}")
return cached_entry.get("data")
def set(
self,
data: Any,
data_type: str,
ticker: str,
timeframe: Optional[str] = None,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
):
"""
Cache data with TTL.
Args:
data: Data to cache
data_type: Type of data
ticker: Asset ticker
timeframe: Timeframe (optional)
start_date: Start date (optional)
end_date: End date (optional)
"""
cache_key = self.generate_cache_key(
data_type, ticker, timeframe, start_date, end_date
)
cache_entry = {
"data": data,
"cached_at": datetime.now(),
"data_type": data_type,
"ticker": ticker,
}
self.store.set(cache_key, cache_entry)
logger.debug(f"Cached: {cache_key}")
def invalidate(
self,
data_type: str,
ticker: str,
timeframe: Optional[str] = None,
):
"""
Invalidate cached data.
Args:
data_type: Type of data
ticker: Asset ticker
timeframe: Timeframe (optional)
"""
cache_key = self.generate_cache_key(data_type, ticker, timeframe)
self.store.delete(cache_key)
logger.info(f"Invalidated cache: {cache_key}")
def clear_all(self):
"""Clear all cached data."""
self.store.clear()
logger.info("Cleared all cache")
def get_multi_timeframe(
self,
data_type: str,
ticker: str,
timeframes: list,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
) -> dict:
"""
Get cached data for multiple timeframes at once.
Args:
data_type: Type of data
ticker: Asset ticker
timeframes: List of timeframes to retrieve
start_date: Start date (optional)
end_date: End date (optional)
Returns:
Dict mapping timeframe -> cached data (None if not cached/expired)
"""
results = {}
for timeframe in timeframes:
cached_data = self.get(
data_type=data_type,
ticker=ticker,
timeframe=timeframe,
start_date=start_date,
end_date=end_date,
)
results[timeframe] = cached_data
# Log summary
cached_count = sum(1 for v in results.values() if v is not None)
logger.debug(
f"Multi-timeframe cache: {cached_count}/{len(timeframes)} hits for {ticker}"
)
return results
def set_multi_timeframe(
self,
data_by_timeframe: dict,
data_type: str,
ticker: str,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
):
"""
Cache data for multiple timeframes at once.
Args:
data_by_timeframe: Dict mapping timeframe -> data
data_type: Type of data
ticker: Asset ticker
start_date: Start date (optional)
end_date: End date (optional)
"""
for timeframe, data in data_by_timeframe.items():
if data is not None:
self.set(
data=data,
data_type=data_type,
ticker=ticker,
timeframe=timeframe,
start_date=start_date,
end_date=end_date,
)
logger.debug(
f"Cached multi-timeframe data: {len(data_by_timeframe)} timeframes for {ticker}"
)
def invalidate_multi_timeframe(
self,
data_type: str,
ticker: str,
timeframes: list,
):
"""
Invalidate cached data for multiple timeframes.
Args:
data_type: Type of data
ticker: Asset ticker
timeframes: List of timeframes to invalidate
"""
for timeframe in timeframes:
self.invalidate(data_type, ticker, timeframe)
logger.info(
f"Invalidated multi-timeframe cache: {len(timeframes)} timeframes for {ticker}"
)