Spaces:
Paused
Paused
File size: 5,045 Bytes
4a2ab42 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 | """Redis caching service for database queries"""
import functools
import hashlib
import json
import logging
from collections.abc import Callable
from typing import Any, TypeVar
try:
import redis
REDIS_AVAILABLE = True
except ImportError:
REDIS_AVAILABLE = False
logger = logging.getLogger(__name__)
T = TypeVar("T")
class CacheService:
def __init__(self, host: str = "localhost", port: int = 6379, db: int = 0):
if REDIS_AVAILABLE:
try:
self.redis_client = redis.Redis(
host=host,
port=port,
db=db,
decode_responses=True,
socket_connect_timeout=1,
)
self.redis_client.ping()
self.enabled = True
except (redis.ConnectionError, Exception) as e:
logger.warning(f"Redis not available: {e}. Caching disabled.")
self.redis_client = None
self.enabled = False
else:
self.redis_client = None
self.enabled = False
logger.warning("Redis library not installed, caching disabled")
def _generate_key(self, prefix: str, *args, **kwargs) -> str:
"""Generate cache key from arguments"""
# Create a stable string representation of args and kwargs
key_parts = [prefix]
if args:
key_parts.append(str(args))
if kwargs:
key_parts.append(json.dumps(kwargs, sort_keys=True, default=str))
key_data = ":".join(key_parts)
return hashlib.md5(key_data.encode()).hexdigest()
def get(self, key: str) -> Any | None:
"""Get value from cache"""
if not self.enabled:
return None
try:
value = self.redis_client.get(key)
if value:
return json.loads(value)
except Exception as e:
logger.warning(f"Cache get error: {e}")
return None
def set(self, key: str, value: Any, ttl: int = 300):
"""Set value in cache with TTL (default 5 minutes)"""
if not self.enabled:
return
try:
self.redis_client.setex(key, ttl, json.dumps(value, default=str))
except Exception as e:
logger.warning(f"Cache set error: {e}")
def delete(self, key: str):
"""Delete key from cache"""
if not self.enabled:
return
try:
self.redis_client.delete(key)
except Exception as e:
logger.warning(f"Cache delete error: {e}")
def invalidate_pattern(self, pattern: str):
"""Invalidate all keys matching pattern"""
if not self.enabled:
return
try:
keys = list(self.redis_client.scan_iter(match=pattern))
if keys:
self.redis_client.delete(*keys)
except Exception as e:
logger.warning(f"Cache invalidation error: {e}")
# Global cache instance
cache_service = CacheService()
def redis_cache(ttl: int = 300, prefix: str | None = None):
"""
Decorator for caching function results in Redis
Args:
ttl: Time to live in seconds (default 300)
prefix: Optional prefix for cache key (default: function name)
"""
def decorator(func: Callable[..., T]) -> Callable[..., T]:
@functools.wraps(func)
def wrapper(*args, **kwargs) -> T:
if not cache_service.enabled:
return func(*args, **kwargs)
# Generate cache key
key_prefix = prefix or func.__name__
cache_key = cache_service._generate_key(key_prefix, *args, **kwargs)
# Try to get from cache
cached_value = cache_service.get(cache_key)
if cached_value is not None:
return cached_value
# Execute function
result = func(*args, **kwargs)
# Cache result
if result is not None:
cache_service.set(cache_key, result, ttl)
return result
@functools.wraps(func)
async def async_wrapper(*args, **kwargs) -> T:
if not cache_service.enabled:
return await func(*args, **kwargs)
# Generate cache key
key_prefix = prefix or func.__name__
cache_key = cache_service._generate_key(key_prefix, *args, **kwargs)
# Try to get from cache
cached_value = cache_service.get(cache_key)
if cached_value is not None:
return cached_value
# Execute function
result = await func(*args, **kwargs)
# Cache result
if result is not None:
cache_service.set(cache_key, result, ttl)
return result
# Return appropriate wrapper based on sync/async
import asyncio
if asyncio.iscoroutinefunction(func):
return async_wrapper
return wrapper
return decorator
|