Spaces:
Running
Running
| """ | |
| Knowledge Universe - Cache Eviction Manager | |
| Handles automatic cache cleanup and memory management | |
| """ | |
| import asyncio | |
| import logging | |
| import time | |
| from typing import List, Tuple | |
| from src.cache.redis_manager import RedisManager | |
| from config.settings import get_settings | |
| settings = get_settings() | |
| logger = logging.getLogger(__name__) | |
| class EvictionManager: | |
| """ | |
| Multi-strategy cache eviction and cleanup | |
| Strategies: | |
| 1. TTL-based: Automatic Redis expiry | |
| 2. Memory-based: Evict when memory > threshold | |
| 3. LFU (Least Frequently Used): Remove low-access items | |
| 4. Staleness: Remove outdated results | |
| """ | |
| def __init__(self, redis_manager: RedisManager): | |
| self.redis = redis_manager | |
| self.max_memory_bytes = settings.MAX_CACHE_MEMORY_GB * (1024 ** 3) | |
| self.eviction_threshold = settings.EVICTION_THRESHOLD | |
| self.eviction_target = settings.EVICTION_TARGET | |
| self._eviction_stats = { | |
| 'total_evictions': 0, | |
| 'memory_evictions': 0, | |
| 'staleness_evictions': 0, | |
| 'lfu_evictions': 0 | |
| } | |
| async def cleanup_daemon(self): | |
| """ | |
| Main cleanup daemon - runs perpetually | |
| Executes multiple cleanup strategies on schedule | |
| """ | |
| logger.info("Starting eviction daemon...") | |
| strategies = [ | |
| (self.cleanup_by_memory, 300), # Every 5 minutes | |
| (self.cleanup_by_staleness, 1800), # Every 30 minutes | |
| (self.cleanup_orphaned, 3600), # Every hour | |
| ] | |
| # Create task for each strategy | |
| tasks = [] | |
| for strategy_func, interval in strategies: | |
| task = asyncio.create_task( | |
| self._run_strategy_loop(strategy_func, interval) | |
| ) | |
| tasks.append(task) | |
| # Run forever | |
| await asyncio.gather(*tasks) | |
| async def _run_strategy_loop(self, strategy_func, interval: int): | |
| """Run cleanup strategy on interval""" | |
| while True: | |
| try: | |
| evicted = await strategy_func() | |
| if evicted > 0: | |
| logger.info(f"{strategy_func.__name__} evicted {evicted} items") | |
| except Exception as e: | |
| logger.error(f"Cleanup strategy failed: {e}") | |
| await asyncio.sleep(interval) | |
| async def cleanup_by_memory(self) -> int: | |
| """ | |
| Evict items when memory usage exceeds threshold | |
| Uses LFU (Least Frequently Used) policy | |
| """ | |
| try: | |
| # Get current memory usage | |
| info = await self.redis.client.info('memory') | |
| used_memory = info.get('used_memory', 0) | |
| memory_percent = used_memory / self.max_memory_bytes | |
| if memory_percent < self.eviction_threshold: | |
| return 0 # No eviction needed | |
| logger.warning(f"Memory usage at {memory_percent:.1%}, triggering eviction") | |
| # Evict to target percentage | |
| evicted = await self._evict_by_lfu( | |
| target_percent=self.eviction_target | |
| ) | |
| self._eviction_stats['memory_evictions'] += evicted | |
| self._eviction_stats['total_evictions'] += evicted | |
| return evicted | |
| except Exception as e: | |
| logger.error(f"Memory-based cleanup failed: {e}") | |
| return 0 | |
| async def cleanup_by_staleness(self) -> int: | |
| """ | |
| Remove stale request results | |
| Staleness criteria: | |
| - Request cache older than 6 hours | |
| - Source cache older than 24 hours | |
| """ | |
| evicted = 0 | |
| try: | |
| now = time.time() | |
| # Check request cache | |
| async for key in self.redis.client.scan_iter(match="req:*"): | |
| try: | |
| key_str = key.decode('utf-8') if isinstance(key, bytes) else key | |
| metadata = await self.redis.hgetall(f"{key_str}:meta") | |
| if not metadata: | |
| continue | |
| created_at = float(metadata.get('created_at', 0)) | |
| age_hours = (now - created_at) / 3600 | |
| # Stale if older than 6 hours | |
| if age_hours > 6: | |
| await self.redis.delete(key_str) | |
| evicted += 1 | |
| except Exception as e: | |
| logger.debug(f"Error checking staleness for {key}: {e}") | |
| continue | |
| self._eviction_stats['staleness_evictions'] += evicted | |
| self._eviction_stats['total_evictions'] += evicted | |
| return evicted | |
| except Exception as e: | |
| logger.error(f"Staleness cleanup failed: {e}") | |
| return 0 | |
| async def cleanup_orphaned(self) -> int: | |
| """ | |
| Remove orphaned metadata keys | |
| Orphaned = metadata key exists but parent key doesn't | |
| """ | |
| evicted = 0 | |
| try: | |
| async for key in self.redis.client.scan_iter(match="*:meta"): | |
| key_str = key.decode('utf-8') if isinstance(key, bytes) else key | |
| # Check if parent key exists | |
| parent_key = key_str.replace(':meta', '') | |
| if not await self.redis.exists(parent_key): | |
| await self.redis.delete(key_str) | |
| evicted += 1 | |
| return evicted | |
| except Exception as e: | |
| logger.error(f"Orphan cleanup failed: {e}") | |
| return 0 | |
| async def _evict_by_lfu(self, target_percent: float) -> int: | |
| """ | |
| Least Frequently Used eviction | |
| Evicts items with lowest access count until memory < target | |
| """ | |
| evicted = 0 | |
| try: | |
| # Collect all keys with access counts | |
| items: List[Tuple[str, int]] = [] | |
| async for key in self.redis.client.scan_iter(): | |
| key_str = key.decode('utf-8') if isinstance(key, bytes) else key | |
| # Skip metadata keys | |
| if key_str.endswith(':meta'): | |
| continue | |
| # Get access count | |
| metadata = await self.redis.hgetall(f"{key_str}:meta") | |
| access_count = int(metadata.get('access_count', 0)) | |
| items.append((key_str, access_count)) | |
| # Sort by ascending access count (least used first) | |
| items.sort(key=lambda x: x[1]) | |
| # Evict items until target reached | |
| info = await self.redis.client.info('memory') | |
| used_memory = info.get('used_memory', 0) | |
| current_percent = used_memory / self.max_memory_bytes | |
| for key, access_count in items: | |
| if current_percent <= target_percent: | |
| break | |
| # Delete key | |
| await self.redis.delete(key) | |
| await self.redis.delete(f"{key}:meta") | |
| evicted += 1 | |
| # Recalculate memory | |
| info = await self.redis.client.info('memory') | |
| used_memory = info.get('used_memory', 0) | |
| current_percent = used_memory / self.max_memory_bytes | |
| return evicted | |
| except Exception as e: | |
| logger.error(f"LFU eviction failed: {e}") | |
| return 0 | |
| async def invalidate_source(self, source_id: str, cascade: bool = True): | |
| """ | |
| Invalidate specific source and optionally cascade | |
| Args: | |
| source_id: Source identifier | |
| cascade: If True, also invalidate requests containing this source | |
| """ | |
| try: | |
| # Delete source | |
| await self.redis.delete(f"source:{source_id}") | |
| await self.redis.delete(f"source:{source_id}:meta") | |
| if cascade: | |
| # Find and delete all requests mentioning this source | |
| evicted = 0 | |
| async for key in self.redis.client.scan_iter(match="req:*"): | |
| key_str = key.decode('utf-8') if isinstance(key, bytes) else key | |
| # Check if source is in cached result | |
| cached = await self.redis.get_json(key_str) | |
| if cached and any( | |
| s.get('id') == f"source:{source_id}" | |
| for s in cached.get('sources', []) | |
| ): | |
| await self.redis.delete(key_str) | |
| evicted += 1 | |
| logger.info(f"Invalidated source {source_id} and {evicted} dependent requests") | |
| except Exception as e: | |
| logger.error(f"Source invalidation failed: {e}") | |
| async def invalidate_pool(self, pool_name: str): | |
| """ | |
| Invalidate entire source pool | |
| Args: | |
| pool_name: Pool identifier (e.g., 'github', 'arxiv') | |
| """ | |
| try: | |
| # Delete all sources from pool | |
| evicted = await self.redis.delete_pattern(f"source:{pool_name}:*") | |
| # Cascade to requests | |
| req_evicted = await self.redis.delete_pattern("req:*") | |
| logger.info(f"Invalidated pool {pool_name}: {evicted} sources, {req_evicted} requests") | |
| except Exception as e: | |
| logger.error(f"Pool invalidation failed: {e}") | |
| async def invalidate_all(self): | |
| """ | |
| DANGEROUS: Clear entire cache | |
| Should only be used for: | |
| - Emergency cache corruption | |
| - Major version upgrades | |
| - Manual admin intervention | |
| """ | |
| logger.warning("⚠️ CLEARING ENTIRE CACHE ⚠️") | |
| try: | |
| await self.redis.client.flushdb() | |
| logger.warning("Cache cleared successfully") | |
| except Exception as e: | |
| logger.error(f"Cache clear failed: {e}") | |
| # Alias so routes.py calling .clear_all() doesn't break | |
| async def clear_all(self): | |
| """Alias for invalidate_all() — keeps routes.py compatible.""" | |
| await self.invalidate_all() | |
| def get_stats(self) -> dict: | |
| """Get eviction statistics""" | |
| return { | |
| **self._eviction_stats, | |
| 'eviction_threshold': self.eviction_threshold, | |
| 'eviction_target': self.eviction_target, | |
| 'max_memory_gb': settings.MAX_CACHE_MEMORY_GB | |
| } |