Spaces:
Sleeping
Sleeping
| """ | |
| Cache Manager for Smart Auto-Complete | |
| Provides efficient caching of API responses to improve performance | |
| """ | |
| import hashlib | |
| import json | |
| import logging | |
| import time | |
| from typing import Any, Dict, List, Optional, Union | |
| import threading | |
| from collections import OrderedDict | |
| logger = logging.getLogger(__name__) | |
| class CacheManager: | |
| """ | |
| In-memory cache manager with TTL (Time To Live) support | |
| Uses LRU (Least Recently Used) eviction policy | |
| """ | |
| def __init__(self, settings=None): | |
| """ | |
| Initialize the cache manager | |
| Args: | |
| settings: Application settings object | |
| """ | |
| self.settings = settings | |
| # Cache configuration | |
| self.max_size = getattr(settings, 'CACHE_MAX_SIZE', 1000) if settings else 1000 | |
| self.default_ttl = getattr(settings, 'CACHE_TTL', 3600) if settings else 3600 # 1 hour | |
| # Cache storage | |
| self._cache = OrderedDict() | |
| self._timestamps = {} | |
| self._access_counts = {} | |
| # Thread safety | |
| self._lock = threading.RLock() | |
| # Statistics | |
| self._stats = { | |
| 'hits': 0, | |
| 'misses': 0, | |
| 'evictions': 0, | |
| 'sets': 0 | |
| } | |
| logger.info(f"Cache manager initialized with max_size={self.max_size}, ttl={self.default_ttl}s") | |
| def get(self, key: str) -> Optional[Any]: | |
| """ | |
| Get a value from the cache | |
| Args: | |
| key: Cache key | |
| Returns: | |
| Cached value or None if not found/expired | |
| """ | |
| with self._lock: | |
| try: | |
| # Generate hash key for consistency | |
| hash_key = self._generate_key_hash(key) | |
| # Check if key exists | |
| if hash_key not in self._cache: | |
| self._stats['misses'] += 1 | |
| return None | |
| # Check if expired | |
| if self._is_expired(hash_key): | |
| self._remove_key(hash_key) | |
| self._stats['misses'] += 1 | |
| return None | |
| # Move to end (mark as recently used) | |
| value = self._cache[hash_key] | |
| self._cache.move_to_end(hash_key) | |
| self._access_counts[hash_key] = self._access_counts.get(hash_key, 0) + 1 | |
| self._stats['hits'] += 1 | |
| logger.debug(f"Cache hit for key: {key[:50]}...") | |
| return value | |
| except Exception as e: | |
| logger.error(f"Error getting from cache: {str(e)}") | |
| self._stats['misses'] += 1 | |
| return None | |
| def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool: | |
| """ | |
| Set a value in the cache | |
| Args: | |
| key: Cache key | |
| value: Value to cache | |
| ttl: Time to live in seconds (uses default if None) | |
| Returns: | |
| True if successfully cached, False otherwise | |
| """ | |
| with self._lock: | |
| try: | |
| # Generate hash key | |
| hash_key = self._generate_key_hash(key) | |
| # Use default TTL if not specified | |
| cache_ttl = ttl if ttl is not None else self.default_ttl | |
| # Check if we need to evict items | |
| if len(self._cache) >= self.max_size and hash_key not in self._cache: | |
| self._evict_lru() | |
| # Store the value | |
| self._cache[hash_key] = value | |
| self._timestamps[hash_key] = time.time() + cache_ttl | |
| self._access_counts[hash_key] = 1 | |
| # Move to end (mark as recently used) | |
| self._cache.move_to_end(hash_key) | |
| self._stats['sets'] += 1 | |
| logger.debug(f"Cached value for key: {key[:50]}... (TTL: {cache_ttl}s)") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error setting cache: {str(e)}") | |
| return False | |
| def delete(self, key: str) -> bool: | |
| """ | |
| Delete a key from the cache | |
| Args: | |
| key: Cache key to delete | |
| Returns: | |
| True if key was deleted, False if not found | |
| """ | |
| with self._lock: | |
| try: | |
| hash_key = self._generate_key_hash(key) | |
| if hash_key in self._cache: | |
| self._remove_key(hash_key) | |
| logger.debug(f"Deleted cache key: {key[:50]}...") | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error deleting from cache: {str(e)}") | |
| return False | |
| def clear(self) -> bool: | |
| """ | |
| Clear all items from the cache | |
| Returns: | |
| True if cache was cleared successfully | |
| """ | |
| with self._lock: | |
| try: | |
| self._cache.clear() | |
| self._timestamps.clear() | |
| self._access_counts.clear() | |
| logger.info("Cache cleared") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error clearing cache: {str(e)}") | |
| return False | |
| def cleanup_expired(self) -> int: | |
| """ | |
| Remove all expired items from the cache | |
| Returns: | |
| Number of items removed | |
| """ | |
| with self._lock: | |
| try: | |
| current_time = time.time() | |
| expired_keys = [] | |
| for hash_key, expiry_time in self._timestamps.items(): | |
| if current_time > expiry_time: | |
| expired_keys.append(hash_key) | |
| for hash_key in expired_keys: | |
| self._remove_key(hash_key) | |
| if expired_keys: | |
| logger.info(f"Cleaned up {len(expired_keys)} expired cache entries") | |
| return len(expired_keys) | |
| except Exception as e: | |
| logger.error(f"Error cleaning up expired items: {str(e)}") | |
| return 0 | |
| def get_stats(self) -> Dict[str, Union[int, float]]: | |
| """ | |
| Get cache statistics | |
| Returns: | |
| Dictionary with cache statistics | |
| """ | |
| with self._lock: | |
| total_requests = self._stats['hits'] + self._stats['misses'] | |
| hit_rate = (self._stats['hits'] / total_requests * 100) if total_requests > 0 else 0 | |
| return { | |
| 'size': len(self._cache), | |
| 'max_size': self.max_size, | |
| 'hits': self._stats['hits'], | |
| 'misses': self._stats['misses'], | |
| 'hit_rate': round(hit_rate, 2), | |
| 'evictions': self._stats['evictions'], | |
| 'sets': self._stats['sets'] | |
| } | |
| def get_cache_info(self) -> Dict[str, Any]: | |
| """ | |
| Get detailed cache information | |
| Returns: | |
| Dictionary with detailed cache info | |
| """ | |
| with self._lock: | |
| current_time = time.time() | |
| # Count expired items | |
| expired_count = sum(1 for expiry_time in self._timestamps.values() | |
| if current_time > expiry_time) | |
| # Get most accessed keys | |
| top_keys = sorted(self._access_counts.items(), | |
| key=lambda x: x[1], reverse=True)[:5] | |
| return { | |
| 'total_items': len(self._cache), | |
| 'expired_items': expired_count, | |
| 'active_items': len(self._cache) - expired_count, | |
| 'top_accessed_keys': [key[:20] + '...' for key, count in top_keys], | |
| 'memory_usage_estimate': self._estimate_memory_usage(), | |
| 'stats': self.get_stats() | |
| } | |
| def _generate_key_hash(self, key: str) -> str: | |
| """Generate a consistent hash for the cache key""" | |
| return hashlib.md5(key.encode('utf-8')).hexdigest() | |
| def _is_expired(self, hash_key: str) -> bool: | |
| """Check if a cache entry is expired""" | |
| if hash_key not in self._timestamps: | |
| return True | |
| return time.time() > self._timestamps[hash_key] | |
| def _remove_key(self, hash_key: str): | |
| """Remove a key and its associated data""" | |
| if hash_key in self._cache: | |
| del self._cache[hash_key] | |
| if hash_key in self._timestamps: | |
| del self._timestamps[hash_key] | |
| if hash_key in self._access_counts: | |
| del self._access_counts[hash_key] | |
| def _evict_lru(self): | |
| """Evict the least recently used item""" | |
| if self._cache: | |
| # Get the first item (least recently used) | |
| lru_key = next(iter(self._cache)) | |
| self._remove_key(lru_key) | |
| self._stats['evictions'] += 1 | |
| logger.debug("Evicted LRU cache entry") | |
| def _estimate_memory_usage(self) -> str: | |
| """Estimate memory usage of the cache""" | |
| try: | |
| # Rough estimation based on string representation | |
| total_size = 0 | |
| for key, value in self._cache.items(): | |
| total_size += len(str(key)) + len(str(value)) | |
| # Convert to human readable format | |
| if total_size < 1024: | |
| return f"{total_size} bytes" | |
| elif total_size < 1024 * 1024: | |
| return f"{total_size / 1024:.1f} KB" | |
| else: | |
| return f"{total_size / (1024 * 1024):.1f} MB" | |
| except Exception: | |
| return "Unknown" | |
| class SimpleDiskCache: | |
| """ | |
| Simple disk-based cache for persistence (optional enhancement) | |
| This is a basic implementation - in production, consider using Redis or similar | |
| """ | |
| def __init__(self, cache_dir: str = "./cache"): | |
| """ | |
| Initialize disk cache | |
| Args: | |
| cache_dir: Directory to store cache files | |
| """ | |
| import os | |
| self.cache_dir = cache_dir | |
| # Create cache directory if it doesn't exist | |
| os.makedirs(cache_dir, exist_ok=True) | |
| logger.info(f"Disk cache initialized at: {cache_dir}") | |
| def _get_file_path(self, key: str) -> str: | |
| """Get file path for a cache key""" | |
| import os | |
| hash_key = hashlib.md5(key.encode('utf-8')).hexdigest() | |
| return os.path.join(self.cache_dir, f"{hash_key}.json") | |
| def get(self, key: str) -> Optional[Any]: | |
| """Get value from disk cache""" | |
| try: | |
| import os | |
| file_path = self._get_file_path(key) | |
| if not os.path.exists(file_path): | |
| return None | |
| with open(file_path, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| # Check expiry | |
| if time.time() > data.get('expires_at', 0): | |
| os.remove(file_path) | |
| return None | |
| return data.get('value') | |
| except Exception as e: | |
| logger.error(f"Error reading from disk cache: {str(e)}") | |
| return None | |
| def set(self, key: str, value: Any, ttl: int = 3600) -> bool: | |
| """Set value in disk cache""" | |
| try: | |
| file_path = self._get_file_path(key) | |
| data = { | |
| 'value': value, | |
| 'created_at': time.time(), | |
| 'expires_at': time.time() + ttl | |
| } | |
| with open(file_path, 'w', encoding='utf-8') as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error writing to disk cache: {str(e)}") | |
| return False | |