"""Cache manager with TTL-based caching.""" import hashlib import logging from datetime import datetime, timedelta from typing import Any, Optional from .cache_store import CacheStore logger = logging.getLogger(__name__) class CacheManager: """Manages caching with TTL (time-to-live) per data type.""" def __init__(self, config: dict): """ Initialize cache manager. Args: config: Configuration dictionary with cache TTL settings """ self.config = config self.store = CacheStore() self.cache_ttl = config.get("cache_ttl", {}) def generate_cache_key( self, data_type: str, ticker: str, timeframe: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, ) -> str: """ Generate cache key for data request. Format: {data_type}:{ticker}:{timeframe}:{start_date}:{end_date} Args: data_type: Type of data ("ohlc", "fundamentals", "news") ticker: Asset ticker symbol timeframe: Candlestick timeframe (for OHLC data) start_date: Start date (for OHLC data) end_date: End date (for OHLC data) Returns: Cache key string """ key_parts = [data_type, ticker] if timeframe: key_parts.append(timeframe) if start_date: key_parts.append(start_date) if end_date: key_parts.append(end_date) key = ":".join(key_parts) # Hash long keys to keep them manageable if len(key) > 200: key = hashlib.md5(key.encode()).hexdigest() return key def get_ttl(self, data_type: str, timeframe: Optional[str] = None) -> int: """ Get TTL (seconds) for data type. Args: data_type: Type of data timeframe: Timeframe (for OHLC data) Returns: TTL in seconds """ if data_type == "ohlc" and timeframe: ttl_key = f"ohlc_{timeframe}" return self.cache_ttl.get(ttl_key, 300) # Default 5 minutes return self.cache_ttl.get(data_type, 3600) # Default 1 hour def get( self, data_type: str, ticker: str, timeframe: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, ) -> Optional[Any]: """ Get cached data if available and not expired. Args: data_type: Type of data ticker: Asset ticker timeframe: Timeframe (optional) start_date: Start date (optional) end_date: End date (optional) Returns: Cached data or None if not found/expired """ cache_key = self.generate_cache_key( data_type, ticker, timeframe, start_date, end_date ) cached_entry = self.store.get(cache_key) if cached_entry is None: logger.debug(f"Cache miss: {cache_key}") return None # Check if expired cached_at = cached_entry.get("cached_at") ttl = self.get_ttl(data_type, timeframe) if cached_at and datetime.now() - cached_at > timedelta(seconds=ttl): logger.debug(f"Cache expired: {cache_key}") self.store.delete(cache_key) return None logger.debug(f"Cache hit: {cache_key}") return cached_entry.get("data") def set( self, data: Any, data_type: str, ticker: str, timeframe: Optional[str] = None, start_date: Optional[str] = None, end_date: Optional[str] = None, ): """ Cache data with TTL. Args: data: Data to cache data_type: Type of data ticker: Asset ticker timeframe: Timeframe (optional) start_date: Start date (optional) end_date: End date (optional) """ cache_key = self.generate_cache_key( data_type, ticker, timeframe, start_date, end_date ) cache_entry = { "data": data, "cached_at": datetime.now(), "data_type": data_type, "ticker": ticker, } self.store.set(cache_key, cache_entry) logger.debug(f"Cached: {cache_key}") def invalidate( self, data_type: str, ticker: str, timeframe: Optional[str] = None, ): """ Invalidate cached data. Args: data_type: Type of data ticker: Asset ticker timeframe: Timeframe (optional) """ cache_key = self.generate_cache_key(data_type, ticker, timeframe) self.store.delete(cache_key) logger.info(f"Invalidated cache: {cache_key}") def clear_all(self): """Clear all cached data.""" self.store.clear() logger.info("Cleared all cache") def get_multi_timeframe( self, data_type: str, ticker: str, timeframes: list, start_date: Optional[str] = None, end_date: Optional[str] = None, ) -> dict: """ Get cached data for multiple timeframes at once. Args: data_type: Type of data ticker: Asset ticker timeframes: List of timeframes to retrieve start_date: Start date (optional) end_date: End date (optional) Returns: Dict mapping timeframe -> cached data (None if not cached/expired) """ results = {} for timeframe in timeframes: cached_data = self.get( data_type=data_type, ticker=ticker, timeframe=timeframe, start_date=start_date, end_date=end_date, ) results[timeframe] = cached_data # Log summary cached_count = sum(1 for v in results.values() if v is not None) logger.debug( f"Multi-timeframe cache: {cached_count}/{len(timeframes)} hits for {ticker}" ) return results def set_multi_timeframe( self, data_by_timeframe: dict, data_type: str, ticker: str, start_date: Optional[str] = None, end_date: Optional[str] = None, ): """ Cache data for multiple timeframes at once. Args: data_by_timeframe: Dict mapping timeframe -> data data_type: Type of data ticker: Asset ticker start_date: Start date (optional) end_date: End date (optional) """ for timeframe, data in data_by_timeframe.items(): if data is not None: self.set( data=data, data_type=data_type, ticker=ticker, timeframe=timeframe, start_date=start_date, end_date=end_date, ) logger.debug( f"Cached multi-timeframe data: {len(data_by_timeframe)} timeframes for {ticker}" ) def invalidate_multi_timeframe( self, data_type: str, ticker: str, timeframes: list, ): """ Invalidate cached data for multiple timeframes. Args: data_type: Type of data ticker: Asset ticker timeframes: List of timeframes to invalidate """ for timeframe in timeframes: self.invalidate(data_type, ticker, timeframe) logger.info( f"Invalidated multi-timeframe cache: {len(timeframes)} timeframes for {ticker}" )