#!/usr/bin/env python3 """ Advanced Performance Profiler Real-time performance analysis and bottleneck detection """ import asyncio import logging import time import tracemalloc from contextlib import asynccontextmanager from dataclasses import asdict, dataclass from datetime import datetime, timedelta from typing import Any, Dict, List, Optional import psutil logger = logging.getLogger(__name__) @dataclass class PerformanceMetric: """Performance metric data structure""" name: str category: str # cpu, memory, io, network, api, database value: float unit: str threshold: Optional[float] status: str # optimal, warning, critical timestamp: datetime details: Dict[str, Any] def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for JSON serialization""" result = asdict(self) result["timestamp"] = self.timestamp.isoformat() return result class AdvancedPerformanceProfiler: """Advanced performance monitoring and profiling""" def __init__(self): self.start_time = datetime.now() self.metrics_history = [] self.profilers = {} self.memory_snapshots = [] # Performance thresholds self.thresholds = { "api_response_time": 500.0, # ms "database_query_time": 100.0, # ms "memory_usage": 512.0, # MB "cpu_usage": 70.0, # % "disk_io": 50.0, # MB/s "network_latency": 100.0, # ms } # Enable memory tracing tracemalloc.start() @asynccontextmanager async def profile_function(self, function_name: str, category: str = "general"): """Context manager for profiling individual functions""" start_time = time.time() start_memory = ( tracemalloc.get_traced_memory()[0] if tracemalloc.is_tracing() else 0 ) try: yield finally: end_time = time.time() end_memory = ( tracemalloc.get_traced_memory()[0] if tracemalloc.is_tracing() else 0 ) execution_time_ms = (end_time - start_time) * 1000 memory_diff_mb = (end_memory - start_memory) / (1024 * 1024) # Determine status based on thresholds if category == "api": status = ( "optimal" if execution_time_ms < self.thresholds["api_response_time"] else ( "warning" if execution_time_ms < self.thresholds["api_response_time"] * 2 else "critical" ) ) elif category == "database": status = ( "optimal" if execution_time_ms < self.thresholds["database_query_time"] else ( "warning" if execution_time_ms < self.thresholds["database_query_time"] * 2 else "critical" ) ) else: status = "optimal" # Default for general functions metric = PerformanceMetric( name=f"{function_name}_execution_time", category=category, value=execution_time_ms, unit="milliseconds", threshold=self.thresholds.get( f"{category}_response_time", self.thresholds["api_response_time"] ), status=status, timestamp=datetime.now(), details={ "memory_diff_mb": memory_diff_mb, "start_memory": start_memory, "end_memory": end_memory, }, ) self.metrics_history.append(metric) @asynccontextmanager async def profile_database_query(self, query_type: str, query: str): """Profile database query performance""" start_time = time.time() try: yield finally: end_time = time.time() execution_time_ms = (end_time - start_time) * 1000 status = ( "optimal" if execution_time_ms < self.thresholds["database_query_time"] else ( "warning" if execution_time_ms < self.thresholds["database_query_time"] * 2 else "critical" ) ) metric = PerformanceMetric( name=f"database_query_{query_type}", category="database", value=execution_time_ms, unit="milliseconds", threshold=self.thresholds["database_query_time"], status=status, timestamp=datetime.now(), details={ "query_type": query_type, "query_preview": query[:100] + "..." if len(query) > 100 else query, "query_length": len(query), }, ) self.metrics_history.append(metric) async def collect_system_performance(self) -> List[PerformanceMetric]: """Collect real-time system performance metrics""" metrics = [] current_time = datetime.now() # CPU Usage with breakdown cpu_percent = psutil.cpu_percent(interval=0.1) cpu_per_core = psutil.cpu_percent(interval=0.1, percpu=True) metrics.append( PerformanceMetric( name="cpu_usage", category="cpu", value=cpu_percent, unit="percent", threshold=self.thresholds["cpu_usage"], status=( "optimal" if cpu_percent < self.thresholds["cpu_usage"] else "warning" if cpu_percent < 90 else "critical" ), timestamp=current_time, details={ "cores": psutil.cpu_count(), "usage_per_core": cpu_per_core, "load_avg": ( psutil.getloadavg() if hasattr(psutil, "getloadavg") else None ), }, ) ) # Memory Usage with breakdown memory = psutil.virtual_memory() metrics.append( PerformanceMetric( name="memory_usage", category="memory", value=memory.percent, unit="percent", threshold=self.thresholds["memory_usage"] * 100 / ( memory.total / (1024 * 1024 * 1024) ), # Dynamic threshold based on total memory status=( "optimal" if memory.percent < 80 else "warning" if memory.percent < 90 else "critical" ), timestamp=current_time, details={ "total_gb": round(memory.total / (1024**3), 2), "available_gb": round(memory.available / (1024**3), 2), "used_gb": round(memory.used / (1024**3), 2), "swap_total_gb": round(psutil.swap_memory().total / (1024**3), 2), "swap_used_gb": round(psutil.swap_memory().used / (1024**3), 2), }, ) ) # Disk I/O try: disk_io = psutil.disk_io_counters() read_mb_s = ( disk_io.read_bytes / (1024 * 1024) if hasattr(disk_io, "read_bytes") else 0 ) write_mb_s = ( disk_io.write_bytes / (1024 * 1024) if hasattr(disk_io, "write_bytes") else 0 ) total_io_mb_s = read_mb_s + write_mb_s metrics.append( PerformanceMetric( name="disk_io", category="io", value=total_io_mb_s, unit="mb/s", threshold=self.thresholds["disk_io"], status=( "optimal" if total_io_mb_s < self.thresholds["disk_io"] else ( "warning" if total_io_mb_s < self.thresholds["disk_io"] * 2 else "critical" ) ), timestamp=current_time, details={ "read_mb_s": read_mb_s, "write_mb_s": write_mb_s, "read_count": getattr(disk_io, "read_count", 0), "write_count": getattr(disk_io, "write_count", 0), "read_time_ms": getattr(disk_io, "read_time", 0), "write_time_ms": getattr(disk_io, "write_time", 0), }, ) ) except Exception: pass # Skip if not available # Network I/O try: network_io = psutil.net_io_counters() metrics.append( PerformanceMetric( name="network_io", category="network", value={ "bytes_sent": network_io.bytes_sent, "bytes_recv": network_io.bytes_recv, "packets_sent": network_io.packets_sent, "packets_recv": network_io.packets_recv, "errin": network_io.errin, "errout": network_io.errout, "dropin": network_io.dropin, "dropout": network_io.dropout, }, unit="bytes", threshold=None, # Network I/O is informational status="optimal", timestamp=current_time, details={}, ) ) except Exception: pass # Skip if not available # Process Information process = psutil.Process() metrics.append( PerformanceMetric( name="process_performance", category="system", value={ "cpu_percent": process.cpu_percent(), "memory_percent": process.memory_percent(), "num_threads": process.num_threads(), "file_descriptors": ( process.num_fds() if hasattr(process, "num_fds") else None ), "context_switches": ( process.num_ctx_switches() if hasattr(process, "num_ctx_switches") else None ), }, unit="info", threshold=None, status="optimal", timestamp=current_time, details={ "pid": process.pid, "create_time": process.create_time(), "status": process.status(), "cmdline": process.cmdline(), }, ) ) return metrics async def collect_api_performance( self, request_data: Dict[str, Any] ) -> PerformanceMetric: """Collect API request performance metrics""" current_time = datetime.now() # Extract performance data from request endpoint = request_data.get("endpoint", "unknown") method = request_data.get("method", "GET") response_time = request_data.get("response_time", 0) status_code = request_data.get("status_code", 200) # Determine status based on response time status = ( "optimal" if response_time < self.thresholds["api_response_time"] else ( "warning" if response_time < self.thresholds["api_response_time"] * 2 else "critical" ) ) metric = PerformanceMetric( name=f"api_request_{endpoint}_{method}", category="api", value=response_time, unit="milliseconds", threshold=self.thresholds["api_response_time"], status=status, timestamp=current_time, details={ "endpoint": endpoint, "method": method, "status_code": status_code, "request_size": request_data.get("request_size", 0), "response_size": request_data.get("response_size", 0), "user_agent": request_data.get("user_agent", ""), "ip_address": request_data.get("ip_address", ""), }, ) self.metrics_history.append(metric) return metric def generate_performance_report(self) -> Dict[str, Any]: """Generate comprehensive performance analysis report""" now = datetime.now() # Categorize metrics api_metrics = [m for m in self.metrics_history if m.category == "api"] database_metrics = [m for m in self.metrics_history if m.category == "database"] cpu_metrics = [m for m in self.metrics_history if m.category == "cpu"] memory_metrics = [m for m in self.metrics_history if m.category == "memory"] # Calculate statistics def calculate_stats( metrics: List[PerformanceMetric], key: str = "value" ) -> Dict[str, float]: if not metrics: return {} values = [getattr(m, key) for m in metrics] if isinstance(values[0], dict): # Handle complex values (like network I/O) return {} return { "count": len(values), "avg": sum(values) / len(values), "min": min(values), "max": max(values), "median": sorted(values)[len(values) // 2], "p95": ( sorted(values)[int(len(values) * 0.95)] if len(values) > 20 else max(values) ), "p99": ( sorted(values)[int(len(values) * 0.99)] if len(values) > 20 else max(values) ), } # API Performance Analysis api_stats = calculate_stats(api_metrics) # Database Performance Analysis db_stats = calculate_stats(database_metrics) # System Performance Analysis cpu_stats = calculate_stats(cpu_metrics) memory_stats = calculate_stats(memory_metrics) # Identify bottlenecks bottlenecks = [] # API bottlenecks if api_metrics: slow_requests = [ m for m in api_metrics if m.status in ["warning", "critical"] ] if slow_requests: bottlenecks.append( { "type": "api_performance", "severity": ( "high" if any(m.status == "critical" for m in slow_requests) else "medium" ), "description": f"{len(slow_requests)} slow API requests detected", "affected_endpoints": list( set( m.details.get("endpoint", "unknown") for m in slow_requests ) ), "recommendation": "Optimize slow endpoints and add caching", } ) # Database bottlenecks if database_metrics: slow_queries = [ m for m in database_metrics if m.status in ["warning", "critical"] ] if slow_queries: bottlenecks.append( { "type": "database_performance", "severity": ( "high" if any(m.status == "critical" for m in slow_queries) else "medium" ), "description": f"{len(slow_queries)} slow database queries detected", "affected_queries": list( set( m.details.get("query_type", "unknown") for m in slow_queries ) ), "recommendation": "Add database indexes and optimize queries", } ) # Resource bottlenecks if cpu_metrics: high_cpu = [ m for m in cpu_metrics if m.value > self.thresholds["cpu_usage"] ] if high_cpu: bottlenecks.append( { "type": "cpu_usage", "severity": "high", "description": f"CPU usage exceeds {self.thresholds['cpu_usage']}% threshold", "max_cpu": max(m.value for m in high_cpu), "recommendation": "Scale horizontally or optimize CPU-intensive operations", } ) if memory_metrics: high_memory = [m for m in memory_metrics if m.value > 80] # 80% threshold if high_memory: bottlenecks.append( { "type": "memory_usage", "severity": "high", "description": "Memory usage exceeds 80%", "max_memory": max(m.value for m in high_memory), "recommendation": "Optimize memory usage or increase available memory", } ) # Generate optimization recommendations recommendations = [] if bottlenecks: for bottleneck in bottlenecks: recommendations.append(bottleneck.get("recommendation", "")) # Memory efficiency recommendations if memory_metrics: avg_memory = sum(m.value for m in memory_metrics) / len(memory_metrics) if avg_memory > 60: recommendations.append( "Implement memory pooling and optimize data structures" ) # API caching recommendations if api_metrics: avg_response = sum(m.value for m in api_metrics) / len(api_metrics) if avg_response > self.thresholds["api_response_time"]: recommendations.append( "Implement API response caching and query optimization" ) report = { "overall_performance_score": self._calculate_performance_score(), "timestamp": now.isoformat(), "analysis_period_hours": (now - self.start_time).total_seconds() / 3600, "summary": { "total_metrics_collected": len(self.metrics_history), "api_requests": len(api_metrics), "database_queries": len(database_metrics), "bottlenecks_detected": len(bottlenecks), "critical_issues": len( [b for b in bottlenecks if b.get("severity") == "high"] ), }, "performance_by_category": { "api": { "statistics": api_stats, "slow_requests": len( [m for m in api_metrics if m.status in ["warning", "critical"]] ), "avg_response_time": api_stats.get("avg", 0), }, "database": { "statistics": db_stats, "slow_queries": len( [ m for m in database_metrics if m.status in ["warning", "critical"] ] ), "avg_query_time": db_stats.get("avg", 0), }, "system": { "cpu": cpu_stats, "memory": memory_stats, "current_cpu": cpu_metrics[-1].to_dict() if cpu_metrics else None, "current_memory": ( memory_metrics[-1].to_dict() if memory_metrics else None ), }, }, "bottlenecks": bottlenecks, "recommendations": list(set(recommendations)), "optimization_opportunities": self._identify_optimization_opportunities(), "historical_trends": self._analyze_trends(), } return report def _calculate_performance_score(self) -> float: """Calculate overall performance score (0-100)""" if not self.metrics_history: return 100.0 recent_metrics = self.metrics_history[-100:] # Last 100 metrics # Count metrics by status optimal_count = sum(1 for m in recent_metrics if m.status == "optimal") warning_count = sum(1 for m in recent_metrics if m.status == "warning") critical_count = sum(1 for m in recent_metrics if m.status == "critical") total = len(recent_metrics) # Calculate weighted score score = (optimal_count * 100 + warning_count * 50 + critical_count * 0) / total return round(score, 2) def _identify_optimization_opportunities(self) -> List[Dict[str, Any]]: """Identify specific optimization opportunities""" opportunities = [] # Analyze API patterns api_metrics = [m for m in self.metrics_history if m.category == "api"] if api_metrics: endpoints = {} for metric in api_metrics: endpoint = metric.details.get("endpoint", "unknown") if endpoint not in endpoints: endpoints[endpoint] = [] endpoints[endpoint].append(metric.value) # Find slow endpoints slow_endpoints = [] for endpoint, times in endpoints.items(): avg_time = sum(times) / len(times) if avg_time > self.thresholds["api_response_time"]: slow_endpoints.append( { "endpoint": endpoint, "avg_response_time": avg_time, "request_count": len(times), "optimization": ( "add_caching" if avg_time > 1000 else "optimize_query" ), } ) if slow_endpoints: opportunities.append( { "category": "api_optimization", "description": f"{len(slow_endpoints)} endpoints need optimization", "details": slow_endpoints, "potential_impact": "high", } ) # Analyze memory patterns memory_metrics = [m for m in self.metrics_history if m.category == "memory"] if memory_metrics: memory_trend = [ m.value for m in memory_metrics[-20:] ] # Last 20 memory metrics if len(memory_trend) > 1: memory_growth = memory_trend[-1] - memory_trend[0] if memory_growth > 10: # 10% growth opportunities.append( { "category": "memory_optimization", "description": f"Memory usage increased by {memory_growth:.1f}%", "details": { "growth_percent": memory_growth, "period": "last 20 samples", }, "potential_impact": "medium", } ) return opportunities def _analyze_trends(self) -> Dict[str, Any]: """Analyze performance trends over time""" if len(self.metrics_history) < 10: return {"message": "Insufficient data for trend analysis"} # Analyze last hour of data now = datetime.now() one_hour_ago = now - timedelta(hours=1) recent_metrics = [m for m in self.metrics_history if m.timestamp > one_hour_ago] if not recent_metrics: return {"message": "No recent data available"} # Group by category api_trends = [m for m in recent_metrics if m.category == "api"] db_trends = [m for m in recent_metrics if m.category == "database"] cpu_trends = [m for m in recent_metrics if m.category == "cpu"] # Calculate trend direction def calculate_trend(values: List[float]) -> str: if len(values) < 2: return "stable" # Simple linear regression to determine trend x = list(range(len(values))) n = len(values) sum_x = sum(x) sum_y = sum(values) sum_xy = sum(x[i] * values[i] for i in range(n)) sum_x2 = sum(x[i] * x[i] for i in range(n)) if n * sum_x2 - sum_x * sum_x == 0: return "stable" slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x) if abs(slope) < 0.01: return "stable" elif slope > 0: return "increasing" else: return "decreasing" trends = { "analysis_period": "last_hour", "metrics_analyzed": len(recent_metrics), "api_response_trend": ( calculate_trend([m.value for m in api_trends]) if api_trends else "no_data" ), "database_query_trend": ( calculate_trend([m.value for m in db_trends]) if db_trends else "no_data" ), "cpu_usage_trend": ( calculate_trend([m.value for m in cpu_trends]) if cpu_trends else "no_data" ), } return trends async def start_continuous_monitoring(self, interval: int = 60): """Start continuous performance monitoring""" logger.info(f"Starting continuous performance monitoring (interval: {interval}s)") while True: try: # Collect system metrics system_metrics = await self.collect_system_performance() self.metrics_history.extend(system_metrics) # Keep only last 1000 metrics to prevent memory bloat if len(self.metrics_history) > 1000: self.metrics_history = self.metrics_history[-1000:] # Generate and log summary if len(self.metrics_history) % 60 == 0: # Every hour report = self.generate_performance_report() # Log critical issues critical_bottlenecks = [ b for b in report["bottlenecks"] if b.get("severity") == "high" ] if critical_bottlenecks: logger.warning( f"CRITICAL PERFORMANCE ISSUES DETECTED: {len(critical_bottlenecks)}" ) for bottleneck in critical_bottlenecks: logger.warning( f" - {bottleneck['type']}: {bottleneck['description']}" ) await asyncio.sleep(interval) except Exception as e: logger.error(f"Error in performance monitoring: {e}") await asyncio.sleep(interval) # Global profiler instance performance_profiler = AdvancedPerformanceProfiler()