""" Cache management for NeuroSAM 3 application. Provides LRU cache with size limits and TTL for processed results. """ import time from typing import Optional, Dict, Any, Tuple from collections import OrderedDict from logger_config import logger from config import MAX_CACHE_SIZE, CACHE_TTL_SECONDS class LRUCache: """ Least Recently Used cache with TTL support. """ def __init__(self, max_size: int = MAX_CACHE_SIZE, ttl_seconds: int = CACHE_TTL_SECONDS): """ Initialize LRU cache. Args: max_size: Maximum number of items in cache ttl_seconds: Time-to-live for cache entries in seconds """ self.max_size = max_size self.ttl_seconds = ttl_seconds self.cache: OrderedDict[str, Tuple[Any, float]] = OrderedDict() logger.info(f"Initialized LRU cache with max_size={max_size}, ttl={ttl_seconds}s") def _is_expired(self, timestamp: float) -> bool: """Check if an entry has expired.""" return time.time() - timestamp > self.ttl_seconds def _cleanup_expired(self) -> None: """Remove expired entries from cache.""" current_time = time.time() expired_keys = [ key for key, (_, timestamp) in self.cache.items() if current_time - timestamp > self.ttl_seconds ] for key in expired_keys: del self.cache[key] if expired_keys: logger.debug(f"Cleaned up {len(expired_keys)} expired cache entries") def get(self, key: str) -> Optional[Any]: """ Get value from cache. Args: key: Cache key Returns: Cached value or None if not found/expired """ self._cleanup_expired() if key not in self.cache: return None # Move to end (most recently used) value, timestamp = self.cache.pop(key) # Check if expired if self._is_expired(timestamp): logger.debug(f"Cache entry expired: {key}") return None # Re-insert at end self.cache[key] = (value, timestamp) return value def set(self, key: str, value: Any) -> None: """ Set value in cache. Args: key: Cache key value: Value to cache """ self._cleanup_expired() # Remove if exists if key in self.cache: del self.cache[key] # Remove oldest if at capacity elif len(self.cache) >= self.max_size: oldest_key = next(iter(self.cache)) del self.cache[oldest_key] logger.debug(f"Cache full, removed oldest entry: {oldest_key}") # Add new entry self.cache[key] = (value, time.time()) logger.debug(f"Cached entry: {key}") def clear(self) -> None: """Clear all cache entries.""" count = len(self.cache) self.cache.clear() logger.info(f"Cleared {count} cache entries") def size(self) -> int: """Get current cache size.""" self._cleanup_expired() return len(self.cache) def stats(self) -> Dict[str, Any]: """ Get cache statistics. Returns: Dictionary with cache statistics """ self._cleanup_expired() return { "size": len(self.cache), "max_size": self.max_size, "ttl_seconds": self.ttl_seconds, "usage_percent": (len(self.cache) / self.max_size * 100) if self.max_size > 0 else 0 } # Global cache instance processed_results_cache = LRUCache(max_size=MAX_CACHE_SIZE, ttl_seconds=CACHE_TTL_SECONDS)