NeuroSAM3 / cache_manager.py
mmrech's picture
Refactor codebase: Add modular structure, logging, validation, and comprehensive improvements
69066c5
"""
Cache management for NeuroSAM 3 application.
Provides LRU cache with size limits and TTL for processed results.
"""
import time
from typing import Optional, Dict, Any, Tuple
from collections import OrderedDict
from logger_config import logger
from config import MAX_CACHE_SIZE, CACHE_TTL_SECONDS
class LRUCache:
"""
Least Recently Used cache with TTL support.
"""
def __init__(self, max_size: int = MAX_CACHE_SIZE, ttl_seconds: int = CACHE_TTL_SECONDS):
"""
Initialize LRU cache.
Args:
max_size: Maximum number of items in cache
ttl_seconds: Time-to-live for cache entries in seconds
"""
self.max_size = max_size
self.ttl_seconds = ttl_seconds
self.cache: OrderedDict[str, Tuple[Any, float]] = OrderedDict()
logger.info(f"Initialized LRU cache with max_size={max_size}, ttl={ttl_seconds}s")
def _is_expired(self, timestamp: float) -> bool:
"""Check if an entry has expired."""
return time.time() - timestamp > self.ttl_seconds
def _cleanup_expired(self) -> None:
"""Remove expired entries from cache."""
current_time = time.time()
expired_keys = [
key for key, (_, timestamp) in self.cache.items()
if current_time - timestamp > self.ttl_seconds
]
for key in expired_keys:
del self.cache[key]
if expired_keys:
logger.debug(f"Cleaned up {len(expired_keys)} expired cache entries")
def get(self, key: str) -> Optional[Any]:
"""
Get value from cache.
Args:
key: Cache key
Returns:
Cached value or None if not found/expired
"""
self._cleanup_expired()
if key not in self.cache:
return None
# Move to end (most recently used)
value, timestamp = self.cache.pop(key)
# Check if expired
if self._is_expired(timestamp):
logger.debug(f"Cache entry expired: {key}")
return None
# Re-insert at end
self.cache[key] = (value, timestamp)
return value
def set(self, key: str, value: Any) -> None:
"""
Set value in cache.
Args:
key: Cache key
value: Value to cache
"""
self._cleanup_expired()
# Remove if exists
if key in self.cache:
del self.cache[key]
# Remove oldest if at capacity
elif len(self.cache) >= self.max_size:
oldest_key = next(iter(self.cache))
del self.cache[oldest_key]
logger.debug(f"Cache full, removed oldest entry: {oldest_key}")
# Add new entry
self.cache[key] = (value, time.time())
logger.debug(f"Cached entry: {key}")
def clear(self) -> None:
"""Clear all cache entries."""
count = len(self.cache)
self.cache.clear()
logger.info(f"Cleared {count} cache entries")
def size(self) -> int:
"""Get current cache size."""
self._cleanup_expired()
return len(self.cache)
def stats(self) -> Dict[str, Any]:
"""
Get cache statistics.
Returns:
Dictionary with cache statistics
"""
self._cleanup_expired()
return {
"size": len(self.cache),
"max_size": self.max_size,
"ttl_seconds": self.ttl_seconds,
"usage_percent": (len(self.cache) / self.max_size * 100) if self.max_size > 0 else 0
}
# Global cache instance
processed_results_cache = LRUCache(max_size=MAX_CACHE_SIZE, ttl_seconds=CACHE_TTL_SECONDS)