zenith-backend / core /cache /advanced_cache.py
teoat's picture
Upload core/cache/advanced_cache.py with huggingface_hub
075b8c4 verified
"""
Redis-based Response Caching System
Implements intelligent caching for API responses and database queries
"""
import hashlib
import json
import logging
from functools import wraps
from typing import Any, Callable, Dict, Optional
from redis import Redis
from redis.exceptions import ConnectionError, TimeoutError
logger = logging.getLogger(__name__)
class CacheManager:
"""Redis-based caching manager for API responses and data"""
def __init__(self, redis_client: Redis, default_ttl: int = 300):
self.redis = redis_client
self.default_ttl = default_ttl # 5 minutes default
def _generate_cache_key(self, prefix: str, *args, **kwargs) -> str:
"""Generate a consistent cache key from function arguments"""
# Sort kwargs for consistent key generation
sorted_kwargs = sorted(kwargs.items())
# Create a unique string from all arguments
key_components = [prefix] + [str(arg) for arg in args]
key_components.extend([f"{k}:{v}" for k, v in sorted_kwargs])
key_string = "|".join(key_components)
return f"cache:{hashlib.md5(key_string.encode()).hexdigest()}"
def get(self, key: str) -> Optional[Any]:
"""Get value from cache"""
try:
data = self.redis.get(key)
if data:
return json.loads(data)
return None
except (ConnectionError, TimeoutError) as e:
logger.warning(f"Cache get failed: {e}")
return None
def set(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
"""Set value in cache with TTL"""
try:
ttl = ttl or self.default_ttl
data = json.dumps(value, default=str) # Handle datetime serialization
return bool(self.redis.setex(key, ttl, data))
except (ConnectionError, TimeoutError, TypeError) as e:
logger.warning(f"Cache set failed: {e}")
return False
def delete(self, key: str) -> bool:
"""Delete key from cache"""
try:
return bool(self.redis.delete(key))
except (ConnectionError, TimeoutError) as e:
logger.warning(f"Cache delete failed: {e}")
return False
def delete_pattern(self, pattern: str) -> int:
"""Delete all keys matching pattern"""
try:
keys = self.redis.keys(pattern)
if keys:
return self.redis.delete(*keys)
return 0
except (ConnectionError, TimeoutError) as e:
logger.warning(f"Cache pattern delete failed: {e}")
return 0
def clear_user_cache(self, user_id: str) -> None:
"""Clear all cache entries for a specific user"""
patterns = [
f"cache:user:{user_id}:*",
f"cache:cases:user:{user_id}:*",
f"cache:activities:user:{user_id}:*"
]
for pattern in patterns:
self.delete_pattern(pattern)
def clear_case_cache(self, case_id: str) -> None:
"""Clear all cache entries for a specific case"""
patterns = [
f"cache:case:{case_id}:*",
f"cache:case:{case_id}",
"cache:cases:list:*" # Clear case listing caches
]
for pattern in patterns:
self.delete_pattern(pattern)
def cached(self, ttl: Optional[int] = None, key_prefix: str = ""):
"""Decorator for caching function results"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
# Generate cache key
prefix = key_prefix or f"{func.__module__}.{func.__name__}"
cache_key = self._generate_cache_key(prefix, *args, **kwargs)
# Try to get from cache first
cached_result = self.get(cache_key)
if cached_result is not None:
logger.debug(f"Cache hit for {cache_key}")
return cached_result
# Execute function and cache result
logger.debug(f"Cache miss for {cache_key}, executing function")
result = await func(*args, **kwargs)
# Cache the result
if result is not None:
self.set(cache_key, result, ttl)
return result
return wrapper
return decorator
class APICacheManager(CacheManager):
"""Specialized cache manager for API responses"""
def __init__(self, redis_client: Redis):
super().__init__(redis_client, default_ttl=300) # 5 minutes
# Define TTLs for different types of data
self.ttls = {
'user_profile': 600, # 10 minutes
'case_list': 120, # 2 minutes
'case_detail': 300, # 5 minutes
'analytics': 1800, # 30 minutes
'stats': 60, # 1 minute
'public_data': 3600, # 1 hour
}
def get_case_list_cache_key(self, filters: Dict[str, Any], page: int = 1, limit: int = 20) -> str:
"""Generate cache key for case listings with filters"""
# Create a sorted string of active filters
filter_parts = []
for key, value in sorted(filters.items()):
if value is not None:
filter_parts.append(f"{key}:{value}")
filter_string = "|".join(filter_parts) if filter_parts else "all"
return f"cache:cases:list:{filter_string}:page:{page}:limit:{limit}"
def get_case_detail_cache_key(self, case_id: str) -> str:
"""Generate cache key for case details"""
return f"cache:case:{case_id}:detail"
def get_user_cases_cache_key(self, user_id: str, status: Optional[str] = None) -> str:
"""Generate cache key for user's cases"""
status_part = f":status:{status}" if status else ""
return f"cache:user:{user_id}:cases{status_part}"
def cache_case_list(self, filters: Dict[str, Any], page: int, limit: int, results: list) -> None:
"""Cache case listing results"""
cache_key = self.get_case_list_cache_key(filters, page, limit)
self.set(cache_key, results, self.ttls['case_list'])
def get_cached_case_list(self, filters: Dict[str, Any], page: int = 1, limit: int = 20) -> Optional[list]:
"""Get cached case listing results"""
cache_key = self.get_case_list_cache_key(filters, page, limit)
return self.get(cache_key)
def cache_case_detail(self, case_id: str, case_data: Dict[str, Any]) -> None:
"""Cache case detail data"""
cache_key = self.get_case_detail_cache_key(case_id)
self.set(cache_key, case_data, self.ttls['case_detail'])
def get_cached_case_detail(self, case_id: str) -> Optional[Dict[str, Any]]:
"""Get cached case detail data"""
cache_key = self.get_case_detail_cache_key(case_id)
return self.get(cache_key)
def invalidate_case_caches(self, case_id: str) -> None:
"""Invalidate all caches related to a case"""
# Clear specific case caches
self.clear_case_cache(case_id)
# Clear case listing caches (they may be stale)
self.delete_pattern("cache:cases:list:*")
def invalidate_user_caches(self, user_id: str) -> None:
"""Invalidate all caches related to a user"""
self.clear_user_cache(user_id)
def warmup_popular_caches(self) -> None:
"""Warm up frequently accessed caches"""
# This would be called periodically to ensure popular data is cached
logger.info("Warming up popular caches...")
# Example: Cache recent high-priority cases
# This would be implemented based on actual usage patterns
pass
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache performance statistics"""
try:
info = self.redis.info()
return {
'connected_clients': info.get('connected_clients', 0),
'used_memory': info.get('used_memory_human', '0B'),
'total_keys': self.redis.dbsize(),
'hit_rate': 'N/A', # Would need additional tracking
'evictions': info.get('evicted_keys', 0),
}
except Exception as e:
logger.warning(f"Could not get cache stats: {e}")
return {'error': str(e)}
class DatabaseQueryCache(CacheManager):
"""Cache manager specifically for database queries"""
def __init__(self, redis_client: Redis):
super().__init__(redis_client, default_ttl=600) # 10 minutes for DB queries
def cached_query(self, query_name: str, ttl: Optional[int] = None):
"""Decorator for caching database query results"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs):
# Generate cache key based on query name and parameters
cache_key = self._generate_cache_key(f"db:{query_name}", *args, **kwargs)
# Try cache first
cached_result = self.get(cache_key)
if cached_result is not None:
logger.debug(f"DB cache hit for {query_name}")
return cached_result
# Execute query
logger.debug(f"DB cache miss for {query_name}, executing query")
result = await func(*args, **kwargs)
# Cache result
if result is not None:
self.set(cache_key, result, ttl)
return result
return wrapper
return decorator
def invalidate_query_cache(self, query_name: str, *args, **kwargs) -> None:
"""Invalidate specific query cache"""
cache_key = self._generate_cache_key(f"db:{query_name}", *args, **kwargs)
self.delete(cache_key)
def invalidate_all_query_caches(self, query_name: str) -> None:
"""Invalidate all caches for a specific query type"""
pattern = f"cache:db:{query_name}:*"
self.delete_pattern(pattern)
# Global instances (would be initialized in app startup)
cache_manager = None
api_cache_manager = None
db_query_cache = None
def init_cache_managers(redis_url: str = "redis://localhost:6379/0") -> None:
"""Initialize global cache managers"""
global cache_manager, api_cache_manager, db_query_cache
try:
redis_client = Redis.from_url(redis_url, decode_responses=True)
# Test connection
redis_client.ping()
cache_manager = CacheManager(redis_client)
api_cache_manager = APICacheManager(redis_client)
db_query_cache = DatabaseQueryCache(redis_client)
logger.info("Cache managers initialized successfully")
except Exception as e:
logger.warning(f"Failed to initialize cache managers: {e}")
# Continue without caching rather than failing
cache_manager = None
api_cache_manager = None
db_query_cache = None
def get_cache_manager() -> Optional[CacheManager]:
"""Get the global cache manager instance"""
return cache_manager
def get_api_cache_manager() -> Optional[APICacheManager]:
"""Get the global API cache manager instance"""
return api_cache_manager
def get_db_query_cache() -> Optional[DatabaseQueryCache]:
"""Get the global database query cache instance"""
return db_query_cache