""" Knowledge Universe - Cache Eviction Manager Handles automatic cache cleanup and memory management """ import asyncio import logging import time from typing import List, Tuple from src.cache.redis_manager import RedisManager from config.settings import get_settings settings = get_settings() logger = logging.getLogger(__name__) class EvictionManager: """ Multi-strategy cache eviction and cleanup Strategies: 1. TTL-based: Automatic Redis expiry 2. Memory-based: Evict when memory > threshold 3. LFU (Least Frequently Used): Remove low-access items 4. Staleness: Remove outdated results """ def __init__(self, redis_manager: RedisManager): self.redis = redis_manager self.max_memory_bytes = settings.MAX_CACHE_MEMORY_GB * (1024 ** 3) self.eviction_threshold = settings.EVICTION_THRESHOLD self.eviction_target = settings.EVICTION_TARGET self._eviction_stats = { 'total_evictions': 0, 'memory_evictions': 0, 'staleness_evictions': 0, 'lfu_evictions': 0 } async def cleanup_daemon(self): """ Main cleanup daemon - runs perpetually Executes multiple cleanup strategies on schedule """ logger.info("Starting eviction daemon...") strategies = [ (self.cleanup_by_memory, 300), # Every 5 minutes (self.cleanup_by_staleness, 1800), # Every 30 minutes (self.cleanup_orphaned, 3600), # Every hour ] # Create task for each strategy tasks = [] for strategy_func, interval in strategies: task = asyncio.create_task( self._run_strategy_loop(strategy_func, interval) ) tasks.append(task) # Run forever await asyncio.gather(*tasks) async def _run_strategy_loop(self, strategy_func, interval: int): """Run cleanup strategy on interval""" while True: try: evicted = await strategy_func() if evicted > 0: logger.info(f"{strategy_func.__name__} evicted {evicted} items") except Exception as e: logger.error(f"Cleanup strategy failed: {e}") await asyncio.sleep(interval) async def cleanup_by_memory(self) -> int: """ Evict items when memory usage exceeds threshold Uses LFU (Least Frequently Used) policy """ try: # Get current memory usage info = await self.redis.client.info('memory') used_memory = info.get('used_memory', 0) memory_percent = used_memory / self.max_memory_bytes if memory_percent < self.eviction_threshold: return 0 # No eviction needed logger.warning(f"Memory usage at {memory_percent:.1%}, triggering eviction") # Evict to target percentage evicted = await self._evict_by_lfu( target_percent=self.eviction_target ) self._eviction_stats['memory_evictions'] += evicted self._eviction_stats['total_evictions'] += evicted return evicted except Exception as e: logger.error(f"Memory-based cleanup failed: {e}") return 0 async def cleanup_by_staleness(self) -> int: """ Remove stale request results Staleness criteria: - Request cache older than 6 hours - Source cache older than 24 hours """ evicted = 0 try: now = time.time() # Check request cache async for key in self.redis.client.scan_iter(match="req:*"): try: key_str = key.decode('utf-8') if isinstance(key, bytes) else key metadata = await self.redis.hgetall(f"{key_str}:meta") if not metadata: continue created_at = float(metadata.get('created_at', 0)) age_hours = (now - created_at) / 3600 # Stale if older than 6 hours if age_hours > 6: await self.redis.delete(key_str) evicted += 1 except Exception as e: logger.debug(f"Error checking staleness for {key}: {e}") continue self._eviction_stats['staleness_evictions'] += evicted self._eviction_stats['total_evictions'] += evicted return evicted except Exception as e: logger.error(f"Staleness cleanup failed: {e}") return 0 async def cleanup_orphaned(self) -> int: """ Remove orphaned metadata keys Orphaned = metadata key exists but parent key doesn't """ evicted = 0 try: async for key in self.redis.client.scan_iter(match="*:meta"): key_str = key.decode('utf-8') if isinstance(key, bytes) else key # Check if parent key exists parent_key = key_str.replace(':meta', '') if not await self.redis.exists(parent_key): await self.redis.delete(key_str) evicted += 1 return evicted except Exception as e: logger.error(f"Orphan cleanup failed: {e}") return 0 async def _evict_by_lfu(self, target_percent: float) -> int: """ Least Frequently Used eviction Evicts items with lowest access count until memory < target """ evicted = 0 try: # Collect all keys with access counts items: List[Tuple[str, int]] = [] async for key in self.redis.client.scan_iter(): key_str = key.decode('utf-8') if isinstance(key, bytes) else key # Skip metadata keys if key_str.endswith(':meta'): continue # Get access count metadata = await self.redis.hgetall(f"{key_str}:meta") access_count = int(metadata.get('access_count', 0)) items.append((key_str, access_count)) # Sort by ascending access count (least used first) items.sort(key=lambda x: x[1]) # Evict items until target reached info = await self.redis.client.info('memory') used_memory = info.get('used_memory', 0) current_percent = used_memory / self.max_memory_bytes for key, access_count in items: if current_percent <= target_percent: break # Delete key await self.redis.delete(key) await self.redis.delete(f"{key}:meta") evicted += 1 # Recalculate memory info = await self.redis.client.info('memory') used_memory = info.get('used_memory', 0) current_percent = used_memory / self.max_memory_bytes return evicted except Exception as e: logger.error(f"LFU eviction failed: {e}") return 0 async def invalidate_source(self, source_id: str, cascade: bool = True): """ Invalidate specific source and optionally cascade Args: source_id: Source identifier cascade: If True, also invalidate requests containing this source """ try: # Delete source await self.redis.delete(f"source:{source_id}") await self.redis.delete(f"source:{source_id}:meta") if cascade: # Find and delete all requests mentioning this source evicted = 0 async for key in self.redis.client.scan_iter(match="req:*"): key_str = key.decode('utf-8') if isinstance(key, bytes) else key # Check if source is in cached result cached = await self.redis.get_json(key_str) if cached and any( s.get('id') == f"source:{source_id}" for s in cached.get('sources', []) ): await self.redis.delete(key_str) evicted += 1 logger.info(f"Invalidated source {source_id} and {evicted} dependent requests") except Exception as e: logger.error(f"Source invalidation failed: {e}") async def invalidate_pool(self, pool_name: str): """ Invalidate entire source pool Args: pool_name: Pool identifier (e.g., 'github', 'arxiv') """ try: # Delete all sources from pool evicted = await self.redis.delete_pattern(f"source:{pool_name}:*") # Cascade to requests req_evicted = await self.redis.delete_pattern("req:*") logger.info(f"Invalidated pool {pool_name}: {evicted} sources, {req_evicted} requests") except Exception as e: logger.error(f"Pool invalidation failed: {e}") async def invalidate_all(self): """ DANGEROUS: Clear entire cache Should only be used for: - Emergency cache corruption - Major version upgrades - Manual admin intervention """ logger.warning("⚠️ CLEARING ENTIRE CACHE ⚠️") try: await self.redis.client.flushdb() logger.warning("Cache cleared successfully") except Exception as e: logger.error(f"Cache clear failed: {e}") # Alias so routes.py calling .clear_all() doesn't break async def clear_all(self): """Alias for invalidate_all() — keeps routes.py compatible.""" await self.invalidate_all() def get_stats(self) -> dict: """Get eviction statistics""" return { **self._eviction_stats, 'eviction_threshold': self.eviction_threshold, 'eviction_target': self.eviction_target, 'max_memory_gb': settings.MAX_CACHE_MEMORY_GB }