|
|
""" |
|
|
Cache management for NeuroSAM 3 application. |
|
|
Provides LRU cache with size limits and TTL for processed results. |
|
|
""" |
|
|
|
|
|
import time |
|
|
from typing import Optional, Dict, Any, Tuple |
|
|
from collections import OrderedDict |
|
|
from logger_config import logger |
|
|
from config import MAX_CACHE_SIZE, CACHE_TTL_SECONDS |
|
|
|
|
|
|
|
|
class LRUCache: |
|
|
""" |
|
|
Least Recently Used cache with TTL support. |
|
|
""" |
|
|
|
|
|
def __init__(self, max_size: int = MAX_CACHE_SIZE, ttl_seconds: int = CACHE_TTL_SECONDS): |
|
|
""" |
|
|
Initialize LRU cache. |
|
|
|
|
|
Args: |
|
|
max_size: Maximum number of items in cache |
|
|
ttl_seconds: Time-to-live for cache entries in seconds |
|
|
""" |
|
|
self.max_size = max_size |
|
|
self.ttl_seconds = ttl_seconds |
|
|
self.cache: OrderedDict[str, Tuple[Any, float]] = OrderedDict() |
|
|
logger.info(f"Initialized LRU cache with max_size={max_size}, ttl={ttl_seconds}s") |
|
|
|
|
|
def _is_expired(self, timestamp: float) -> bool: |
|
|
"""Check if an entry has expired.""" |
|
|
return time.time() - timestamp > self.ttl_seconds |
|
|
|
|
|
def _cleanup_expired(self) -> None: |
|
|
"""Remove expired entries from cache.""" |
|
|
current_time = time.time() |
|
|
expired_keys = [ |
|
|
key for key, (_, timestamp) in self.cache.items() |
|
|
if current_time - timestamp > self.ttl_seconds |
|
|
] |
|
|
for key in expired_keys: |
|
|
del self.cache[key] |
|
|
if expired_keys: |
|
|
logger.debug(f"Cleaned up {len(expired_keys)} expired cache entries") |
|
|
|
|
|
def get(self, key: str) -> Optional[Any]: |
|
|
""" |
|
|
Get value from cache. |
|
|
|
|
|
Args: |
|
|
key: Cache key |
|
|
|
|
|
Returns: |
|
|
Cached value or None if not found/expired |
|
|
""" |
|
|
self._cleanup_expired() |
|
|
|
|
|
if key not in self.cache: |
|
|
return None |
|
|
|
|
|
|
|
|
value, timestamp = self.cache.pop(key) |
|
|
|
|
|
|
|
|
if self._is_expired(timestamp): |
|
|
logger.debug(f"Cache entry expired: {key}") |
|
|
return None |
|
|
|
|
|
|
|
|
self.cache[key] = (value, timestamp) |
|
|
return value |
|
|
|
|
|
def set(self, key: str, value: Any) -> None: |
|
|
""" |
|
|
Set value in cache. |
|
|
|
|
|
Args: |
|
|
key: Cache key |
|
|
value: Value to cache |
|
|
""" |
|
|
self._cleanup_expired() |
|
|
|
|
|
|
|
|
if key in self.cache: |
|
|
del self.cache[key] |
|
|
|
|
|
elif len(self.cache) >= self.max_size: |
|
|
oldest_key = next(iter(self.cache)) |
|
|
del self.cache[oldest_key] |
|
|
logger.debug(f"Cache full, removed oldest entry: {oldest_key}") |
|
|
|
|
|
|
|
|
self.cache[key] = (value, time.time()) |
|
|
logger.debug(f"Cached entry: {key}") |
|
|
|
|
|
def clear(self) -> None: |
|
|
"""Clear all cache entries.""" |
|
|
count = len(self.cache) |
|
|
self.cache.clear() |
|
|
logger.info(f"Cleared {count} cache entries") |
|
|
|
|
|
def size(self) -> int: |
|
|
"""Get current cache size.""" |
|
|
self._cleanup_expired() |
|
|
return len(self.cache) |
|
|
|
|
|
def stats(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Get cache statistics. |
|
|
|
|
|
Returns: |
|
|
Dictionary with cache statistics |
|
|
""" |
|
|
self._cleanup_expired() |
|
|
return { |
|
|
"size": len(self.cache), |
|
|
"max_size": self.max_size, |
|
|
"ttl_seconds": self.ttl_seconds, |
|
|
"usage_percent": (len(self.cache) / self.max_size * 100) if self.max_size > 0 else 0 |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
processed_results_cache = LRUCache(max_size=MAX_CACHE_SIZE, ttl_seconds=CACHE_TTL_SECONDS) |
|
|
|
|
|
|