Spaces:
Sleeping
Sleeping
| """Redis cache for real-time data""" | |
| import redis.asyncio as redis | |
| import json | |
| from typing import Optional, Dict, Any, List | |
| import logging | |
| from datetime import datetime, timedelta | |
| logger = logging.getLogger(__name__) | |
| class RedisCache: | |
| """Redis cache handler for real-time data""" | |
| def __init__(self, config: Dict[str, Any]): | |
| self.config = config | |
| self.client = None | |
| self.default_ttl = 3600 # 1 hour default TTL | |
| async def connect(self): | |
| """Connect to Redis""" | |
| try: | |
| self.client = await redis.from_url( | |
| f"redis://{self.config['host']}:{self.config['port']}", | |
| password=self.config.get('password'), | |
| db=self.config.get('db', 0), | |
| decode_responses=True | |
| ) | |
| await self.client.ping() | |
| logger.info("Connected to Redis") | |
| except Exception as e: | |
| logger.error(f"Redis connection failed: {e}") | |
| raise | |
| async def disconnect(self): | |
| """Disconnect from Redis""" | |
| if self.client: | |
| await self.client.close() | |
| logger.info("Disconnected from Redis") | |
| # Parameter caching | |
| async def cache_parameters(self, zone: str, parameters: Dict, ttl: int = 300): | |
| """Cache parameter data for a zone""" | |
| key = f"params:{zone}" | |
| data = { | |
| 'timestamp': datetime.utcnow().isoformat(), | |
| 'parameters': parameters | |
| } | |
| await self.client.setex(key, ttl, json.dumps(data)) | |
| async def get_cached_parameters(self, zone: str) -> Optional[Dict]: | |
| """Get cached parameters for a zone""" | |
| key = f"params:{zone}" | |
| data = await self.client.get(key) | |
| if data: | |
| return json.loads(data) | |
| return None | |
| # CHI caching | |
| async def cache_chi(self, zone: str, chi_data: Dict, ttl: int = 300): | |
| """Cache CHI data for a zone""" | |
| key = f"chi:{zone}" | |
| data = { | |
| 'timestamp': datetime.utcnow().isoformat(), | |
| 'chi': chi_data | |
| } | |
| await self.client.setex(key, ttl, json.dumps(data)) | |
| async def get_cached_chi(self, zone: str) -> Optional[Dict]: | |
| """Get cached CHI for a zone""" | |
| key = f"chi:{zone}" | |
| data = await self.client.get(key) | |
| if data: | |
| return json.loads(data) | |
| return None | |
| # Event caching | |
| async def cache_event(self, event_id: str, event_data: Dict, ttl: int = 3600): | |
| """Cache event data""" | |
| key = f"event:{event_id}" | |
| await self.client.setex(key, ttl, json.dumps(event_data)) | |
| async def get_cached_event(self, event_id: str) -> Optional[Dict]: | |
| """Get cached event data""" | |
| key = f"event:{event_id}" | |
| data = await self.client.get(key) | |
| if data: | |
| return json.loads(data) | |
| return None | |
| async def cache_active_events(self, events: List[Dict], ttl: int = 300): | |
| """Cache list of active events""" | |
| key = "events:active" | |
| await self.client.setex(key, ttl, json.dumps(events)) | |
| async def get_cached_active_events(self) -> Optional[List[Dict]]: | |
| """Get cached active events""" | |
| key = "events:active" | |
| data = await self.client.get(key) | |
| if data: | |
| return json.loads(data) | |
| return None | |
| # Alert caching | |
| async def cache_alert(self, alert_id: str, alert_data: Dict, ttl: int = 3600): | |
| """Cache alert data""" | |
| key = f"alert:{alert_id}" | |
| await self.client.setex(key, ttl, json.dumps(alert_data)) | |
| async def get_cached_alert(self, alert_id: str) -> Optional[Dict]: | |
| """Get cached alert data""" | |
| key = f"alert:{alert_id}" | |
| data = await self.client.get(key) | |
| if data: | |
| return json.loads(data) | |
| return None | |
| async def cache_active_alerts(self, alerts: List[Dict], ttl: int = 300): | |
| """Cache list of active alerts""" | |
| key = "alerts:active" | |
| await self.client.setex(key, ttl, json.dumps(alerts)) | |
| async def get_cached_active_alerts(self) -> Optional[List[Dict]]: | |
| """Get cached active alerts""" | |
| key = "alerts:active" | |
| data = await self.client.get(key) | |
| if data: | |
| return json.loads(data) | |
| return None | |
| # BECF caching (static, long TTL) | |
| async def cache_becf_map(self, zone: str, becf_value: float, ttl: int = 86400): | |
| """Cache BECF value for a zone (24 hour TTL)""" | |
| key = f"becf:{zone}" | |
| await self.client.setex(key, ttl, json.dumps({ | |
| 'zone': zone, | |
| 'becf': becf_value, | |
| 'cached_at': datetime.utcnow().isoformat() | |
| })) | |
| async def get_cached_becf(self, zone: str) -> Optional[float]: | |
| """Get cached BECF value""" | |
| key = f"becf:{zone}" | |
| data = await self.client.get(key) | |
| if data: | |
| return json.loads(data)['becf'] | |
| return None | |
| # Rate limiting | |
| async def check_rate_limit(self, key: str, limit: int, window: int = 60) -> bool: | |
| """Check rate limit for a key | |
| Args: | |
| key: rate limit key | |
| limit: maximum requests per window | |
| window: time window in seconds | |
| Returns: | |
| True if under limit, False if exceeded | |
| """ | |
| current = await self.client.incr(key) | |
| if current == 1: | |
| await self.client.expire(key, window) | |
| return current <= limit | |
| async def get_rate_limit_remaining(self, key: str, limit: int) -> int: | |
| """Get remaining rate limit""" | |
| current = await self.client.get(key) | |
| if current is None: | |
| return limit | |
| return max(0, limit - int(current)) | |
| # Pub/Sub for real-time updates | |
| async def publish_update(self, channel: str, data: Dict): | |
| """Publish update to a channel""" | |
| await self.client.publish(channel, json.dumps({ | |
| 'timestamp': datetime.utcnow().isoformat(), | |
| 'data': data | |
| })) | |
| async def subscribe(self, channel: str): | |
| """Subscribe to a channel""" | |
| pubsub = self.client.pubsub() | |
| await pubsub.subscribe(channel) | |
| return pubsub | |
| # Cache management | |
| async def clear_zone_cache(self, zone: str): | |
| """Clear all cache entries for a zone""" | |
| patterns = [ | |
| f"params:{zone}", | |
| f"chi:{zone}", | |
| f"becf:{zone}" | |
| ] | |
| for pattern in patterns: | |
| await self.client.delete(pattern) | |
| async def clear_all_cache(self): | |
| """Clear all cache entries""" | |
| await self.client.flushdb() | |
| # Health check | |
| async def health_check(self) -> bool: | |
| """Check Redis health""" | |
| try: | |
| return await self.client.ping() | |
| except: | |
| return False | |
| # Cache statistics | |
| async def get_stats(self) -> Dict: | |
| """Get cache statistics""" | |
| info = await self.client.info() | |
| return { | |
| 'connected_clients': info.get('connected_clients', 0), | |
| 'used_memory_human': info.get('used_memory_human', '0'), | |
| 'total_connections_received': info.get('total_connections_received', 0), | |
| 'total_commands_processed': info.get('total_commands_processed', 0), | |
| 'keyspace_hits': info.get('keyspace_hits', 0), | |
| 'keyspace_misses': info.get('keyspace_misses', 0), | |
| 'uptime_days': info.get('uptime_in_days', 0) | |
| } | |