""" LRU cache with TTL support for application data. """ import time import threading from typing import Any, Optional, Dict from collections import OrderedDict from config import config class LRUCacheWithTTL: """Thread-safe LRU cache with TTL (Time To Live) support.""" def __init__(self, max_size: int = None, ttl_seconds: int = None): """ Initialize cache. Args: max_size: Maximum number of items (default from config) ttl_seconds: Time to live in seconds (default from config) """ self.max_size = max_size or config.CACHE_MAX_SIZE self.ttl = ttl_seconds or config.CACHE_TTL self._cache = OrderedDict() self._timestamps = {} self._lock = threading.Lock() self._hits = 0 self._misses = 0 def get(self, key: str) -> Optional[Any]: """Get value from cache.""" with self._lock: if key not in self._cache: self._misses += 1 return None # Check if expired if time.time() - self._timestamps[key] > self.ttl: self._evict(key) self._misses += 1 return None # Move to end (most recently used) self._cache.move_to_end(key) self._hits += 1 return self._cache[key] def set(self, key: str, value: Any): """Set value in cache.""" with self._lock: # Update if exists if key in self._cache: self._cache.move_to_end(key) self._cache[key] = value self._timestamps[key] = time.time() return # Add new item self._cache[key] = value self._timestamps[key] = time.time() # Evict oldest if necessary if len(self._cache) > self.max_size: oldest_key = next(iter(self._cache)) self._evict(oldest_key) def _evict(self, key: str): """Evict item from cache.""" if key in self._cache: del self._cache[key] del self._timestamps[key] def clear(self): """Clear all cached items.""" with self._lock: self._cache.clear() self._timestamps.clear() def invalidate(self, key: str): """Invalidate a specific cache entry.""" with self._lock: self._evict(key) def get_stats(self) -> Dict: """Get cache statistics.""" with self._lock: total_requests = self._hits + self._misses hit_rate = self._hits / max(1, total_requests) return { "size": len(self._cache), "max_size": self.max_size, "hits": self._hits, "misses": self._misses, "hit_rate": hit_rate, "ttl_seconds": self.ttl } class CacheManager: """Manages multiple caches for different data types.""" def __init__(self): # Cache for plan listings self.plan_cache = LRUCacheWithTTL(max_size=100, ttl_seconds=300) # 5 minutes # Cache for plan metadata self.metadata_cache = LRUCacheWithTTL(max_size=500, ttl_seconds=600) # 10 minutes # Cache for query rewrites self.query_cache = LRUCacheWithTTL(max_size=1000, ttl_seconds=300) # 5 minutes def invalidate_all(self): """Invalidate all caches (e.g., after ingestion).""" self.plan_cache.clear() self.metadata_cache.clear() # Don't clear query cache as it's query-dependent def get_all_stats(self) -> Dict: """Get statistics for all caches.""" return { "plan_cache": self.plan_cache.get_stats(), "metadata_cache": self.metadata_cache.get_stats(), "query_cache": self.query_cache.get_stats(), } # Global cache manager cache_manager = CacheManager()