Spaces:
Paused
Paused
| """ | |
| Performance Baselines and Regression Detection System | |
| Automated performance monitoring with baseline establishment and regression detection | |
| """ | |
| import asyncio | |
| import json | |
| import os | |
| import statistics | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List | |
| import aiohttp | |
| import asyncpg | |
| # Simplified version without scipy dependency | |
| try: | |
| import numpy as np | |
| HAS_NUMPY = True | |
| except ImportError: | |
| HAS_NUMPY = False | |
| import statistics | |
| from core.config import settings | |
| from core.logging import logger | |
| class PerformanceMetrics: | |
| """Performance metrics container""" | |
| def __init__(self): | |
| self.response_time = 0.0 | |
| self.throughput = 0.0 | |
| self.error_rate = 0.0 | |
| self.cpu_usage = 0.0 | |
| self.memory_usage = 0.0 | |
| self.database_query_time = 0.0 | |
| self.cache_hit_rate = 0.0 | |
| self.timestamp = datetime.now() | |
| def to_dict(self) -> Dict: | |
| return { | |
| "response_time_ms": self.response_time, | |
| "throughput_rps": self.throughput, | |
| "error_rate_percent": self.error_rate, | |
| "cpu_usage_percent": self.cpu_usage, | |
| "memory_usage_percent": self.memory_usage, | |
| "database_query_time_ms": self.database_query_time, | |
| "cache_hit_rate_percent": self.cache_hit_rate, | |
| "timestamp": self.timestamp.isoformat(), | |
| } | |
| class PerformanceBaseline: | |
| """Performance baseline with statistical properties""" | |
| def __init__(self): | |
| self.response_time_baseline = BaselineStats() | |
| self.throughput_baseline = BaselineStats() | |
| self.error_rate_baseline = BaselineStats() | |
| self.cpu_usage_baseline = BaselineStats() | |
| self.memory_usage_baseline = BaselineStats() | |
| self.database_query_time_baseline = BaselineStats() | |
| self.cache_hit_rate_baseline = BaselineStats() | |
| self.established_at = None | |
| self.sample_size = 0 | |
| self.confidence_interval = 0.95 | |
| def to_dict(self) -> Dict: | |
| return { | |
| "response_time": self.response_time_baseline.to_dict(), | |
| "throughput": self.throughput_baseline.to_dict(), | |
| "error_rate": self.error_rate_baseline.to_dict(), | |
| "cpu_usage": self.cpu_usage_baseline.to_dict(), | |
| "memory_usage": self.memory_usage_baseline.to_dict(), | |
| "database_query_time": self.database_query_time_baseline.to_dict(), | |
| "cache_hit_rate": self.cache_hit_rate_baseline.to_dict(), | |
| "established_at": self.established_at.isoformat() if self.established_at else None, | |
| "sample_size": self.sample_size, | |
| "confidence_interval": self.confidence_interval, | |
| } | |
| class BaselineStats: | |
| """Statistical baseline for a single metric""" | |
| def __init__(self): | |
| self.mean = 0.0 | |
| self.median = 0.0 | |
| self.p95 = 0.0 | |
| self.p99 = 0.0 | |
| self.std_dev = 0.0 | |
| self.min_value = float("inf") | |
| self.max_value = float("-inf") | |
| self.outliers_removed = 0 | |
| def to_dict(self) -> Dict: | |
| return { | |
| "mean": self.mean, | |
| "median": self.median, | |
| "p95": self.p95, | |
| "p99": self.p99, | |
| "std_dev": self.std_dev, | |
| "min": self.min_value, | |
| "max": self.max_value, | |
| "outliers_removed": self.outliers_removed, | |
| } | |
| class PerformanceRegressionDetector: | |
| """Detects performance regressions using statistical methods""" | |
| def __init__(self, baseline: PerformanceBaseline): | |
| self.baseline = baseline | |
| self.regression_threshold = 0.15 # 15% degradation threshold | |
| def detect_regression(self, current_metrics: PerformanceMetrics) -> List[Dict]: | |
| """Detect performance regressions compared to baseline""" | |
| regressions = [] | |
| # Response time regression | |
| if current_metrics.response_time > self.baseline.response_time_baseline.p95: | |
| degradation_pct = ( | |
| current_metrics.response_time - self.baseline.response_time_baseline.mean | |
| ) / self.baseline.response_time_baseline.mean | |
| if degradation_pct > self.regression_threshold: | |
| regressions.append( | |
| { | |
| "metric": "response_time", | |
| "severity": self._calculate_severity(degradation_pct), | |
| "current_value": current_metrics.response_time, | |
| "baseline_value": self.baseline.response_time_baseline.p95, | |
| "degradation_percent": degradation_pct * 100, | |
| "confidence": self._calculate_confidence(), | |
| } | |
| ) | |
| # Throughput regression | |
| if current_metrics.throughput < self.baseline.throughput_baseline.p95 * 0.8: # 20% drop | |
| degradation_pct = ( | |
| self.baseline.throughput_baseline.mean - current_metrics.throughput | |
| ) / self.baseline.throughput_baseline.mean | |
| if degradation_pct > self.regression_threshold: | |
| regressions.append( | |
| { | |
| "metric": "throughput", | |
| "severity": self._calculate_severity(degradation_pct), | |
| "current_value": current_metrics.throughput, | |
| "baseline_value": self.baseline.throughput_baseline.p95, | |
| "degradation_percent": degradation_pct * 100, | |
| "confidence": self._calculate_confidence(), | |
| } | |
| ) | |
| # Error rate regression | |
| if current_metrics.error_rate > self.baseline.error_rate_baseline.p95 * 2: # 2x error rate | |
| degradation_pct = ( | |
| current_metrics.error_rate - self.baseline.error_rate_baseline.mean | |
| ) / self.baseline.error_rate_baseline.mean | |
| if degradation_pct > self.regression_threshold: | |
| regressions.append( | |
| { | |
| "metric": "error_rate", | |
| "severity": self._calculate_severity(degradation_pct), | |
| "current_value": current_metrics.error_rate, | |
| "baseline_value": self.baseline.error_rate_baseline.p95, | |
| "degradation_percent": degradation_pct * 100, | |
| "confidence": self._calculate_confidence(), | |
| } | |
| ) | |
| return regressions | |
| def _calculate_severity(self, degradation_pct: float) -> str: | |
| """Calculate regression severity based on degradation percentage""" | |
| if degradation_pct > 0.5: | |
| return "critical" | |
| elif degradation_pct > 0.3: | |
| return "high" | |
| elif degradation_pct > 0.15: | |
| return "medium" | |
| else: | |
| return "low" | |
| def _calculate_confidence(self) -> float: | |
| """Calculate confidence level based on baseline sample size""" | |
| if self.baseline.sample_size >= 100: | |
| return 0.95 | |
| elif self.baseline.sample_size >= 50: | |
| return 0.85 | |
| elif self.baseline.sample_size >= 20: | |
| return 0.70 | |
| else: | |
| return 0.50 | |
| class PerformanceMonitor: | |
| """ | |
| Main performance monitoring system | |
| """ | |
| def __init__(self): | |
| self.baseline = PerformanceBaseline() | |
| self.detector = PerformanceRegressionDetector(self.baseline) | |
| self.metrics_history: List[PerformanceMetrics] = [] | |
| self.session = None | |
| self.baseline_window_hours = 24 # 24 hours for baseline establishment | |
| self.max_history_size = 1000 | |
| async def __aenter__(self): | |
| """Async context manager entry""" | |
| self.session = aiohttp.ClientSession( | |
| timeout=aiohttp.ClientTimeout(total=30), connector=aiohttp.TCPConnector(limit=10) | |
| ) | |
| return self | |
| async def __aexit__(self, exc_type, exc_val, exc_tb): | |
| """Async context manager exit""" | |
| if self.session: | |
| await self.session.close() | |
| async def collect_current_metrics(self) -> PerformanceMetrics: | |
| """Collect current performance metrics from multiple sources""" | |
| metrics = PerformanceMetrics() | |
| # Collect application metrics from Prometheus | |
| try: | |
| prometheus_url = "http://localhost:9090/api/v1/query" | |
| # Response time metrics | |
| async with self.session.get( | |
| prometheus_url, | |
| params={"query": "histogram_quantile(0.95, rate(http_request_duration_seconds_bucket[5m]))"}, | |
| ) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| value = data.get("data", {}).get("result", [0, 0])[1] | |
| metrics.response_time = value * 1000 if value else 0 | |
| # Throughput metrics | |
| async with self.session.get( | |
| prometheus_url, params={"query": "sum(rate(http_requests_total[5m]))"} | |
| ) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| value = data.get("data", {}).get("result", [0, 0])[1] | |
| metrics.throughput = value if value else 0 | |
| # Error rate metrics | |
| async with self.session.get( | |
| prometheus_url, | |
| params={ | |
| "query": 'sum(rate(http_requests_total{status=~"5.."}[5m])) / sum(rate(http_requests_total[5m]))' | |
| }, | |
| ) as response: | |
| if response.status == 200: | |
| data = await response.json() | |
| value = data.get("data", {}).get("result", [0, 0])[1] | |
| metrics.error_rate = value * 100 if value else 0 | |
| except Exception as e: | |
| logger.error(f"Failed to collect Prometheus metrics: {e}") | |
| # Collect system metrics | |
| try: | |
| system_metrics_url = "http://localhost:9100/metrics" | |
| # CPU usage | |
| async with self.session.get(system_metrics_url) as response: | |
| if response.status == 200: | |
| data = await response.text() | |
| # Parse node exporter CPU metrics | |
| for line in data.split("\n"): | |
| if "node_cpu_seconds_total" in line and 'mode="idle"' in line: | |
| # Extract CPU usage (100 - idle %) | |
| cpu_idle = float(line.split()[-1]) | |
| metrics.cpu_usage = 100.0 - cpu_idle | |
| break | |
| # Memory usage | |
| async with self.session.get(system_metrics_url) as response: | |
| if response.status == 200: | |
| data = await response.text() | |
| # Parse node exporter memory metrics | |
| for line in data.split("\n"): | |
| if "node_memory_MemAvailable_bytes" in line: | |
| mem_available = float(line.split()[-1]) | |
| elif "node_memory_MemTotal_bytes" in line: | |
| mem_total = float(line.split()[-1]) | |
| if mem_available and mem_total: | |
| metrics.memory_usage = ((mem_total - mem_available) / mem_total) * 100 | |
| break | |
| except Exception as e: | |
| logger.error(f"Failed to collect system metrics: {e}") | |
| # Collect database metrics | |
| try: | |
| db_url = settings.DATABASE_URL | |
| conn = await asyncio.wait_for(asyncpg.connect(db_url), timeout=10) | |
| # Average query time | |
| query_time = await conn.fetchval(""" | |
| SELECT AVG(EXTRACT(EPOCH FROM (statement_finish - statement_start)) * 1000) as avg_query_time | |
| FROM pg_stat_statements | |
| WHERE query_start > NOW() - INTERVAL '1 hour' | |
| """) | |
| if query_time: | |
| metrics.database_query_time = query_time | |
| await conn.close() | |
| except Exception as e: | |
| logger.error(f"Failed to collect database metrics: {e}") | |
| metrics.timestamp = datetime.now() | |
| return metrics | |
| async def establish_baseline(self, hours: int = 24) -> PerformanceBaseline: | |
| """Establish performance baseline from historical data""" | |
| logger.info(f"Establishing performance baseline from last {hours} hours...") | |
| baseline = PerformanceBaseline() | |
| # Collect metrics for baseline period | |
| cutoff_time = datetime.now() - timedelta(hours=hours) | |
| # Filter existing history for baseline period | |
| recent_metrics = [m for m in self.metrics_history if m.timestamp > cutoff_time] | |
| if len(recent_metrics) < 30: | |
| logger.warning(f"Insufficient data for baseline (need 30 samples, have {len(recent_metrics)})") | |
| return baseline | |
| # Extract metric arrays | |
| response_times = [m.response_time for m in recent_metrics] | |
| throughputs = [m.throughput for m in recent_metrics] | |
| error_rates = [m.error_rate for m in recent_metrics] | |
| cpu_usages = [m.cpu_usage for m in recent_metrics] | |
| memory_usages = [m.memory_usage for m in recent_metrics] | |
| db_query_times = [m.database_query_time for m in recent_metrics if m.database_query_time > 0] | |
| # Calculate baseline statistics | |
| if response_times: | |
| self._calculate_stats(baseline.response_time_baseline, response_times) | |
| if throughputs: | |
| self._calculate_stats(baseline.throughput_baseline, throughputs) | |
| if error_rates: | |
| self._calculate_stats(baseline.error_rate_baseline, error_rates) | |
| if cpu_usages: | |
| self._calculate_stats(baseline.cpu_usage_baseline, cpu_usages) | |
| if memory_usages: | |
| self._calculate_stats(baseline.memory_usage_baseline, memory_usages) | |
| if db_query_times: | |
| self._calculate_stats(baseline.database_query_time_baseline, db_query_times) | |
| baseline.established_at = datetime.now() | |
| baseline.sample_size = len(recent_metrics) | |
| self.baseline = baseline | |
| return baseline | |
| def _calculate_stats(self, baseline_stats: BaselineStats, values: List[float]): | |
| """Calculate statistical properties for baseline""" | |
| if not values: | |
| return | |
| # Remove outliers using IQR method | |
| if HAS_NUMPY: | |
| q1 = np.percentile(values, 25) | |
| q3 = np.percentile(values, 75) | |
| iqr = q3 - q1 | |
| lower_bound = q1 - 1.5 * iqr | |
| upper_bound = q3 + 1.5 * iqr | |
| filtered_values = [v for v in values if lower_bound <= v <= upper_bound] | |
| outliers_removed = len(values) - len(filtered_values) | |
| if filtered_values: | |
| baseline_stats.mean = np.mean(filtered_values) | |
| baseline_stats.median = np.median(filtered_values) | |
| baseline_stats.p95 = np.percentile(filtered_values, 95) | |
| baseline_stats.p99 = np.percentile(filtered_values, 99) | |
| baseline_stats.std_dev = np.std(filtered_values) | |
| baseline_stats.min_value = min(filtered_values) | |
| baseline_stats.max_value = max(filtered_values) | |
| else: | |
| # Fallback to basic statistics | |
| sorted_values = sorted(values) | |
| baseline_stats.mean = statistics.mean(values) | |
| baseline_stats.median = statistics.median(values) | |
| baseline_stats.p95 = sorted_values[int(len(values) * 0.95)] | |
| baseline_stats.p99 = sorted_values[int(len(values) * 0.99)] | |
| baseline_stats.std_dev = statistics.stdev(values) | |
| baseline_stats.min_value = min(values) | |
| baseline_stats.max_value = max(values) | |
| outliers_removed = 0 | |
| baseline_stats.outliers_removed = outliers_removed | |
| async def monitor_performance(self): | |
| """Continuous performance monitoring with regression detection""" | |
| logger.info("Starting performance monitoring with regression detection...") | |
| while True: | |
| try: | |
| # Collect current metrics | |
| current_metrics = await self.collect_current_metrics() | |
| # Store in history | |
| self.metrics_history.append(current_metrics) | |
| # Keep history size manageable | |
| if len(self.metrics_history) > self.max_history_size: | |
| self.metrics_history = self.metrics_history[-self.max_history_size :] | |
| # Detect regressions | |
| if self.baseline.established_at: | |
| regressions = self.detector.detect_regression(current_metrics) | |
| # Send alerts for regressions | |
| for regression in regressions: | |
| await self.send_regression_alert(regression) | |
| # Re-establish baseline periodically (daily) | |
| if (datetime.now() - self.baseline.established_at).hours >= 24: | |
| logger.info("Re-establishing performance baseline...") | |
| await self.establish_baseline() | |
| logger.info(f"Performance check completed. Regressions: {len(regressions)}") | |
| except Exception as e: | |
| logger.error(f"Error in performance monitoring: {e}") | |
| # Wait before next check | |
| await asyncio.sleep(300) # Check every 5 minutes | |
| async def send_regression_alert(self, regression: Dict): | |
| """Send regression alert""" | |
| alert_data = { | |
| "alert_type": "performance_regression", | |
| "timestamp": datetime.now().isoformat(), | |
| "severity": regression["severity"], | |
| "metric": regression, | |
| "baseline": self.baseline.to_dict(), | |
| "environment": os.getenv("ENVIRONMENT", "production"), | |
| } | |
| # Log regression | |
| logger.warning(f"Performance regression detected: {regression}") | |
| # Send to alerting system | |
| webhook_url = os.getenv("PERFORMANCE_WEBHOOK_URL") | |
| if webhook_url: | |
| try: | |
| async with self.session.post(webhook_url, json=alert_data) as response: | |
| if response.status == 200: | |
| logger.info(f"Regression alert sent for {regression['metric']}") | |
| except Exception as e: | |
| logger.error(f"Failed to send regression alert: {e}") | |
| def get_performance_summary(self) -> Dict: | |
| """Get current performance monitoring summary""" | |
| if not self.metrics_history: | |
| return {"status": "no_data"} | |
| current_metrics = self.metrics_history[-1] if self.metrics_history else PerformanceMetrics() | |
| return { | |
| "status": "monitoring", | |
| "current_metrics": current_metrics.to_dict(), | |
| "baseline": self.baseline.to_dict(), | |
| "history_size": len(self.metrics_history), | |
| "baseline_established": self.baseline.established_at.isoformat() if self.baseline.established_at else None, | |
| } | |
| # CLI interface | |
| async def main(): | |
| import argparse | |
| parser = argparse.ArgumentParser(description="Performance Monitoring System") | |
| parser.add_argument("action", choices=["monitor", "baseline", "status", "report"]) | |
| parser.add_argument("--period", type=int, default=24, help="Baseline period in hours") | |
| parser.add_argument("--output", help="Output file for reports") | |
| args = parser.parse_args() | |
| monitor = PerformanceMonitor() | |
| if args.action == "monitor": | |
| async with monitor: | |
| await monitor.monitor_performance() | |
| elif args.action == "baseline": | |
| async with monitor: | |
| baseline = await monitor.establish_baseline(args.period) | |
| if args.output: | |
| with open(args.output, "w") as f: | |
| json.dump(baseline.to_dict(), f, indent=2) | |
| print(f"Performance baseline saved to {args.output}") | |
| else: | |
| print(json.dumps(baseline.to_dict(), indent=2)) | |
| elif args.action == "status": | |
| async with monitor: | |
| summary = monitor.get_performance_summary() | |
| print(json.dumps(summary, indent=2)) | |
| elif args.action == "report": | |
| async with monitor: | |
| # Generate comprehensive performance report | |
| summary = monitor.get_performance_summary() | |
| report = { | |
| "report_type": "performance_analysis", | |
| "generated_at": datetime.now().isoformat(), | |
| "summary": summary, | |
| "recommendations": [], | |
| } | |
| if args.output: | |
| with open(args.output, "w") as f: | |
| json.dump(report, f, indent=2) | |
| print(f"Performance report saved to {args.output}") | |
| else: | |
| print(json.dumps(report, indent=2)) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |