File size: 12,388 Bytes
b309c22 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 |
"""
Cache Manager for Smart Auto-Complete
Provides efficient caching of API responses to improve performance
"""
import hashlib
import json
import logging
import time
from typing import Any, Dict, List, Optional, Union
import threading
from collections import OrderedDict
logger = logging.getLogger(__name__)
class CacheManager:
"""
In-memory cache manager with TTL (Time To Live) support
Uses LRU (Least Recently Used) eviction policy
"""
def __init__(self, settings=None):
"""
Initialize the cache manager
Args:
settings: Application settings object
"""
self.settings = settings
# Cache configuration
self.max_size = getattr(settings, 'CACHE_MAX_SIZE', 1000) if settings else 1000
self.default_ttl = getattr(settings, 'CACHE_TTL', 3600) if settings else 3600 # 1 hour
# Cache storage
self._cache = OrderedDict()
self._timestamps = {}
self._access_counts = {}
# Thread safety
self._lock = threading.RLock()
# Statistics
self._stats = {
'hits': 0,
'misses': 0,
'evictions': 0,
'sets': 0
}
logger.info(f"Cache manager initialized with max_size={self.max_size}, ttl={self.default_ttl}s")
def get(self, key: str) -> Optional[Any]:
"""
Get a value from the cache
Args:
key: Cache key
Returns:
Cached value or None if not found/expired
"""
with self._lock:
try:
# Generate hash key for consistency
hash_key = self._generate_key_hash(key)
# Check if key exists
if hash_key not in self._cache:
self._stats['misses'] += 1
return None
# Check if expired
if self._is_expired(hash_key):
self._remove_key(hash_key)
self._stats['misses'] += 1
return None
# Move to end (mark as recently used)
value = self._cache[hash_key]
self._cache.move_to_end(hash_key)
self._access_counts[hash_key] = self._access_counts.get(hash_key, 0) + 1
self._stats['hits'] += 1
logger.debug(f"Cache hit for key: {key[:50]}...")
return value
except Exception as e:
logger.error(f"Error getting from cache: {str(e)}")
self._stats['misses'] += 1
return None
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
"""
Set a value in the cache
Args:
key: Cache key
value: Value to cache
ttl: Time to live in seconds (uses default if None)
Returns:
True if successfully cached, False otherwise
"""
with self._lock:
try:
# Generate hash key
hash_key = self._generate_key_hash(key)
# Use default TTL if not specified
cache_ttl = ttl if ttl is not None else self.default_ttl
# Check if we need to evict items
if len(self._cache) >= self.max_size and hash_key not in self._cache:
self._evict_lru()
# Store the value
self._cache[hash_key] = value
self._timestamps[hash_key] = time.time() + cache_ttl
self._access_counts[hash_key] = 1
# Move to end (mark as recently used)
self._cache.move_to_end(hash_key)
self._stats['sets'] += 1
logger.debug(f"Cached value for key: {key[:50]}... (TTL: {cache_ttl}s)")
return True
except Exception as e:
logger.error(f"Error setting cache: {str(e)}")
return False
def delete(self, key: str) -> bool:
"""
Delete a key from the cache
Args:
key: Cache key to delete
Returns:
True if key was deleted, False if not found
"""
with self._lock:
try:
hash_key = self._generate_key_hash(key)
if hash_key in self._cache:
self._remove_key(hash_key)
logger.debug(f"Deleted cache key: {key[:50]}...")
return True
return False
except Exception as e:
logger.error(f"Error deleting from cache: {str(e)}")
return False
def clear(self) -> bool:
"""
Clear all items from the cache
Returns:
True if cache was cleared successfully
"""
with self._lock:
try:
self._cache.clear()
self._timestamps.clear()
self._access_counts.clear()
logger.info("Cache cleared")
return True
except Exception as e:
logger.error(f"Error clearing cache: {str(e)}")
return False
def cleanup_expired(self) -> int:
"""
Remove all expired items from the cache
Returns:
Number of items removed
"""
with self._lock:
try:
current_time = time.time()
expired_keys = []
for hash_key, expiry_time in self._timestamps.items():
if current_time > expiry_time:
expired_keys.append(hash_key)
for hash_key in expired_keys:
self._remove_key(hash_key)
if expired_keys:
logger.info(f"Cleaned up {len(expired_keys)} expired cache entries")
return len(expired_keys)
except Exception as e:
logger.error(f"Error cleaning up expired items: {str(e)}")
return 0
def get_stats(self) -> Dict[str, Union[int, float]]:
"""
Get cache statistics
Returns:
Dictionary with cache statistics
"""
with self._lock:
total_requests = self._stats['hits'] + self._stats['misses']
hit_rate = (self._stats['hits'] / total_requests * 100) if total_requests > 0 else 0
return {
'size': len(self._cache),
'max_size': self.max_size,
'hits': self._stats['hits'],
'misses': self._stats['misses'],
'hit_rate': round(hit_rate, 2),
'evictions': self._stats['evictions'],
'sets': self._stats['sets']
}
def get_cache_info(self) -> Dict[str, Any]:
"""
Get detailed cache information
Returns:
Dictionary with detailed cache info
"""
with self._lock:
current_time = time.time()
# Count expired items
expired_count = sum(1 for expiry_time in self._timestamps.values()
if current_time > expiry_time)
# Get most accessed keys
top_keys = sorted(self._access_counts.items(),
key=lambda x: x[1], reverse=True)[:5]
return {
'total_items': len(self._cache),
'expired_items': expired_count,
'active_items': len(self._cache) - expired_count,
'top_accessed_keys': [key[:20] + '...' for key, count in top_keys],
'memory_usage_estimate': self._estimate_memory_usage(),
'stats': self.get_stats()
}
def _generate_key_hash(self, key: str) -> str:
"""Generate a consistent hash for the cache key"""
return hashlib.md5(key.encode('utf-8')).hexdigest()
def _is_expired(self, hash_key: str) -> bool:
"""Check if a cache entry is expired"""
if hash_key not in self._timestamps:
return True
return time.time() > self._timestamps[hash_key]
def _remove_key(self, hash_key: str):
"""Remove a key and its associated data"""
if hash_key in self._cache:
del self._cache[hash_key]
if hash_key in self._timestamps:
del self._timestamps[hash_key]
if hash_key in self._access_counts:
del self._access_counts[hash_key]
def _evict_lru(self):
"""Evict the least recently used item"""
if self._cache:
# Get the first item (least recently used)
lru_key = next(iter(self._cache))
self._remove_key(lru_key)
self._stats['evictions'] += 1
logger.debug("Evicted LRU cache entry")
def _estimate_memory_usage(self) -> str:
"""Estimate memory usage of the cache"""
try:
# Rough estimation based on string representation
total_size = 0
for key, value in self._cache.items():
total_size += len(str(key)) + len(str(value))
# Convert to human readable format
if total_size < 1024:
return f"{total_size} bytes"
elif total_size < 1024 * 1024:
return f"{total_size / 1024:.1f} KB"
else:
return f"{total_size / (1024 * 1024):.1f} MB"
except Exception:
return "Unknown"
class SimpleDiskCache:
"""
Simple disk-based cache for persistence (optional enhancement)
This is a basic implementation - in production, consider using Redis or similar
"""
def __init__(self, cache_dir: str = "./cache"):
"""
Initialize disk cache
Args:
cache_dir: Directory to store cache files
"""
import os
self.cache_dir = cache_dir
# Create cache directory if it doesn't exist
os.makedirs(cache_dir, exist_ok=True)
logger.info(f"Disk cache initialized at: {cache_dir}")
def _get_file_path(self, key: str) -> str:
"""Get file path for a cache key"""
import os
hash_key = hashlib.md5(key.encode('utf-8')).hexdigest()
return os.path.join(self.cache_dir, f"{hash_key}.json")
def get(self, key: str) -> Optional[Any]:
"""Get value from disk cache"""
try:
import os
file_path = self._get_file_path(key)
if not os.path.exists(file_path):
return None
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
# Check expiry
if time.time() > data.get('expires_at', 0):
os.remove(file_path)
return None
return data.get('value')
except Exception as e:
logger.error(f"Error reading from disk cache: {str(e)}")
return None
def set(self, key: str, value: Any, ttl: int = 3600) -> bool:
"""Set value in disk cache"""
try:
file_path = self._get_file_path(key)
data = {
'value': value,
'created_at': time.time(),
'expires_at': time.time() + ttl
}
with open(file_path, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
logger.error(f"Error writing to disk cache: {str(e)}")
return False
|