Spaces:
Paused
Paused
File size: 11,442 Bytes
075b8c4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 | """
Redis-based Response Caching System
Implements intelligent caching for API responses and database queries
"""
import hashlib
import json
import logging
from functools import wraps
from typing import Any, Callable, Dict, Optional
from redis import Redis
from redis.exceptions import ConnectionError, TimeoutError
logger = logging.getLogger(__name__)
class CacheManager:
"""Redis-based caching manager for API responses and data"""
def __init__(self, redis_client: Redis, default_ttl: int = 300):
self.redis = redis_client
self.default_ttl = default_ttl # 5 minutes default
def _generate_cache_key(self, prefix: str, *args, **kwargs) -> str:
"""Generate a consistent cache key from function arguments"""
# Sort kwargs for consistent key generation
sorted_kwargs = sorted(kwargs.items())
# Create a unique string from all arguments
key_components = [prefix] + [str(arg) for arg in args]
key_components.extend([f"{k}:{v}" for k, v in sorted_kwargs])
key_string = "|".join(key_components)
return f"cache:{hashlib.md5(key_string.encode()).hexdigest()}"
def get(self, key: str) -> Optional[Any]:
"""Get value from cache"""
try:
data = self.redis.get(key)
if data:
return json.loads(data)
return None
except (ConnectionError, TimeoutError) as e:
logger.warning(f"Cache get failed: {e}")
return None
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
"""Set value in cache with TTL"""
try:
ttl = ttl or self.default_ttl
data = json.dumps(value, default=str) # Handle datetime serialization
return bool(self.redis.setex(key, ttl, data))
except (ConnectionError, TimeoutError, TypeError) as e:
logger.warning(f"Cache set failed: {e}")
return False
def delete(self, key: str) -> bool:
"""Delete key from cache"""
try:
return bool(self.redis.delete(key))
except (ConnectionError, TimeoutError) as e:
logger.warning(f"Cache delete failed: {e}")
return False
def delete_pattern(self, pattern: str) -> int:
"""Delete all keys matching pattern"""
try:
keys = self.redis.keys(pattern)
if keys:
return self.redis.delete(*keys)
return 0
except (ConnectionError, TimeoutError) as e:
logger.warning(f"Cache pattern delete failed: {e}")
return 0
def clear_user_cache(self, user_id: str) -> None:
"""Clear all cache entries for a specific user"""
patterns = [
f"cache:user:{user_id}:*",
f"cache:cases:user:{user_id}:*",
f"cache:activities:user:{user_id}:*"
]
for pattern in patterns:
self.delete_pattern(pattern)
def clear_case_cache(self, case_id: str) -> None:
"""Clear all cache entries for a specific case"""
patterns = [
f"cache:case:{case_id}:*",
f"cache:case:{case_id}",
"cache:cases:list:*" # Clear case listing caches
]
for pattern in patterns:
self.delete_pattern(pattern)
def cached(self, ttl: Optional[int] = None, key_prefix: str = ""):
"""Decorator for caching function results"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
# Generate cache key
prefix = key_prefix or f"{func.__module__}.{func.__name__}"
cache_key = self._generate_cache_key(prefix, *args, **kwargs)
# Try to get from cache first
cached_result = self.get(cache_key)
if cached_result is not None:
logger.debug(f"Cache hit for {cache_key}")
return cached_result
# Execute function and cache result
logger.debug(f"Cache miss for {cache_key}, executing function")
result = await func(*args, **kwargs)
# Cache the result
if result is not None:
self.set(cache_key, result, ttl)
return result
return wrapper
return decorator
class APICacheManager(CacheManager):
"""Specialized cache manager for API responses"""
def __init__(self, redis_client: Redis):
super().__init__(redis_client, default_ttl=300) # 5 minutes
# Define TTLs for different types of data
self.ttls = {
'user_profile': 600, # 10 minutes
'case_list': 120, # 2 minutes
'case_detail': 300, # 5 minutes
'analytics': 1800, # 30 minutes
'stats': 60, # 1 minute
'public_data': 3600, # 1 hour
}
def get_case_list_cache_key(self, filters: Dict[str, Any], page: int = 1, limit: int = 20) -> str:
"""Generate cache key for case listings with filters"""
# Create a sorted string of active filters
filter_parts = []
for key, value in sorted(filters.items()):
if value is not None:
filter_parts.append(f"{key}:{value}")
filter_string = "|".join(filter_parts) if filter_parts else "all"
return f"cache:cases:list:{filter_string}:page:{page}:limit:{limit}"
def get_case_detail_cache_key(self, case_id: str) -> str:
"""Generate cache key for case details"""
return f"cache:case:{case_id}:detail"
def get_user_cases_cache_key(self, user_id: str, status: Optional[str] = None) -> str:
"""Generate cache key for user's cases"""
status_part = f":status:{status}" if status else ""
return f"cache:user:{user_id}:cases{status_part}"
def cache_case_list(self, filters: Dict[str, Any], page: int, limit: int, results: list) -> None:
"""Cache case listing results"""
cache_key = self.get_case_list_cache_key(filters, page, limit)
self.set(cache_key, results, self.ttls['case_list'])
def get_cached_case_list(self, filters: Dict[str, Any], page: int = 1, limit: int = 20) -> Optional[list]:
"""Get cached case listing results"""
cache_key = self.get_case_list_cache_key(filters, page, limit)
return self.get(cache_key)
def cache_case_detail(self, case_id: str, case_data: Dict[str, Any]) -> None:
"""Cache case detail data"""
cache_key = self.get_case_detail_cache_key(case_id)
self.set(cache_key, case_data, self.ttls['case_detail'])
def get_cached_case_detail(self, case_id: str) -> Optional[Dict[str, Any]]:
"""Get cached case detail data"""
cache_key = self.get_case_detail_cache_key(case_id)
return self.get(cache_key)
def invalidate_case_caches(self, case_id: str) -> None:
"""Invalidate all caches related to a case"""
# Clear specific case caches
self.clear_case_cache(case_id)
# Clear case listing caches (they may be stale)
self.delete_pattern("cache:cases:list:*")
def invalidate_user_caches(self, user_id: str) -> None:
"""Invalidate all caches related to a user"""
self.clear_user_cache(user_id)
def warmup_popular_caches(self) -> None:
"""Warm up frequently accessed caches"""
# This would be called periodically to ensure popular data is cached
logger.info("Warming up popular caches...")
# Example: Cache recent high-priority cases
# This would be implemented based on actual usage patterns
pass
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache performance statistics"""
try:
info = self.redis.info()
return {
'connected_clients': info.get('connected_clients', 0),
'used_memory': info.get('used_memory_human', '0B'),
'total_keys': self.redis.dbsize(),
'hit_rate': 'N/A', # Would need additional tracking
'evictions': info.get('evicted_keys', 0),
}
except Exception as e:
logger.warning(f"Could not get cache stats: {e}")
return {'error': str(e)}
class DatabaseQueryCache(CacheManager):
"""Cache manager specifically for database queries"""
def __init__(self, redis_client: Redis):
super().__init__(redis_client, default_ttl=600) # 10 minutes for DB queries
def cached_query(self, query_name: str, ttl: Optional[int] = None):
"""Decorator for caching database query results"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
# Generate cache key based on query name and parameters
cache_key = self._generate_cache_key(f"db:{query_name}", *args, **kwargs)
# Try cache first
cached_result = self.get(cache_key)
if cached_result is not None:
logger.debug(f"DB cache hit for {query_name}")
return cached_result
# Execute query
logger.debug(f"DB cache miss for {query_name}, executing query")
result = await func(*args, **kwargs)
# Cache result
if result is not None:
self.set(cache_key, result, ttl)
return result
return wrapper
return decorator
def invalidate_query_cache(self, query_name: str, *args, **kwargs) -> None:
"""Invalidate specific query cache"""
cache_key = self._generate_cache_key(f"db:{query_name}", *args, **kwargs)
self.delete(cache_key)
def invalidate_all_query_caches(self, query_name: str) -> None:
"""Invalidate all caches for a specific query type"""
pattern = f"cache:db:{query_name}:*"
self.delete_pattern(pattern)
# Global instances (would be initialized in app startup)
cache_manager = None
api_cache_manager = None
db_query_cache = None
def init_cache_managers(redis_url: str = "redis://localhost:6379/0") -> None:
"""Initialize global cache managers"""
global cache_manager, api_cache_manager, db_query_cache
try:
redis_client = Redis.from_url(redis_url, decode_responses=True)
# Test connection
redis_client.ping()
cache_manager = CacheManager(redis_client)
api_cache_manager = APICacheManager(redis_client)
db_query_cache = DatabaseQueryCache(redis_client)
logger.info("Cache managers initialized successfully")
except Exception as e:
logger.warning(f"Failed to initialize cache managers: {e}")
# Continue without caching rather than failing
cache_manager = None
api_cache_manager = None
db_query_cache = None
def get_cache_manager() -> Optional[CacheManager]:
"""Get the global cache manager instance"""
return cache_manager
def get_api_cache_manager() -> Optional[APICacheManager]:
"""Get the global API cache manager instance"""
return api_cache_manager
def get_db_query_cache() -> Optional[DatabaseQueryCache]:
"""Get the global database query cache instance"""
return db_query_cache
|