Spaces:
Paused
Paused
| from typing import Optional, Any, Dict, Callable, TypeVar | |
| import json | |
| import inspect | |
| import redis.asyncio as redis | |
| from ..core.config import settings | |
| from ..utils.logger import logger | |
| T = TypeVar('T') | |
| class RedisCache: | |
| _instance = None | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super(RedisCache, cls).__new__(cls) | |
| cls._instance.is_connected = False | |
| cls._instance.fallback_cache = {} | |
| cls._instance.redis = redis.Redis( | |
| host=settings.REDIS_HOST, | |
| port=settings.REDIS_PORT, | |
| password=settings.REDIS_PASSWORD, | |
| username=settings.REDIS_USERNAME, | |
| decode_responses=True | |
| ) | |
| return cls._instance | |
| async def initialize(self): | |
| """Initialize Redis connection with fallback to dummy cache""" | |
| try: | |
| await self.redis.ping() | |
| self.is_connected = True | |
| logger.info("Redis cache initialized successfully") | |
| except Exception as e: | |
| self.is_connected = False | |
| self.fallback_cache = {} | |
| logger.warning(f"Redis connection failed, using in-memory fallback: {str(e)}") | |
| async def set_cache(self, key: str, value: Any, expire: int = 3600): | |
| """Set a cache entry with optional expiration time (default 1 hour)""" | |
| try: | |
| if not self.is_connected: | |
| self.fallback_cache[key] = value | |
| return | |
| await self.redis.set(key, json.dumps(value), ex=expire) | |
| except Exception as e: | |
| logger.error(f"Cache set error: {str(e)}") | |
| self.fallback_cache[key] = value | |
| async def get_cache(self, key: str) -> Optional[Any]: | |
| """Get a cached value by key""" | |
| try: | |
| if not self.is_connected: | |
| return self.fallback_cache.get(key) | |
| value = await self.redis.get(key) | |
| if value: | |
| return json.loads(value) | |
| except Exception as e: | |
| logger.error(f"Cache get error: {str(e)}") | |
| return self.fallback_cache.get(key) | |
| return None | |
| async def delete_cache(self, key: str) -> bool: | |
| """Delete a cache entry by key""" | |
| try: | |
| if not self.is_connected: | |
| return bool(self.fallback_cache.pop(key, None)) | |
| return bool(await self.redis.delete(key)) | |
| except Exception as e: | |
| logger.error(f"Cache delete error: {str(e)}") | |
| return False | |
| async def clear_cache_pattern(self, pattern: str) -> bool: | |
| """Clear all cache entries matching a pattern""" | |
| try: | |
| if not self.is_connected: | |
| # Basic pattern matching for fallback cache | |
| removed = 0 | |
| for key in list(self.fallback_cache.keys()): | |
| if pattern in key: | |
| del self.fallback_cache[key] | |
| removed += 1 | |
| return removed > 0 | |
| # Get all keys matching pattern | |
| keys = [key async for key in self.redis.scan_iter(pattern)] | |
| if keys: | |
| await self.redis.delete(*keys) | |
| return bool(keys) | |
| except Exception as e: | |
| logger.error(f"Cache pattern clear error: {str(e)}") | |
| return False | |
| async def check_connection(self) -> bool: | |
| """Check if Redis connection is alive""" | |
| try: | |
| await self.redis.ping() | |
| self.is_connected = True | |
| return True | |
| except Exception: | |
| self.is_connected = False | |
| return False | |
| async def cleanup_expired(self): | |
| """Clean up expired cache entries""" | |
| # Redis automatically handles expiration, only need to clean fallback | |
| if not self.is_connected: | |
| self.fallback_cache = {} | |
| def cached(ttl_seconds: int): | |
| """Decorator to cache function results""" | |
| def decorator(func: Callable[..., T]) -> Callable[..., T]: | |
| async def async_wrapper(*args, **kwargs) -> T: | |
| # Create cache key from function name and arguments | |
| key = f"{func.__name__}:{str(args)}:{str(kwargs)}" | |
| # Try to get from cache first | |
| cached_value = await cache.get_cache(key) | |
| if cached_value is not None: | |
| return cached_value | |
| # If not in cache, execute function | |
| result = await func(*args, **kwargs) | |
| # Cache the result | |
| await cache.set_cache(key, result, expire=ttl_seconds) | |
| return result | |
| def sync_wrapper(*args, **kwargs) -> T: | |
| # Create cache key from function name and arguments | |
| key = f"{func.__name__}:{str(args)}:{str(kwargs)}" | |
| # For sync functions, we can't use async cache directly | |
| # So we use the fallback cache | |
| if key in cache.fallback_cache: | |
| return cache.fallback_cache[key] | |
| # If not in cache, execute function | |
| result = func(*args, **kwargs) | |
| # Cache the result in fallback | |
| cache.fallback_cache[key] = result | |
| return result | |
| return async_wrapper if inspect.iscoroutinefunction(func) else sync_wrapper | |
| return decorator | |
| cache = RedisCache() |