Production_Rag / cache.py
TharaKavin's picture
Upload 17 files
6f94597 verified
Raw
History Blame Contribute Delete
2.03 kB
import time
import threading
from typing import Any, Optional, Dict
from collections import OrderedDict
class LRUCache:
def __init__(self, max_size: int = 200, ttl_seconds: int = 3600):
self.max_size = max_size
self.ttl = ttl_seconds
self._store: OrderedDict = OrderedDict()
self._expiry: Dict[str, float] = {}
self._lock = threading.Lock()
def _is_expired(self, key: str) -> bool:
exp = self._expiry.get(key)
return exp is not None and time.monotonic() > exp
def get(self, key: Any) -> Optional[Any]:
serialized = self._serialize_key(key)
with self._lock:
if serialized not in self._store:
return None
if self._is_expired(serialized):
self._store.pop(serialized)
self._expiry.pop(serialized, None)
return None
self._store.move_to_end(serialized)
return self._store[serialized]
def put(self, key: Any, value: Any):
serialized = self._serialize_key(key)
with self._lock:
if serialized in self._store:
self._store.move_to_end(serialized)
self._store[serialized] = value
self._expiry[serialized] = time.monotonic() + self.ttl
while len(self._store) > self.max_size:
oldest, _ = self._store.popitem(last=False)
self._expiry.pop(oldest, None)
def clear(self):
with self._lock:
self._store.clear()
self._expiry.clear()
def invalidate_prefix(self, prefix: Any):
serialized = str(prefix)
with self._lock:
keys = [k for k in self._store if str(k).startswith(serialized)]
for k in keys:
self._store.pop(k, None)
self._expiry.pop(k, None)
@staticmethod
def _serialize_key(key: Any) -> str:
if isinstance(key, tuple):
return "::".join(str(k) for k in key)
return str(key)