"""Redis cache for real-time data""" import redis.asyncio as redis import json from typing import Optional, Dict, Any, List import logging from datetime import datetime, timedelta logger = logging.getLogger(__name__) class RedisCache: """Redis cache handler for real-time data""" def __init__(self, config: Dict[str, Any]): self.config = config self.client = None self.default_ttl = 3600 # 1 hour default TTL async def connect(self): """Connect to Redis""" try: self.client = await redis.from_url( f"redis://{self.config['host']}:{self.config['port']}", password=self.config.get('password'), db=self.config.get('db', 0), decode_responses=True ) await self.client.ping() logger.info("Connected to Redis") except Exception as e: logger.error(f"Redis connection failed: {e}") raise async def disconnect(self): """Disconnect from Redis""" if self.client: await self.client.close() logger.info("Disconnected from Redis") # Parameter caching async def cache_parameters(self, zone: str, parameters: Dict, ttl: int = 300): """Cache parameter data for a zone""" key = f"params:{zone}" data = { 'timestamp': datetime.utcnow().isoformat(), 'parameters': parameters } await self.client.setex(key, ttl, json.dumps(data)) async def get_cached_parameters(self, zone: str) -> Optional[Dict]: """Get cached parameters for a zone""" key = f"params:{zone}" data = await self.client.get(key) if data: return json.loads(data) return None # CHI caching async def cache_chi(self, zone: str, chi_data: Dict, ttl: int = 300): """Cache CHI data for a zone""" key = f"chi:{zone}" data = { 'timestamp': datetime.utcnow().isoformat(), 'chi': chi_data } await self.client.setex(key, ttl, json.dumps(data)) async def get_cached_chi(self, zone: str) -> Optional[Dict]: """Get cached CHI for a zone""" key = f"chi:{zone}" data = await self.client.get(key) if data: return json.loads(data) return None # Event caching async def cache_event(self, event_id: str, event_data: Dict, ttl: int = 3600): """Cache event data""" key = f"event:{event_id}" await self.client.setex(key, ttl, json.dumps(event_data)) async def get_cached_event(self, event_id: str) -> Optional[Dict]: """Get cached event data""" key = f"event:{event_id}" data = await self.client.get(key) if data: return json.loads(data) return None async def cache_active_events(self, events: List[Dict], ttl: int = 300): """Cache list of active events""" key = "events:active" await self.client.setex(key, ttl, json.dumps(events)) async def get_cached_active_events(self) -> Optional[List[Dict]]: """Get cached active events""" key = "events:active" data = await self.client.get(key) if data: return json.loads(data) return None # Alert caching async def cache_alert(self, alert_id: str, alert_data: Dict, ttl: int = 3600): """Cache alert data""" key = f"alert:{alert_id}" await self.client.setex(key, ttl, json.dumps(alert_data)) async def get_cached_alert(self, alert_id: str) -> Optional[Dict]: """Get cached alert data""" key = f"alert:{alert_id}" data = await self.client.get(key) if data: return json.loads(data) return None async def cache_active_alerts(self, alerts: List[Dict], ttl: int = 300): """Cache list of active alerts""" key = "alerts:active" await self.client.setex(key, ttl, json.dumps(alerts)) async def get_cached_active_alerts(self) -> Optional[List[Dict]]: """Get cached active alerts""" key = "alerts:active" data = await self.client.get(key) if data: return json.loads(data) return None # BECF caching (static, long TTL) async def cache_becf_map(self, zone: str, becf_value: float, ttl: int = 86400): """Cache BECF value for a zone (24 hour TTL)""" key = f"becf:{zone}" await self.client.setex(key, ttl, json.dumps({ 'zone': zone, 'becf': becf_value, 'cached_at': datetime.utcnow().isoformat() })) async def get_cached_becf(self, zone: str) -> Optional[float]: """Get cached BECF value""" key = f"becf:{zone}" data = await self.client.get(key) if data: return json.loads(data)['becf'] return None # Rate limiting async def check_rate_limit(self, key: str, limit: int, window: int = 60) -> bool: """Check rate limit for a key Args: key: rate limit key limit: maximum requests per window window: time window in seconds Returns: True if under limit, False if exceeded """ current = await self.client.incr(key) if current == 1: await self.client.expire(key, window) return current <= limit async def get_rate_limit_remaining(self, key: str, limit: int) -> int: """Get remaining rate limit""" current = await self.client.get(key) if current is None: return limit return max(0, limit - int(current)) # Pub/Sub for real-time updates async def publish_update(self, channel: str, data: Dict): """Publish update to a channel""" await self.client.publish(channel, json.dumps({ 'timestamp': datetime.utcnow().isoformat(), 'data': data })) async def subscribe(self, channel: str): """Subscribe to a channel""" pubsub = self.client.pubsub() await pubsub.subscribe(channel) return pubsub # Cache management async def clear_zone_cache(self, zone: str): """Clear all cache entries for a zone""" patterns = [ f"params:{zone}", f"chi:{zone}", f"becf:{zone}" ] for pattern in patterns: await self.client.delete(pattern) async def clear_all_cache(self): """Clear all cache entries""" await self.client.flushdb() # Health check async def health_check(self) -> bool: """Check Redis health""" try: return await self.client.ping() except: return False # Cache statistics async def get_stats(self) -> Dict: """Get cache statistics""" info = await self.client.info() return { 'connected_clients': info.get('connected_clients', 0), 'used_memory_human': info.get('used_memory_human', '0'), 'total_connections_received': info.get('total_connections_received', 0), 'total_commands_processed': info.get('total_commands_processed', 0), 'keyspace_hits': info.get('keyspace_hits', 0), 'keyspace_misses': info.get('keyspace_misses', 0), 'uptime_days': info.get('uptime_in_days', 0) }