Knowledge-Universe / src /cache /eviction_manager.py
vlsiddarth's picture
Block 3: auth, decay engine, signup, usage, landing page, all empty files filled
b32fbe0
"""
Knowledge Universe - Cache Eviction Manager
Handles automatic cache cleanup and memory management
"""
import asyncio
import logging
import time
from typing import List, Tuple
from src.cache.redis_manager import RedisManager
from config.settings import get_settings
settings = get_settings()
logger = logging.getLogger(__name__)
class EvictionManager:
"""
Multi-strategy cache eviction and cleanup
Strategies:
1. TTL-based: Automatic Redis expiry
2. Memory-based: Evict when memory > threshold
3. LFU (Least Frequently Used): Remove low-access items
4. Staleness: Remove outdated results
"""
def __init__(self, redis_manager: RedisManager):
self.redis = redis_manager
self.max_memory_bytes = settings.MAX_CACHE_MEMORY_GB * (1024 ** 3)
self.eviction_threshold = settings.EVICTION_THRESHOLD
self.eviction_target = settings.EVICTION_TARGET
self._eviction_stats = {
'total_evictions': 0,
'memory_evictions': 0,
'staleness_evictions': 0,
'lfu_evictions': 0
}
async def cleanup_daemon(self):
"""
Main cleanup daemon - runs perpetually
Executes multiple cleanup strategies on schedule
"""
logger.info("Starting eviction daemon...")
strategies = [
(self.cleanup_by_memory, 300), # Every 5 minutes
(self.cleanup_by_staleness, 1800), # Every 30 minutes
(self.cleanup_orphaned, 3600), # Every hour
]
# Create task for each strategy
tasks = []
for strategy_func, interval in strategies:
task = asyncio.create_task(
self._run_strategy_loop(strategy_func, interval)
)
tasks.append(task)
# Run forever
await asyncio.gather(*tasks)
async def _run_strategy_loop(self, strategy_func, interval: int):
"""Run cleanup strategy on interval"""
while True:
try:
evicted = await strategy_func()
if evicted > 0:
logger.info(f"{strategy_func.__name__} evicted {evicted} items")
except Exception as e:
logger.error(f"Cleanup strategy failed: {e}")
await asyncio.sleep(interval)
async def cleanup_by_memory(self) -> int:
"""
Evict items when memory usage exceeds threshold
Uses LFU (Least Frequently Used) policy
"""
try:
# Get current memory usage
info = await self.redis.client.info('memory')
used_memory = info.get('used_memory', 0)
memory_percent = used_memory / self.max_memory_bytes
if memory_percent < self.eviction_threshold:
return 0 # No eviction needed
logger.warning(f"Memory usage at {memory_percent:.1%}, triggering eviction")
# Evict to target percentage
evicted = await self._evict_by_lfu(
target_percent=self.eviction_target
)
self._eviction_stats['memory_evictions'] += evicted
self._eviction_stats['total_evictions'] += evicted
return evicted
except Exception as e:
logger.error(f"Memory-based cleanup failed: {e}")
return 0
async def cleanup_by_staleness(self) -> int:
"""
Remove stale request results
Staleness criteria:
- Request cache older than 6 hours
- Source cache older than 24 hours
"""
evicted = 0
try:
now = time.time()
# Check request cache
async for key in self.redis.client.scan_iter(match="req:*"):
try:
key_str = key.decode('utf-8') if isinstance(key, bytes) else key
metadata = await self.redis.hgetall(f"{key_str}:meta")
if not metadata:
continue
created_at = float(metadata.get('created_at', 0))
age_hours = (now - created_at) / 3600
# Stale if older than 6 hours
if age_hours > 6:
await self.redis.delete(key_str)
evicted += 1
except Exception as e:
logger.debug(f"Error checking staleness for {key}: {e}")
continue
self._eviction_stats['staleness_evictions'] += evicted
self._eviction_stats['total_evictions'] += evicted
return evicted
except Exception as e:
logger.error(f"Staleness cleanup failed: {e}")
return 0
async def cleanup_orphaned(self) -> int:
"""
Remove orphaned metadata keys
Orphaned = metadata key exists but parent key doesn't
"""
evicted = 0
try:
async for key in self.redis.client.scan_iter(match="*:meta"):
key_str = key.decode('utf-8') if isinstance(key, bytes) else key
# Check if parent key exists
parent_key = key_str.replace(':meta', '')
if not await self.redis.exists(parent_key):
await self.redis.delete(key_str)
evicted += 1
return evicted
except Exception as e:
logger.error(f"Orphan cleanup failed: {e}")
return 0
async def _evict_by_lfu(self, target_percent: float) -> int:
"""
Least Frequently Used eviction
Evicts items with lowest access count until memory < target
"""
evicted = 0
try:
# Collect all keys with access counts
items: List[Tuple[str, int]] = []
async for key in self.redis.client.scan_iter():
key_str = key.decode('utf-8') if isinstance(key, bytes) else key
# Skip metadata keys
if key_str.endswith(':meta'):
continue
# Get access count
metadata = await self.redis.hgetall(f"{key_str}:meta")
access_count = int(metadata.get('access_count', 0))
items.append((key_str, access_count))
# Sort by ascending access count (least used first)
items.sort(key=lambda x: x[1])
# Evict items until target reached
info = await self.redis.client.info('memory')
used_memory = info.get('used_memory', 0)
current_percent = used_memory / self.max_memory_bytes
for key, access_count in items:
if current_percent <= target_percent:
break
# Delete key
await self.redis.delete(key)
await self.redis.delete(f"{key}:meta")
evicted += 1
# Recalculate memory
info = await self.redis.client.info('memory')
used_memory = info.get('used_memory', 0)
current_percent = used_memory / self.max_memory_bytes
return evicted
except Exception as e:
logger.error(f"LFU eviction failed: {e}")
return 0
async def invalidate_source(self, source_id: str, cascade: bool = True):
"""
Invalidate specific source and optionally cascade
Args:
source_id: Source identifier
cascade: If True, also invalidate requests containing this source
"""
try:
# Delete source
await self.redis.delete(f"source:{source_id}")
await self.redis.delete(f"source:{source_id}:meta")
if cascade:
# Find and delete all requests mentioning this source
evicted = 0
async for key in self.redis.client.scan_iter(match="req:*"):
key_str = key.decode('utf-8') if isinstance(key, bytes) else key
# Check if source is in cached result
cached = await self.redis.get_json(key_str)
if cached and any(
s.get('id') == f"source:{source_id}"
for s in cached.get('sources', [])
):
await self.redis.delete(key_str)
evicted += 1
logger.info(f"Invalidated source {source_id} and {evicted} dependent requests")
except Exception as e:
logger.error(f"Source invalidation failed: {e}")
async def invalidate_pool(self, pool_name: str):
"""
Invalidate entire source pool
Args:
pool_name: Pool identifier (e.g., 'github', 'arxiv')
"""
try:
# Delete all sources from pool
evicted = await self.redis.delete_pattern(f"source:{pool_name}:*")
# Cascade to requests
req_evicted = await self.redis.delete_pattern("req:*")
logger.info(f"Invalidated pool {pool_name}: {evicted} sources, {req_evicted} requests")
except Exception as e:
logger.error(f"Pool invalidation failed: {e}")
async def invalidate_all(self):
"""
DANGEROUS: Clear entire cache
Should only be used for:
- Emergency cache corruption
- Major version upgrades
- Manual admin intervention
"""
logger.warning("⚠️ CLEARING ENTIRE CACHE ⚠️")
try:
await self.redis.client.flushdb()
logger.warning("Cache cleared successfully")
except Exception as e:
logger.error(f"Cache clear failed: {e}")
# Alias so routes.py calling .clear_all() doesn't break
async def clear_all(self):
"""Alias for invalidate_all() — keeps routes.py compatible."""
await self.invalidate_all()
def get_stats(self) -> dict:
"""Get eviction statistics"""
return {
**self._eviction_stats,
'eviction_threshold': self.eviction_threshold,
'eviction_target': self.eviction_target,
'max_memory_gb': settings.MAX_CACHE_MEMORY_GB
}