Insurance-RAG / utils /cache.py
DeltaVenom's picture
Update app code and initialize runtime databases
72bff80
"""
LRU cache with TTL support for application data.
"""
import time
import threading
from typing import Any, Optional, Dict
from collections import OrderedDict
from config import config
class LRUCacheWithTTL:
"""Thread-safe LRU cache with TTL (Time To Live) support."""
def __init__(self, max_size: int = None, ttl_seconds: int = None):
"""
Initialize cache.
Args:
max_size: Maximum number of items (default from config)
ttl_seconds: Time to live in seconds (default from config)
"""
self.max_size = max_size or config.CACHE_MAX_SIZE
self.ttl = ttl_seconds or config.CACHE_TTL
self._cache = OrderedDict()
self._timestamps = {}
self._lock = threading.Lock()
self._hits = 0
self._misses = 0
def get(self, key: str) -> Optional[Any]:
"""Get value from cache."""
with self._lock:
if key not in self._cache:
self._misses += 1
return None
# Check if expired
if time.time() - self._timestamps[key] > self.ttl:
self._evict(key)
self._misses += 1
return None
# Move to end (most recently used)
self._cache.move_to_end(key)
self._hits += 1
return self._cache[key]
def set(self, key: str, value: Any):
"""Set value in cache."""
with self._lock:
# Update if exists
if key in self._cache:
self._cache.move_to_end(key)
self._cache[key] = value
self._timestamps[key] = time.time()
return
# Add new item
self._cache[key] = value
self._timestamps[key] = time.time()
# Evict oldest if necessary
if len(self._cache) > self.max_size:
oldest_key = next(iter(self._cache))
self._evict(oldest_key)
def _evict(self, key: str):
"""Evict item from cache."""
if key in self._cache:
del self._cache[key]
del self._timestamps[key]
def clear(self):
"""Clear all cached items."""
with self._lock:
self._cache.clear()
self._timestamps.clear()
def invalidate(self, key: str):
"""Invalidate a specific cache entry."""
with self._lock:
self._evict(key)
def get_stats(self) -> Dict:
"""Get cache statistics."""
with self._lock:
total_requests = self._hits + self._misses
hit_rate = self._hits / max(1, total_requests)
return {
"size": len(self._cache),
"max_size": self.max_size,
"hits": self._hits,
"misses": self._misses,
"hit_rate": hit_rate,
"ttl_seconds": self.ttl
}
class CacheManager:
"""Manages multiple caches for different data types."""
def __init__(self):
# Cache for plan listings
self.plan_cache = LRUCacheWithTTL(max_size=100, ttl_seconds=300) # 5 minutes
# Cache for plan metadata
self.metadata_cache = LRUCacheWithTTL(max_size=500, ttl_seconds=600) # 10 minutes
# Cache for query rewrites
self.query_cache = LRUCacheWithTTL(max_size=1000, ttl_seconds=300) # 5 minutes
def invalidate_all(self):
"""Invalidate all caches (e.g., after ingestion)."""
self.plan_cache.clear()
self.metadata_cache.clear()
# Don't clear query cache as it's query-dependent
def get_all_stats(self) -> Dict:
"""Get statistics for all caches."""
return {
"plan_cache": self.plan_cache.get_stats(),
"metadata_cache": self.metadata_cache.get_stats(),
"query_cache": self.query_cache.get_stats(),
}
# Global cache manager
cache_manager = CacheManager()