zenith-backend / app /core /redis_manager.py
teoat's picture
fix(backend): fix port and health check robustness
d29a5a0 verified
"""
Redis clustering and caching management
"""
import json
import logging
import os
from typing import Any, Optional
import redis.asyncio as redis
logger = logging.getLogger(__name__)
# Redis configuration
REDIS_NODES = [
{"host": "cache-service", "port": 6379, "db": 0},
# Add additional Redis nodes for clustering
{"host": os.getenv("REDIS_SLAVE_1", "cache-service"), "port": 6379, "db": 0},
{"host": os.getenv("REDIS_SLAVE_2", "cache-service"), "port": 6379, "db": 0}
]
CACHE_TTL = int(os.getenv("CACHE_TTL", "3600"))
CLUSTER_ENABLED = os.getenv("REDIS_CLUSTER_ENABLED", "false").lower() == "true"
class RedisManager:
"""Enhanced Redis manager with clustering support"""
def __init__(self):
self.master = None
self.slaves = []
self.cluster_mode = CLUSTER_ENABLED
async def initialize_connections(self):
"""Initialize Redis connections based on configuration"""
if self.cluster_mode:
await self._setup_cluster()
else:
await self._setup_single_node()
async def _setup_single_node(self):
"""Setup single Redis node"""
self.master = redis.from_url(
f"redis://{REDIS_NODES[0]['host']}:{REDIS_NODES[0]['port']}/{REDIS_NODES[0]['db']}",
decode_responses=True,
socket_timeout=5,
socket_connect_timeout=5,
retry_on_timeout=True
)
logger.info("Single Redis node initialized")
async def _setup_cluster(self):
"""Setup Redis cluster"""
try:
self.master = redis.from_url(
f"redis://{REDIS_NODES[0]['host']}:{REDIS_NODES[0]['port']}/{REDIS_NODES[0]['db']}",
decode_responses=True
)
# Add slave nodes if available
for node in REDIS_NODES[1:]:
if node.get("host"):
slave = redis.from_url(
f"redis://{node['host']}:{node['port']}/{node['db']}",
decode_responses=True
)
self.slaves.append(slave)
logger.info(f"Redis cluster initialized with {len(self.slaves) + 1} nodes")
except Exception as e:
logger.error(f"Cluster setup failed: {e}")
await self._setup_single_node()
async def set_cache(self, key: str, value: Any, ttl: Optional[int] = None) -> bool:
"""Set cache with optional TTL"""
ttl = ttl or CACHE_TTL
try:
result = await self.master.setex(key, ttl, json.dumps(value))
return result
except Exception as e:
logger.error(f"Cache set error: {e}")
return False
async def get_cache(self, key: str) -> Optional[Any]:
"""Get cached value"""
try:
result = await self.master.get(key)
if result:
return json.loads(result)
return None
except Exception as e:
logger.error(f"Cache get error: {e}")
return None
async def delete_cache(self, key: str) -> bool:
"""Delete cached value"""
try:
result = await self.master.delete(key)
return result
except Exception as e:
logger.error(f"Cache delete error: {e}")
return False
async def health_check(self) -> dict[str, Any]:
"""Health check for Redis cluster"""
status = {
"mode": "cluster" if self.cluster_mode else "single",
"master": "unknown",
"slaves": []
}
try:
# Master health
await self.master.ping()
info = await self.master.info()
status["master"] = {
"status": "healthy",
"memory": info.get("used_memory_human"),
"connected_clients": info.get("connected_clients"),
"uptime": info.get("uptime_in_seconds")
}
except Exception as e:
status["master"] = {"status": f"unhealthy: {e}"}
# Slave health checks
for i, slave in enumerate(self.slaves):
try:
await slave.ping()
status["slaves"].append({
"node": f"slave_{i+1}",
"status": "healthy",
"host": slave.connection_pool.connection_kwargs.get("host", "unknown")
})
except Exception as e:
status["slaves"].append({
"node": f"slave_{i+1}",
"status": f"unhealthy: {e}",
"host": "unknown"
})
return status
async def clear_cache_pattern(self, pattern: str) -> int:
"""Clear cache by pattern"""
try:
keys = await self.master.keys(pattern)
if keys:
return await self.master.delete(*keys)
return 0
except Exception as e:
logger.error(f"Cache clear error: {e}")
return 0
# Global Redis manager instance
redis_manager = RedisManager()
# Connection helper for async context
async def get_redis_connection():
"""Get Redis connection (master or slave based on availability)"""
return redis_manager.master
# Cache decorators
def cache_result(ttl: int = CACHE_TTL):
"""Decorator for caching function results"""
def decorator(func):
async def wrapper(*args, **kwargs):
cache_key = f"cache:{func.__name__}:{hash(str(args) + str(kwargs))}"
result = await redis_manager.get_cache(cache_key)
if result is not None:
return result
result = await func(*args, **kwargs)
await redis_manager.set_cache(cache_key, result, ttl)
return result
return wrapper
return decorator