import logging import time from datetime import datetime, timedelta from typing import Dict, Any, List import json import asyncio from dataclasses import dataclass, asdict import psutil from collections import deque # Configure structured logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('logs/chatbot.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) @dataclass class RequestMetric: timestamp: datetime endpoint: str response_time: float status_code: int prompt_length: int response_length: int cached: bool session_id: str class PerformanceMonitor: def __init__(self, window_size: int = 1000): """Initialize performance monitoring""" self.window_size = window_size self.request_metrics = deque(maxlen=window_size) self.start_time = datetime.now() # Real-time metrics self.metrics = { "total_requests": 0, "successful_requests": 0, "failed_requests": 0, "cache_hits": 0, "cache_misses": 0, "average_response_time": 0, "p95_response_time": 0, "p99_response_time": 0, "requests_per_minute": 0, "active_sessions": set(), "uptime_hours": 0 } # System metrics self.system_metrics = { "cpu_percent": 0, "memory_mb": 0, "memory_percent": 0, "disk_usage_percent": 0 } def log_request(self, metric: RequestMetric): """Log request metric""" self.request_metrics.append(metric) self.metrics["total_requests"] += 1 if metric.status_code == 200: self.metrics["successful_requests"] += 1 else: self.metrics["failed_requests"] += 1 if metric.cached: self.metrics["cache_hits"] += 1 else: self.metrics["cache_misses"] += 1 self.metrics["active_sessions"].add(metric.session_id) # Log to file logger.info(f"Request: {json.dumps(asdict(metric), default=str)}") # Update aggregated metrics self._update_aggregates() def _update_aggregates(self): """Update aggregated metrics""" if not self.request_metrics: return # Response time percentiles response_times = sorted([m.response_time for m in self.request_metrics]) self.metrics["average_response_time"] = sum(response_times) / len(response_times) p95_idx = int(len(response_times) * 0.95) p99_idx = int(len(response_times) * 0.99) self.metrics["p95_response_time"] = response_times[min(p95_idx, len(response_times) - 1)] self.metrics["p99_response_time"] = response_times[min(p99_idx, len(response_times) - 1)] # Requests per minute now = datetime.now() recent_requests = [ m for m in self.request_metrics if (now - m.timestamp).total_seconds() < 60 ] self.metrics["requests_per_minute"] = len(recent_requests) # Uptime self.metrics["uptime_hours"] = (now - self.start_time).total_seconds() / 3600 # Cache hit rate if self.metrics["total_requests"] > 0: self.metrics["cache_hit_rate"] = ( self.metrics["cache_hits"] / self.metrics["total_requests"] ) def update_system_metrics(self): """Update system resource metrics""" process = psutil.Process() self.system_metrics["cpu_percent"] = process.cpu_percent() self.system_metrics["memory_mb"] = process.memory_info().rss / 1024 / 1024 self.system_metrics["memory_percent"] = process.memory_percent() disk = psutil.disk_usage('/') self.system_metrics["disk_usage_percent"] = disk.percent return self.system_metrics def get_dashboard_metrics(self) -> Dict[str, Any]: """Get metrics for dashboard display""" self.update_system_metrics() return { "performance": self.metrics, "system": self.system_metrics, "health_score": self._calculate_health_score() } def _calculate_health_score(self) -> float: """Calculate overall system health score (0-100)""" score = 100.0 # Deduct for high response times if self.metrics["average_response_time"] > 5: score -= 20 elif self.metrics["average_response_time"] > 2: score -= 10 # Deduct for errors error_rate = self.metrics["failed_requests"] / max(self.metrics["total_requests"], 1) score -= error_rate * 50 # Deduct for high memory usage if self.system_metrics["memory_percent"] > 90: score -= 30 elif self.system_metrics["memory_percent"] > 70: score -= 10 # Deduct for low cache hit rate cache_hit_rate = self.metrics.get("cache_hit_rate", 0) if cache_hit_rate < 0.3: score -= 10 return max(0, min(100, score)) def generate_report(self) -> str: """Generate performance report""" report = f""" CYBERSECURITY CHATBOT PERFORMANCE REPORT ========================================= Generated: {datetime.now().isoformat()} Uptime: {self.metrics['uptime_hours']:.2f} hours REQUEST METRICS --------------- Total Requests: {self.metrics['total_requests']} Successful: {self.metrics['successful_requests']} Failed: {self.metrics['failed_requests']} Error Rate: {(self.metrics['failed_requests'] / max(self.metrics['total_requests'], 1) * 100):.2f}% PERFORMANCE ----------- Average Response Time: {self.metrics['average_response_time']:.3f}s P95 Response Time: {self.metrics['p95_response_time']:.3f}s P99 Response Time: {self.metrics['p99_response_time']:.3f}s Requests/Minute: {self.metrics['requests_per_minute']} CACHE PERFORMANCE ----------------- Cache Hits: {self.metrics['cache_hits']} Cache Misses: {self.metrics['cache_misses']} Hit Rate: {self.metrics.get('cache_hit_rate', 0) * 100:.2f}% SYSTEM RESOURCES ---------------- CPU Usage: {self.system_metrics['cpu_percent']:.1f}% Memory Usage: {self.system_metrics['memory_mb']:.2f} MB ({self.system_metrics['memory_percent']:.1f}%) Disk Usage: {self.system_metrics['disk_usage_percent']:.1f}% HEALTH SCORE: {self._calculate_health_score():.1f}/100 """ return report # Alert system class AlertManager: def __init__(self, webhook_url: str = None): """Initialize alert manager""" self.webhook_url = webhook_url self.alert_thresholds = { "response_time": 5.0, # seconds "error_rate": 0.1, # 10% "memory_percent": 85, "cpu_percent": 90 } self.alert_history = deque(maxlen=100) self.last_alert_time = {} def check_alerts(self, metrics: Dict[str, Any]): """Check if any alerts should be triggered""" alerts = [] # Check response time if metrics["performance"]["average_response_time"] > self.alert_thresholds["response_time"]: alerts.append({ "level": "warning", "type": "response_time", "message": f"High response time: {metrics['performance']['average_response_time']:.2f}s" }) # Check error rate error_rate = metrics["performance"]["failed_requests"] / max(metrics["performance"]["total_requests"], 1) if error_rate > self.alert_thresholds["error_rate"]: alerts.append({ "level": "critical", "type": "error_rate", "message": f"High error rate: {error_rate * 100:.2f}%" }) # Check memory if metrics["system"]["memory_percent"] > self.alert_thresholds["memory_percent"]: alerts.append({ "level": "warning", "type": "memory", "message": f"High memory usage: {metrics['system']['memory_percent']:.1f}%" }) # Check CPU if metrics["system"]["cpu_percent"] > self.alert_thresholds["cpu_percent"]: alerts.append({ "level": "warning", "type": "cpu", "message": f"High CPU usage: {metrics['system']['cpu_percent']:.1f}%" }) # Send alerts for alert in alerts: self._send_alert(alert) def _send_alert(self, alert: Dict[str, Any]): """Send alert notification""" # Rate limiting - don't send same alert more than once per 5 minutes alert_key = f"{alert['type']}_{alert['level']}" now = datetime.now() if alert_key in self.last_alert_time: if (now - self.last_alert_time[alert_key]).seconds < 300: return self.last_alert_time[alert_key] = now self.alert_history.append({ "timestamp": now.isoformat(), **alert }) # Log alert if alert["level"] == "critical": logger.error(f"ALERT: {alert['message']}") else: logger.warning(f"ALERT: {alert['message']}") # Send to webhook if configured if self.webhook_url: self._send_webhook(alert)