""" Redis-based Response Caching System Implements intelligent caching for API responses and database queries """ import hashlib import json import logging from functools import wraps from typing import Any, Callable, Dict, Optional from redis import Redis from redis.exceptions import ConnectionError, TimeoutError logger = logging.getLogger(__name__) class CacheManager: """Redis-based caching manager for API responses and data""" def __init__(self, redis_client: Redis, default_ttl: int = 300): self.redis = redis_client self.default_ttl = default_ttl # 5 minutes default def _generate_cache_key(self, prefix: str, *args, **kwargs) -> str: """Generate a consistent cache key from function arguments""" # Sort kwargs for consistent key generation sorted_kwargs = sorted(kwargs.items()) # Create a unique string from all arguments key_components = [prefix] + [str(arg) for arg in args] key_components.extend([f"{k}:{v}" for k, v in sorted_kwargs]) key_string = "|".join(key_components) return f"cache:{hashlib.md5(key_string.encode()).hexdigest()}" def get(self, key: str) -> Optional[Any]: """Get value from cache""" try: data = self.redis.get(key) if data: return json.loads(data) return None except (ConnectionError, TimeoutError) as e: logger.warning(f"Cache get failed: {e}") return None def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool: """Set value in cache with TTL""" try: ttl = ttl or self.default_ttl data = json.dumps(value, default=str) # Handle datetime serialization return bool(self.redis.setex(key, ttl, data)) except (ConnectionError, TimeoutError, TypeError) as e: logger.warning(f"Cache set failed: {e}") return False def delete(self, key: str) -> bool: """Delete key from cache""" try: return bool(self.redis.delete(key)) except (ConnectionError, TimeoutError) as e: logger.warning(f"Cache delete failed: {e}") return False def delete_pattern(self, pattern: str) -> int: """Delete all keys matching pattern""" try: keys = self.redis.keys(pattern) if keys: return self.redis.delete(*keys) return 0 except (ConnectionError, TimeoutError) as e: logger.warning(f"Cache pattern delete failed: {e}") return 0 def clear_user_cache(self, user_id: str) -> None: """Clear all cache entries for a specific user""" patterns = [ f"cache:user:{user_id}:*", f"cache:cases:user:{user_id}:*", f"cache:activities:user:{user_id}:*" ] for pattern in patterns: self.delete_pattern(pattern) def clear_case_cache(self, case_id: str) -> None: """Clear all cache entries for a specific case""" patterns = [ f"cache:case:{case_id}:*", f"cache:case:{case_id}", "cache:cases:list:*" # Clear case listing caches ] for pattern in patterns: self.delete_pattern(pattern) def cached(self, ttl: Optional[int] = None, key_prefix: str = ""): """Decorator for caching function results""" def decorator(func: Callable) -> Callable: @wraps(func) async def wrapper(*args, **kwargs): # Generate cache key prefix = key_prefix or f"{func.__module__}.{func.__name__}" cache_key = self._generate_cache_key(prefix, *args, **kwargs) # Try to get from cache first cached_result = self.get(cache_key) if cached_result is not None: logger.debug(f"Cache hit for {cache_key}") return cached_result # Execute function and cache result logger.debug(f"Cache miss for {cache_key}, executing function") result = await func(*args, **kwargs) # Cache the result if result is not None: self.set(cache_key, result, ttl) return result return wrapper return decorator class APICacheManager(CacheManager): """Specialized cache manager for API responses""" def __init__(self, redis_client: Redis): super().__init__(redis_client, default_ttl=300) # 5 minutes # Define TTLs for different types of data self.ttls = { 'user_profile': 600, # 10 minutes 'case_list': 120, # 2 minutes 'case_detail': 300, # 5 minutes 'analytics': 1800, # 30 minutes 'stats': 60, # 1 minute 'public_data': 3600, # 1 hour } def get_case_list_cache_key(self, filters: Dict[str, Any], page: int = 1, limit: int = 20) -> str: """Generate cache key for case listings with filters""" # Create a sorted string of active filters filter_parts = [] for key, value in sorted(filters.items()): if value is not None: filter_parts.append(f"{key}:{value}") filter_string = "|".join(filter_parts) if filter_parts else "all" return f"cache:cases:list:{filter_string}:page:{page}:limit:{limit}" def get_case_detail_cache_key(self, case_id: str) -> str: """Generate cache key for case details""" return f"cache:case:{case_id}:detail" def get_user_cases_cache_key(self, user_id: str, status: Optional[str] = None) -> str: """Generate cache key for user's cases""" status_part = f":status:{status}" if status else "" return f"cache:user:{user_id}:cases{status_part}" def cache_case_list(self, filters: Dict[str, Any], page: int, limit: int, results: list) -> None: """Cache case listing results""" cache_key = self.get_case_list_cache_key(filters, page, limit) self.set(cache_key, results, self.ttls['case_list']) def get_cached_case_list(self, filters: Dict[str, Any], page: int = 1, limit: int = 20) -> Optional[list]: """Get cached case listing results""" cache_key = self.get_case_list_cache_key(filters, page, limit) return self.get(cache_key) def cache_case_detail(self, case_id: str, case_data: Dict[str, Any]) -> None: """Cache case detail data""" cache_key = self.get_case_detail_cache_key(case_id) self.set(cache_key, case_data, self.ttls['case_detail']) def get_cached_case_detail(self, case_id: str) -> Optional[Dict[str, Any]]: """Get cached case detail data""" cache_key = self.get_case_detail_cache_key(case_id) return self.get(cache_key) def invalidate_case_caches(self, case_id: str) -> None: """Invalidate all caches related to a case""" # Clear specific case caches self.clear_case_cache(case_id) # Clear case listing caches (they may be stale) self.delete_pattern("cache:cases:list:*") def invalidate_user_caches(self, user_id: str) -> None: """Invalidate all caches related to a user""" self.clear_user_cache(user_id) def warmup_popular_caches(self) -> None: """Warm up frequently accessed caches""" # This would be called periodically to ensure popular data is cached logger.info("Warming up popular caches...") # Example: Cache recent high-priority cases # This would be implemented based on actual usage patterns pass def get_cache_stats(self) -> Dict[str, Any]: """Get cache performance statistics""" try: info = self.redis.info() return { 'connected_clients': info.get('connected_clients', 0), 'used_memory': info.get('used_memory_human', '0B'), 'total_keys': self.redis.dbsize(), 'hit_rate': 'N/A', # Would need additional tracking 'evictions': info.get('evicted_keys', 0), } except Exception as e: logger.warning(f"Could not get cache stats: {e}") return {'error': str(e)} class DatabaseQueryCache(CacheManager): """Cache manager specifically for database queries""" def __init__(self, redis_client: Redis): super().__init__(redis_client, default_ttl=600) # 10 minutes for DB queries def cached_query(self, query_name: str, ttl: Optional[int] = None): """Decorator for caching database query results""" def decorator(func: Callable) -> Callable: @wraps(func) async def wrapper(*args, **kwargs): # Generate cache key based on query name and parameters cache_key = self._generate_cache_key(f"db:{query_name}", *args, **kwargs) # Try cache first cached_result = self.get(cache_key) if cached_result is not None: logger.debug(f"DB cache hit for {query_name}") return cached_result # Execute query logger.debug(f"DB cache miss for {query_name}, executing query") result = await func(*args, **kwargs) # Cache result if result is not None: self.set(cache_key, result, ttl) return result return wrapper return decorator def invalidate_query_cache(self, query_name: str, *args, **kwargs) -> None: """Invalidate specific query cache""" cache_key = self._generate_cache_key(f"db:{query_name}", *args, **kwargs) self.delete(cache_key) def invalidate_all_query_caches(self, query_name: str) -> None: """Invalidate all caches for a specific query type""" pattern = f"cache:db:{query_name}:*" self.delete_pattern(pattern) # Global instances (would be initialized in app startup) cache_manager = None api_cache_manager = None db_query_cache = None def init_cache_managers(redis_url: str = "redis://localhost:6379/0") -> None: """Initialize global cache managers""" global cache_manager, api_cache_manager, db_query_cache try: redis_client = Redis.from_url(redis_url, decode_responses=True) # Test connection redis_client.ping() cache_manager = CacheManager(redis_client) api_cache_manager = APICacheManager(redis_client) db_query_cache = DatabaseQueryCache(redis_client) logger.info("Cache managers initialized successfully") except Exception as e: logger.warning(f"Failed to initialize cache managers: {e}") # Continue without caching rather than failing cache_manager = None api_cache_manager = None db_query_cache = None def get_cache_manager() -> Optional[CacheManager]: """Get the global cache manager instance""" return cache_manager def get_api_cache_manager() -> Optional[APICacheManager]: """Get the global API cache manager instance""" return api_cache_manager def get_db_query_cache() -> Optional[DatabaseQueryCache]: """Get the global database query cache instance""" return db_query_cache