Spaces:
Running
Running
| """ | |
| Knowledge Universe - Redis Cache Manager | |
| High-performance Redis operations with connection pooling | |
| RICK'S FIX: Added Sorted Set (ZSET) operations (zadd, zrangebyscore, zrem) | |
| to support scheduling for the Freshness Webhook daemon. | |
| """ | |
| import json | |
| import logging | |
| from typing import Any, Optional, Dict, List | |
| import redis.asyncio as aioredis | |
| from redis.asyncio.connection import ConnectionPool | |
| from config.settings import get_settings | |
| settings = get_settings() | |
| logger = logging.getLogger(__name__) | |
| class RedisManager: | |
| """ | |
| Async Redis manager with connection pooling and utilities | |
| """ | |
| def __init__(self): | |
| self.pool: Optional[ConnectionPool] = None | |
| self.client: Optional[aioredis.Redis] = None | |
| self._stats = { | |
| 'hits': 0, | |
| 'misses': 0, | |
| 'evictions': 0 | |
| } | |
| async def connect(self): | |
| """Initialize Redis connection""" | |
| try: | |
| use_ssl = getattr(settings, 'REDIS_SSL', False) or ( | |
| not ("127.0.0.1" in settings.REDIS_HOST or "localhost" in settings.REDIS_HOST) | |
| and settings.REDIS_PASSWORD # Cloud Redis with password = assume SSL | |
| ) | |
| self.client = aioredis.Redis( | |
| host=settings.REDIS_HOST, | |
| port=settings.REDIS_PORT, | |
| password=settings.REDIS_PASSWORD, | |
| db=settings.REDIS_DB, | |
| max_connections=settings.REDIS_MAX_CONNECTIONS, | |
| socket_timeout=settings.REDIS_SOCKET_TIMEOUT, | |
| decode_responses=False, | |
| ssl=use_ssl | |
| ) | |
| await self.client.ping() | |
| logger.info("Redis connection established") | |
| except Exception as e: | |
| logger.error(f"Redis connection failed: {e}") | |
| raise | |
| async def close(self): | |
| """Close Redis connections""" | |
| if self.client: | |
| await self.client.close() | |
| await self.client.connection_pool.disconnect() | |
| logger.info("Redis connection closed") | |
| async def ping(self) -> bool: | |
| """Check if Redis is alive""" | |
| try: | |
| return await self.client.ping() | |
| except: | |
| return False | |
| # ============ GET OPERATIONS ============ | |
| async def get(self, key: str) -> Optional[bytes]: | |
| try: | |
| value = await self.client.get(key) | |
| if value: | |
| self._stats['hits'] += 1 | |
| else: | |
| self._stats['misses'] += 1 | |
| return value | |
| except Exception as e: | |
| logger.error(f"Redis GET failed for {key}: {e}") | |
| return None | |
| async def get_json(self, key: str) -> Optional[Dict]: | |
| value = await self.get(key) | |
| if value: | |
| try: | |
| return json.loads(value.decode('utf-8')) | |
| except json.JSONDecodeError as e: | |
| logger.error(f"JSON decode failed for {key}: {e}") | |
| return None | |
| return None | |
| async def get_many(self, keys: List[str]) -> List[Optional[bytes]]: | |
| try: | |
| return await self.client.mget(keys) | |
| except Exception as e: | |
| logger.error(f"Redis MGET failed: {e}") | |
| return [None] * len(keys) | |
| # ============ SET OPERATIONS ============ | |
| async def set( | |
| self, | |
| key: str, | |
| value: Any, | |
| ttl: Optional[int] = None | |
| ) -> bool: | |
| try: | |
| if isinstance(value, dict) or isinstance(value, list): | |
| value = json.dumps(value).encode('utf-8') | |
| elif isinstance(value, str): | |
| value = value.encode('utf-8') | |
| if ttl: | |
| await self.client.setex(key, ttl, value) | |
| else: | |
| await self.client.set(key, value) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Redis SET failed for {key}: {e}") | |
| return False | |
| async def set_json( | |
| self, | |
| key: str, | |
| value: Dict, | |
| ttl: Optional[int] = None | |
| ) -> bool: | |
| return await self.set(key, value, ttl) | |
| async def set_many( | |
| self, | |
| mapping: Dict[str, Any], | |
| ttl: Optional[int] = None | |
| ) -> bool: | |
| try: | |
| pipeline = self.client.pipeline() | |
| for key, value in mapping.items(): | |
| if isinstance(value, dict): | |
| value = json.dumps(value).encode('utf-8') | |
| elif isinstance(value, str): | |
| value = value.encode('utf-8') | |
| if ttl: | |
| pipeline.setex(key, ttl, value) | |
| else: | |
| pipeline.set(key, value) | |
| await pipeline.execute() | |
| return True | |
| except Exception as e: | |
| logger.error(f"Redis MSET failed: {e}") | |
| return False | |
| # ============ DELETE OPERATIONS ============ | |
| async def delete(self, key: str) -> bool: | |
| try: | |
| result = await self.client.delete(key) | |
| if result > 0: | |
| self._stats['evictions'] += 1 | |
| return result > 0 | |
| except Exception as e: | |
| logger.error(f"Redis DELETE failed for {key}: {e}") | |
| return False | |
| async def delete_many(self, keys: List[str]) -> int: | |
| try: | |
| count = await self.client.delete(*keys) | |
| self._stats['evictions'] += count | |
| return count | |
| except Exception as e: | |
| logger.error(f"Redis DELETE many failed: {e}") | |
| return 0 | |
| async def delete_pattern(self, pattern: str) -> int: | |
| try: | |
| keys = [] | |
| async for key in self.client.scan_iter(match=pattern): | |
| keys.append(key) | |
| if keys: | |
| return await self.delete_many(keys) | |
| return 0 | |
| except Exception as e: | |
| logger.error(f"Redis DELETE pattern failed for {pattern}: {e}") | |
| return 0 | |
| # ============ TTL OPERATIONS ============ | |
| async def ttl(self, key: str) -> int: | |
| try: | |
| return await self.client.ttl(key) | |
| except Exception as e: | |
| logger.error(f"Redis TTL failed for {key}: {e}") | |
| return -2 | |
| async def expire(self, key: str, seconds: int) -> bool: | |
| try: | |
| return await self.client.expire(key, seconds) | |
| except Exception as e: | |
| logger.error(f"Redis EXPIRE failed for {key}: {e}") | |
| return False | |
| # ============ UTILITY OPERATIONS ============ | |
| async def exists(self, key: str) -> bool: | |
| try: | |
| return await self.client.exists(key) > 0 | |
| except Exception as e: | |
| logger.error(f"Redis EXISTS failed for {key}: {e}") | |
| return False | |
| async def keys(self, pattern: str = "*") -> List[str]: | |
| try: | |
| keys = [] | |
| async for key in self.client.scan_iter(match=pattern): | |
| keys.append(key.decode('utf-8') if isinstance(key, bytes) else key) | |
| return keys | |
| except Exception as e: | |
| logger.error(f"Redis KEYS failed for {pattern}: {e}") | |
| return [] | |
| async def memory_usage(self, key: str) -> int: | |
| try: | |
| return await self.client.memory_usage(key) or 0 | |
| except Exception as e: | |
| logger.debug(f"Memory usage unavailable for {key}") | |
| return 0 | |
| # ============ HASH OPERATIONS ============ | |
| async def hset(self, key: str, field: str, value: str) -> bool: | |
| try: | |
| await self.client.hset(key, field, value) | |
| return True | |
| except Exception as e: | |
| logger.error(f"Redis HSET failed for {key}: {e}") | |
| return False | |
| async def hget(self, key: str, field: str) -> Optional[str]: | |
| try: | |
| value = await self.client.hget(key, field) | |
| return value.decode('utf-8') if value else None | |
| except Exception as e: | |
| logger.error(f"Redis HGET failed for {key}.{field}: {e}") | |
| return None | |
| async def hgetall(self, key: str) -> Dict[str, str]: | |
| try: | |
| data = await self.client.hgetall(key) | |
| return { | |
| k.decode('utf-8'): v.decode('utf-8') | |
| for k, v in data.items() | |
| } | |
| except Exception as e: | |
| logger.error(f"Redis HGETALL failed for {key}: {e}") | |
| return {} | |
| # ============ SORTED SET OPERATIONS (NEW: FEATURE 2) ============ | |
| async def zadd(self, key: str, mapping: Dict[str, float]) -> int: | |
| """Add members to a sorted set, used for scheduling webhooks""" | |
| try: | |
| return await self.client.zadd(key, mapping) | |
| except Exception as e: | |
| logger.error(f"Redis ZADD failed for {key}: {e}") | |
| return 0 | |
| async def zrangebyscore(self, key: str, min_val: float, max_val: float) -> List[Any]: | |
| """Get members in a sorted set by score (timestamp)""" | |
| try: | |
| return await self.client.zrangebyscore(key, min_val, max_val) | |
| except Exception as e: | |
| logger.error(f"Redis ZRANGEBYSCORE failed for {key}: {e}") | |
| return [] | |
| async def zrem(self, key: str, *values) -> int: | |
| """Remove members from a sorted set""" | |
| try: | |
| return await self.client.zrem(key, *values) | |
| except Exception as e: | |
| logger.error(f"Redis ZREM failed for {key}: {e}") | |
| return 0 | |
| # ============ STATISTICS ============ | |
| async def get_stats(self) -> Dict[str, Any]: | |
| try: | |
| info = await self.client.info('memory') | |
| total_hits = self._stats['hits'] | |
| total_misses = self._stats['misses'] | |
| total_requests = total_hits + total_misses | |
| return { | |
| 'total_keys': await self.client.dbsize(), | |
| 'memory_used_mb': info['used_memory'] / (1024 * 1024), | |
| 'memory_used_percent': info.get('used_memory_rss', 0) / info.get('maxmemory', 1) if info.get('maxmemory') else 0, | |
| 'hit_rate': total_hits / total_requests if total_requests > 0 else 0, | |
| 'hits': total_hits, | |
| 'misses': total_misses, | |
| 'evictions': self._stats['evictions'], | |
| 'ttl_distribution': await self._get_ttl_distribution() | |
| } | |
| except Exception as e: | |
| logger.error(f"Failed to get stats: {e}") | |
| return {} | |
| async def _get_ttl_distribution(self) -> Dict[str, int]: | |
| distribution = { | |
| '0-1h': 0, '1-6h': 0, '6-24h': 0, '24h+': 0, 'no_ttl': 0 | |
| } | |
| try: | |
| async for key in self.client.scan_iter(): | |
| ttl = await self.client.ttl(key) | |
| if ttl == -1: distribution['no_ttl'] += 1 | |
| elif ttl < 3600: distribution['0-1h'] += 1 | |
| elif ttl < 21600: distribution['1-6h'] += 1 | |
| elif ttl < 86400: distribution['6-24h'] += 1 | |
| else: distribution['24h+'] += 1 | |
| return distribution | |
| except: | |
| return distribution |