zenith-backend / core /diagnostics /performance_profiler.py
teoat's picture
Upload folder using huggingface_hub
4ae946d verified
#!/usr/bin/env python3
"""
Advanced Performance Profiler
Real-time performance analysis and bottleneck detection
"""
import asyncio
import logging
import time
import tracemalloc
from contextlib import asynccontextmanager
from dataclasses import asdict, dataclass
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
import psutil
logger = logging.getLogger(__name__)
@dataclass
class PerformanceMetric:
"""Performance metric data structure"""
name: str
category: str # cpu, memory, io, network, api, database
value: float
unit: str
threshold: Optional[float]
status: str # optimal, warning, critical
timestamp: datetime
details: Dict[str, Any]
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary for JSON serialization"""
result = asdict(self)
result["timestamp"] = self.timestamp.isoformat()
return result
class AdvancedPerformanceProfiler:
"""Advanced performance monitoring and profiling"""
def __init__(self):
self.start_time = datetime.now()
self.metrics_history = []
self.profilers = {}
self.memory_snapshots = []
# Performance thresholds
self.thresholds = {
"api_response_time": 500.0, # ms
"database_query_time": 100.0, # ms
"memory_usage": 512.0, # MB
"cpu_usage": 70.0, # %
"disk_io": 50.0, # MB/s
"network_latency": 100.0, # ms
}
# Enable memory tracing
tracemalloc.start()
@asynccontextmanager
async def profile_function(self, function_name: str, category: str = "general"):
"""Context manager for profiling individual functions"""
start_time = time.time()
start_memory = (
tracemalloc.get_traced_memory()[0] if tracemalloc.is_tracing() else 0
)
try:
yield
finally:
end_time = time.time()
end_memory = (
tracemalloc.get_traced_memory()[0] if tracemalloc.is_tracing() else 0
)
execution_time_ms = (end_time - start_time) * 1000
memory_diff_mb = (end_memory - start_memory) / (1024 * 1024)
# Determine status based on thresholds
if category == "api":
status = (
"optimal"
if execution_time_ms < self.thresholds["api_response_time"]
else (
"warning"
if execution_time_ms < self.thresholds["api_response_time"] * 2
else "critical"
)
)
elif category == "database":
status = (
"optimal"
if execution_time_ms < self.thresholds["database_query_time"]
else (
"warning"
if execution_time_ms
< self.thresholds["database_query_time"] * 2
else "critical"
)
)
else:
status = "optimal" # Default for general functions
metric = PerformanceMetric(
name=f"{function_name}_execution_time",
category=category,
value=execution_time_ms,
unit="milliseconds",
threshold=self.thresholds.get(
f"{category}_response_time", self.thresholds["api_response_time"]
),
status=status,
timestamp=datetime.now(),
details={
"memory_diff_mb": memory_diff_mb,
"start_memory": start_memory,
"end_memory": end_memory,
},
)
self.metrics_history.append(metric)
@asynccontextmanager
async def profile_database_query(self, query_type: str, query: str):
"""Profile database query performance"""
start_time = time.time()
try:
yield
finally:
end_time = time.time()
execution_time_ms = (end_time - start_time) * 1000
status = (
"optimal"
if execution_time_ms < self.thresholds["database_query_time"]
else (
"warning"
if execution_time_ms < self.thresholds["database_query_time"] * 2
else "critical"
)
)
metric = PerformanceMetric(
name=f"database_query_{query_type}",
category="database",
value=execution_time_ms,
unit="milliseconds",
threshold=self.thresholds["database_query_time"],
status=status,
timestamp=datetime.now(),
details={
"query_type": query_type,
"query_preview": query[:100] + "..." if len(query) > 100 else query,
"query_length": len(query),
},
)
self.metrics_history.append(metric)
async def collect_system_performance(self) -> List[PerformanceMetric]:
"""Collect real-time system performance metrics"""
metrics = []
current_time = datetime.now()
# CPU Usage with breakdown
cpu_percent = psutil.cpu_percent(interval=0.1)
cpu_per_core = psutil.cpu_percent(interval=0.1, percpu=True)
metrics.append(
PerformanceMetric(
name="cpu_usage",
category="cpu",
value=cpu_percent,
unit="percent",
threshold=self.thresholds["cpu_usage"],
status=(
"optimal"
if cpu_percent < self.thresholds["cpu_usage"]
else "warning" if cpu_percent < 90 else "critical"
),
timestamp=current_time,
details={
"cores": psutil.cpu_count(),
"usage_per_core": cpu_per_core,
"load_avg": (
psutil.getloadavg() if hasattr(psutil, "getloadavg") else None
),
},
)
)
# Memory Usage with breakdown
memory = psutil.virtual_memory()
metrics.append(
PerformanceMetric(
name="memory_usage",
category="memory",
value=memory.percent,
unit="percent",
threshold=self.thresholds["memory_usage"]
* 100
/ (
memory.total / (1024 * 1024 * 1024)
), # Dynamic threshold based on total memory
status=(
"optimal"
if memory.percent < 80
else "warning" if memory.percent < 90 else "critical"
),
timestamp=current_time,
details={
"total_gb": round(memory.total / (1024**3), 2),
"available_gb": round(memory.available / (1024**3), 2),
"used_gb": round(memory.used / (1024**3), 2),
"swap_total_gb": round(psutil.swap_memory().total / (1024**3), 2),
"swap_used_gb": round(psutil.swap_memory().used / (1024**3), 2),
},
)
)
# Disk I/O
try:
disk_io = psutil.disk_io_counters()
read_mb_s = (
disk_io.read_bytes / (1024 * 1024)
if hasattr(disk_io, "read_bytes")
else 0
)
write_mb_s = (
disk_io.write_bytes / (1024 * 1024)
if hasattr(disk_io, "write_bytes")
else 0
)
total_io_mb_s = read_mb_s + write_mb_s
metrics.append(
PerformanceMetric(
name="disk_io",
category="io",
value=total_io_mb_s,
unit="mb/s",
threshold=self.thresholds["disk_io"],
status=(
"optimal"
if total_io_mb_s < self.thresholds["disk_io"]
else (
"warning"
if total_io_mb_s < self.thresholds["disk_io"] * 2
else "critical"
)
),
timestamp=current_time,
details={
"read_mb_s": read_mb_s,
"write_mb_s": write_mb_s,
"read_count": getattr(disk_io, "read_count", 0),
"write_count": getattr(disk_io, "write_count", 0),
"read_time_ms": getattr(disk_io, "read_time", 0),
"write_time_ms": getattr(disk_io, "write_time", 0),
},
)
)
except Exception:
pass # Skip if not available
# Network I/O
try:
network_io = psutil.net_io_counters()
metrics.append(
PerformanceMetric(
name="network_io",
category="network",
value={
"bytes_sent": network_io.bytes_sent,
"bytes_recv": network_io.bytes_recv,
"packets_sent": network_io.packets_sent,
"packets_recv": network_io.packets_recv,
"errin": network_io.errin,
"errout": network_io.errout,
"dropin": network_io.dropin,
"dropout": network_io.dropout,
},
unit="bytes",
threshold=None, # Network I/O is informational
status="optimal",
timestamp=current_time,
details={},
)
)
except Exception:
pass # Skip if not available
# Process Information
process = psutil.Process()
metrics.append(
PerformanceMetric(
name="process_performance",
category="system",
value={
"cpu_percent": process.cpu_percent(),
"memory_percent": process.memory_percent(),
"num_threads": process.num_threads(),
"file_descriptors": (
process.num_fds() if hasattr(process, "num_fds") else None
),
"context_switches": (
process.num_ctx_switches()
if hasattr(process, "num_ctx_switches")
else None
),
},
unit="info",
threshold=None,
status="optimal",
timestamp=current_time,
details={
"pid": process.pid,
"create_time": process.create_time(),
"status": process.status(),
"cmdline": process.cmdline(),
},
)
)
return metrics
async def collect_api_performance(
self, request_data: Dict[str, Any]
) -> PerformanceMetric:
"""Collect API request performance metrics"""
current_time = datetime.now()
# Extract performance data from request
endpoint = request_data.get("endpoint", "unknown")
method = request_data.get("method", "GET")
response_time = request_data.get("response_time", 0)
status_code = request_data.get("status_code", 200)
# Determine status based on response time
status = (
"optimal"
if response_time < self.thresholds["api_response_time"]
else (
"warning"
if response_time < self.thresholds["api_response_time"] * 2
else "critical"
)
)
metric = PerformanceMetric(
name=f"api_request_{endpoint}_{method}",
category="api",
value=response_time,
unit="milliseconds",
threshold=self.thresholds["api_response_time"],
status=status,
timestamp=current_time,
details={
"endpoint": endpoint,
"method": method,
"status_code": status_code,
"request_size": request_data.get("request_size", 0),
"response_size": request_data.get("response_size", 0),
"user_agent": request_data.get("user_agent", ""),
"ip_address": request_data.get("ip_address", ""),
},
)
self.metrics_history.append(metric)
return metric
def generate_performance_report(self) -> Dict[str, Any]:
"""Generate comprehensive performance analysis report"""
now = datetime.now()
# Categorize metrics
api_metrics = [m for m in self.metrics_history if m.category == "api"]
database_metrics = [m for m in self.metrics_history if m.category == "database"]
cpu_metrics = [m for m in self.metrics_history if m.category == "cpu"]
memory_metrics = [m for m in self.metrics_history if m.category == "memory"]
# Calculate statistics
def calculate_stats(
metrics: List[PerformanceMetric], key: str = "value"
) -> Dict[str, float]:
if not metrics:
return {}
values = [getattr(m, key) for m in metrics]
if isinstance(values[0], dict):
# Handle complex values (like network I/O)
return {}
return {
"count": len(values),
"avg": sum(values) / len(values),
"min": min(values),
"max": max(values),
"median": sorted(values)[len(values) // 2],
"p95": (
sorted(values)[int(len(values) * 0.95)]
if len(values) > 20
else max(values)
),
"p99": (
sorted(values)[int(len(values) * 0.99)]
if len(values) > 20
else max(values)
),
}
# API Performance Analysis
api_stats = calculate_stats(api_metrics)
# Database Performance Analysis
db_stats = calculate_stats(database_metrics)
# System Performance Analysis
cpu_stats = calculate_stats(cpu_metrics)
memory_stats = calculate_stats(memory_metrics)
# Identify bottlenecks
bottlenecks = []
# API bottlenecks
if api_metrics:
slow_requests = [
m for m in api_metrics if m.status in ["warning", "critical"]
]
if slow_requests:
bottlenecks.append(
{
"type": "api_performance",
"severity": (
"high"
if any(m.status == "critical" for m in slow_requests)
else "medium"
),
"description": f"{len(slow_requests)} slow API requests detected",
"affected_endpoints": list(
set(
m.details.get("endpoint", "unknown")
for m in slow_requests
)
),
"recommendation": "Optimize slow endpoints and add caching",
}
)
# Database bottlenecks
if database_metrics:
slow_queries = [
m for m in database_metrics if m.status in ["warning", "critical"]
]
if slow_queries:
bottlenecks.append(
{
"type": "database_performance",
"severity": (
"high"
if any(m.status == "critical" for m in slow_queries)
else "medium"
),
"description": f"{len(slow_queries)} slow database queries detected",
"affected_queries": list(
set(
m.details.get("query_type", "unknown")
for m in slow_queries
)
),
"recommendation": "Add database indexes and optimize queries",
}
)
# Resource bottlenecks
if cpu_metrics:
high_cpu = [
m for m in cpu_metrics if m.value > self.thresholds["cpu_usage"]
]
if high_cpu:
bottlenecks.append(
{
"type": "cpu_usage",
"severity": "high",
"description": f"CPU usage exceeds {self.thresholds['cpu_usage']}% threshold",
"max_cpu": max(m.value for m in high_cpu),
"recommendation": "Scale horizontally or optimize CPU-intensive operations",
}
)
if memory_metrics:
high_memory = [m for m in memory_metrics if m.value > 80] # 80% threshold
if high_memory:
bottlenecks.append(
{
"type": "memory_usage",
"severity": "high",
"description": "Memory usage exceeds 80%",
"max_memory": max(m.value for m in high_memory),
"recommendation": "Optimize memory usage or increase available memory",
}
)
# Generate optimization recommendations
recommendations = []
if bottlenecks:
for bottleneck in bottlenecks:
recommendations.append(bottleneck.get("recommendation", ""))
# Memory efficiency recommendations
if memory_metrics:
avg_memory = sum(m.value for m in memory_metrics) / len(memory_metrics)
if avg_memory > 60:
recommendations.append(
"Implement memory pooling and optimize data structures"
)
# API caching recommendations
if api_metrics:
avg_response = sum(m.value for m in api_metrics) / len(api_metrics)
if avg_response > self.thresholds["api_response_time"]:
recommendations.append(
"Implement API response caching and query optimization"
)
report = {
"overall_performance_score": self._calculate_performance_score(),
"timestamp": now.isoformat(),
"analysis_period_hours": (now - self.start_time).total_seconds() / 3600,
"summary": {
"total_metrics_collected": len(self.metrics_history),
"api_requests": len(api_metrics),
"database_queries": len(database_metrics),
"bottlenecks_detected": len(bottlenecks),
"critical_issues": len(
[b for b in bottlenecks if b.get("severity") == "high"]
),
},
"performance_by_category": {
"api": {
"statistics": api_stats,
"slow_requests": len(
[m for m in api_metrics if m.status in ["warning", "critical"]]
),
"avg_response_time": api_stats.get("avg", 0),
},
"database": {
"statistics": db_stats,
"slow_queries": len(
[
m
for m in database_metrics
if m.status in ["warning", "critical"]
]
),
"avg_query_time": db_stats.get("avg", 0),
},
"system": {
"cpu": cpu_stats,
"memory": memory_stats,
"current_cpu": cpu_metrics[-1].to_dict() if cpu_metrics else None,
"current_memory": (
memory_metrics[-1].to_dict() if memory_metrics else None
),
},
},
"bottlenecks": bottlenecks,
"recommendations": list(set(recommendations)),
"optimization_opportunities": self._identify_optimization_opportunities(),
"historical_trends": self._analyze_trends(),
}
return report
def _calculate_performance_score(self) -> float:
"""Calculate overall performance score (0-100)"""
if not self.metrics_history:
return 100.0
recent_metrics = self.metrics_history[-100:] # Last 100 metrics
# Count metrics by status
optimal_count = sum(1 for m in recent_metrics if m.status == "optimal")
warning_count = sum(1 for m in recent_metrics if m.status == "warning")
critical_count = sum(1 for m in recent_metrics if m.status == "critical")
total = len(recent_metrics)
# Calculate weighted score
score = (optimal_count * 100 + warning_count * 50 + critical_count * 0) / total
return round(score, 2)
def _identify_optimization_opportunities(self) -> List[Dict[str, Any]]:
"""Identify specific optimization opportunities"""
opportunities = []
# Analyze API patterns
api_metrics = [m for m in self.metrics_history if m.category == "api"]
if api_metrics:
endpoints = {}
for metric in api_metrics:
endpoint = metric.details.get("endpoint", "unknown")
if endpoint not in endpoints:
endpoints[endpoint] = []
endpoints[endpoint].append(metric.value)
# Find slow endpoints
slow_endpoints = []
for endpoint, times in endpoints.items():
avg_time = sum(times) / len(times)
if avg_time > self.thresholds["api_response_time"]:
slow_endpoints.append(
{
"endpoint": endpoint,
"avg_response_time": avg_time,
"request_count": len(times),
"optimization": (
"add_caching" if avg_time > 1000 else "optimize_query"
),
}
)
if slow_endpoints:
opportunities.append(
{
"category": "api_optimization",
"description": f"{len(slow_endpoints)} endpoints need optimization",
"details": slow_endpoints,
"potential_impact": "high",
}
)
# Analyze memory patterns
memory_metrics = [m for m in self.metrics_history if m.category == "memory"]
if memory_metrics:
memory_trend = [
m.value for m in memory_metrics[-20:]
] # Last 20 memory metrics
if len(memory_trend) > 1:
memory_growth = memory_trend[-1] - memory_trend[0]
if memory_growth > 10: # 10% growth
opportunities.append(
{
"category": "memory_optimization",
"description": f"Memory usage increased by {memory_growth:.1f}%",
"details": {
"growth_percent": memory_growth,
"period": "last 20 samples",
},
"potential_impact": "medium",
}
)
return opportunities
def _analyze_trends(self) -> Dict[str, Any]:
"""Analyze performance trends over time"""
if len(self.metrics_history) < 10:
return {"message": "Insufficient data for trend analysis"}
# Analyze last hour of data
now = datetime.now()
one_hour_ago = now - timedelta(hours=1)
recent_metrics = [m for m in self.metrics_history if m.timestamp > one_hour_ago]
if not recent_metrics:
return {"message": "No recent data available"}
# Group by category
api_trends = [m for m in recent_metrics if m.category == "api"]
db_trends = [m for m in recent_metrics if m.category == "database"]
cpu_trends = [m for m in recent_metrics if m.category == "cpu"]
# Calculate trend direction
def calculate_trend(values: List[float]) -> str:
if len(values) < 2:
return "stable"
# Simple linear regression to determine trend
x = list(range(len(values)))
n = len(values)
sum_x = sum(x)
sum_y = sum(values)
sum_xy = sum(x[i] * values[i] for i in range(n))
sum_x2 = sum(x[i] * x[i] for i in range(n))
if n * sum_x2 - sum_x * sum_x == 0:
return "stable"
slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x)
if abs(slope) < 0.01:
return "stable"
elif slope > 0:
return "increasing"
else:
return "decreasing"
trends = {
"analysis_period": "last_hour",
"metrics_analyzed": len(recent_metrics),
"api_response_trend": (
calculate_trend([m.value for m in api_trends])
if api_trends
else "no_data"
),
"database_query_trend": (
calculate_trend([m.value for m in db_trends])
if db_trends
else "no_data"
),
"cpu_usage_trend": (
calculate_trend([m.value for m in cpu_trends])
if cpu_trends
else "no_data"
),
}
return trends
async def start_continuous_monitoring(self, interval: int = 60):
"""Start continuous performance monitoring"""
logger.info(f"Starting continuous performance monitoring (interval: {interval}s)")
while True:
try:
# Collect system metrics
system_metrics = await self.collect_system_performance()
self.metrics_history.extend(system_metrics)
# Keep only last 1000 metrics to prevent memory bloat
if len(self.metrics_history) > 1000:
self.metrics_history = self.metrics_history[-1000:]
# Generate and log summary
if len(self.metrics_history) % 60 == 0: # Every hour
report = self.generate_performance_report()
# Log critical issues
critical_bottlenecks = [
b for b in report["bottlenecks"] if b.get("severity") == "high"
]
if critical_bottlenecks:
logger.warning(
f"CRITICAL PERFORMANCE ISSUES DETECTED: {len(critical_bottlenecks)}"
)
for bottleneck in critical_bottlenecks:
logger.warning(
f" - {bottleneck['type']}: {bottleneck['description']}"
)
await asyncio.sleep(interval)
except Exception as e:
logger.error(f"Error in performance monitoring: {e}")
await asyncio.sleep(interval)
# Global profiler instance
performance_profiler = AdvancedPerformanceProfiler()