File size: 16,395 Bytes
50a7bf0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
"""

Cache monitoring and metrics collection for Redis caching strategies.



This module provides comprehensive monitoring, alerting, and analytics

for cache performance and usage patterns.

"""

import asyncio
import logging
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from dataclasses import dataclass, asdict
from collections import defaultdict, deque

from .redis import redis_manager, RedisKeyManager
from .cache import cache_manager, CacheConfig
from .config import get_settings

logger = logging.getLogger(__name__)
settings = get_settings()


@dataclass
class CacheMetrics:
    """Data class for cache metrics."""
    timestamp: datetime
    hits: int
    misses: int
    sets: int
    deletes: int
    invalidations: int
    hit_rate: float
    memory_usage: int
    key_count: int
    avg_ttl: float
    top_keys: List[str]
    slow_operations: List[Dict[str, Any]]


@dataclass
class CacheAlert:
    """Data class for cache alerts."""
    alert_type: str
    severity: str  # low, medium, high, critical
    message: str
    timestamp: datetime
    metrics: Dict[str, Any]
    threshold: Optional[float] = None
    current_value: Optional[float] = None


class CacheMonitor:
    """Advanced cache monitoring with alerting and analytics."""
    
    def __init__(self):
        self._metrics_history = deque(maxlen=1000)  # Keep last 1000 metric snapshots
        self._alerts = deque(maxlen=100)  # Keep last 100 alerts
        self._slow_operations = deque(maxlen=50)  # Track slow operations
        self._key_access_patterns = defaultdict(int)
        self._monitoring_enabled = True
        
        # Alert thresholds
        self._thresholds = {
            "hit_rate_low": 70.0,  # Alert if hit rate below 70%
            "memory_usage_high": 80.0,  # Alert if memory usage above 80%
            "slow_operation_threshold": 1.0,  # Alert if operation takes > 1 second
            "error_rate_high": 5.0,  # Alert if error rate above 5%
            "key_count_high": 10000,  # Alert if key count above 10k
        }
    
    async def collect_metrics(self) -> CacheMetrics:
        """

        Collect comprehensive cache metrics.

        

        Returns:

            CacheMetrics object with current metrics

        """
        try:
            redis_client = redis_manager.redis
            
            # Get basic cache stats
            cache_stats = cache_manager.get_stats()
            
            # Get Redis memory info
            redis_info = await redis_client.info("memory")
            memory_usage = redis_info.get("used_memory", 0)
            
            # Get key count and sample keys
            key_count = await redis_client.dbsize()
            sample_keys = await redis_client.keys("cache:*", count=10)
            
            # Calculate average TTL for sample keys
            avg_ttl = await self._calculate_avg_ttl(sample_keys)
            
            # Get top accessed keys
            top_keys = await self._get_top_keys()
            
            # Get recent slow operations
            slow_operations = list(self._slow_operations)
            
            metrics = CacheMetrics(
                timestamp=datetime.utcnow(),
                hits=cache_stats["hits"],
                misses=cache_stats["misses"],
                sets=cache_stats["sets"],
                deletes=cache_stats["deletes"],
                invalidations=cache_stats["invalidations"],
                hit_rate=cache_stats["hit_rate"],
                memory_usage=memory_usage,
                key_count=key_count,
                avg_ttl=avg_ttl,
                top_keys=top_keys,
                slow_operations=slow_operations
            )
            
            # Store metrics in history
            self._metrics_history.append(metrics)
            
            # Check for alerts
            await self._check_alerts(metrics)
            
            return metrics
            
        except Exception as e:
            logger.error(f"Failed to collect cache metrics: {e}")
            # Return empty metrics on error
            return CacheMetrics(
                timestamp=datetime.utcnow(),
                hits=0, misses=0, sets=0, deletes=0, invalidations=0,
                hit_rate=0.0, memory_usage=0, key_count=0, avg_ttl=0.0,
                top_keys=[], slow_operations=[]
            )
    
    async def _calculate_avg_ttl(self, keys: List[str]) -> float:
        """Calculate average TTL for a sample of keys."""
        if not keys:
            return 0.0
        
        try:
            redis_client = redis_manager.redis
            ttls = []
            
            for key in keys:
                ttl = await redis_client.ttl(key)
                if ttl > 0:  # Only include keys with positive TTL
                    ttls.append(ttl)
            
            return sum(ttls) / len(ttls) if ttls else 0.0
            
        except Exception as e:
            logger.error(f"Failed to calculate average TTL: {e}")
            return 0.0
    
    async def _get_top_keys(self, limit: int = 10) -> List[str]:
        """Get top accessed keys based on access patterns."""
        try:
            # Sort by access count and return top keys
            sorted_keys = sorted(
                self._key_access_patterns.items(),
                key=lambda x: x[1],
                reverse=True
            )
            return [key for key, _ in sorted_keys[:limit]]
            
        except Exception as e:
            logger.error(f"Failed to get top keys: {e}")
            return []
    
    async def _check_alerts(self, metrics: CacheMetrics) -> None:
        """Check metrics against thresholds and generate alerts."""
        alerts = []
        
        # Check hit rate
        if metrics.hit_rate < self._thresholds["hit_rate_low"]:
            alerts.append(CacheAlert(
                alert_type="low_hit_rate",
                severity="medium",
                message=f"Cache hit rate is low: {metrics.hit_rate}%",
                timestamp=datetime.utcnow(),
                metrics={"hit_rate": metrics.hit_rate},
                threshold=self._thresholds["hit_rate_low"],
                current_value=metrics.hit_rate
            ))
        
        # Check memory usage (if we can get max memory)
        try:
            redis_client = redis_manager.redis
            redis_config = await redis_client.config_get("maxmemory")
            max_memory = int(redis_config.get("maxmemory", 0))
            
            if max_memory > 0:
                memory_usage_percent = (metrics.memory_usage / max_memory) * 100
                if memory_usage_percent > self._thresholds["memory_usage_high"]:
                    alerts.append(CacheAlert(
                        alert_type="high_memory_usage",
                        severity="high",
                        message=f"Cache memory usage is high: {memory_usage_percent:.1f}%",
                        timestamp=datetime.utcnow(),
                        metrics={"memory_usage_percent": memory_usage_percent},
                        threshold=self._thresholds["memory_usage_high"],
                        current_value=memory_usage_percent
                    ))
        except Exception as e:
            logger.debug(f"Could not check memory usage: {e}")
        
        # Check key count
        if metrics.key_count > self._thresholds["key_count_high"]:
            alerts.append(CacheAlert(
                alert_type="high_key_count",
                severity="medium",
                message=f"High number of cache keys: {metrics.key_count}",
                timestamp=datetime.utcnow(),
                metrics={"key_count": metrics.key_count},
                threshold=self._thresholds["key_count_high"],
                current_value=metrics.key_count
            ))
        
        # Check for slow operations
        recent_slow_ops = [
            op for op in metrics.slow_operations
            if datetime.fromisoformat(op["timestamp"]) > datetime.utcnow() - timedelta(minutes=5)
        ]
        
        if len(recent_slow_ops) > 3:  # More than 3 slow operations in 5 minutes
            alerts.append(CacheAlert(
                alert_type="frequent_slow_operations",
                severity="medium",
                message=f"Frequent slow cache operations detected: {len(recent_slow_ops)} in last 5 minutes",
                timestamp=datetime.utcnow(),
                metrics={"slow_operations_count": len(recent_slow_ops)},
                threshold=3,
                current_value=len(recent_slow_ops)
            ))
        
        # Store alerts
        for alert in alerts:
            self._alerts.append(alert)
            logger.warning(f"Cache alert: {alert.message}")
    
    def track_key_access(self, key: str) -> None:
        """Track key access patterns for analytics."""
        if self._monitoring_enabled:
            self._key_access_patterns[key] += 1
    
    def track_slow_operation(

        self,

        operation: str,

        duration: float,

        key: str = None,

        details: Dict[str, Any] = None

    ) -> None:
        """Track slow cache operations."""
        if duration > self._thresholds["slow_operation_threshold"]:
            slow_op = {
                "operation": operation,
                "duration": duration,
                "key": key,
                "details": details or {},
                "timestamp": datetime.utcnow().isoformat()
            }
            self._slow_operations.append(slow_op)
            logger.warning(f"Slow cache operation: {operation} took {duration:.2f}s")
    
    def get_metrics_history(

        self,

        hours: int = 24,

        limit: int = 100

    ) -> List[CacheMetrics]:
        """

        Get historical metrics data.

        

        Args:

            hours: Number of hours of history to return

            limit: Maximum number of data points

            

        Returns:

            List of historical metrics

        """
        cutoff_time = datetime.utcnow() - timedelta(hours=hours)
        
        filtered_metrics = [
            metrics for metrics in self._metrics_history
            if metrics.timestamp > cutoff_time
        ]
        
        # Return most recent metrics if we have more than limit
        if len(filtered_metrics) > limit:
            return filtered_metrics[-limit:]
        
        return filtered_metrics
    
    def get_alerts(

        self,

        severity: Optional[str] = None,

        hours: int = 24,

        limit: int = 50

    ) -> List[CacheAlert]:
        """

        Get recent alerts.

        

        Args:

            severity: Filter by severity level

            hours: Number of hours of history

            limit: Maximum number of alerts

            

        Returns:

            List of alerts

        """
        cutoff_time = datetime.utcnow() - timedelta(hours=hours)
        
        filtered_alerts = [
            alert for alert in self._alerts
            if alert.timestamp > cutoff_time
        ]
        
        if severity:
            filtered_alerts = [
                alert for alert in filtered_alerts
                if alert.severity == severity
            ]
        
        # Return most recent alerts
        if len(filtered_alerts) > limit:
            return filtered_alerts[-limit:]
        
        return filtered_alerts
    
    def get_performance_summary(self) -> Dict[str, Any]:
        """Get performance summary and recommendations."""
        if not self._metrics_history:
            return {"error": "No metrics data available"}
        
        recent_metrics = list(self._metrics_history)[-10:]  # Last 10 snapshots
        
        # Calculate averages
        avg_hit_rate = sum(m.hit_rate for m in recent_metrics) / len(recent_metrics)
        avg_memory = sum(m.memory_usage for m in recent_metrics) / len(recent_metrics)
        avg_keys = sum(m.key_count for m in recent_metrics) / len(recent_metrics)
        
        # Generate recommendations
        recommendations = []
        
        if avg_hit_rate < 80:
            recommendations.append("Consider increasing cache TTL for frequently accessed data")
        
        if avg_keys > 5000:
            recommendations.append("Consider implementing cache key cleanup policies")
        
        if len(self._slow_operations) > 10:
            recommendations.append("Investigate slow cache operations and optimize queries")
        
        # Get top cache patterns
        top_patterns = {}
        for key in self._key_access_patterns:
            pattern = key.split(':')[1] if ':' in key else 'unknown'
            top_patterns[pattern] = top_patterns.get(pattern, 0) + self._key_access_patterns[key]
        
        return {
            "performance_summary": {
                "avg_hit_rate": round(avg_hit_rate, 2),
                "avg_memory_usage": int(avg_memory),
                "avg_key_count": int(avg_keys),
                "total_slow_operations": len(self._slow_operations),
                "active_alerts": len([a for a in self._alerts if a.timestamp > datetime.utcnow() - timedelta(hours=1)])
            },
            "top_cache_patterns": dict(sorted(top_patterns.items(), key=lambda x: x[1], reverse=True)[:5]),
            "recommendations": recommendations,
            "timestamp": datetime.utcnow().isoformat()
        }
    
    def set_threshold(self, threshold_name: str, value: float) -> bool:
        """Update alert threshold."""
        if threshold_name in self._thresholds:
            self._thresholds[threshold_name] = value
            logger.info(f"Updated threshold {threshold_name} to {value}")
            return True
        return False
    
    def enable_monitoring(self) -> None:
        """Enable cache monitoring."""
        self._monitoring_enabled = True
        logger.info("Cache monitoring enabled")
    
    def disable_monitoring(self) -> None:
        """Disable cache monitoring."""
        self._monitoring_enabled = False
        logger.info("Cache monitoring disabled")
    
    def reset_metrics(self) -> None:
        """Reset all metrics and history."""
        self._metrics_history.clear()
        self._alerts.clear()
        self._slow_operations.clear()
        self._key_access_patterns.clear()
        cache_manager.reset_stats()
        logger.info("Cache metrics reset")


