Spaces:
Sleeping
Sleeping
| """ | |
| Sheikh-Kitty Monitoring System | |
| Real-time metrics aggregation and system health monitoring | |
| Features: | |
| - API request metrics tracking | |
| - Sandbox execution monitoring | |
| - System resource monitoring | |
| - Security violation alerts | |
| - Performance analytics | |
| - Health check endpoints | |
| Author: MiniMax Agent | |
| Date: 2025-11-14 | |
| """ | |
| import json | |
| import time | |
| import psutil | |
| import threading | |
| import queue | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from typing import Dict, List, Optional, Any, Callable | |
| from dataclasses import dataclass, asdict | |
| from enum import Enum | |
| import logging | |
| import statistics | |
| from collections import deque, defaultdict | |
| import os | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| class MetricType(Enum): | |
| """Types of metrics to track""" | |
| COUNTER = "counter" | |
| GAUGE = "gauge" | |
| HISTOGRAM = "histogram" | |
| TIMER = "timer" | |
| class AlertSeverity(Enum): | |
| """Alert severity levels""" | |
| INFO = "info" | |
| WARNING = "warning" | |
| ERROR = "error" | |
| CRITICAL = "critical" | |
| class Metric: | |
| """Individual metric data point""" | |
| name: str | |
| value: float | |
| metric_type: MetricType | |
| timestamp: datetime | |
| labels: Dict[str, str] = None | |
| tags: List[str] = None | |
| class Alert: | |
| """System alert""" | |
| id: str | |
| severity: AlertSeverity | |
| message: str | |
| timestamp: datetime | |
| metric_name: str | |
| threshold: float | |
| current_value: float | |
| resolved: bool = False | |
| resolved_at: Optional[datetime] = None | |
| class MetricCollector: | |
| """Collect and store metrics""" | |
| def __init__(self, max_history: int = 10000): | |
| self.max_history = max_history | |
| self.metrics = deque(maxlen=max_history) | |
| self.current_values = {} # For gauge metrics | |
| self.counters = defaultdict(float) # For counter metrics | |
| self.lock = threading.Lock() | |
| def record(self, metric: Metric): | |
| """Record a metric""" | |
| with self.lock: | |
| self.metrics.append(metric) | |
| # Update current values for gauge metrics | |
| if metric.metric_type == MetricType.GAUGE: | |
| self.current_values[metric.name] = metric.value | |
| elif metric.metric_type == MetricType.COUNTER: | |
| self.counters[metric.name] += metric.value | |
| def get_metrics(self, name: str = None, since: datetime = None) -> List[Metric]: | |
| """Get metrics by name and time range""" | |
| with self.lock: | |
| filtered_metrics = [] | |
| for metric in self.metrics: | |
| # Filter by name | |
| if name and metric.name != name: | |
| continue | |
| # Filter by time | |
| if since and metric.timestamp < since: | |
| continue | |
| filtered_metrics.append(metric) | |
| return filtered_metrics | |
| def get_current_value(self, name: str) -> Optional[float]: | |
| """Get current value for gauge metric""" | |
| with self.lock: | |
| return self.current_values.get(name) | |
| def get_counter(self, name: str) -> float: | |
| """Get counter value""" | |
| with self.lock: | |
| return self.counters.get(name, 0.0) | |
| def get_stats(self, name: str, window_minutes: int = 60) -> Dict[str, float]: | |
| """Get statistics for a metric over time window""" | |
| since = datetime.now() - timedelta(minutes=window_minutes) | |
| metrics = self.get_metrics(name, since) | |
| if not metrics: | |
| return {} | |
| values = [m.value for m in metrics] | |
| return { | |
| 'count': len(values), | |
| 'min': min(values), | |
| 'max': max(values), | |
| 'avg': statistics.mean(values), | |
| 'median': statistics.median(values), | |
| 'p95': self._percentile(values, 95), | |
| 'p99': self._percentile(values, 99), | |
| 'latest': values[-1] if values else 0.0 | |
| } | |
| def _percentile(self, values: List[float], percentile: int) -> float: | |
| """Calculate percentile""" | |
| if not values: | |
| return 0.0 | |
| sorted_values = sorted(values) | |
| index = int(len(sorted_values) * percentile / 100) | |
| return sorted_values[min(index, len(sorted_values) - 1)] | |
| class AlertManager: | |
| """Manage system alerts and notifications""" | |
| def __init__(self, storage_path: str = "logs/alerts.jsonl"): | |
| self.storage_path = Path(storage_path) | |
| self.storage_path.parent.mkdir(parents=True, exist_ok=True) | |
| self.active_alerts = {} | |
| self.alert_history = deque(maxlen=1000) | |
| self.rules = [] # Alert rules | |
| self.lock = threading.Lock() | |
| def add_rule(self, name: str, metric_name: str, threshold: float, | |
| comparison: str = "greater_than", severity: AlertSeverity = AlertSeverity.WARNING): | |
| """Add alert rule""" | |
| rule = { | |
| 'name': name, | |
| 'metric_name': metric_name, | |
| 'threshold': threshold, | |
| 'comparison': comparison, | |
| 'severity': severity, | |
| 'enabled': True | |
| } | |
| self.rules.append(rule) | |
| logger.info(f"Added alert rule: {name}") | |
| def check_alerts(self, metric_collector: MetricCollector): | |
| """Check metrics against alert rules""" | |
| for rule in self.rules: | |
| if not rule['enabled']: | |
| continue | |
| try: | |
| current_value = metric_collector.get_current_value(rule['metric_name']) | |
| if current_value is None: | |
| continue | |
| triggered = self._evaluate_condition( | |
| current_value, rule['threshold'], rule['comparison'] | |
| ) | |
| if triggered: | |
| self._trigger_alert(rule, current_value, metric_collector) | |
| else: | |
| self._resolve_alert(rule['name'], metric_collector) | |
| except Exception as e: | |
| logger.error(f"Alert check failed for {rule['name']}: {e}") | |
| def _evaluate_condition(self, value: float, threshold: float, comparison: str) -> bool: | |
| """Evaluate if condition is met""" | |
| if comparison == "greater_than": | |
| return value > threshold | |
| elif comparison == "less_than": | |
| return value < threshold | |
| elif comparison == "equals": | |
| return abs(value - threshold) < 0.001 | |
| elif comparison == "greater_equal": | |
| return value >= threshold | |
| elif comparison == "less_equal": | |
| return value <= threshold | |
| else: | |
| return False | |
| def _trigger_alert(self, rule: Dict[str, Any], current_value: float, | |
| metric_collector: MetricCollector): | |
| """Trigger an alert""" | |
| alert_id = rule['name'] | |
| # Check if alert is already active | |
| if alert_id in self.active_alerts: | |
| return | |
| # Create new alert | |
| alert = Alert( | |
| id=alert_id, | |
| severity=rule['severity'], | |
| message=f"{rule['metric_name']} is {current_value:.2f} (threshold: {rule['threshold']})", | |
| timestamp=datetime.now(), | |
| metric_name=rule['metric_name'], | |
| threshold=rule['threshold'], | |
| current_value=current_value | |
| ) | |
| with self.lock: | |
| self.active_alerts[alert_id] = alert | |
| self.alert_history.append(alert) | |
| self._save_alert(alert) | |
| logger.warning(f"Alert triggered: {alert.message}") | |
| def _resolve_alert(self, alert_id: str, metric_collector: MetricCollector): | |
| """Resolve an active alert""" | |
| if alert_id not in self.active_alerts: | |
| return | |
| with self.lock: | |
| alert = self.active_alerts[alert_id] | |
| alert.resolved = True | |
| alert.resolved_at = datetime.now() | |
| # Move to history | |
| del self.active_alerts[alert_id] | |
| self._save_alert(alert) | |
| logger.info(f"Alert resolved: {alert_id}") | |
| def _save_alert(self, alert: Alert): | |
| """Save alert to persistent storage""" | |
| try: | |
| with open(self.storage_path, 'a') as f: | |
| alert_data = asdict(alert) | |
| alert_data['timestamp'] = alert.timestamp.isoformat() | |
| if alert.resolved_at: | |
| alert_data['resolved_at'] = alert.resolved_at.isoformat() | |
| f.write(json.dumps(alert_data) + '\n') | |
| except Exception as e: | |
| logger.error(f"Failed to save alert: {e}") | |
| def get_active_alerts(self) -> List[Alert]: | |
| """Get currently active alerts""" | |
| with self.lock: | |
| return list(self.active_alerts.values()) | |
| def get_alert_history(self, limit: int = 100) -> List[Alert]: | |
| """Get alert history""" | |
| with self.lock: | |
| return list(self.alert_history)[-limit:] | |
| class SystemMonitor: | |
| """Monitor system resources and health""" | |
| def __init__(self, check_interval: int = 30): | |
| self.check_interval = check_interval | |
| self.running = False | |
| self.monitor_thread = None | |
| # System thresholds | |
| self.thresholds = { | |
| 'cpu_usage': 80.0, # % | |
| 'memory_usage': 85.0, # % | |
| 'disk_usage': 90.0, # % | |
| 'temperature': 70.0, # Celsius | |
| 'load_average': 2.0 # per CPU core | |
| } | |
| def start(self, metric_collector: MetricCollector): | |
| """Start system monitoring""" | |
| if self.running: | |
| return | |
| self.running = True | |
| self.monitor_thread = threading.Thread( | |
| target=self._monitor_loop, | |
| args=(metric_collector,), | |
| daemon=True | |
| ) | |
| self.monitor_thread.start() | |
| logger.info("System monitoring started") | |
| def stop(self): | |
| """Stop system monitoring""" | |
| self.running = False | |
| if self.monitor_thread: | |
| self.monitor_thread.join() | |
| logger.info("System monitoring stopped") | |
| def _monitor_loop(self, metric_collector: MetricCollector): | |
| """Main monitoring loop""" | |
| while self.running: | |
| try: | |
| self._collect_system_metrics(metric_collector) | |
| time.sleep(self.check_interval) | |
| except Exception as e: | |
| logger.error(f"System monitoring error: {e}") | |
| time.sleep(5) # Brief pause on error | |
| def _collect_system_metrics(self, metric_collector: MetricCollector): | |
| """Collect system resource metrics""" | |
| timestamp = datetime.now() | |
| try: | |
| # CPU metrics | |
| cpu_percent = psutil.cpu_percent(interval=1) | |
| cpu_count = psutil.cpu_count() | |
| load_avg = psutil.getloadavg()[0] if hasattr(psutil, 'getloadavg') else 0.0 | |
| metric_collector.record(Metric( | |
| name="system.cpu.usage", | |
| value=cpu_percent, | |
| metric_type=MetricType.GAUGE, | |
| timestamp=timestamp, | |
| labels={"core": "total"} | |
| )) | |
| metric_collector.record(Metric( | |
| name="system.cpu.count", | |
| value=cpu_count, | |
| metric_type=MetricType.GAUGE, | |
| timestamp=timestamp | |
| )) | |
| if load_avg > 0: | |
| metric_collector.record(Metric( | |
| name="system.load.average", | |
| value=load_avg, | |
| metric_type=MetricType.GAUGE, | |
| timestamp=timestamp | |
| )) | |
| # Memory metrics | |
| memory = psutil.virtual_memory() | |
| metric_collector.record(Metric( | |
| name="system.memory.usage", | |
| value=memory.percent, | |
| metric_type=MetricType.GAUGE, | |
| timestamp=timestamp | |
| )) | |
| metric_collector.record(Metric( | |
| name="system.memory.available", | |
| value=memory.available / (1024**3), # GB | |
| metric_type=MetricType.GAUGE, | |
| timestamp=timestamp | |
| )) | |
| # Disk metrics | |
| disk = psutil.disk_usage('/') | |
| metric_collector.record(Metric( | |
| name="system.disk.usage", | |
| value=(disk.used / disk.total) * 100, | |
| metric_type=MetricType.GAUGE, | |
| timestamp=timestamp | |
| )) | |
| # Network metrics (if available) | |
| try: | |
| network = psutil.net_io_counters() | |
| metric_collector.record(Metric( | |
| name="system.network.bytes_sent", | |
| value=network.bytes_sent, | |
| metric_type=MetricType.COUNTER, | |
| timestamp=timestamp | |
| )) | |
| metric_collector.record(Metric( | |
| name="system.network.bytes_recv", | |
| value=network.bytes_recv, | |
| metric_type=MetricType.COUNTER, | |
| timestamp=timestamp | |
| )) | |
| except: | |
| pass | |
| # Process metrics | |
| process_count = len(psutil.pids()) | |
| metric_collector.record(Metric( | |
| name="system.processes.count", | |
| value=process_count, | |
| metric_type=MetricType.GAUGE, | |
| timestamp=timestamp | |
| )) | |
| except Exception as e: | |
| logger.error(f"Failed to collect system metrics: {e}") | |
| class APIMonitor: | |
| """Monitor API performance and usage""" | |
| def __init__(self): | |
| self.request_times = deque(maxlen=1000) | |
| self.endpoint_stats = defaultdict(list) | |
| self.error_counts = defaultdict(int) | |
| self.lock = threading.Lock() | |
| def record_request(self, endpoint: str, response_time: float, status_code: int): | |
| """Record API request metrics""" | |
| timestamp = datetime.now() | |
| with self.lock: | |
| self.request_times.append({ | |
| 'timestamp': timestamp, | |
| 'endpoint': endpoint, | |
| 'response_time': response_time, | |
| 'status_code': status_code | |
| }) | |
| self.endpoint_stats[endpoint].append(response_time) | |
| if status_code >= 400: | |
| self.error_counts[endpoint] += 1 | |
| def get_api_stats(self, window_minutes: int = 60) -> Dict[str, Any]: | |
| """Get API statistics""" | |
| since = datetime.now() - timedelta(minutes=window_minutes) | |
| with self.lock: | |
| recent_requests = [ | |
| req for req in self.request_times | |
| if req['timestamp'] >= since | |
| ] | |
| if not recent_requests: | |
| return {} | |
| response_times = [req['response_time'] for req in recent_requests] | |
| error_requests = [req for req in recent_requests if req['status_code'] >= 400] | |
| return { | |
| 'total_requests': len(recent_requests), | |
| 'error_requests': len(error_requests), | |
| 'error_rate': len(error_requests) / len(recent_requests), | |
| 'avg_response_time': statistics.mean(response_times), | |
| 'p95_response_time': self._percentile(response_times, 95), | |
| 'endpoints': { | |
| endpoint: { | |
| 'count': len(times), | |
| 'avg_time': statistics.mean(times), | |
| 'errors': self.error_counts.get(endpoint, 0) | |
| } | |
| for endpoint, times in self.endpoint_stats.items() | |
| if any(req['endpoint'] == endpoint for req in recent_requests) | |
| } | |
| } | |
| def _percentile(self, values: List[float], percentile: int) -> float: | |
| """Calculate percentile""" | |
| if not values: | |
| return 0.0 | |
| sorted_values = sorted(values) | |
| index = int(len(sorted_values) * percentile / 100) | |
| return sorted_values[min(index, len(sorted_values) - 1)] | |
| class MonitoringDashboard: | |
| """Real-time monitoring dashboard""" | |
| def __init__(self, data_dir: str = "logs"): | |
| self.data_dir = Path(data_dir) | |
| self.data_dir.mkdir(exist_ok=True) | |
| self.metric_collector = MetricCollector() | |
| self.alert_manager = AlertManager(str(self.data_dir / "alerts.jsonl")) | |
| self.system_monitor = SystemMonitor() | |
| self.api_monitor = APIMonitor() | |
| # Setup default alert rules | |
| self._setup_default_alerts() | |
| self.running = False | |
| self.dashboard_thread = None | |
| def _setup_default_alerts(self): | |
| """Setup default alert rules""" | |
| # High CPU usage | |
| self.alert_manager.add_rule( | |
| name="high_cpu_usage", | |
| metric_name="system.cpu.usage", | |
| threshold=80.0, | |
| comparison="greater_than", | |
| severity=AlertSeverity.WARNING | |
| ) | |
| # High memory usage | |
| self.alert_manager.add_rule( | |
| name="high_memory_usage", | |
| metric_name="system.memory.usage", | |
| threshold=85.0, | |
| comparison="greater_than", | |
| severity=AlertSeverity.WARNING | |
| ) | |
| # High disk usage | |
| self.alert_manager.add_rule( | |
| name="high_disk_usage", | |
| metric_name="system.disk.usage", | |
| threshold=90.0, | |
| comparison="greater_than", | |
| severity=AlertSeverity.CRITICAL | |
| ) | |
| # High API response time | |
| self.alert_manager.add_rule( | |
| name="high_api_response_time", | |
| metric_name="api.response.time", | |
| threshold=5.0, | |
| comparison="greater_than", | |
| severity=AlertSeverity.WARNING | |
| ) | |
| # High error rate | |
| self.alert_manager.add_rule( | |
| name="high_error_rate", | |
| metric_name="api.error.rate", | |
| threshold=0.1, # 10% | |
| comparison="greater_than", | |
| severity=AlertSeverity.ERROR | |
| ) | |
| def start(self): | |
| """Start monitoring dashboard""" | |
| if self.running: | |
| return | |
| self.running = True | |
| # Start system monitoring | |
| self.system_monitor.start(self.metric_collector) | |
| # Start dashboard update thread | |
| self.dashboard_thread = threading.Thread( | |
| target=self._dashboard_loop, | |
| daemon=True | |
| ) | |
| self.dashboard_thread.start() | |
| logger.info("Monitoring dashboard started") | |
| def stop(self): | |
| """Stop monitoring dashboard""" | |
| self.running = False | |
| self.system_monitor.stop() | |
| if self.dashboard_thread: | |
| self.dashboard_thread.join() | |
| logger.info("Monitoring dashboard stopped") | |
| def _dashboard_loop(self): | |
| """Main dashboard update loop""" | |
| while self.running: | |
| try: | |
| # Update metrics | |
| self._update_api_metrics() | |
| # Check alerts | |
| self.alert_manager.check_alerts(self.metric_collector) | |
| # Save dashboard state | |
| self._save_dashboard_state() | |
| time.sleep(30) # Update every 30 seconds | |
| except Exception as e: | |
| logger.error(f"Dashboard update error: {e}") | |
| time.sleep(10) | |
| def _update_api_metrics(self): | |
| """Update API-related metrics""" | |
| timestamp = datetime.now() | |
| # Get API stats | |
| api_stats = self.api_monitor.get_api_stats(window_minutes=5) | |
| if 'avg_response_time' in api_stats: | |
| self.metric_collector.record(Metric( | |
| name="api.response.time", | |
| value=api_stats['avg_response_time'], | |
| metric_type=MetricType.GAUGE, | |
| timestamp=timestamp | |
| )) | |
| if 'error_rate' in api_stats: | |
| self.metric_collector.record(Metric( | |
| name="api.error.rate", | |
| value=api_stats['error_rate'], | |
| metric_type=MetricType.GAUGE, | |
| timestamp=timestamp | |
| )) | |
| def _save_dashboard_state(self): | |
| """Save current dashboard state to file""" | |
| try: | |
| state = { | |
| 'timestamp': datetime.now().isoformat(), | |
| 'active_alerts': [ | |
| asdict(alert) for alert in self.alert_manager.get_active_alerts() | |
| ], | |
| 'system_metrics': { | |
| name: self.metric_collector.get_current_value(name) | |
| for name in [ | |
| 'system.cpu.usage', | |
| 'system.memory.usage', | |
| 'system.disk.usage' | |
| ] | |
| }, | |
| 'api_stats': self.api_monitor.get_api_stats() | |
| } | |
| # Convert datetime objects | |
| for alert in state['active_alerts']: | |
| alert['timestamp'] = alert['timestamp'].isoformat() | |
| if alert['resolved_at']: | |
| alert['resolved_at'] = alert['resolved_at'].isoformat() | |
| state_file = self.data_dir / "dashboard_state.json" | |
| with open(state_file, 'w') as f: | |
| json.dump(state, f, indent=2) | |
| except Exception as e: | |
| logger.error(f"Failed to save dashboard state: {e}") | |
| def record_api_request(self, endpoint: str, response_time: float, status_code: int): | |
| """Record API request for monitoring""" | |
| self.api_monitor.record_request(endpoint, response_time, status_code) | |
| def get_dashboard_data(self) -> Dict[str, Any]: | |
| """Get current dashboard data""" | |
| return { | |
| 'active_alerts': [ | |
| asdict(alert) for alert in self.alert_manager.get_active_alerts() | |
| ], | |
| 'system_health': { | |
| 'cpu_usage': self.metric_collector.get_current_value('system.cpu.usage'), | |
| 'memory_usage': self.metric_collector.get_current_value('system.memory.usage'), | |
| 'disk_usage': self.metric_collector.get_current_value('system.disk.usage'), | |
| }, | |
| 'api_performance': self.api_monitor.get_api_stats(), | |
| 'recent_alerts': self.alert_manager.get_alert_history(limit=10) | |
| } | |
| def export_metrics(self, format: str = "json", hours: int = 24) -> str: | |
| """Export metrics in specified format""" | |
| since = datetime.now() - timedelta(hours=hours) | |
| if format.lower() == "json": | |
| metrics_data = { | |
| 'export_timestamp': datetime.now().isoformat(), | |
| 'time_range': f"last_{hours}_hours", | |
| 'metrics': [ | |
| { | |
| 'name': metric.name, | |
| 'value': metric.value, | |
| 'timestamp': metric.timestamp.isoformat(), | |
| 'labels': metric.labels, | |
| 'type': metric.metric_type.value | |
| } | |
| for metric in self.metric_collector.get_metrics(since=since) | |
| ] | |
| } | |
| return json.dumps(metrics_data, indent=2) | |
| else: | |
| raise ValueError(f"Unsupported export format: {format}") | |
| # Global dashboard instance | |
| monitoring_dashboard = MonitoringDashboard() | |
| # Utility functions | |
| def test_monitoring_system(): | |
| """Test the monitoring system""" | |
| print("Testing monitoring system...") | |
| dashboard = MonitoringDashboard() | |
| # Record some test metrics | |
| dashboard.record_api_request('/generate', 1.5, 200) | |
| dashboard.record_api_request('/generate', 2.1, 200) | |
| dashboard.record_api_request('/generate', 0.8, 500) | |
| # Get dashboard data | |
| data = dashboard.get_dashboard_data() | |
| print(f"Active alerts: {len(data['active_alerts'])}") | |
| print(f"API performance: {data['api_performance']}") | |
| # Export metrics | |
| exported = dashboard.export_metrics(format="json", hours=1) | |
| print(f"Exported metrics: {len(exported)} characters") | |
| print("Monitoring system test complete") | |
| if __name__ == "__main__": | |
| # Create logs directory | |
| Path("logs").mkdir(exist_ok=True) | |
| # Test monitoring functionality | |
| test_monitoring_system() | |
| print("\nMonitoring system ready for integration") |