Knowledge-Universe / src /cache /redis_manager.py
vlsiddarth's picture
fix: Optimized Code performance
2549784
"""
Knowledge Universe - Redis Cache Manager
High-performance Redis operations with connection pooling
RICK'S FIX: Added Sorted Set (ZSET) operations (zadd, zrangebyscore, zrem)
to support scheduling for the Freshness Webhook daemon.
"""
import json
import logging
from typing import Any, Optional, Dict, List
import redis.asyncio as aioredis
from redis.asyncio.connection import ConnectionPool
from config.settings import get_settings
settings = get_settings()
logger = logging.getLogger(__name__)
class RedisManager:
"""
Async Redis manager with connection pooling and utilities
"""
def __init__(self):
self.pool: Optional[ConnectionPool] = None
self.client: Optional[aioredis.Redis] = None
self._stats = {
'hits': 0,
'misses': 0,
'evictions': 0
}
async def connect(self):
"""Initialize Redis connection"""
try:
use_ssl = getattr(settings, 'REDIS_SSL', False) or (
not ("127.0.0.1" in settings.REDIS_HOST or "localhost" in settings.REDIS_HOST)
and settings.REDIS_PASSWORD # Cloud Redis with password = assume SSL
)
self.client = aioredis.Redis(
host=settings.REDIS_HOST,
port=settings.REDIS_PORT,
password=settings.REDIS_PASSWORD,
db=settings.REDIS_DB,
max_connections=settings.REDIS_MAX_CONNECTIONS,
socket_timeout=settings.REDIS_SOCKET_TIMEOUT,
decode_responses=False,
ssl=use_ssl
)
await self.client.ping()
logger.info("Redis connection established")
except Exception as e:
logger.error(f"Redis connection failed: {e}")
raise
async def close(self):
"""Close Redis connections"""
if self.client:
await self.client.close()
await self.client.connection_pool.disconnect()
logger.info("Redis connection closed")
async def ping(self) -> bool:
"""Check if Redis is alive"""
try:
return await self.client.ping()
except:
return False
# ============ GET OPERATIONS ============
async def get(self, key: str) -> Optional[bytes]:
try:
value = await self.client.get(key)
if value:
self._stats['hits'] += 1
else:
self._stats['misses'] += 1
return value
except Exception as e:
logger.error(f"Redis GET failed for {key}: {e}")
return None
async def get_json(self, key: str) -> Optional[Dict]:
value = await self.get(key)
if value:
try:
return json.loads(value.decode('utf-8'))
except json.JSONDecodeError as e:
logger.error(f"JSON decode failed for {key}: {e}")
return None
return None
async def get_many(self, keys: List[str]) -> List[Optional[bytes]]:
try:
return await self.client.mget(keys)
except Exception as e:
logger.error(f"Redis MGET failed: {e}")
return [None] * len(keys)
# ============ SET OPERATIONS ============
async def set(
self,
key: str,
value: Any,
ttl: Optional[int] = None
) -> bool:
try:
if isinstance(value, dict) or isinstance(value, list):
value = json.dumps(value).encode('utf-8')
elif isinstance(value, str):
value = value.encode('utf-8')
if ttl:
await self.client.setex(key, ttl, value)
else:
await self.client.set(key, value)
return True
except Exception as e:
logger.error(f"Redis SET failed for {key}: {e}")
return False
async def set_json(
self,
key: str,
value: Dict,
ttl: Optional[int] = None
) -> bool:
return await self.set(key, value, ttl)
async def set_many(
self,
mapping: Dict[str, Any],
ttl: Optional[int] = None
) -> bool:
try:
pipeline = self.client.pipeline()
for key, value in mapping.items():
if isinstance(value, dict):
value = json.dumps(value).encode('utf-8')
elif isinstance(value, str):
value = value.encode('utf-8')
if ttl:
pipeline.setex(key, ttl, value)
else:
pipeline.set(key, value)
await pipeline.execute()
return True
except Exception as e:
logger.error(f"Redis MSET failed: {e}")
return False
# ============ DELETE OPERATIONS ============
async def delete(self, key: str) -> bool:
try:
result = await self.client.delete(key)
if result > 0:
self._stats['evictions'] += 1
return result > 0
except Exception as e:
logger.error(f"Redis DELETE failed for {key}: {e}")
return False
async def delete_many(self, keys: List[str]) -> int:
try:
count = await self.client.delete(*keys)
self._stats['evictions'] += count
return count
except Exception as e:
logger.error(f"Redis DELETE many failed: {e}")
return 0
async def delete_pattern(self, pattern: str) -> int:
try:
keys = []
async for key in self.client.scan_iter(match=pattern):
keys.append(key)
if keys:
return await self.delete_many(keys)
return 0
except Exception as e:
logger.error(f"Redis DELETE pattern failed for {pattern}: {e}")
return 0
# ============ TTL OPERATIONS ============
async def ttl(self, key: str) -> int:
try:
return await self.client.ttl(key)
except Exception as e:
logger.error(f"Redis TTL failed for {key}: {e}")
return -2
async def expire(self, key: str, seconds: int) -> bool:
try:
return await self.client.expire(key, seconds)
except Exception as e:
logger.error(f"Redis EXPIRE failed for {key}: {e}")
return False
# ============ UTILITY OPERATIONS ============
async def exists(self, key: str) -> bool:
try:
return await self.client.exists(key) > 0
except Exception as e:
logger.error(f"Redis EXISTS failed for {key}: {e}")
return False
async def keys(self, pattern: str = "*") -> List[str]:
try:
keys = []
async for key in self.client.scan_iter(match=pattern):
keys.append(key.decode('utf-8') if isinstance(key, bytes) else key)
return keys
except Exception as e:
logger.error(f"Redis KEYS failed for {pattern}: {e}")
return []
async def memory_usage(self, key: str) -> int:
try:
return await self.client.memory_usage(key) or 0
except Exception as e:
logger.debug(f"Memory usage unavailable for {key}")
return 0
# ============ HASH OPERATIONS ============
async def hset(self, key: str, field: str, value: str) -> bool:
try:
await self.client.hset(key, field, value)
return True
except Exception as e:
logger.error(f"Redis HSET failed for {key}: {e}")
return False
async def hget(self, key: str, field: str) -> Optional[str]:
try:
value = await self.client.hget(key, field)
return value.decode('utf-8') if value else None
except Exception as e:
logger.error(f"Redis HGET failed for {key}.{field}: {e}")
return None
async def hgetall(self, key: str) -> Dict[str, str]:
try:
data = await self.client.hgetall(key)
return {
k.decode('utf-8'): v.decode('utf-8')
for k, v in data.items()
}
except Exception as e:
logger.error(f"Redis HGETALL failed for {key}: {e}")
return {}
# ============ SORTED SET OPERATIONS (NEW: FEATURE 2) ============
async def zadd(self, key: str, mapping: Dict[str, float]) -> int:
"""Add members to a sorted set, used for scheduling webhooks"""
try:
return await self.client.zadd(key, mapping)
except Exception as e:
logger.error(f"Redis ZADD failed for {key}: {e}")
return 0
async def zrangebyscore(self, key: str, min_val: float, max_val: float) -> List[Any]:
"""Get members in a sorted set by score (timestamp)"""
try:
return await self.client.zrangebyscore(key, min_val, max_val)
except Exception as e:
logger.error(f"Redis ZRANGEBYSCORE failed for {key}: {e}")
return []
async def zrem(self, key: str, *values) -> int:
"""Remove members from a sorted set"""
try:
return await self.client.zrem(key, *values)
except Exception as e:
logger.error(f"Redis ZREM failed for {key}: {e}")
return 0
# ============ STATISTICS ============
async def get_stats(self) -> Dict[str, Any]:
try:
info = await self.client.info('memory')
total_hits = self._stats['hits']
total_misses = self._stats['misses']
total_requests = total_hits + total_misses
return {
'total_keys': await self.client.dbsize(),
'memory_used_mb': info['used_memory'] / (1024 * 1024),
'memory_used_percent': info.get('used_memory_rss', 0) / info.get('maxmemory', 1) if info.get('maxmemory') else 0,
'hit_rate': total_hits / total_requests if total_requests > 0 else 0,
'hits': total_hits,
'misses': total_misses,
'evictions': self._stats['evictions'],
'ttl_distribution': await self._get_ttl_distribution()
}
except Exception as e:
logger.error(f"Failed to get stats: {e}")
return {}
async def _get_ttl_distribution(self) -> Dict[str, int]:
distribution = {
'0-1h': 0, '1-6h': 0, '6-24h': 0, '24h+': 0, 'no_ttl': 0
}
try:
async for key in self.client.scan_iter():
ttl = await self.client.ttl(key)
if ttl == -1: distribution['no_ttl'] += 1
elif ttl < 3600: distribution['0-1h'] += 1
elif ttl < 21600: distribution['1-6h'] += 1
elif ttl < 86400: distribution['6-24h'] += 1
else: distribution['24h+'] += 1
return distribution
except:
return distribution