#!/usr/bin/env python3 """ Context Memory Cache Manager A sophisticated caching system for NZ Legislation Loophole Analysis that provides: - Hash-based chunk identification for unique content tracking - Multi-level caching (memory + optional disk persistence) - Intelligent cache invalidation based on memory limits - Performance metrics and cache statistics - Thread-safe operations for concurrent processing """ import hashlib import json import os import time import threading from typing import Dict, Any, Optional, Tuple from functools import lru_cache import sqlite3 from pathlib import Path import psutil import streamlit as st class CacheEntry: """Represents a single cache entry with metadata""" def __init__(self, key: str, content: str, analysis_result: Dict[str, Any], model_config: Dict[str, Any], processing_config: Dict[str, Any]): self.key = key self.content = content self.analysis_result = analysis_result self.model_config = model_config self.processing_config = processing_config self.created_at = time.time() self.last_accessed = time.time() self.access_count = 0 self.size_bytes = len(content.encode('utf-8')) + len(str(analysis_result).encode('utf-8')) def to_dict(self) -> Dict[str, Any]: """Convert cache entry to dictionary for serialization""" return { 'key': self.key, 'content': self.content, 'analysis_result': self.analysis_result, 'model_config': self.model_config, 'processing_config': self.processing_config, 'created_at': self.created_at, 'last_accessed': self.last_accessed, 'access_count': self.access_count, 'size_bytes': self.size_bytes } @classmethod def from_dict(cls, data: Dict[str, Any]) -> 'CacheEntry': """Create cache entry from dictionary""" entry = cls( key=data['key'], content=data['content'], analysis_result=data['analysis_result'], model_config=data['model_config'], processing_config=data['processing_config'] ) entry.created_at = data.get('created_at', time.time()) entry.last_accessed = data.get('last_accessed', time.time()) entry.access_count = data.get('access_count', 0) entry.size_bytes = data.get('size_bytes', entry.size_bytes) return entry def update_access(self): """Update access statistics""" self.last_accessed = time.time() self.access_count += 1 class CacheManager: """Advanced cache manager for legislation analysis""" def __init__(self, max_memory_mb: int = 1024, persistent: bool = True, cache_dir: str = None, ttl_hours: int = 24): """ Initialize the cache manager Args: max_memory_mb: Maximum memory to use for caching (MB) persistent: Whether to use persistent disk cache cache_dir: Directory for persistent cache storage ttl_hours: Time-to-live for cache entries (hours) """ self.max_memory_mb = max_memory_mb self.persistent = persistent self.ttl_hours = ttl_hours self.ttl_seconds = ttl_hours * 3600 # Set up cache directory if cache_dir is None: cache_dir = os.path.join(os.path.dirname(__file__), '..', 'cache') self.cache_dir = Path(cache_dir) self.cache_dir.mkdir(exist_ok=True) self.db_path = self.cache_dir / 'cache.db' # Thread synchronization self.lock = threading.RLock() # In-memory cache with LRU eviction self.memory_cache: Dict[str, CacheEntry] = {} self.memory_size = 0 # Current memory usage in bytes # Statistics self.stats = { 'hits': 0, 'misses': 0, 'entries': 0, 'memory_usage_mb': 0, 'evictions': 0, 'enabled': True } # Initialize database if persistent if self.persistent: self._init_database() # Load existing cache entries if persistent if self.persistent: self._load_persistent_cache() def _init_database(self): """Initialize SQLite database for persistent cache""" try: with sqlite3.connect(str(self.db_path)) as conn: conn.execute(''' CREATE TABLE IF NOT EXISTS cache_entries ( key TEXT PRIMARY KEY, data TEXT NOT NULL, created_at REAL NOT NULL, last_accessed REAL NOT NULL, access_count INTEGER DEFAULT 0, size_bytes INTEGER DEFAULT 0 ) ''') conn.execute('CREATE INDEX IF NOT EXISTS idx_created_at ON cache_entries(created_at)') conn.execute('CREATE INDEX IF NOT EXISTS idx_last_accessed ON cache_entries(last_accessed)') except Exception as e: print(f"Warning: Could not initialize persistent cache: {e}") self.persistent = False def _load_persistent_cache(self): """Load existing cache entries from database""" if not self.persistent: return try: with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.execute('SELECT data FROM cache_entries') for row in cursor: try: entry_data = json.loads(row[0]) entry = CacheEntry.from_dict(entry_data) # Check if entry is still valid if self._is_entry_valid(entry): self._add_to_memory_cache(entry) else: # Remove expired entry from database conn.execute('DELETE FROM cache_entries WHERE key = ?', (entry.key,)) except (json.JSONDecodeError, KeyError): continue except Exception as e: print(f"Warning: Could not load persistent cache: {e}") def _generate_cache_key(self, content: str, model_config: Dict[str, Any], processing_config: Dict[str, Any]) -> str: """ Generate a unique cache key based on content and configuration Args: content: The text content to be analyzed model_config: Model configuration used for analysis processing_config: Processing configuration used Returns: SHA-256 hash string as cache key """ # Create a deterministic string from all parameters key_data = { 'content': content, 'model_config': model_config, 'processing_config': processing_config } # Convert to JSON string with sorted keys for consistency key_string = json.dumps(key_data, sort_keys=True) # Generate SHA-256 hash return hashlib.sha256(key_string.encode('utf-8')).hexdigest() def _is_entry_valid(self, entry: CacheEntry) -> bool: """Check if a cache entry is still valid""" # Check TTL if time.time() - entry.created_at > self.ttl_seconds: return False # Check if configurations match (for future-proofing) # This could be enhanced to handle configuration changes return True def _add_to_memory_cache(self, entry: CacheEntry): """Add entry to memory cache with size management""" with self.lock: # Check if we need to evict entries while self.memory_size + entry.size_bytes > self.max_memory_mb * 1024 * 1024: if not self.memory_cache: break self._evict_lru_entry() self.memory_cache[entry.key] = entry self.memory_size += entry.size_bytes self.stats['entries'] = len(self.memory_cache) self.stats['memory_usage_mb'] = self.memory_size / (1024 * 1024) def _evict_lru_entry(self): """Evict the least recently used entry from memory cache""" if not self.memory_cache: return # Find entry with oldest last_accessed time lru_key = min(self.memory_cache.keys(), key=lambda k: self.memory_cache[k].last_accessed) evicted_entry = self.memory_cache.pop(lru_key) self.memory_size -= evicted_entry.size_bytes self.stats['evictions'] += 1 # If persistent, we could keep it in database but remove from memory # For now, we'll just remove it completely def _save_to_persistent_cache(self, entry: CacheEntry): """Save entry to persistent cache""" if not self.persistent: return try: with sqlite3.connect(str(self.db_path)) as conn: conn.execute(''' INSERT OR REPLACE INTO cache_entries (key, data, created_at, last_accessed, access_count, size_bytes) VALUES (?, ?, ?, ?, ?, ?) ''', ( entry.key, json.dumps(entry.to_dict()), entry.created_at, entry.last_accessed, entry.access_count, entry.size_bytes )) except Exception as e: print(f"Warning: Could not save to persistent cache: {e}") def get(self, content: str, model_config: Dict[str, Any], processing_config: Dict[str, Any]) -> Optional[Dict[str, Any]]: """ Get cached analysis result for given content and configuration Args: content: Text content to look up model_config: Model configuration used for analysis processing_config: Processing configuration used Returns: Cached analysis result or None if not found """ if not self.stats['enabled']: self.stats['misses'] += 1 return None cache_key = self._generate_cache_key(content, model_config, processing_config) with self.lock: # Check memory cache first if cache_key in self.memory_cache: entry = self.memory_cache[cache_key] if self._is_entry_valid(entry): entry.update_access() self.stats['hits'] += 1 return entry.analysis_result else: # Remove invalid entry self.memory_cache.pop(cache_key) self.memory_size -= entry.size_bytes self.stats['entries'] = len(self.memory_cache) # Check persistent cache if not in memory if self.persistent: try: with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.execute('SELECT data FROM cache_entries WHERE key = ?', (cache_key,)) row = cursor.fetchone() if row: entry_data = json.loads(row[0]) entry = CacheEntry.from_dict(entry_data) if self._is_entry_valid(entry): entry.update_access() self.stats['hits'] += 1 # Move to memory cache for faster future access self._add_to_memory_cache(entry) # Update persistent cache with new access stats self._save_to_persistent_cache(entry) return entry.analysis_result except Exception as e: print(f"Warning: Error accessing persistent cache: {e}") self.stats['misses'] += 1 return None def put(self, content: str, analysis_result: Dict[str, Any], model_config: Dict[str, Any], processing_config: Dict[str, Any]): """ Store analysis result in cache Args: content: Text content that was analyzed analysis_result: Analysis result to cache model_config: Model configuration used for analysis processing_config: Processing configuration used """ if not self.stats['enabled']: return cache_key = self._generate_cache_key(content, model_config, processing_config) with self.lock: entry = CacheEntry(cache_key, content, analysis_result, model_config, processing_config) # Add to memory cache self._add_to_memory_cache(entry) # Save to persistent cache self._save_to_persistent_cache(entry) def get_stats(self) -> Dict[str, Any]: """Get cache statistics""" with self.lock: total_requests = self.stats['hits'] + self.stats['misses'] hit_rate = (self.stats['hits'] / total_requests * 100) if total_requests > 0 else 0 return { **self.stats, 'hit_rate': hit_rate, 'total_requests': total_requests, 'persistent_enabled': self.persistent, 'memory_limit_mb': self.max_memory_mb, 'ttl_hours': self.ttl_hours } def clear_cache(self): """Clear all cache entries""" with self.lock: self.memory_cache.clear() self.memory_size = 0 self.stats['entries'] = 0 self.stats['hits'] = 0 self.stats['misses'] = 0 self.stats['evictions'] = 0 self.stats['memory_usage_mb'] = 0 # Clear persistent cache if self.persistent: try: with sqlite3.connect(str(self.db_path)) as conn: conn.execute('DELETE FROM cache_entries') except Exception as e: print(f"Warning: Could not clear persistent cache: {e}") def cleanup_expired_entries(self): """Remove expired entries from cache""" current_time = time.time() expired_keys = [] with self.lock: # Find expired entries in memory for key, entry in self.memory_cache.items(): if current_time - entry.created_at > self.ttl_seconds: expired_keys.append(key) self.memory_size -= entry.size_bytes # Remove expired entries from memory for key in expired_keys: del self.memory_cache[key] self.stats['entries'] = len(self.memory_cache) self.stats['memory_usage_mb'] = self.memory_size / (1024 * 1024) # Clean up persistent cache if self.persistent: try: with sqlite3.connect(str(self.db_path)) as conn: conn.execute('DELETE FROM cache_entries WHERE ? - created_at > ?', (current_time, self.ttl_seconds)) except Exception as e: print(f"Warning: Could not cleanup persistent cache: {e}") def enable(self): """Enable caching""" self.stats['enabled'] = True def disable(self): """Disable caching""" self.stats['enabled'] = False def export_cache(self, filepath: str): """Export cache contents to JSON file""" cache_data = { 'metadata': { 'exported_at': time.time(), 'version': '1.0', 'total_entries': len(self.memory_cache) }, 'entries': [] } with self.lock: for entry in self.memory_cache.values(): cache_data['entries'].append(entry.to_dict()) # Also export persistent cache entries if self.persistent: try: with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.execute('SELECT data FROM cache_entries') for row in cursor: try: entry_data = json.loads(row[0]) cache_data['entries'].append(entry_data) except json.JSONDecodeError: continue except Exception as e: print(f"Warning: Could not export persistent cache: {e}") try: with open(filepath, 'w', encoding='utf-8') as f: json.dump(cache_data, f, indent=2, ensure_ascii=False) return True except Exception as e: print(f"Error exporting cache: {e}") return False def import_cache(self, filepath: str): """Import cache contents from JSON file""" try: with open(filepath, 'r', encoding='utf-8') as f: cache_data = json.load(f) imported_count = 0 for entry_data in cache_data.get('entries', []): try: entry = CacheEntry.from_dict(entry_data) if self._is_entry_valid(entry): self._add_to_memory_cache(entry) if self.persistent: self._save_to_persistent_cache(entry) imported_count += 1 except Exception as e: print(f"Warning: Could not import cache entry: {e}") continue return imported_count except Exception as e: print(f"Error importing cache: {e}") return 0 # Global cache instance for use across the application _cache_instance = None _cache_lock = threading.Lock() def get_cache_manager(max_memory_mb: int = 1024, persistent: bool = True, cache_dir: str = None, ttl_hours: int = 24) -> CacheManager: """ Get or create global cache manager instance This ensures we have a single cache instance across the application while allowing configuration updates. """ global _cache_instance with _cache_lock: if _cache_instance is None: _cache_instance = CacheManager(max_memory_mb, persistent, cache_dir, ttl_hours) else: # Update configuration if different if (_cache_instance.max_memory_mb != max_memory_mb or _cache_instance.persistent != persistent or _cache_instance.ttl_hours != ttl_hours): _cache_instance.max_memory_mb = max_memory_mb _cache_instance.persistent = persistent _cache_instance.ttl_hours = ttl_hours _cache_instance.ttl_seconds = ttl_hours * 3600 return _cache_instance