Spaces:
Paused
Paused
| """ | |
| Redis clustering and caching management | |
| """ | |
| import json | |
| import logging | |
| import os | |
| from typing import Any, Optional | |
| import redis.asyncio as redis | |
| logger = logging.getLogger(__name__) | |
| # Redis configuration | |
| REDIS_NODES = [ | |
| {"host": "cache-service", "port": 6379, "db": 0}, | |
| # Add additional Redis nodes for clustering | |
| {"host": os.getenv("REDIS_SLAVE_1", "cache-service"), "port": 6379, "db": 0}, | |
| {"host": os.getenv("REDIS_SLAVE_2", "cache-service"), "port": 6379, "db": 0} | |
| ] | |
| CACHE_TTL = int(os.getenv("CACHE_TTL", "3600")) | |
| CLUSTER_ENABLED = os.getenv("REDIS_CLUSTER_ENABLED", "false").lower() == "true" | |
| class RedisManager: | |
| """Enhanced Redis manager with clustering support""" | |
| def __init__(self): | |
| self.master = None | |
| self.slaves = [] | |
| self.cluster_mode = CLUSTER_ENABLED | |
| async def initialize_connections(self): | |
| """Initialize Redis connections based on configuration""" | |
| if self.cluster_mode: | |
| await self._setup_cluster() | |
| else: | |
| await self._setup_single_node() | |
| async def _setup_single_node(self): | |
| """Setup single Redis node""" | |
| self.master = redis.from_url( | |
| f"redis://{REDIS_NODES[0]['host']}:{REDIS_NODES[0]['port']}/{REDIS_NODES[0]['db']}", | |
| decode_responses=True, | |
| socket_timeout=5, | |
| socket_connect_timeout=5, | |
| retry_on_timeout=True | |
| ) | |
| logger.info("Single Redis node initialized") | |
| async def _setup_cluster(self): | |
| """Setup Redis cluster""" | |
| try: | |
| self.master = redis.from_url( | |
| f"redis://{REDIS_NODES[0]['host']}:{REDIS_NODES[0]['port']}/{REDIS_NODES[0]['db']}", | |
| decode_responses=True | |
| ) | |
| # Add slave nodes if available | |
| for node in REDIS_NODES[1:]: | |
| if node.get("host"): | |
| slave = redis.from_url( | |
| f"redis://{node['host']}:{node['port']}/{node['db']}", | |
| decode_responses=True | |
| ) | |
| self.slaves.append(slave) | |
| logger.info(f"Redis cluster initialized with {len(self.slaves) + 1} nodes") | |
| except Exception as e: | |
| logger.error(f"Cluster setup failed: {e}") | |
| await self._setup_single_node() | |
| async def set_cache(self, key: str, value: Any, ttl: Optional[int] = None) -> bool: | |
| """Set cache with optional TTL""" | |
| ttl = ttl or CACHE_TTL | |
| try: | |
| result = await self.master.setex(key, ttl, json.dumps(value)) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Cache set error: {e}") | |
| return False | |
| async def get_cache(self, key: str) -> Optional[Any]: | |
| """Get cached value""" | |
| try: | |
| result = await self.master.get(key) | |
| if result: | |
| return json.loads(result) | |
| return None | |
| except Exception as e: | |
| logger.error(f"Cache get error: {e}") | |
| return None | |
| async def delete_cache(self, key: str) -> bool: | |
| """Delete cached value""" | |
| try: | |
| result = await self.master.delete(key) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Cache delete error: {e}") | |
| return False | |
| async def health_check(self) -> dict[str, Any]: | |
| """Health check for Redis cluster""" | |
| status = { | |
| "mode": "cluster" if self.cluster_mode else "single", | |
| "master": "unknown", | |
| "slaves": [] | |
| } | |
| try: | |
| # Master health | |
| await self.master.ping() | |
| info = await self.master.info() | |
| status["master"] = { | |
| "status": "healthy", | |
| "memory": info.get("used_memory_human"), | |
| "connected_clients": info.get("connected_clients"), | |
| "uptime": info.get("uptime_in_seconds") | |
| } | |
| except Exception as e: | |
| status["master"] = {"status": f"unhealthy: {e}"} | |
| # Slave health checks | |
| for i, slave in enumerate(self.slaves): | |
| try: | |
| await slave.ping() | |
| status["slaves"].append({ | |
| "node": f"slave_{i+1}", | |
| "status": "healthy", | |
| "host": slave.connection_pool.connection_kwargs.get("host", "unknown") | |
| }) | |
| except Exception as e: | |
| status["slaves"].append({ | |
| "node": f"slave_{i+1}", | |
| "status": f"unhealthy: {e}", | |
| "host": "unknown" | |
| }) | |
| return status | |
| async def clear_cache_pattern(self, pattern: str) -> int: | |
| """Clear cache by pattern""" | |
| try: | |
| keys = await self.master.keys(pattern) | |
| if keys: | |
| return await self.master.delete(*keys) | |
| return 0 | |
| except Exception as e: | |
| logger.error(f"Cache clear error: {e}") | |
| return 0 | |
| # Global Redis manager instance | |
| redis_manager = RedisManager() | |
| # Connection helper for async context | |
| async def get_redis_connection(): | |
| """Get Redis connection (master or slave based on availability)""" | |
| return redis_manager.master | |
| # Cache decorators | |
| def cache_result(ttl: int = CACHE_TTL): | |
| """Decorator for caching function results""" | |
| def decorator(func): | |
| async def wrapper(*args, **kwargs): | |
| cache_key = f"cache:{func.__name__}:{hash(str(args) + str(kwargs))}" | |
| result = await redis_manager.get_cache(cache_key) | |
| if result is not None: | |
| return result | |
| result = await func(*args, **kwargs) | |
| await redis_manager.set_cache(cache_key, result, ttl) | |
| return result | |
| return wrapper | |
| return decorator | |