Spaces:
Sleeping
Sleeping
File size: 7,490 Bytes
4e4664a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
"""
Redis Caching Utility
Provides caching decorator and Redis connection management
"""
import json
import hashlib
from functools import wraps
from typing import Optional, Any
import logging
logger = logging.getLogger(__name__)
# Redis client (lazy initialization)
_redis_client: Optional[Any] = None
def get_redis_client():
"""Get Redis client instance (singleton with graceful fallback)"""
global _redis_client
if _redis_client is None:
try:
from redis import Redis
_redis_client = Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True,
socket_connect_timeout=2,
socket_timeout=2
)
# Test connection
_redis_client.ping()
logger.info("[CACHE] [SUCCESS] Redis connection established")
except Exception as e:
logger.warning(f"[CACHE] Redis not available: {e}. Caching disabled.")
_redis_client = False # Mark as unavailable to avoid retries
return _redis_client if _redis_client is not False else None
def generate_cache_key(func_name: str, args: tuple, kwargs: dict) -> str:
"""Generate a unique cache key from function name and arguments"""
# Convert args and kwargs to a stable string representation
args_str = str(args)
kwargs_str = str(sorted(kwargs.items()))
combined = f"{func_name}:{args_str}:{kwargs_str}"
# Hash for shorter keys (avoid Redis key length limits)
key_hash = hashlib.md5(combined.encode()).hexdigest()
return f"cache:{func_name}:{key_hash}"
def cache(ttl: int = 300):
"""
Caching decorator with Redis backend
Args:
ttl: Time to live in seconds (default 5 minutes)
Usage:
@cache(ttl=60)
def expensive_function(arg1, arg2):
# ... expensive operation
return result
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
redis_client = get_redis_client()
# If Redis is not available, execute function normally
if redis_client is None:
return func(*args, **kwargs)
try:
# Generate cache key
cache_key = generate_cache_key(func.__name__, args, kwargs)
# Try to get from cache
cached_value = redis_client.get(cache_key)
if cached_value is not None:
logger.debug(f"[CACHE] HIT: {func.__name__}")
return json.loads(cached_value)
# Cache miss - execute function
logger.debug(f"[CACHE] MISS: {func.__name__}")
result = func(*args, **kwargs)
# Store in cache
try:
serialized = json.dumps(result, default=str)
redis_client.setex(cache_key, ttl, serialized)
except (TypeError, ValueError) as e:
logger.warning(f"[CACHE] Failed to serialize result for {func.__name__}: {e}")
return result
except Exception as e:
# If caching fails, log and execute function normally
logger.warning(f"[CACHE] Error in cache decorator for {func.__name__}: {e}")
return func(*args, **kwargs)
return wrapper
return decorator
def invalidate_cache(pattern: str = None):
"""
Invalidate cache entries matching pattern
Args:
pattern: Redis key pattern (e.g., "cache:list_properties:*")
If None, clears all cache entries
"""
redis_client = get_redis_client()
if redis_client is None:
return
try:
if pattern:
keys = redis_client.keys(pattern)
if keys:
redis_client.delete(*keys)
logger.info(f"[CACHE] Invalidated {len(keys)} keys matching '{pattern}'")
else:
redis_client.flushdb()
logger.info("[CACHE] Cleared all cache entries")
except Exception as e:
logger.error(f"[CACHE] Error invalidating cache: {e}")
def cache_property_data(property_id: str, data: dict, ttl: int = 300):
"""Cache property data manually (for fine-grained control)"""
redis_client = get_redis_client()
if redis_client is None:
return
try:
cache_key = f"cache:property:{property_id}"
redis_client.setex(cache_key, ttl, json.dumps(data, default=str))
logger.debug(f"[CACHE] Cached property data: {property_id}")
except Exception as e:
logger.warning(f"[CACHE] Failed to cache property data: {e}")
def get_cached_property(property_id: str) -> Optional[dict]:
"""Get cached property data"""
redis_client = get_redis_client()
if redis_client is None:
return None
try:
cache_key = f"cache:property:{property_id}"
cached = redis_client.get(cache_key)
if cached:
return json.loads(cached)
except Exception as e:
logger.warning(f"[CACHE] Failed to get cached property: {e}")
return None
def cache_xrp_balance(address: str, balance: float, ttl: int = 300):
"""Cache XRP balance for an address"""
redis_client = get_redis_client()
if redis_client is None:
return
try:
cache_key = f"cache:xrp_balance:{address}"
redis_client.setex(cache_key, ttl, str(balance))
logger.debug(f"[CACHE] Cached XRP balance for {address}: {balance}")
except Exception as e:
logger.warning(f"[CACHE] Failed to cache XRP balance: {e}")
def get_cached_xrp_balance(address: str) -> Optional[float]:
"""Get cached XRP balance"""
redis_client = get_redis_client()
if redis_client is None:
return None
try:
cache_key = f"cache:xrp_balance:{address}"
cached = redis_client.get(cache_key)
if cached:
return float(cached)
except Exception as e:
logger.warning(f"[CACHE] Failed to get cached XRP balance: {e}")
return None
def cache_token_balance(xrp_address: str, currency_code: str, issuer_address: str, balance: float, ttl: int = 300):
"""Cache IOU token balance for an address"""
redis_client = get_redis_client()
if redis_client is None:
return
try:
cache_key = f"cache:token_balance:{xrp_address}:{currency_code}:{issuer_address}"
redis_client.setex(cache_key, ttl, str(balance))
logger.debug(f"[CACHE] Cached token balance for {xrp_address}: {balance} {currency_code}")
except Exception as e:
logger.warning(f"[CACHE] Failed to cache token balance: {e}")
def get_cached_token_balance(xrp_address: str, currency_code: str, issuer_address: str) -> Optional[float]:
"""Get cached IOU token balance"""
redis_client = get_redis_client()
if redis_client is None:
return None
try:
cache_key = f"cache:token_balance:{xrp_address}:{currency_code}:{issuer_address}"
cached = redis_client.get(cache_key)
if cached:
return float(cached)
except Exception as e:
logger.warning(f"[CACHE] Failed to get cached token balance: {e}")
return None
|