# Global cache monitor instance
cache_monitor = CacheMonitor()


# Monitoring utilities
async def start_metrics_collection(interval: int = 60) -> None:
    """

    Start periodic metrics collection.

    

    Args:

        interval: Collection interval in seconds

    """
    logger.info(f"Starting cache metrics collection with {interval}s interval")
    
    while True:
        try:
            await cache_monitor.collect_metrics()
            await asyncio.sleep(interval)
        except Exception as e:
            logger.error(f"Metrics collection error: {e}")
            await asyncio.sleep(interval)


async def generate_cache_report() -> Dict[str, Any]:
    """Generate comprehensive cache performance report."""
    try:
        current_metrics = await cache_monitor.collect_metrics()
        performance_summary = cache_monitor.get_performance_summary()
        recent_alerts = cache_monitor.get_alerts(hours=24)
        
        return {
            "report_timestamp": datetime.utcnow().isoformat(),
            "current_metrics": asdict(current_metrics),
            "performance_summary": performance_summary,
            "recent_alerts": [asdict(alert) for alert in recent_alerts],
            "cache_health": "healthy" if current_metrics.hit_rate > 70 else "needs_attention"
        }
        
    except Exception as e:
        logger.error(f"Failed to generate cache report: {e}")
        return {
            "error": str(e),
            "report_timestamp": datetime.utcnow().isoformat()
        }