""" Redis Caching Utility Provides caching decorator and Redis connection management """ import json import hashlib from functools import wraps from typing import Optional, Any import logging logger = logging.getLogger(__name__) # Redis client (lazy initialization) _redis_client: Optional[Any] = None def get_redis_client(): """Get Redis client instance (singleton with graceful fallback)""" global _redis_client if _redis_client is None: try: from redis import Redis _redis_client = Redis( host='localhost', port=6379, db=0, decode_responses=True, socket_connect_timeout=2, socket_timeout=2 ) # Test connection _redis_client.ping() logger.info("[CACHE] [SUCCESS] Redis connection established") except Exception as e: logger.warning(f"[CACHE] Redis not available: {e}. Caching disabled.") _redis_client = False # Mark as unavailable to avoid retries return _redis_client if _redis_client is not False else None def generate_cache_key(func_name: str, args: tuple, kwargs: dict) -> str: """Generate a unique cache key from function name and arguments""" # Convert args and kwargs to a stable string representation args_str = str(args) kwargs_str = str(sorted(kwargs.items())) combined = f"{func_name}:{args_str}:{kwargs_str}" # Hash for shorter keys (avoid Redis key length limits) key_hash = hashlib.md5(combined.encode()).hexdigest() return f"cache:{func_name}:{key_hash}" def cache(ttl: int = 300): """ Caching decorator with Redis backend Args: ttl: Time to live in seconds (default 5 minutes) Usage: @cache(ttl=60) def expensive_function(arg1, arg2): # ... expensive operation return result """ def decorator(func): @wraps(func) def wrapper(*args, **kwargs): redis_client = get_redis_client() # If Redis is not available, execute function normally if redis_client is None: return func(*args, **kwargs) try: # Generate cache key cache_key = generate_cache_key(func.__name__, args, kwargs) # Try to get from cache cached_value = redis_client.get(cache_key) if cached_value is not None: logger.debug(f"[CACHE] HIT: {func.__name__}") return json.loads(cached_value) # Cache miss - execute function logger.debug(f"[CACHE] MISS: {func.__name__}") result = func(*args, **kwargs) # Store in cache try: serialized = json.dumps(result, default=str) redis_client.setex(cache_key, ttl, serialized) except (TypeError, ValueError) as e: logger.warning(f"[CACHE] Failed to serialize result for {func.__name__}: {e}") return result except Exception as e: # If caching fails, log and execute function normally logger.warning(f"[CACHE] Error in cache decorator for {func.__name__}: {e}") return func(*args, **kwargs) return wrapper return decorator def invalidate_cache(pattern: str = None): """ Invalidate cache entries matching pattern Args: pattern: Redis key pattern (e.g., "cache:list_properties:*") If None, clears all cache entries """ redis_client = get_redis_client() if redis_client is None: return try: if pattern: keys = redis_client.keys(pattern) if keys: redis_client.delete(*keys) logger.info(f"[CACHE] Invalidated {len(keys)} keys matching '{pattern}'") else: redis_client.flushdb() logger.info("[CACHE] Cleared all cache entries") except Exception as e: logger.error(f"[CACHE] Error invalidating cache: {e}") def cache_property_data(property_id: str, data: dict, ttl: int = 300): """Cache property data manually (for fine-grained control)""" redis_client = get_redis_client() if redis_client is None: return try: cache_key = f"cache:property:{property_id}" redis_client.setex(cache_key, ttl, json.dumps(data, default=str)) logger.debug(f"[CACHE] Cached property data: {property_id}") except Exception as e: logger.warning(f"[CACHE] Failed to cache property data: {e}") def get_cached_property(property_id: str) -> Optional[dict]: """Get cached property data""" redis_client = get_redis_client() if redis_client is None: return None try: cache_key = f"cache:property:{property_id}" cached = redis_client.get(cache_key) if cached: return json.loads(cached) except Exception as e: logger.warning(f"[CACHE] Failed to get cached property: {e}") return None def cache_xrp_balance(address: str, balance: float, ttl: int = 300): """Cache XRP balance for an address""" redis_client = get_redis_client() if redis_client is None: return try: cache_key = f"cache:xrp_balance:{address}" redis_client.setex(cache_key, ttl, str(balance)) logger.debug(f"[CACHE] Cached XRP balance for {address}: {balance}") except Exception as e: logger.warning(f"[CACHE] Failed to cache XRP balance: {e}") def get_cached_xrp_balance(address: str) -> Optional[float]: """Get cached XRP balance""" redis_client = get_redis_client() if redis_client is None: return None try: cache_key = f"cache:xrp_balance:{address}" cached = redis_client.get(cache_key) if cached: return float(cached) except Exception as e: logger.warning(f"[CACHE] Failed to get cached XRP balance: {e}") return None def cache_token_balance(xrp_address: str, currency_code: str, issuer_address: str, balance: float, ttl: int = 300): """Cache IOU token balance for an address""" redis_client = get_redis_client() if redis_client is None: return try: cache_key = f"cache:token_balance:{xrp_address}:{currency_code}:{issuer_address}" redis_client.setex(cache_key, ttl, str(balance)) logger.debug(f"[CACHE] Cached token balance for {xrp_address}: {balance} {currency_code}") except Exception as e: logger.warning(f"[CACHE] Failed to cache token balance: {e}") def get_cached_token_balance(xrp_address: str, currency_code: str, issuer_address: str) -> Optional[float]: """Get cached IOU token balance""" redis_client = get_redis_client() if redis_client is None: return None try: cache_key = f"cache:token_balance:{xrp_address}:{currency_code}:{issuer_address}" cached = redis_client.get(cache_key) if cached: return float(cached) except Exception as e: logger.warning(f"[CACHE] Failed to get cached token balance: {e}") return None