tsunami / src /tsuwave /database /redis_cache.py
Gitdeeper4's picture
رفع جميع ملفات TSU-WAVE مع YAML
12834b7
"""Redis cache for real-time data"""
import redis.asyncio as redis
import json
from typing import Optional, Dict, Any, List
import logging
from datetime import datetime, timedelta
logger = logging.getLogger(__name__)
class RedisCache:
"""Redis cache handler for real-time data"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.client = None
self.default_ttl = 3600 # 1 hour default TTL
async def connect(self):
"""Connect to Redis"""
try:
self.client = await redis.from_url(
f"redis://{self.config['host']}:{self.config['port']}",
password=self.config.get('password'),
db=self.config.get('db', 0),
decode_responses=True
)
await self.client.ping()
logger.info("Connected to Redis")
except Exception as e:
logger.error(f"Redis connection failed: {e}")
raise
async def disconnect(self):
"""Disconnect from Redis"""
if self.client:
await self.client.close()
logger.info("Disconnected from Redis")
# Parameter caching
async def cache_parameters(self, zone: str, parameters: Dict, ttl: int = 300):
"""Cache parameter data for a zone"""
key = f"params:{zone}"
data = {
'timestamp': datetime.utcnow().isoformat(),
'parameters': parameters
}
await self.client.setex(key, ttl, json.dumps(data))
async def get_cached_parameters(self, zone: str) -> Optional[Dict]:
"""Get cached parameters for a zone"""
key = f"params:{zone}"
data = await self.client.get(key)
if data:
return json.loads(data)
return None
# CHI caching
async def cache_chi(self, zone: str, chi_data: Dict, ttl: int = 300):
"""Cache CHI data for a zone"""
key = f"chi:{zone}"
data = {
'timestamp': datetime.utcnow().isoformat(),
'chi': chi_data
}
await self.client.setex(key, ttl, json.dumps(data))
async def get_cached_chi(self, zone: str) -> Optional[Dict]:
"""Get cached CHI for a zone"""
key = f"chi:{zone}"
data = await self.client.get(key)
if data:
return json.loads(data)
return None
# Event caching
async def cache_event(self, event_id: str, event_data: Dict, ttl: int = 3600):
"""Cache event data"""
key = f"event:{event_id}"
await self.client.setex(key, ttl, json.dumps(event_data))
async def get_cached_event(self, event_id: str) -> Optional[Dict]:
"""Get cached event data"""
key = f"event:{event_id}"
data = await self.client.get(key)
if data:
return json.loads(data)
return None
async def cache_active_events(self, events: List[Dict], ttl: int = 300):
"""Cache list of active events"""
key = "events:active"
await self.client.setex(key, ttl, json.dumps(events))
async def get_cached_active_events(self) -> Optional[List[Dict]]:
"""Get cached active events"""
key = "events:active"
data = await self.client.get(key)
if data:
return json.loads(data)
return None
# Alert caching
async def cache_alert(self, alert_id: str, alert_data: Dict, ttl: int = 3600):
"""Cache alert data"""
key = f"alert:{alert_id}"
await self.client.setex(key, ttl, json.dumps(alert_data))
async def get_cached_alert(self, alert_id: str) -> Optional[Dict]:
"""Get cached alert data"""
key = f"alert:{alert_id}"
data = await self.client.get(key)
if data:
return json.loads(data)
return None
async def cache_active_alerts(self, alerts: List[Dict], ttl: int = 300):
"""Cache list of active alerts"""
key = "alerts:active"
await self.client.setex(key, ttl, json.dumps(alerts))
async def get_cached_active_alerts(self) -> Optional[List[Dict]]:
"""Get cached active alerts"""
key = "alerts:active"
data = await self.client.get(key)
if data:
return json.loads(data)
return None
# BECF caching (static, long TTL)
async def cache_becf_map(self, zone: str, becf_value: float, ttl: int = 86400):
"""Cache BECF value for a zone (24 hour TTL)"""
key = f"becf:{zone}"
await self.client.setex(key, ttl, json.dumps({
'zone': zone,
'becf': becf_value,
'cached_at': datetime.utcnow().isoformat()
}))
async def get_cached_becf(self, zone: str) -> Optional[float]:
"""Get cached BECF value"""
key = f"becf:{zone}"
data = await self.client.get(key)
if data:
return json.loads(data)['becf']
return None
# Rate limiting
async def check_rate_limit(self, key: str, limit: int, window: int = 60) -> bool:
"""Check rate limit for a key
Args:
key: rate limit key
limit: maximum requests per window
window: time window in seconds
Returns:
True if under limit, False if exceeded
"""
current = await self.client.incr(key)
if current == 1:
await self.client.expire(key, window)
return current <= limit
async def get_rate_limit_remaining(self, key: str, limit: int) -> int:
"""Get remaining rate limit"""
current = await self.client.get(key)
if current is None:
return limit
return max(0, limit - int(current))
# Pub/Sub for real-time updates
async def publish_update(self, channel: str, data: Dict):
"""Publish update to a channel"""
await self.client.publish(channel, json.dumps({
'timestamp': datetime.utcnow().isoformat(),
'data': data
}))
async def subscribe(self, channel: str):
"""Subscribe to a channel"""
pubsub = self.client.pubsub()
await pubsub.subscribe(channel)
return pubsub
# Cache management
async def clear_zone_cache(self, zone: str):
"""Clear all cache entries for a zone"""
patterns = [
f"params:{zone}",
f"chi:{zone}",
f"becf:{zone}"
]
for pattern in patterns:
await self.client.delete(pattern)
async def clear_all_cache(self):
"""Clear all cache entries"""
await self.client.flushdb()
# Health check
async def health_check(self) -> bool:
"""Check Redis health"""
try:
return await self.client.ping()
except:
return False
# Cache statistics
async def get_stats(self) -> Dict:
"""Get cache statistics"""
info = await self.client.info()
return {
'connected_clients': info.get('connected_clients', 0),
'used_memory_human': info.get('used_memory_human', '0'),
'total_connections_received': info.get('total_connections_received', 0),
'total_commands_processed': info.get('total_commands_processed', 0),
'keyspace_hits': info.get('keyspace_hits', 0),
'keyspace_misses': info.get('keyspace_misses', 0),
'uptime_days': info.get('uptime_in_days', 0)
}