from typing import Optional, Any, Dict, Callable, TypeVar import json import inspect import redis.asyncio as redis from ..core.config import settings from ..utils.logger import logger T = TypeVar('T') class RedisCache: _instance = None def __new__(cls): if cls._instance is None: cls._instance = super(RedisCache, cls).__new__(cls) cls._instance.is_connected = False cls._instance.fallback_cache = {} cls._instance.redis = redis.Redis( host=settings.REDIS_HOST, port=settings.REDIS_PORT, password=settings.REDIS_PASSWORD, username=settings.REDIS_USERNAME, decode_responses=True ) return cls._instance async def initialize(self): """Initialize Redis connection with fallback to dummy cache""" try: await self.redis.ping() self.is_connected = True logger.info("Redis cache initialized successfully") except Exception as e: self.is_connected = False self.fallback_cache = {} logger.warning(f"Redis connection failed, using in-memory fallback: {str(e)}") async def set_cache(self, key: str, value: Any, expire: int = 3600): """Set a cache entry with optional expiration time (default 1 hour)""" try: if not self.is_connected: self.fallback_cache[key] = value return await self.redis.set(key, json.dumps(value), ex=expire) except Exception as e: logger.error(f"Cache set error: {str(e)}") self.fallback_cache[key] = value async def get_cache(self, key: str) -> Optional[Any]: """Get a cached value by key""" try: if not self.is_connected: return self.fallback_cache.get(key) value = await self.redis.get(key) if value: return json.loads(value) except Exception as e: logger.error(f"Cache get error: {str(e)}") return self.fallback_cache.get(key) return None async def delete_cache(self, key: str) -> bool: """Delete a cache entry by key""" try: if not self.is_connected: return bool(self.fallback_cache.pop(key, None)) return bool(await self.redis.delete(key)) except Exception as e: logger.error(f"Cache delete error: {str(e)}") return False async def clear_cache_pattern(self, pattern: str) -> bool: """Clear all cache entries matching a pattern""" try: if not self.is_connected: # Basic pattern matching for fallback cache removed = 0 for key in list(self.fallback_cache.keys()): if pattern in key: del self.fallback_cache[key] removed += 1 return removed > 0 # Get all keys matching pattern keys = [key async for key in self.redis.scan_iter(pattern)] if keys: await self.redis.delete(*keys) return bool(keys) except Exception as e: logger.error(f"Cache pattern clear error: {str(e)}") return False async def check_connection(self) -> bool: """Check if Redis connection is alive""" try: await self.redis.ping() self.is_connected = True return True except Exception: self.is_connected = False return False async def cleanup_expired(self): """Clean up expired cache entries""" # Redis automatically handles expiration, only need to clean fallback if not self.is_connected: self.fallback_cache = {} def cached(ttl_seconds: int): """Decorator to cache function results""" def decorator(func: Callable[..., T]) -> Callable[..., T]: async def async_wrapper(*args, **kwargs) -> T: # Create cache key from function name and arguments key = f"{func.__name__}:{str(args)}:{str(kwargs)}" # Try to get from cache first cached_value = await cache.get_cache(key) if cached_value is not None: return cached_value # If not in cache, execute function result = await func(*args, **kwargs) # Cache the result await cache.set_cache(key, result, expire=ttl_seconds) return result def sync_wrapper(*args, **kwargs) -> T: # Create cache key from function name and arguments key = f"{func.__name__}:{str(args)}:{str(kwargs)}" # For sync functions, we can't use async cache directly # So we use the fallback cache if key in cache.fallback_cache: return cache.fallback_cache[key] # If not in cache, execute function result = func(*args, **kwargs) # Cache the result in fallback cache.fallback_cache[key] = result return result return async_wrapper if inspect.iscoroutinefunction(func) else sync_wrapper return decorator cache = RedisCache()