Spaces:
Paused
Paused
| """ | |
| Redis-based Response Caching System | |
| Implements intelligent caching for API responses and database queries | |
| """ | |
| import hashlib | |
| import json | |
| import logging | |
| from functools import wraps | |
| from typing import Any, Callable, Dict, Optional | |
| from redis import Redis | |
| from redis.exceptions import ConnectionError, TimeoutError | |
| logger = logging.getLogger(__name__) | |
| class CacheManager: | |
| """Redis-based caching manager for API responses and data""" | |
| def __init__(self, redis_client: Redis, default_ttl: int = 300): | |
| self.redis = redis_client | |
| self.default_ttl = default_ttl # 5 minutes default | |
| def _generate_cache_key(self, prefix: str, *args, **kwargs) -> str: | |
| """Generate a consistent cache key from function arguments""" | |
| # Sort kwargs for consistent key generation | |
| sorted_kwargs = sorted(kwargs.items()) | |
| # Create a unique string from all arguments | |
| key_components = [prefix] + [str(arg) for arg in args] | |
| key_components.extend([f"{k}:{v}" for k, v in sorted_kwargs]) | |
| key_string = "|".join(key_components) | |
| return f"cache:{hashlib.md5(key_string.encode()).hexdigest()}" | |
| def get(self, key: str) -> Optional[Any]: | |
| """Get value from cache""" | |
| try: | |
| data = self.redis.get(key) | |
| if data: | |
| return json.loads(data) | |
| return None | |
| except (ConnectionError, TimeoutError) as e: | |
| logger.warning(f"Cache get failed: {e}") | |
| return None | |
| def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool: | |
| """Set value in cache with TTL""" | |
| try: | |
| ttl = ttl or self.default_ttl | |
| data = json.dumps(value, default=str) # Handle datetime serialization | |
| return bool(self.redis.setex(key, ttl, data)) | |
| except (ConnectionError, TimeoutError, TypeError) as e: | |
| logger.warning(f"Cache set failed: {e}") | |
| return False | |
| def delete(self, key: str) -> bool: | |
| """Delete key from cache""" | |
| try: | |
| return bool(self.redis.delete(key)) | |
| except (ConnectionError, TimeoutError) as e: | |
| logger.warning(f"Cache delete failed: {e}") | |
| return False | |
| def delete_pattern(self, pattern: str) -> int: | |
| """Delete all keys matching pattern""" | |
| try: | |
| keys = self.redis.keys(pattern) | |
| if keys: | |
| return self.redis.delete(*keys) | |
| return 0 | |
| except (ConnectionError, TimeoutError) as e: | |
| logger.warning(f"Cache pattern delete failed: {e}") | |
| return 0 | |
| def clear_user_cache(self, user_id: str) -> None: | |
| """Clear all cache entries for a specific user""" | |
| patterns = [ | |
| f"cache:user:{user_id}:*", | |
| f"cache:cases:user:{user_id}:*", | |
| f"cache:activities:user:{user_id}:*" | |
| ] | |
| for pattern in patterns: | |
| self.delete_pattern(pattern) | |
| def clear_case_cache(self, case_id: str) -> None: | |
| """Clear all cache entries for a specific case""" | |
| patterns = [ | |
| f"cache:case:{case_id}:*", | |
| f"cache:case:{case_id}", | |
| "cache:cases:list:*" # Clear case listing caches | |
| ] | |
| for pattern in patterns: | |
| self.delete_pattern(pattern) | |
| def cached(self, ttl: Optional[int] = None, key_prefix: str = ""): | |
| """Decorator for caching function results""" | |
| def decorator(func: Callable) -> Callable: | |
| async def wrapper(*args, **kwargs): | |
| # Generate cache key | |
| prefix = key_prefix or f"{func.__module__}.{func.__name__}" | |
| cache_key = self._generate_cache_key(prefix, *args, **kwargs) | |
| # Try to get from cache first | |
| cached_result = self.get(cache_key) | |
| if cached_result is not None: | |
| logger.debug(f"Cache hit for {cache_key}") | |
| return cached_result | |
| # Execute function and cache result | |
| logger.debug(f"Cache miss for {cache_key}, executing function") | |
| result = await func(*args, **kwargs) | |
| # Cache the result | |
| if result is not None: | |
| self.set(cache_key, result, ttl) | |
| return result | |
| return wrapper | |
| return decorator | |
| class APICacheManager(CacheManager): | |
| """Specialized cache manager for API responses""" | |
| def __init__(self, redis_client: Redis): | |
| super().__init__(redis_client, default_ttl=300) # 5 minutes | |
| # Define TTLs for different types of data | |
| self.ttls = { | |
| 'user_profile': 600, # 10 minutes | |
| 'case_list': 120, # 2 minutes | |
| 'case_detail': 300, # 5 minutes | |
| 'analytics': 1800, # 30 minutes | |
| 'stats': 60, # 1 minute | |
| 'public_data': 3600, # 1 hour | |
| } | |
| def get_case_list_cache_key(self, filters: Dict[str, Any], page: int = 1, limit: int = 20) -> str: | |
| """Generate cache key for case listings with filters""" | |
| # Create a sorted string of active filters | |
| filter_parts = [] | |
| for key, value in sorted(filters.items()): | |
| if value is not None: | |
| filter_parts.append(f"{key}:{value}") | |
| filter_string = "|".join(filter_parts) if filter_parts else "all" | |
| return f"cache:cases:list:{filter_string}:page:{page}:limit:{limit}" | |
| def get_case_detail_cache_key(self, case_id: str) -> str: | |
| """Generate cache key for case details""" | |
| return f"cache:case:{case_id}:detail" | |
| def get_user_cases_cache_key(self, user_id: str, status: Optional[str] = None) -> str: | |
| """Generate cache key for user's cases""" | |
| status_part = f":status:{status}" if status else "" | |
| return f"cache:user:{user_id}:cases{status_part}" | |
| def cache_case_list(self, filters: Dict[str, Any], page: int, limit: int, results: list) -> None: | |
| """Cache case listing results""" | |
| cache_key = self.get_case_list_cache_key(filters, page, limit) | |
| self.set(cache_key, results, self.ttls['case_list']) | |
| def get_cached_case_list(self, filters: Dict[str, Any], page: int = 1, limit: int = 20) -> Optional[list]: | |
| """Get cached case listing results""" | |
| cache_key = self.get_case_list_cache_key(filters, page, limit) | |
| return self.get(cache_key) | |
| def cache_case_detail(self, case_id: str, case_data: Dict[str, Any]) -> None: | |
| """Cache case detail data""" | |
| cache_key = self.get_case_detail_cache_key(case_id) | |
| self.set(cache_key, case_data, self.ttls['case_detail']) | |
| def get_cached_case_detail(self, case_id: str) -> Optional[Dict[str, Any]]: | |
| """Get cached case detail data""" | |
| cache_key = self.get_case_detail_cache_key(case_id) | |
| return self.get(cache_key) | |
| def invalidate_case_caches(self, case_id: str) -> None: | |
| """Invalidate all caches related to a case""" | |
| # Clear specific case caches | |
| self.clear_case_cache(case_id) | |
| # Clear case listing caches (they may be stale) | |
| self.delete_pattern("cache:cases:list:*") | |
| def invalidate_user_caches(self, user_id: str) -> None: | |
| """Invalidate all caches related to a user""" | |
| self.clear_user_cache(user_id) | |
| def warmup_popular_caches(self) -> None: | |
| """Warm up frequently accessed caches""" | |
| # This would be called periodically to ensure popular data is cached | |
| logger.info("Warming up popular caches...") | |
| # Example: Cache recent high-priority cases | |
| # This would be implemented based on actual usage patterns | |
| pass | |
| def get_cache_stats(self) -> Dict[str, Any]: | |
| """Get cache performance statistics""" | |
| try: | |
| info = self.redis.info() | |
| return { | |
| 'connected_clients': info.get('connected_clients', 0), | |
| 'used_memory': info.get('used_memory_human', '0B'), | |
| 'total_keys': self.redis.dbsize(), | |
| 'hit_rate': 'N/A', # Would need additional tracking | |
| 'evictions': info.get('evicted_keys', 0), | |
| } | |
| except Exception as e: | |
| logger.warning(f"Could not get cache stats: {e}") | |
| return {'error': str(e)} | |
| class DatabaseQueryCache(CacheManager): | |
| """Cache manager specifically for database queries""" | |
| def __init__(self, redis_client: Redis): | |
| super().__init__(redis_client, default_ttl=600) # 10 minutes for DB queries | |
| def cached_query(self, query_name: str, ttl: Optional[int] = None): | |
| """Decorator for caching database query results""" | |
| def decorator(func: Callable) -> Callable: | |
| async def wrapper(*args, **kwargs): | |
| # Generate cache key based on query name and parameters | |
| cache_key = self._generate_cache_key(f"db:{query_name}", *args, **kwargs) | |
| # Try cache first | |
| cached_result = self.get(cache_key) | |
| if cached_result is not None: | |
| logger.debug(f"DB cache hit for {query_name}") | |
| return cached_result | |
| # Execute query | |
| logger.debug(f"DB cache miss for {query_name}, executing query") | |
| result = await func(*args, **kwargs) | |
| # Cache result | |
| if result is not None: | |
| self.set(cache_key, result, ttl) | |
| return result | |
| return wrapper | |
| return decorator | |
| def invalidate_query_cache(self, query_name: str, *args, **kwargs) -> None: | |
| """Invalidate specific query cache""" | |
| cache_key = self._generate_cache_key(f"db:{query_name}", *args, **kwargs) | |
| self.delete(cache_key) | |
| def invalidate_all_query_caches(self, query_name: str) -> None: | |
| """Invalidate all caches for a specific query type""" | |
| pattern = f"cache:db:{query_name}:*" | |
| self.delete_pattern(pattern) | |
| # Global instances (would be initialized in app startup) | |
| cache_manager = None | |
| api_cache_manager = None | |
| db_query_cache = None | |
| def init_cache_managers(redis_url: str = "redis://localhost:6379/0") -> None: | |
| """Initialize global cache managers""" | |
| global cache_manager, api_cache_manager, db_query_cache | |
| try: | |
| redis_client = Redis.from_url(redis_url, decode_responses=True) | |
| # Test connection | |
| redis_client.ping() | |
| cache_manager = CacheManager(redis_client) | |
| api_cache_manager = APICacheManager(redis_client) | |
| db_query_cache = DatabaseQueryCache(redis_client) | |
| logger.info("Cache managers initialized successfully") | |
| except Exception as e: | |
| logger.warning(f"Failed to initialize cache managers: {e}") | |
| # Continue without caching rather than failing | |
| cache_manager = None | |
| api_cache_manager = None | |
| db_query_cache = None | |
| def get_cache_manager() -> Optional[CacheManager]: | |
| """Get the global cache manager instance""" | |
| return cache_manager | |
| def get_api_cache_manager() -> Optional[APICacheManager]: | |
| """Get the global API cache manager instance""" | |
| return api_cache_manager | |
| def get_db_query_cache() -> Optional[DatabaseQueryCache]: | |
| """Get the global database query cache instance""" | |
| return db_query_cache | |