File size: 3,802 Bytes
69066c5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
"""
Cache management for NeuroSAM 3 application.
Provides LRU cache with size limits and TTL for processed results.
"""

import time
from typing import Optional, Dict, Any, Tuple
from collections import OrderedDict
from logger_config import logger
from config import MAX_CACHE_SIZE, CACHE_TTL_SECONDS


class LRUCache:
    """
    Least Recently Used cache with TTL support.
    """
    
    def __init__(self, max_size: int = MAX_CACHE_SIZE, ttl_seconds: int = CACHE_TTL_SECONDS):
        """
        Initialize LRU cache.
        
        Args:
            max_size: Maximum number of items in cache
            ttl_seconds: Time-to-live for cache entries in seconds
        """
        self.max_size = max_size
        self.ttl_seconds = ttl_seconds
        self.cache: OrderedDict[str, Tuple[Any, float]] = OrderedDict()
        logger.info(f"Initialized LRU cache with max_size={max_size}, ttl={ttl_seconds}s")
    
    def _is_expired(self, timestamp: float) -> bool:
        """Check if an entry has expired."""
        return time.time() - timestamp > self.ttl_seconds
    
    def _cleanup_expired(self) -> None:
        """Remove expired entries from cache."""
        current_time = time.time()
        expired_keys = [
            key for key, (_, timestamp) in self.cache.items()
            if current_time - timestamp > self.ttl_seconds
        ]
        for key in expired_keys:
            del self.cache[key]
        if expired_keys:
            logger.debug(f"Cleaned up {len(expired_keys)} expired cache entries")
    
    def get(self, key: str) -> Optional[Any]:
        """
        Get value from cache.
        
        Args:
            key: Cache key
        
        Returns:
            Cached value or None if not found/expired
        """
        self._cleanup_expired()
        
        if key not in self.cache:
            return None
        
        # Move to end (most recently used)
        value, timestamp = self.cache.pop(key)
        
        # Check if expired
        if self._is_expired(timestamp):
            logger.debug(f"Cache entry expired: {key}")
            return None
        
        # Re-insert at end
        self.cache[key] = (value, timestamp)
        return value
    
    def set(self, key: str, value: Any) -> None:
        """
        Set value in cache.
        
        Args:
            key: Cache key
            value: Value to cache
        """
        self._cleanup_expired()
        
        # Remove if exists
        if key in self.cache:
            del self.cache[key]
        # Remove oldest if at capacity
        elif len(self.cache) >= self.max_size:
            oldest_key = next(iter(self.cache))
            del self.cache[oldest_key]
            logger.debug(f"Cache full, removed oldest entry: {oldest_key}")
        
        # Add new entry
        self.cache[key] = (value, time.time())
        logger.debug(f"Cached entry: {key}")
    
    def clear(self) -> None:
        """Clear all cache entries."""
        count = len(self.cache)
        self.cache.clear()
        logger.info(f"Cleared {count} cache entries")
    
    def size(self) -> int:
        """Get current cache size."""
        self._cleanup_expired()
        return len(self.cache)
    
    def stats(self) -> Dict[str, Any]:
        """
        Get cache statistics.
        
        Returns:
            Dictionary with cache statistics
        """
        self._cleanup_expired()
        return {
            "size": len(self.cache),
            "max_size": self.max_size,
            "ttl_seconds": self.ttl_seconds,
            "usage_percent": (len(self.cache) / self.max_size * 100) if self.max_size > 0 else 0
        }


# Global cache instance
processed_results_cache = LRUCache(max_size=MAX_CACHE_SIZE, ttl_seconds=CACHE_TTL_SECONDS)