File size: 19,618 Bytes
2ed8996
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
"""
Queue Monitor for AegisLM Framework

Production-ready Celery queue monitoring with bottleneck detection,
auto-scaling recommendations, and comprehensive performance tracking.
"""

import asyncio
import time
import logging
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from collections import deque
from dataclasses import dataclass
import json

logger = logging.getLogger(__name__)


@dataclass
class QueueMetrics:
    """Queue performance metrics."""
    queue_name: str
    current_length: int
    max_length: int
    avg_processing_time: float
    throughput_per_minute: float
    error_rate: float
    worker_count: int
    last_updated: datetime


@dataclass
class BottleneckAlert:
    """Bottleneck alert information."""
    queue_name: str
    severity: str  # "low", "medium", "high", "critical"
    message: str
    current_length: int
    threshold: int
    recommended_workers: int
    timestamp: datetime


class QueueMonitor:
    """
    Monitor Celery queue health and performance with auto-scaling recommendations.
    
    Provides bottleneck detection, performance tracking, and intelligent
    scaling recommendations for optimal queue processing.
    """
    
    def __init__(self, celery_app, monitoring_interval: int = 30, bottleneck_threshold: int = 100):
        """
        Initialize queue monitor.
        
        Args:
            celery_app: Celery application instance
            monitoring_interval: Monitoring interval in seconds
            bottleneck_threshold: Queue length threshold for bottleneck detection
        """
        self.celery_app = celery_app
        self.monitoring_interval = monitoring_interval
        self.bottleneck_threshold = bottleneck_threshold
        self.queue_metrics: Dict[str, QueueMetrics] = {}
        self.bottleneck_history: deque = deque(maxlen=100)
        self._monitoring_task = None
        self._stats = {
            'total_checks': 0,
            'bottlenecks_detected': 0,
            'auto_scaling_recommendations': 0,
            'alerts_sent': 0
        }
        self._scaling_rules = {
            'evaluation': {'threshold': 50, 'max_workers': 8, 'scale_factor': 2},
            'benchmark': {'threshold': 30, 'max_workers': 6, 'scale_factor': 1.5},
            'default': {'threshold': 100, 'max_workers': 4, 'scale_factor': 2}
        }
    
    async def start_monitoring(self, interval: Optional[int] = None):
        """
        Start queue monitoring.
        
        Args:
            interval: Override default monitoring interval
        """
        if self._monitoring_task is None:
            monitor_interval = interval or self.monitoring_interval
            self._monitoring_task = asyncio.create_task(self._monitor_queues(monitor_interval))
            logger.info(f"Queue monitoring started with interval: {monitor_interval}s")
    
    async def stop_monitoring(self):
        """Stop queue monitoring."""
        if self._monitoring_task:
            self._monitoring_task.cancel()
            try:
                await self._monitoring_task
            except asyncio.CancelledError:
                pass
            self._monitoring_task = None
            logger.info("Queue monitoring stopped")
    
    async def _monitor_queues(self, interval: int):
        """Monitor queue statistics and detect bottlenecks."""
        while True:
            try:
                await asyncio.sleep(interval)
                await self.update_queue_stats()
                await self._detect_bottlenecks()
            except asyncio.CancelledError:
                break
            except Exception as e:
                logger.error(f"Queue monitoring error: {e}")
    
    async def update_queue_stats(self):
        """Update queue statistics and performance metrics."""
        try:
            # Get queue inspector
            inspector = self.celery_app.control.inspect()
            
            # Get active queues and stats
            active_queues = inspector.active_queues()
            stats = inspector.stats()
            
            if not active_queues or not stats:
                logger.warning("Unable to get queue statistics")
                return
            
            # Process each queue
            for worker_name, worker_info in (stats or {}).items():
                queue_names = self._extract_queue_names(worker_info)
                
                for queue_name in queue_names:
                    await self._update_single_queue_stats(queue_name, active_queues, stats)
            
            self._stats['total_checks'] += 1
            
        except Exception as e:
            logger.error(f"Failed to update queue stats: {e}")
    
    async def _update_single_queue_stats(self, queue_name: str, active_queues: Dict, stats: Dict):
        """Update statistics for a single queue."""
        try:
            # Get current queue length
            current_length = await self._get_queue_length(queue_name)
            
            # Get worker count
            worker_count = self._get_worker_count_for_queue(queue_name, stats)
            
            # Calculate throughput and processing time
            throughput, processing_time = await self._calculate_performance_metrics(queue_name)
            
            # Get error rate
            error_rate = await self._calculate_error_rate(queue_name)
            
            # Get or create metrics
            if queue_name not in self.queue_metrics:
                self.queue_metrics[queue_name] = QueueMetrics(
                    queue_name=queue_name,
                    current_length=0,
                    max_length=0,
                    avg_processing_time=0.0,
                    throughput_per_minute=0.0,
                    error_rate=0.0,
                    worker_count=0,
                    last_updated=datetime.utcnow()
                )
            
            # Update metrics
            metrics = self.queue_metrics[queue_name]
            metrics.current_length = current_length
            metrics.max_length = max(metrics.max_length, current_length)
            metrics.avg_processing_time = processing_time
            metrics.throughput_per_minute = throughput
            metrics.error_rate = error_rate
            metrics.worker_count = worker_count
            metrics.last_updated = datetime.utcnow()
            
        except Exception as e:
            logger.error(f"Failed to update stats for queue {queue_name}: {e}")
    
    async def _get_queue_length(self, queue_name: str) -> int:
        """Get current queue length."""
        try:
            with self.celery_app.pool.acquire() as conn:
                queue_length = conn.default_channel.client.llen(queue_name)
                return queue_length
        except Exception as e:
            logger.warning(f"Failed to get queue length for {queue_name}: {e}")
            return 0
    
    def _extract_queue_names(self, worker_info: Dict) -> List[str]:
        """Extract queue names from worker info."""
        try:
            pool_info = worker_info.get('pool', {})
            if isinstance(pool_info, dict):
                return list(pool_info.keys())
            return []
        except Exception:
            return []
    
    def _get_worker_count_for_queue(self, queue_name: str, stats: Dict) -> int:
        """Get number of workers processing a specific queue."""
        try:
            worker_count = 0
            for worker_name, worker_info in (stats or {}).items():
                pool_info = worker_info.get('pool', {})
                if isinstance(pool_info, dict) and queue_name in pool_info:
                    worker_count += 1
            return worker_count
        except Exception:
            return 0
    
    async def _calculate_performance_metrics(self, queue_name: str) -> tuple[float, float]:
        """Calculate throughput and processing time metrics."""
        try:
            # This would integrate with your task tracking system
            # For now, return estimated values based on historical data
            throughput = 10.0  # tasks per minute (placeholder)
            processing_time = 5.0  # seconds per task (placeholder)
            
            # In a real implementation, you would:
            # 1. Query task completion rates
            # 2. Calculate average processing times
            # 3. Track success/failure rates
            
            return throughput, processing_time
            
        except Exception as e:
            logger.warning(f"Failed to calculate performance metrics for {queue_name}: {e}")
            return 0.0, 0.0
    
    async def _calculate_error_rate(self, queue_name: str) -> float:
        """Calculate error rate for queue."""
        try:
            # This would integrate with your error tracking system
            # For now, return estimated value
            error_rate = 0.05  # 5% error rate (placeholder)
            
            # In a real implementation, you would:
            # 1. Query failed task counts
            # 2. Calculate error rate based on total tasks
            # 3. Track error trends
            
            return error_rate
            
        except Exception as e:
            logger.warning(f"Failed to calculate error rate for {queue_name}: {e}")
            return 0.0
    
    async def _detect_bottlenecks(self):
        """Detect queue bottlenecks and trigger alerts."""
        for queue_name, metrics in self.queue_metrics.items():
            # Get threshold for this queue type
            rule = self._scaling_rules.get(queue_name, self._scaling_rules['default'])
            threshold = rule['threshold']
            
            # Check for bottleneck
            if metrics.current_length > threshold:
                await self._handle_queue_bottleneck(queue_name, metrics, threshold)
    
    async def _handle_queue_bottleneck(self, queue_name: str, metrics: QueueMetrics, threshold: int):
        """Handle queue bottleneck situation."""
        # Determine severity
        severity = self._calculate_bottleneck_severity(metrics.current_length, threshold)
        
        # Calculate recommended workers
        recommended_workers = self._calculate_recommended_workers(queue_name, metrics.current_length)
        
        # Create alert
        alert = BottleneckAlert(
            queue_name=queue_name,
            severity=severity,
            message=f"Queue bottleneck detected: {queue_name} has {metrics.current_length} pending tasks",
            current_length=metrics.current_length,
            threshold=threshold,
            recommended_workers=recommended_workers,
            timestamp=datetime.utcnow()
        )
        
        # Store alert
        self.bottleneck_history.append(alert)
        self._stats['bottlenecks_detected'] += 1
        
        # Log alert
        logger.warning(f"Queue bottleneck alert: {alert}")
        
        # Send notification
        await self._send_bottleneck_alert(alert)
        
        # Attempt auto-scaling if configured
        await self._attempt_auto_scaling(queue_name, recommended_workers)
    
    def _calculate_bottleneck_severity(self, current_length: int, threshold: int) -> str:
        """Calculate bottleneck severity based on queue length."""
        ratio = current_length / threshold
        
        if ratio >= 3.0:
            return "critical"
        elif ratio >= 2.0:
            return "high"
        elif ratio >= 1.5:
            return "medium"
        else:
            return "low"
    
    def _calculate_recommended_workers(self, queue_name: str, current_length: int) -> int:
        """Calculate recommended number of workers."""
        rule = self._scaling_rules.get(queue_name, self._scaling_rules['default'])
        max_workers = rule['max_workers']
        scale_factor = rule['scale_factor']
        
        # Calculate based on current load
        current_workers = self.queue_metrics.get(queue_name, QueueMetrics(queue_name, 0, 0, 0, 0, 0, 0, datetime.utcnow())).worker_count
        
        # Scale based on queue length
        if current_length > rule['threshold']:
            recommended = int(current_workers * scale_factor)
            return min(recommended, max_workers)
        
        return current_workers
    
    async def _send_bottleneck_alert(self, alert: BottleneckAlert):
        """Send bottleneck alert notification."""
        try:
            # This would integrate with your notification system
            # For now, just log the alert
            alert_data = {
                'type': 'queue_bottleneck',
                'queue_name': alert.queue_name,
                'severity': alert.severity,
                'current_length': alert.current_length,
                'threshold': alert.threshold,
                'recommended_workers': alert.recommended_workers,
                'timestamp': alert.timestamp.isoformat()
            }
            
            logger.error(f"Queue bottleneck alert: {alert_data}")
            self._stats['alerts_sent'] += 1
            
        except Exception as e:
            logger.error(f"Failed to send bottleneck alert: {e}")
    
    async def _attempt_auto_scaling(self, queue_name: str, recommended_workers: int):
        """Attempt automatic scaling of workers."""
        try:
            # This would integrate with your container orchestration or worker management system
            # For now, just log the recommendation
            
            current_workers = self.queue_metrics.get(queue_name, QueueMetrics(queue_name, 0, 0, 0, 0, 0, 0, datetime.utcnow())).worker_count
            
            if recommended_workers > current_workers:
                logger.info(f"Auto-scaling recommendation for {queue_name}: {current_workers} -> {recommended_workers} workers")
                self._stats['auto_scaling_recommendations'] += 1
                
                # In a real implementation, you would:
                # 1. Call your orchestration API (Kubernetes, Docker Swarm, etc.)
                # 2. Scale worker pods/containers
                # 3. Monitor scaling progress
                # 4. Update worker count in metrics
            
        except Exception as e:
            logger.error(f"Failed to attempt auto-scaling for {queue_name}: {e}")
    
    async def get_queue_health(self) -> Dict[str, Any]:
        """Get overall queue health status."""
        if not self.queue_metrics:
            return {
                'status': 'no_data',
                'timestamp': datetime.utcnow().isoformat(),
                'total_queues': 0
            }
        
        total_pending = sum(metrics.current_length for metrics in self.queue_metrics.values())
        bottlenecks = [
            queue_name for queue_name, metrics in self.queue_metrics.items()
            if metrics.current_length > self._scaling_rules.get(queue_name, self._scaling_rules['default'])['threshold']
        ]
        
        avg_throughput = sum(metrics.throughput_per_minute for metrics in self.queue_metrics.values()) / len(self.queue_metrics)
        avg_error_rate = sum(metrics.error_rate for metrics in self.queue_metrics.values()) / len(self.queue_metrics)
        
        return {
            'status': 'degraded' if bottlenecks else 'healthy',
            'total_queues': len(self.queue_metrics),
            'total_pending_tasks': total_pending,
            'bottlenecks': bottlenecks,
            'average_throughput': avg_throughput,
            'average_error_rate': avg_error_rate,
            'monitoring_stats': self._stats.copy(),
            'timestamp': datetime.utcnow().isoformat()
        }
    
    async def get_queue_details(self, queue_name: str) -> Optional[Dict[str, Any]]:
        """Get detailed information for a specific queue."""
        if queue_name not in self.queue_metrics:
            return None
        
        metrics = self.queue_metrics[queue_name]
        rule = self._scaling_rules.get(queue_name, self._scaling_rules['default'])
        
        return {
            'queue_name': metrics.queue_name,
            'current_length': metrics.current_length,
            'max_length': metrics.max_length,
            'worker_count': metrics.worker_count,
            'throughput_per_minute': metrics.throughput_per_minute,
            'avg_processing_time': metrics.avg_processing_time,
            'error_rate': metrics.error_rate,
            'threshold': rule['threshold'],
            'max_workers': rule['max_workers'],
            'utilization': (metrics.current_length / rule['threshold']) * 100 if rule['threshold'] > 0 else 0,
            'last_updated': metrics.last_updated.isoformat()
        }
    
    async def get_bottleneck_history(self, limit: int = 50) -> List[Dict[str, Any]]:
        """Get recent bottleneck history."""
        history = list(self.bottleneck_history)[-limit:]
        return [
            {
                'queue_name': alert.queue_name,
                'severity': alert.severity,
                'message': alert.message,
                'current_length': alert.current_length,
                'threshold': alert.threshold,
                'recommended_workers': alert.recommended_workers,
                'timestamp': alert.timestamp.isoformat()
            }
            for alert in history
        ]
    
    async def force_queue_check(self) -> Dict[str, Any]:
        """Force immediate queue check."""
        try:
            await self.update_queue_stats()
            await self._detect_bottlenecks()
            
            return {
                'status': 'completed',
                'queues_checked': len(self.queue_metrics),
                'bottlenecks_found': len([
                    queue_name for queue_name, metrics in self.queue_metrics.items()
                    if metrics.current_length > self._scaling_rules.get(queue_name, self._scaling_rules['default'])['threshold']
                ]),
                'timestamp': datetime.utcnow().isoformat()
            }
            
        except Exception as e:
            return {
                'status': 'failed',
                'error': str(e),
                'timestamp': datetime.utcnow().isoformat()
            }
    
    def update_scaling_rule(self, queue_name: str, threshold: int, max_workers: int, scale_factor: float):
        """Update scaling rule for a queue."""
        self._scaling_rules[queue_name] = {
            'threshold': threshold,
            'max_workers': max_workers,
            'scale_factor': scale_factor
        }
        logger.info(f"Updated scaling rule for {queue_name}: threshold={threshold}, max_workers={max_workers}")
    
    async def reset_statistics(self):
        """Reset monitoring statistics."""
        self._stats = {
            'total_checks': 0,
            'bottlenecks_detected': 0,
            'auto_scaling_recommendations': 0,
            'alerts_sent': 0
        }
        self.bottleneck_history.clear()
        logger.info("Queue monitoring statistics reset")


# Factory function
def create_queue_monitor(celery_app, monitoring_interval: int = 30, bottleneck_threshold: int = 100) -> QueueMonitor:
    """
    Create a queue monitor instance.
    
    Args:
        celery_app: Celery application instance
        monitoring_interval: Monitoring interval in seconds
        bottleneck_threshold: Default bottleneck threshold
        
    Returns:
        QueueMonitor: Configured monitor
    """
    return QueueMonitor(celery_app, monitoring_interval, bottleneck_threshold)