""" Knowledge Universe - Redis Cache Manager High-performance Redis operations with connection pooling RICK'S FIX: Added Sorted Set (ZSET) operations (zadd, zrangebyscore, zrem) to support scheduling for the Freshness Webhook daemon. """ import json import logging from typing import Any, Optional, Dict, List import redis.asyncio as aioredis from redis.asyncio.connection import ConnectionPool from config.settings import get_settings settings = get_settings() logger = logging.getLogger(__name__) class RedisManager: """ Async Redis manager with connection pooling and utilities """ def __init__(self): self.pool: Optional[ConnectionPool] = None self.client: Optional[aioredis.Redis] = None self._stats = { 'hits': 0, 'misses': 0, 'evictions': 0 } async def connect(self): """Initialize Redis connection""" try: use_ssl = getattr(settings, 'REDIS_SSL', False) or ( not ("127.0.0.1" in settings.REDIS_HOST or "localhost" in settings.REDIS_HOST) and settings.REDIS_PASSWORD # Cloud Redis with password = assume SSL ) self.client = aioredis.Redis( host=settings.REDIS_HOST, port=settings.REDIS_PORT, password=settings.REDIS_PASSWORD, db=settings.REDIS_DB, max_connections=settings.REDIS_MAX_CONNECTIONS, socket_timeout=settings.REDIS_SOCKET_TIMEOUT, decode_responses=False, ssl=use_ssl ) await self.client.ping() logger.info("Redis connection established") except Exception as e: logger.error(f"Redis connection failed: {e}") raise async def close(self): """Close Redis connections""" if self.client: await self.client.close() await self.client.connection_pool.disconnect() logger.info("Redis connection closed") async def ping(self) -> bool: """Check if Redis is alive""" try: return await self.client.ping() except: return False # ============ GET OPERATIONS ============ async def get(self, key: str) -> Optional[bytes]: try: value = await self.client.get(key) if value: self._stats['hits'] += 1 else: self._stats['misses'] += 1 return value except Exception as e: logger.error(f"Redis GET failed for {key}: {e}") return None async def get_json(self, key: str) -> Optional[Dict]: value = await self.get(key) if value: try: return json.loads(value.decode('utf-8')) except json.JSONDecodeError as e: logger.error(f"JSON decode failed for {key}: {e}") return None return None async def get_many(self, keys: List[str]) -> List[Optional[bytes]]: try: return await self.client.mget(keys) except Exception as e: logger.error(f"Redis MGET failed: {e}") return [None] * len(keys) # ============ SET OPERATIONS ============ async def set( self, key: str, value: Any, ttl: Optional[int] = None ) -> bool: try: if isinstance(value, dict) or isinstance(value, list): value = json.dumps(value).encode('utf-8') elif isinstance(value, str): value = value.encode('utf-8') if ttl: await self.client.setex(key, ttl, value) else: await self.client.set(key, value) return True except Exception as e: logger.error(f"Redis SET failed for {key}: {e}") return False async def set_json( self, key: str, value: Dict, ttl: Optional[int] = None ) -> bool: return await self.set(key, value, ttl) async def set_many( self, mapping: Dict[str, Any], ttl: Optional[int] = None ) -> bool: try: pipeline = self.client.pipeline() for key, value in mapping.items(): if isinstance(value, dict): value = json.dumps(value).encode('utf-8') elif isinstance(value, str): value = value.encode('utf-8') if ttl: pipeline.setex(key, ttl, value) else: pipeline.set(key, value) await pipeline.execute() return True except Exception as e: logger.error(f"Redis MSET failed: {e}") return False # ============ DELETE OPERATIONS ============ async def delete(self, key: str) -> bool: try: result = await self.client.delete(key) if result > 0: self._stats['evictions'] += 1 return result > 0 except Exception as e: logger.error(f"Redis DELETE failed for {key}: {e}") return False async def delete_many(self, keys: List[str]) -> int: try: count = await self.client.delete(*keys) self._stats['evictions'] += count return count except Exception as e: logger.error(f"Redis DELETE many failed: {e}") return 0 async def delete_pattern(self, pattern: str) -> int: try: keys = [] async for key in self.client.scan_iter(match=pattern): keys.append(key) if keys: return await self.delete_many(keys) return 0 except Exception as e: logger.error(f"Redis DELETE pattern failed for {pattern}: {e}") return 0 # ============ TTL OPERATIONS ============ async def ttl(self, key: str) -> int: try: return await self.client.ttl(key) except Exception as e: logger.error(f"Redis TTL failed for {key}: {e}") return -2 async def expire(self, key: str, seconds: int) -> bool: try: return await self.client.expire(key, seconds) except Exception as e: logger.error(f"Redis EXPIRE failed for {key}: {e}") return False # ============ UTILITY OPERATIONS ============ async def exists(self, key: str) -> bool: try: return await self.client.exists(key) > 0 except Exception as e: logger.error(f"Redis EXISTS failed for {key}: {e}") return False async def keys(self, pattern: str = "*") -> List[str]: try: keys = [] async for key in self.client.scan_iter(match=pattern): keys.append(key.decode('utf-8') if isinstance(key, bytes) else key) return keys except Exception as e: logger.error(f"Redis KEYS failed for {pattern}: {e}") return [] async def memory_usage(self, key: str) -> int: try: return await self.client.memory_usage(key) or 0 except Exception as e: logger.debug(f"Memory usage unavailable for {key}") return 0 # ============ HASH OPERATIONS ============ async def hset(self, key: str, field: str, value: str) -> bool: try: await self.client.hset(key, field, value) return True except Exception as e: logger.error(f"Redis HSET failed for {key}: {e}") return False async def hget(self, key: str, field: str) -> Optional[str]: try: value = await self.client.hget(key, field) return value.decode('utf-8') if value else None except Exception as e: logger.error(f"Redis HGET failed for {key}.{field}: {e}") return None async def hgetall(self, key: str) -> Dict[str, str]: try: data = await self.client.hgetall(key) return { k.decode('utf-8'): v.decode('utf-8') for k, v in data.items() } except Exception as e: logger.error(f"Redis HGETALL failed for {key}: {e}") return {} # ============ SORTED SET OPERATIONS (NEW: FEATURE 2) ============ async def zadd(self, key: str, mapping: Dict[str, float]) -> int: """Add members to a sorted set, used for scheduling webhooks""" try: return await self.client.zadd(key, mapping) except Exception as e: logger.error(f"Redis ZADD failed for {key}: {e}") return 0 async def zrangebyscore(self, key: str, min_val: float, max_val: float) -> List[Any]: """Get members in a sorted set by score (timestamp)""" try: return await self.client.zrangebyscore(key, min_val, max_val) except Exception as e: logger.error(f"Redis ZRANGEBYSCORE failed for {key}: {e}") return [] async def zrem(self, key: str, *values) -> int: """Remove members from a sorted set""" try: return await self.client.zrem(key, *values) except Exception as e: logger.error(f"Redis ZREM failed for {key}: {e}") return 0 # ============ STATISTICS ============ async def get_stats(self) -> Dict[str, Any]: try: info = await self.client.info('memory') total_hits = self._stats['hits'] total_misses = self._stats['misses'] total_requests = total_hits + total_misses return { 'total_keys': await self.client.dbsize(), 'memory_used_mb': info['used_memory'] / (1024 * 1024), 'memory_used_percent': info.get('used_memory_rss', 0) / info.get('maxmemory', 1) if info.get('maxmemory') else 0, 'hit_rate': total_hits / total_requests if total_requests > 0 else 0, 'hits': total_hits, 'misses': total_misses, 'evictions': self._stats['evictions'], 'ttl_distribution': await self._get_ttl_distribution() } except Exception as e: logger.error(f"Failed to get stats: {e}") return {} async def _get_ttl_distribution(self) -> Dict[str, int]: distribution = { '0-1h': 0, '1-6h': 0, '6-24h': 0, '24h+': 0, 'no_ttl': 0 } try: async for key in self.client.scan_iter(): ttl = await self.client.ttl(key) if ttl == -1: distribution['no_ttl'] += 1 elif ttl < 3600: distribution['0-1h'] += 1 elif ttl < 21600: distribution['1-6h'] += 1 elif ttl < 86400: distribution['6-24h'] += 1 else: distribution['24h+'] += 1 return distribution except: return distribution