""" Sheikh-Kitty Monitoring System Real-time metrics aggregation and system health monitoring Features: - API request metrics tracking - Sandbox execution monitoring - System resource monitoring - Security violation alerts - Performance analytics - Health check endpoints Author: MiniMax Agent Date: 2025-11-14 """ import json import time import psutil import threading import queue from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Optional, Any, Callable from dataclasses import dataclass, asdict from enum import Enum import logging import statistics from collections import deque, defaultdict import os # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class MetricType(Enum): """Types of metrics to track""" COUNTER = "counter" GAUGE = "gauge" HISTOGRAM = "histogram" TIMER = "timer" class AlertSeverity(Enum): """Alert severity levels""" INFO = "info" WARNING = "warning" ERROR = "error" CRITICAL = "critical" @dataclass class Metric: """Individual metric data point""" name: str value: float metric_type: MetricType timestamp: datetime labels: Dict[str, str] = None tags: List[str] = None @dataclass class Alert: """System alert""" id: str severity: AlertSeverity message: str timestamp: datetime metric_name: str threshold: float current_value: float resolved: bool = False resolved_at: Optional[datetime] = None class MetricCollector: """Collect and store metrics""" def __init__(self, max_history: int = 10000): self.max_history = max_history self.metrics = deque(maxlen=max_history) self.current_values = {} # For gauge metrics self.counters = defaultdict(float) # For counter metrics self.lock = threading.Lock() def record(self, metric: Metric): """Record a metric""" with self.lock: self.metrics.append(metric) # Update current values for gauge metrics if metric.metric_type == MetricType.GAUGE: self.current_values[metric.name] = metric.value elif metric.metric_type == MetricType.COUNTER: self.counters[metric.name] += metric.value def get_metrics(self, name: str = None, since: datetime = None) -> List[Metric]: """Get metrics by name and time range""" with self.lock: filtered_metrics = [] for metric in self.metrics: # Filter by name if name and metric.name != name: continue # Filter by time if since and metric.timestamp < since: continue filtered_metrics.append(metric) return filtered_metrics def get_current_value(self, name: str) -> Optional[float]: """Get current value for gauge metric""" with self.lock: return self.current_values.get(name) def get_counter(self, name: str) -> float: """Get counter value""" with self.lock: return self.counters.get(name, 0.0) def get_stats(self, name: str, window_minutes: int = 60) -> Dict[str, float]: """Get statistics for a metric over time window""" since = datetime.now() - timedelta(minutes=window_minutes) metrics = self.get_metrics(name, since) if not metrics: return {} values = [m.value for m in metrics] return { 'count': len(values), 'min': min(values), 'max': max(values), 'avg': statistics.mean(values), 'median': statistics.median(values), 'p95': self._percentile(values, 95), 'p99': self._percentile(values, 99), 'latest': values[-1] if values else 0.0 } def _percentile(self, values: List[float], percentile: int) -> float: """Calculate percentile""" if not values: return 0.0 sorted_values = sorted(values) index = int(len(sorted_values) * percentile / 100) return sorted_values[min(index, len(sorted_values) - 1)] class AlertManager: """Manage system alerts and notifications""" def __init__(self, storage_path: str = "logs/alerts.jsonl"): self.storage_path = Path(storage_path) self.storage_path.parent.mkdir(parents=True, exist_ok=True) self.active_alerts = {} self.alert_history = deque(maxlen=1000) self.rules = [] # Alert rules self.lock = threading.Lock() def add_rule(self, name: str, metric_name: str, threshold: float, comparison: str = "greater_than", severity: AlertSeverity = AlertSeverity.WARNING): """Add alert rule""" rule = { 'name': name, 'metric_name': metric_name, 'threshold': threshold, 'comparison': comparison, 'severity': severity, 'enabled': True } self.rules.append(rule) logger.info(f"Added alert rule: {name}") def check_alerts(self, metric_collector: MetricCollector): """Check metrics against alert rules""" for rule in self.rules: if not rule['enabled']: continue try: current_value = metric_collector.get_current_value(rule['metric_name']) if current_value is None: continue triggered = self._evaluate_condition( current_value, rule['threshold'], rule['comparison'] ) if triggered: self._trigger_alert(rule, current_value, metric_collector) else: self._resolve_alert(rule['name'], metric_collector) except Exception as e: logger.error(f"Alert check failed for {rule['name']}: {e}") def _evaluate_condition(self, value: float, threshold: float, comparison: str) -> bool: """Evaluate if condition is met""" if comparison == "greater_than": return value > threshold elif comparison == "less_than": return value < threshold elif comparison == "equals": return abs(value - threshold) < 0.001 elif comparison == "greater_equal": return value >= threshold elif comparison == "less_equal": return value <= threshold else: return False def _trigger_alert(self, rule: Dict[str, Any], current_value: float, metric_collector: MetricCollector): """Trigger an alert""" alert_id = rule['name'] # Check if alert is already active if alert_id in self.active_alerts: return # Create new alert alert = Alert( id=alert_id, severity=rule['severity'], message=f"{rule['metric_name']} is {current_value:.2f} (threshold: {rule['threshold']})", timestamp=datetime.now(), metric_name=rule['metric_name'], threshold=rule['threshold'], current_value=current_value ) with self.lock: self.active_alerts[alert_id] = alert self.alert_history.append(alert) self._save_alert(alert) logger.warning(f"Alert triggered: {alert.message}") def _resolve_alert(self, alert_id: str, metric_collector: MetricCollector): """Resolve an active alert""" if alert_id not in self.active_alerts: return with self.lock: alert = self.active_alerts[alert_id] alert.resolved = True alert.resolved_at = datetime.now() # Move to history del self.active_alerts[alert_id] self._save_alert(alert) logger.info(f"Alert resolved: {alert_id}") def _save_alert(self, alert: Alert): """Save alert to persistent storage""" try: with open(self.storage_path, 'a') as f: alert_data = asdict(alert) alert_data['timestamp'] = alert.timestamp.isoformat() if alert.resolved_at: alert_data['resolved_at'] = alert.resolved_at.isoformat() f.write(json.dumps(alert_data) + '\n') except Exception as e: logger.error(f"Failed to save alert: {e}") def get_active_alerts(self) -> List[Alert]: """Get currently active alerts""" with self.lock: return list(self.active_alerts.values()) def get_alert_history(self, limit: int = 100) -> List[Alert]: """Get alert history""" with self.lock: return list(self.alert_history)[-limit:] class SystemMonitor: """Monitor system resources and health""" def __init__(self, check_interval: int = 30): self.check_interval = check_interval self.running = False self.monitor_thread = None # System thresholds self.thresholds = { 'cpu_usage': 80.0, # % 'memory_usage': 85.0, # % 'disk_usage': 90.0, # % 'temperature': 70.0, # Celsius 'load_average': 2.0 # per CPU core } def start(self, metric_collector: MetricCollector): """Start system monitoring""" if self.running: return self.running = True self.monitor_thread = threading.Thread( target=self._monitor_loop, args=(metric_collector,), daemon=True ) self.monitor_thread.start() logger.info("System monitoring started") def stop(self): """Stop system monitoring""" self.running = False if self.monitor_thread: self.monitor_thread.join() logger.info("System monitoring stopped") def _monitor_loop(self, metric_collector: MetricCollector): """Main monitoring loop""" while self.running: try: self._collect_system_metrics(metric_collector) time.sleep(self.check_interval) except Exception as e: logger.error(f"System monitoring error: {e}") time.sleep(5) # Brief pause on error def _collect_system_metrics(self, metric_collector: MetricCollector): """Collect system resource metrics""" timestamp = datetime.now() try: # CPU metrics cpu_percent = psutil.cpu_percent(interval=1) cpu_count = psutil.cpu_count() load_avg = psutil.getloadavg()[0] if hasattr(psutil, 'getloadavg') else 0.0 metric_collector.record(Metric( name="system.cpu.usage", value=cpu_percent, metric_type=MetricType.GAUGE, timestamp=timestamp, labels={"core": "total"} )) metric_collector.record(Metric( name="system.cpu.count", value=cpu_count, metric_type=MetricType.GAUGE, timestamp=timestamp )) if load_avg > 0: metric_collector.record(Metric( name="system.load.average", value=load_avg, metric_type=MetricType.GAUGE, timestamp=timestamp )) # Memory metrics memory = psutil.virtual_memory() metric_collector.record(Metric( name="system.memory.usage", value=memory.percent, metric_type=MetricType.GAUGE, timestamp=timestamp )) metric_collector.record(Metric( name="system.memory.available", value=memory.available / (1024**3), # GB metric_type=MetricType.GAUGE, timestamp=timestamp )) # Disk metrics disk = psutil.disk_usage('/') metric_collector.record(Metric( name="system.disk.usage", value=(disk.used / disk.total) * 100, metric_type=MetricType.GAUGE, timestamp=timestamp )) # Network metrics (if available) try: network = psutil.net_io_counters() metric_collector.record(Metric( name="system.network.bytes_sent", value=network.bytes_sent, metric_type=MetricType.COUNTER, timestamp=timestamp )) metric_collector.record(Metric( name="system.network.bytes_recv", value=network.bytes_recv, metric_type=MetricType.COUNTER, timestamp=timestamp )) except: pass # Process metrics process_count = len(psutil.pids()) metric_collector.record(Metric( name="system.processes.count", value=process_count, metric_type=MetricType.GAUGE, timestamp=timestamp )) except Exception as e: logger.error(f"Failed to collect system metrics: {e}") class APIMonitor: """Monitor API performance and usage""" def __init__(self): self.request_times = deque(maxlen=1000) self.endpoint_stats = defaultdict(list) self.error_counts = defaultdict(int) self.lock = threading.Lock() def record_request(self, endpoint: str, response_time: float, status_code: int): """Record API request metrics""" timestamp = datetime.now() with self.lock: self.request_times.append({ 'timestamp': timestamp, 'endpoint': endpoint, 'response_time': response_time, 'status_code': status_code }) self.endpoint_stats[endpoint].append(response_time) if status_code >= 400: self.error_counts[endpoint] += 1 def get_api_stats(self, window_minutes: int = 60) -> Dict[str, Any]: """Get API statistics""" since = datetime.now() - timedelta(minutes=window_minutes) with self.lock: recent_requests = [ req for req in self.request_times if req['timestamp'] >= since ] if not recent_requests: return {} response_times = [req['response_time'] for req in recent_requests] error_requests = [req for req in recent_requests if req['status_code'] >= 400] return { 'total_requests': len(recent_requests), 'error_requests': len(error_requests), 'error_rate': len(error_requests) / len(recent_requests), 'avg_response_time': statistics.mean(response_times), 'p95_response_time': self._percentile(response_times, 95), 'endpoints': { endpoint: { 'count': len(times), 'avg_time': statistics.mean(times), 'errors': self.error_counts.get(endpoint, 0) } for endpoint, times in self.endpoint_stats.items() if any(req['endpoint'] == endpoint for req in recent_requests) } } def _percentile(self, values: List[float], percentile: int) -> float: """Calculate percentile""" if not values: return 0.0 sorted_values = sorted(values) index = int(len(sorted_values) * percentile / 100) return sorted_values[min(index, len(sorted_values) - 1)] class MonitoringDashboard: """Real-time monitoring dashboard""" def __init__(self, data_dir: str = "logs"): self.data_dir = Path(data_dir) self.data_dir.mkdir(exist_ok=True) self.metric_collector = MetricCollector() self.alert_manager = AlertManager(str(self.data_dir / "alerts.jsonl")) self.system_monitor = SystemMonitor() self.api_monitor = APIMonitor() # Setup default alert rules self._setup_default_alerts() self.running = False self.dashboard_thread = None def _setup_default_alerts(self): """Setup default alert rules""" # High CPU usage self.alert_manager.add_rule( name="high_cpu_usage", metric_name="system.cpu.usage", threshold=80.0, comparison="greater_than", severity=AlertSeverity.WARNING ) # High memory usage self.alert_manager.add_rule( name="high_memory_usage", metric_name="system.memory.usage", threshold=85.0, comparison="greater_than", severity=AlertSeverity.WARNING ) # High disk usage self.alert_manager.add_rule( name="high_disk_usage", metric_name="system.disk.usage", threshold=90.0, comparison="greater_than", severity=AlertSeverity.CRITICAL ) # High API response time self.alert_manager.add_rule( name="high_api_response_time", metric_name="api.response.time", threshold=5.0, comparison="greater_than", severity=AlertSeverity.WARNING ) # High error rate self.alert_manager.add_rule( name="high_error_rate", metric_name="api.error.rate", threshold=0.1, # 10% comparison="greater_than", severity=AlertSeverity.ERROR ) def start(self): """Start monitoring dashboard""" if self.running: return self.running = True # Start system monitoring self.system_monitor.start(self.metric_collector) # Start dashboard update thread self.dashboard_thread = threading.Thread( target=self._dashboard_loop, daemon=True ) self.dashboard_thread.start() logger.info("Monitoring dashboard started") def stop(self): """Stop monitoring dashboard""" self.running = False self.system_monitor.stop() if self.dashboard_thread: self.dashboard_thread.join() logger.info("Monitoring dashboard stopped") def _dashboard_loop(self): """Main dashboard update loop""" while self.running: try: # Update metrics self._update_api_metrics() # Check alerts self.alert_manager.check_alerts(self.metric_collector) # Save dashboard state self._save_dashboard_state() time.sleep(30) # Update every 30 seconds except Exception as e: logger.error(f"Dashboard update error: {e}") time.sleep(10) def _update_api_metrics(self): """Update API-related metrics""" timestamp = datetime.now() # Get API stats api_stats = self.api_monitor.get_api_stats(window_minutes=5) if 'avg_response_time' in api_stats: self.metric_collector.record(Metric( name="api.response.time", value=api_stats['avg_response_time'], metric_type=MetricType.GAUGE, timestamp=timestamp )) if 'error_rate' in api_stats: self.metric_collector.record(Metric( name="api.error.rate", value=api_stats['error_rate'], metric_type=MetricType.GAUGE, timestamp=timestamp )) def _save_dashboard_state(self): """Save current dashboard state to file""" try: state = { 'timestamp': datetime.now().isoformat(), 'active_alerts': [ asdict(alert) for alert in self.alert_manager.get_active_alerts() ], 'system_metrics': { name: self.metric_collector.get_current_value(name) for name in [ 'system.cpu.usage', 'system.memory.usage', 'system.disk.usage' ] }, 'api_stats': self.api_monitor.get_api_stats() } # Convert datetime objects for alert in state['active_alerts']: alert['timestamp'] = alert['timestamp'].isoformat() if alert['resolved_at']: alert['resolved_at'] = alert['resolved_at'].isoformat() state_file = self.data_dir / "dashboard_state.json" with open(state_file, 'w') as f: json.dump(state, f, indent=2) except Exception as e: logger.error(f"Failed to save dashboard state: {e}") def record_api_request(self, endpoint: str, response_time: float, status_code: int): """Record API request for monitoring""" self.api_monitor.record_request(endpoint, response_time, status_code) def get_dashboard_data(self) -> Dict[str, Any]: """Get current dashboard data""" return { 'active_alerts': [ asdict(alert) for alert in self.alert_manager.get_active_alerts() ], 'system_health': { 'cpu_usage': self.metric_collector.get_current_value('system.cpu.usage'), 'memory_usage': self.metric_collector.get_current_value('system.memory.usage'), 'disk_usage': self.metric_collector.get_current_value('system.disk.usage'), }, 'api_performance': self.api_monitor.get_api_stats(), 'recent_alerts': self.alert_manager.get_alert_history(limit=10) } def export_metrics(self, format: str = "json", hours: int = 24) -> str: """Export metrics in specified format""" since = datetime.now() - timedelta(hours=hours) if format.lower() == "json": metrics_data = { 'export_timestamp': datetime.now().isoformat(), 'time_range': f"last_{hours}_hours", 'metrics': [ { 'name': metric.name, 'value': metric.value, 'timestamp': metric.timestamp.isoformat(), 'labels': metric.labels, 'type': metric.metric_type.value } for metric in self.metric_collector.get_metrics(since=since) ] } return json.dumps(metrics_data, indent=2) else: raise ValueError(f"Unsupported export format: {format}") # Global dashboard instance monitoring_dashboard = MonitoringDashboard() # Utility functions def test_monitoring_system(): """Test the monitoring system""" print("Testing monitoring system...") dashboard = MonitoringDashboard() # Record some test metrics dashboard.record_api_request('/generate', 1.5, 200) dashboard.record_api_request('/generate', 2.1, 200) dashboard.record_api_request('/generate', 0.8, 500) # Get dashboard data data = dashboard.get_dashboard_data() print(f"Active alerts: {len(data['active_alerts'])}") print(f"API performance: {data['api_performance']}") # Export metrics exported = dashboard.export_metrics(format="json", hours=1) print(f"Exported metrics: {len(exported)} characters") print("Monitoring system test complete") if __name__ == "__main__": # Create logs directory Path("logs").mkdir(exist_ok=True) # Test monitoring functionality test_monitoring_system() print("\nMonitoring system ready for integration")