Sandipan Haldar
adding submission
b309c22
"""
Cache Manager for Smart Auto-Complete
Provides efficient caching of API responses to improve performance
"""
import hashlib
import json
import logging
import time
from typing import Any, Dict, List, Optional, Union
import threading
from collections import OrderedDict
logger = logging.getLogger(__name__)
class CacheManager:
"""
In-memory cache manager with TTL (Time To Live) support
Uses LRU (Least Recently Used) eviction policy
"""
def __init__(self, settings=None):
"""
Initialize the cache manager
Args:
settings: Application settings object
"""
self.settings = settings
# Cache configuration
self.max_size = getattr(settings, 'CACHE_MAX_SIZE', 1000) if settings else 1000
self.default_ttl = getattr(settings, 'CACHE_TTL', 3600) if settings else 3600 # 1 hour
# Cache storage
self._cache = OrderedDict()
self._timestamps = {}
self._access_counts = {}
# Thread safety
self._lock = threading.RLock()
# Statistics
self._stats = {
'hits': 0,
'misses': 0,
'evictions': 0,
'sets': 0
}
logger.info(f"Cache manager initialized with max_size={self.max_size}, ttl={self.default_ttl}s")
def get(self, key: str) -> Optional[Any]:
"""
Get a value from the cache
Args:
key: Cache key
Returns:
Cached value or None if not found/expired
"""
with self._lock:
try:
# Generate hash key for consistency
hash_key = self._generate_key_hash(key)
# Check if key exists
if hash_key not in self._cache:
self._stats['misses'] += 1
return None
# Check if expired
if self._is_expired(hash_key):
self._remove_key(hash_key)
self._stats['misses'] += 1
return None
# Move to end (mark as recently used)
value = self._cache[hash_key]
self._cache.move_to_end(hash_key)
self._access_counts[hash_key] = self._access_counts.get(hash_key, 0) + 1
self._stats['hits'] += 1
logger.debug(f"Cache hit for key: {key[:50]}...")
return value
except Exception as e:
logger.error(f"Error getting from cache: {str(e)}")
self._stats['misses'] += 1
return None
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
"""
Set a value in the cache
Args:
key: Cache key
value: Value to cache
ttl: Time to live in seconds (uses default if None)
Returns:
True if successfully cached, False otherwise
"""
with self._lock:
try:
# Generate hash key
hash_key = self._generate_key_hash(key)
# Use default TTL if not specified
cache_ttl = ttl if ttl is not None else self.default_ttl
# Check if we need to evict items
if len(self._cache) >= self.max_size and hash_key not in self._cache:
self._evict_lru()
# Store the value
self._cache[hash_key] = value
self._timestamps[hash_key] = time.time() + cache_ttl
self._access_counts[hash_key] = 1
# Move to end (mark as recently used)
self._cache.move_to_end(hash_key)
self._stats['sets'] += 1
logger.debug(f"Cached value for key: {key[:50]}... (TTL: {cache_ttl}s)")
return True
except Exception as e:
logger.error(f"Error setting cache: {str(e)}")
return False
def delete(self, key: str) -> bool:
"""
Delete a key from the cache
Args:
key: Cache key to delete
Returns:
True if key was deleted, False if not found
"""
with self._lock:
try:
hash_key = self._generate_key_hash(key)
if hash_key in self._cache:
self._remove_key(hash_key)
logger.debug(f"Deleted cache key: {key[:50]}...")
return True
return False
except Exception as e:
logger.error(f"Error deleting from cache: {str(e)}")
return False
def clear(self) -> bool:
"""
Clear all items from the cache
Returns:
True if cache was cleared successfully
"""
with self._lock:
try:
self._cache.clear()
self._timestamps.clear()
self._access_counts.clear()
logger.info("Cache cleared")
return True
except Exception as e:
logger.error(f"Error clearing cache: {str(e)}")
return False
def cleanup_expired(self) -> int:
"""
Remove all expired items from the cache
Returns:
Number of items removed
"""
with self._lock:
try:
current_time = time.time()
expired_keys = []
for hash_key, expiry_time in self._timestamps.items():
if current_time > expiry_time:
expired_keys.append(hash_key)
for hash_key in expired_keys:
self._remove_key(hash_key)
if expired_keys:
logger.info(f"Cleaned up {len(expired_keys)} expired cache entries")
return len(expired_keys)
except Exception as e:
logger.error(f"Error cleaning up expired items: {str(e)}")
return 0
def get_stats(self) -> Dict[str, Union[int, float]]:
"""
Get cache statistics
Returns:
Dictionary with cache statistics
"""
with self._lock:
total_requests = self._stats['hits'] + self._stats['misses']
hit_rate = (self._stats['hits'] / total_requests * 100) if total_requests > 0 else 0
return {
'size': len(self._cache),
'max_size': self.max_size,
'hits': self._stats['hits'],
'misses': self._stats['misses'],
'hit_rate': round(hit_rate, 2),
'evictions': self._stats['evictions'],
'sets': self._stats['sets']
}
def get_cache_info(self) -> Dict[str, Any]:
"""
Get detailed cache information
Returns:
Dictionary with detailed cache info
"""
with self._lock:
current_time = time.time()
# Count expired items
expired_count = sum(1 for expiry_time in self._timestamps.values()
if current_time > expiry_time)
# Get most accessed keys
top_keys = sorted(self._access_counts.items(),
key=lambda x: x[1], reverse=True)[:5]
return {
'total_items': len(self._cache),
'expired_items': expired_count,
'active_items': len(self._cache) - expired_count,
'top_accessed_keys': [key[:20] + '...' for key, count in top_keys],
'memory_usage_estimate': self._estimate_memory_usage(),
'stats': self.get_stats()
}
def _generate_key_hash(self, key: str) -> str:
"""Generate a consistent hash for the cache key"""
return hashlib.md5(key.encode('utf-8')).hexdigest()
def _is_expired(self, hash_key: str) -> bool:
"""Check if a cache entry is expired"""
if hash_key not in self._timestamps:
return True
return time.time() > self._timestamps[hash_key]
def _remove_key(self, hash_key: str):
"""Remove a key and its associated data"""
if hash_key in self._cache:
del self._cache[hash_key]
if hash_key in self._timestamps:
del self._timestamps[hash_key]
if hash_key in self._access_counts:
del self._access_counts[hash_key]
def _evict_lru(self):
"""Evict the least recently used item"""
if self._cache:
# Get the first item (least recently used)
lru_key = next(iter(self._cache))
self._remove_key(lru_key)
self._stats['evictions'] += 1
logger.debug("Evicted LRU cache entry")
def _estimate_memory_usage(self) -> str:
"""Estimate memory usage of the cache"""
try:
# Rough estimation based on string representation
total_size = 0
for key, value in self._cache.items():
total_size += len(str(key)) + len(str(value))
# Convert to human readable format
if total_size < 1024:
return f"{total_size} bytes"
elif total_size < 1024 * 1024:
return f"{total_size / 1024:.1f} KB"
else:
return f"{total_size / (1024 * 1024):.1f} MB"
except Exception:
return "Unknown"
class SimpleDiskCache:
"""
Simple disk-based cache for persistence (optional enhancement)
This is a basic implementation - in production, consider using Redis or similar
"""
def __init__(self, cache_dir: str = "./cache"):
"""
Initialize disk cache
Args:
cache_dir: Directory to store cache files
"""
import os
self.cache_dir = cache_dir
# Create cache directory if it doesn't exist
os.makedirs(cache_dir, exist_ok=True)
logger.info(f"Disk cache initialized at: {cache_dir}")
def _get_file_path(self, key: str) -> str:
"""Get file path for a cache key"""
import os
hash_key = hashlib.md5(key.encode('utf-8')).hexdigest()
return os.path.join(self.cache_dir, f"{hash_key}.json")
def get(self, key: str) -> Optional[Any]:
"""Get value from disk cache"""
try:
import os
file_path = self._get_file_path(key)
if not os.path.exists(file_path):
return None
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Check expiry
if time.time() > data.get('expires_at', 0):
os.remove(file_path)
return None
return data.get('value')
except Exception as e:
logger.error(f"Error reading from disk cache: {str(e)}")
return None
def set(self, key: str, value: Any, ttl: int = 3600) -> bool:
"""Set value in disk cache"""
try:
file_path = self._get_file_path(key)
data = {
'value': value,
'created_at': time.time(),
'expires_at': time.time() + ttl
}
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
logger.error(f"Error writing to disk cache: {str(e)}")
return False