zenith-backend / core /cache.py
teoat
deploy: sync from main Sun Jan 11 18:43:53 WIT 2026
4a2ab42
"""Redis caching service for database queries"""
import functools
import hashlib
import json
import logging
from collections.abc import Callable
from typing import Any, TypeVar
try:
import redis
REDIS_AVAILABLE = True
except ImportError:
REDIS_AVAILABLE = False
logger = logging.getLogger(__name__)
T = TypeVar("T")
class CacheService:
def __init__(self, host: str = "localhost", port: int = 6379, db: int = 0):
if REDIS_AVAILABLE:
try:
self.redis_client = redis.Redis(
host=host,
port=port,
db=db,
decode_responses=True,
socket_connect_timeout=1,
)
self.redis_client.ping()
self.enabled = True
except (redis.ConnectionError, Exception) as e:
logger.warning(f"Redis not available: {e}. Caching disabled.")
self.redis_client = None
self.enabled = False
else:
self.redis_client = None
self.enabled = False
logger.warning("Redis library not installed, caching disabled")
def _generate_key(self, prefix: str, *args, **kwargs) -> str:
"""Generate cache key from arguments"""
# Create a stable string representation of args and kwargs
key_parts = [prefix]
if args:
key_parts.append(str(args))
if kwargs:
key_parts.append(json.dumps(kwargs, sort_keys=True, default=str))
key_data = ":".join(key_parts)
return hashlib.md5(key_data.encode()).hexdigest()
def get(self, key: str) -> Any | None:
"""Get value from cache"""
if not self.enabled:
return None
try:
value = self.redis_client.get(key)
if value:
return json.loads(value)
except Exception as e:
logger.warning(f"Cache get error: {e}")
return None
def set(self, key: str, value: Any, ttl: int = 300):
"""Set value in cache with TTL (default 5 minutes)"""
if not self.enabled:
return
try:
self.redis_client.setex(key, ttl, json.dumps(value, default=str))
except Exception as e:
logger.warning(f"Cache set error: {e}")
def delete(self, key: str):
"""Delete key from cache"""
if not self.enabled:
return
try:
self.redis_client.delete(key)
except Exception as e:
logger.warning(f"Cache delete error: {e}")
def invalidate_pattern(self, pattern: str):
"""Invalidate all keys matching pattern"""
if not self.enabled:
return
try:
keys = list(self.redis_client.scan_iter(match=pattern))
if keys:
self.redis_client.delete(*keys)
except Exception as e:
logger.warning(f"Cache invalidation error: {e}")
# Global cache instance
cache_service = CacheService()
def redis_cache(ttl: int = 300, prefix: str | None = None):
"""
Decorator for caching function results in Redis
Args:
ttl: Time to live in seconds (default 300)
prefix: Optional prefix for cache key (default: function name)
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> T:
if not cache_service.enabled:
return func(*args, **kwargs)
# Generate cache key
key_prefix = prefix or func.__name__
cache_key = cache_service._generate_key(key_prefix, *args, **kwargs)
# Try to get from cache
cached_value = cache_service.get(cache_key)
if cached_value is not None:
return cached_value
# Execute function
result = func(*args, **kwargs)
# Cache result
if result is not None:
cache_service.set(cache_key, result, ttl)
return result
@functools.wraps(func)
async def async_wrapper(*args, **kwargs) -> T:
if not cache_service.enabled:
return await func(*args, **kwargs)
# Generate cache key
key_prefix = prefix or func.__name__
cache_key = cache_service._generate_key(key_prefix, *args, **kwargs)
# Try to get from cache
cached_value = cache_service.get(cache_key)
if cached_value is not None:
return cached_value
# Execute function
result = await func(*args, **kwargs)
# Cache result
if result is not None:
cache_service.set(cache_key, result, ttl)
return result
# Return appropriate wrapper based on sync/async
import asyncio
if asyncio.iscoroutinefunction(func):
return async_wrapper
return wrapper
return decorator