""" Database Performance Monitor for AegisLM SaaS Backend. Production-ready performance monitoring with metrics collection, query analysis, and performance optimization recommendations. """ import asyncio import time import json from datetime import datetime, timedelta from typing import List, Dict, Optional, Any, Tuple from sqlalchemy import text from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.pool import QueuePool import logging import psutil from collections import defaultdict, deque import statistics from .database import async_engine, get_redis from .config import settings logger = logging.getLogger(__name__) class QueryMetrics: """Query performance metrics.""" def __init__(self, query_hash: str, query_type: str, execution_time: float, rows_returned: int, timestamp: datetime): self.query_hash = query_hash self.query_type = query_type self.execution_time = execution_time self.rows_returned = rows_returned self.timestamp = timestamp def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "query_hash": self.query_hash, "query_type": self.query_type, "execution_time": self.execution_time, "rows_returned": self.rows_returned, "timestamp": self.timestamp.isoformat() } class ConnectionPoolMetrics: """Connection pool metrics.""" def __init__(self, pool_size: int, checked_in: int, checked_out: int, overflow: int, invalid: int): self.pool_size = pool_size self.checked_in = checked_in self.checked_out = checked_out self.overflow = overflow self.invalid = invalid self.timestamp = datetime.utcnow() def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "pool_size": self.pool_size, "checked_in": self.checked_in, "checked_out": self.checked_out, "overflow": self.overflow, "invalid": self.invalid, "utilization_percent": round((self.checked_out / self.pool_size) * 100, 2), "timestamp": self.timestamp.isoformat() } class DatabaseMetrics: """Database performance metrics.""" def __init__(self): self.connections = 0 self.active_connections = 0 self.idle_connections = 0 self.transaction_count = 0 self.rollback_count = 0 self.deadlock_count = 0 self.cache_hit_ratio = 0.0 self.index_usage_stats = {} self.table_sizes = {} self.timestamp = datetime.utcnow() def to_dict(self) -> Dict[str, Any]: """Convert to dictionary.""" return { "connections": self.connections, "active_connections": self.active_connections, "idle_connections": self.idle_connections, "transaction_count": self.transaction_count, "rollback_count": self.rollback_count, "deadlock_count": self.deadlock_count, "cache_hit_ratio": self.cache_hit_ratio, "index_usage_stats": self.index_usage_stats, "table_sizes": self.table_sizes, "timestamp": self.timestamp.isoformat() } class PerformanceMonitor: """Database performance monitor.""" def __init__(self): self.redis_client = None self.query_history = deque(maxlen=1000) # Last 1000 queries self.pool_history = deque(maxlen=100) # Last 100 pool metrics self.slow_query_threshold = getattr(settings, 'SLOW_QUERY_THRESHOLD', 1.0) # seconds self.metrics_retention_hours = getattr(settings, 'METRICS_RETENTION_HOURS', 24) async def get_redis(self): """Get Redis client.""" if not self.redis_client: self.redis_client = await get_redis() return self.redis_client def _hash_query(self, query: str) -> str: """Create hash of normalized query.""" import hashlib # Simple normalization - remove extra whitespace and convert to lowercase normalized = ' '.join(query.lower().split()) return hashlib.md5(normalized.encode()).hexdigest()[:16] async def track_query(self, query: str, execution_time: float, rows_returned: int = 0): """Track query performance.""" query_hash = self._hash_query(query) query_type = self._extract_query_type(query) metrics = QueryMetrics( query_hash=query_hash, query_type=query_type, execution_time=execution_time, rows_returned=rows_returned, timestamp=datetime.utcnow() ) self.query_history.append(metrics) # Store in Redis for analytics try: redis_client = await self.get_redis() await redis_client.lpush( f"queries:{query_hash}", json.dumps(metrics.to_dict()) ) await redis_client.expire(f"queries:{query_hash}", self.metrics_retention_hours * 3600) # Alert on slow queries if execution_time > self.slow_query_threshold: await redis_client.lpush( "slow_queries", json.dumps(metrics.to_dict()) ) await redis_client.expire("slow_queries", self.metrics_retention_hours * 3600) logger.warning(f"Slow query detected: {execution_time:.2f}s - {query[:100]}...") except Exception as e: logger.error(f"Failed to track query: {e}") def _extract_query_type(self, query: str) -> str: """Extract query type from SQL.""" query_upper = query.strip().upper() if query_upper.startswith('SELECT'): return 'SELECT' elif query_upper.startswith('INSERT'): return 'INSERT' elif query_upper.startswith('UPDATE'): return 'UPDATE' elif query_upper.startswith('DELETE'): return 'DELETE' elif query_upper.startswith('CREATE'): return 'CREATE' elif query_upper.startswith('DROP'): return 'DROP' elif query_upper.startswith('ALTER'): return 'ALTER' else: return 'OTHER' async def get_detailed_connection_pool_metrics(self) -> Dict[str, Any]: """Get detailed connection pool metrics with historical data.""" pool = async_engine.pool # Current pool state current_metrics = ConnectionPoolMetrics( pool_size=pool.size(), checked_in=pool.checkedin(), checked_out=pool.checkedout(), overflow=pool.overflow(), invalid=pool.invalid() ) # Calculate additional metrics pool_metrics = { "current": current_metrics.to_dict(), "historical": { "last_10_samples": [m.to_dict() for m in list(self.pool_history)[-10:]], "avg_utilization": 0, "max_utilization": 0, "overflow_events": 0 }, "health_indicators": { "is_healthy": True, "warnings": [], "errors": [] }, "performance_impact": { "wait_time_estimate": 0, "throughput_impact": "low" } } # Calculate historical metrics if len(self.pool_history) > 1: utilizations = [m.utilization_percent for m in self.pool_history] pool_metrics["historical"]["avg_utilization"] = round(statistics.mean(utilizations), 2) pool_metrics["historical"]["max_utilization"] = max(utilizations) pool_metrics["historical"]["overflow_events"] = sum(1 for m in self.pool_history if m.overflow > 0) # Health indicators if current_metrics.utilization_percent > 90: pool_metrics["health_indicators"]["warnings"].append("Very high pool utilization") pool_metrics["health_indicators"]["is_healthy"] = False if current_metrics.overflow > 0: pool_metrics["health_indicators"]["warnings"].append(f"Pool overflow: {current_metrics.overflow} connections") if current_metrics.invalid > 0: pool_metrics["health_indicators"]["errors"].append(f"Invalid connections: {current_metrics.invalid}") pool_metrics["health_indicators"]["is_healthy"] = False # Performance impact estimation if current_metrics.utilization_percent > 80: pool_metrics["performance_impact"]["wait_time_estimate"] = "high" pool_metrics["performance_impact"]["throughput_impact"] = "medium" elif current_metrics.utilization_percent > 60: pool_metrics["performance_impact"]["wait_time_estimate"] = "medium" pool_metrics["performance_impact"]["throughput_impact"] = "low" return pool_metrics async def get_connection_pool_analysis(self) -> Dict[str, Any]: """Get comprehensive connection pool analysis.""" detailed_metrics = await self.get_detailed_connection_pool_metrics() analysis = { "summary": { "current_status": "healthy" if detailed_metrics["health_indicators"]["is_healthy"] else "unhealthy", "utilization_trend": "stable", "recommendation": "no_action" }, "metrics": detailed_metrics, "optimization_suggestions": [], "capacity_planning": { "current_capacity": detailed_metrics["current"]["pool_size"], "recommended_capacity": detailed_metrics["current"]["pool_size"], "scaling_needed": False } } # Generate optimization suggestions if detailed_metrics["current"]["utilization_percent"] > 80: analysis["optimization_suggestions"].append("Consider increasing pool size") analysis["capacity_planning"]["recommended_capacity"] = detailed_metrics["current"]["pool_size"] + 5 analysis["capacity_planning"]["scaling_needed"] = True if detailed_metrics["historical"]["overflow_events"] > 0: analysis["optimization_suggestions"].append("Pool overflow detected - increase max_overflow setting") if detailed_metrics["current"]["invalid"] > 0: analysis["optimization_suggestions"].append("Investigate connection invalidation causes") # Determine utilization trend if len(self.pool_history) >= 5: recent_utilizations = [m.utilization_percent for m in list(self.pool_history)[-5:]] if all(u > 80 for u in recent_utilizations): analysis["summary"]["utilization_trend"] = "increasing" elif all(u < 20 for u in recent_utilizations): analysis["summary"]["utilization_trend"] = "decreasing" # Set recommendation if analysis["optimization_suggestions"]: analysis["summary"]["recommendation"] = "optimize" elif detailed_metrics["current"]["utilization_percent"] > 90: analysis["summary"]["recommendation"] = "urgent" return analysis async def get_connection_pool_metrics(self) -> ConnectionPoolMetrics: """Get connection pool metrics.""" pool = async_engine.pool metrics = ConnectionPoolMetrics( pool_size=pool.size(), checked_in=pool.checkedin(), checked_out=pool.checkedout(), overflow=pool.overflow(), invalid=pool.invalid() ) self.pool_history.append(metrics) # Store in Redis try: redis_client = await self.get_redis() await redis_client.lpush( "pool_metrics", json.dumps(metrics.to_dict()) ) await redis_client.expire("pool_metrics", self.metrics_retention_hours * 3600) except Exception as e: logger.error(f"Failed to store pool metrics: {e}") return metrics async def get_database_metrics(self) -> DatabaseMetrics: """Get comprehensive database metrics.""" metrics = DatabaseMetrics() try: async with async_engine.begin() as conn: # Connection statistics result = await conn.execute(text(""" SELECT count(*) as total_connections, count(*) FILTER (WHERE state = 'active') as active_connections, count(*) FILTER (WHERE state = 'idle') as idle_connections FROM pg_stat_activity WHERE datname = current_database() """)) row = result.fetchone() if row: metrics.connections = row.total_connections metrics.active_connections = row.active_connections metrics.idle_connections = row.idle_connections # Transaction statistics result = await conn.execute(text(""" SELECT xact_commit as transaction_count, xact_rollback as rollback_count, deadlocks as deadlock_count FROM pg_stat_database WHERE datname = current_database() """)) row = result.fetchone() if row: metrics.transaction_count = row.transaction_count metrics.rollback_count = row.rollback_count metrics.deadlock_count = row.deadlock_count # Cache hit ratio result = await conn.execute(text(""" SELECT round(sum(heap_blks_hit) / nullif(sum(heap_blks_hit) + sum(heap_blks_read), 0) * 100, 2) as cache_hit_ratio FROM pg_statio_user_tables """)) row = result.fetchone() if row and row.cache_hit_ratio: metrics.cache_hit_ratio = float(row.cache_hit_ratio) # Index usage statistics result = await conn.execute(text(""" SELECT schemaname, tablename, indexname, idx_tup_read, idx_tup_fetch FROM pg_stat_user_indexes WHERE idx_tup_read > 0 ORDER BY idx_tup_read DESC LIMIT 20 """)) for row in result.fetchall(): key = f"{row.schemaname}.{row.tablename}.{row.indexname}" metrics.index_usage_stats[key] = { "tuples_read": row.idx_tup_read, "tuples_fetched": row.idx_tup_fetch } # Table sizes result = await conn.execute(text(""" SELECT schemaname, tablename, pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size, pg_total_relation_size(schemaname||'.'||tablename) as size_bytes FROM pg_tables WHERE schemaname = 'public' ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC """)) for row in result.fetchall(): key = f"{row.schemaname}.{row.tablename}" metrics.table_sizes[key] = { "size_pretty": row.size, "size_bytes": row.size_bytes } except Exception as e: logger.error(f"Failed to collect database metrics: {e}") return metrics async def get_slow_queries(self, limit: int = 50) -> List[Dict[str, Any]]: """Get recent slow queries.""" try: redis_client = await self.get_redis() slow_queries = await redis_client.lrange("slow_queries", 0, limit - 1) return [json.loads(query) for query in slow_queries] except Exception as e: logger.error(f"Failed to get slow queries: {e}") return [] async def get_query_performance_stats(self, query_hash: str) -> Dict[str, Any]: """Get performance statistics for a specific query.""" try: redis_client = await self.get_redis() query_data = await redis_client.lrange(f"queries:{query_hash}", 0, -1) if not query_data: return {} queries = [json.loads(q) for q in query_data] execution_times = [q['execution_time'] for q in queries] return { "query_hash": query_hash, "query_type": queries[0]['query_type'], "total_executions": len(queries), "avg_execution_time": statistics.mean(execution_times), "min_execution_time": min(execution_times), "max_execution_time": max(execution_times), "median_execution_time": statistics.median(execution_times), "total_rows_returned": sum(q['rows_returned'] for q in queries), "first_seen": min(q['timestamp'] for q in queries), "last_seen": max(q['timestamp'] for q in queries) } except Exception as e: logger.error(f"Failed to get query performance stats: {e}") return {} async def get_top_slow_queries(self, limit: int = 10) -> List[Dict[str, Any]]: """Get top slow queries by average execution time.""" try: redis_client = await self.get_redis() # Get all query keys query_keys = [] cursor = 0 while True: cursor, keys = await redis_client.scan(cursor, match="queries:*", count=100) query_keys.extend(keys) if cursor == 0: break # Get stats for each query query_stats = [] for key in query_keys: query_hash = key.split(':')[1] stats = await self.get_query_performance_stats(query_hash) if stats and stats['total_executions'] >= 5: # Only consider queries with sufficient data query_stats.append(stats) # Sort by average execution time query_stats.sort(key=lambda x: x['avg_execution_time'], reverse=True) return query_stats[:limit] except Exception as e: logger.error(f"Failed to get top slow queries: {e}") return [] async def get_performance_summary(self) -> Dict[str, Any]: """Get performance summary dashboard.""" pool_metrics = await self.get_connection_pool_metrics() db_metrics = await self.get_database_metrics() slow_queries = await self.get_slow_queries(10) top_slow = await self.get_top_slow_queries(5) # Calculate query statistics from recent history recent_queries = list(self.query_history)[-100:] # Last 100 queries if recent_queries: execution_times = [q.execution_time for q in recent_queries] avg_time = statistics.mean(execution_times) slow_count = len([q for q in recent_queries if q.execution_time > self.slow_query_threshold]) else: avg_time = 0 slow_count = 0 return { "timestamp": datetime.utcnow().isoformat(), "connection_pool": pool_metrics.to_dict(), "database": db_metrics.to_dict(), "query_performance": { "recent_queries_count": len(recent_queries), "avg_execution_time": round(avg_time, 4), "slow_queries_count": slow_count, "slow_query_threshold": self.slow_query_threshold }, "recent_slow_queries": slow_queries[:5], "top_slow_queries": top_slow, "alerts": await self._generate_alerts(pool_metrics, db_metrics) } async def _generate_alerts(self, pool_metrics: ConnectionPoolMetrics, db_metrics: DatabaseMetrics) -> List[Dict[str, Any]]: """Generate performance alerts.""" alerts = [] # Connection pool alerts if pool_metrics.utilization_percent > 80: alerts.append({ "type": "connection_pool", "severity": "warning", "message": f"High connection pool utilization: {pool_metrics.utilization_percent}%", "timestamp": datetime.utcnow().isoformat() }) if pool_metrics.overflow > 0: alerts.append({ "type": "connection_pool", "severity": "warning", "message": f"Connection pool overflow detected: {pool_metrics.overflow} connections", "timestamp": datetime.utcnow().isoformat() }) # Database performance alerts if db_metrics.cache_hit_ratio < 90: alerts.append({ "type": "database", "severity": "warning", "message": f"Low cache hit ratio: {db_metrics.cache_hit_ratio}%", "timestamp": datetime.utcnow().isoformat() }) if db_metrics.deadlock_count > 0: alerts.append({ "type": "database", "severity": "error", "message": f"Deadlocks detected: {db_metrics.deadlock_count}", "timestamp": datetime.utcnow().isoformat() }) # Recent slow queries alert recent_slow = len([q for q in list(self.query_history)[-10:] if q.execution_time > self.slow_query_threshold]) if recent_slow >= 3: alerts.append({ "type": "queries", "severity": "warning", "message": f"Multiple slow queries detected: {recent_slow} in last 10 queries", "timestamp": datetime.utcnow().isoformat() }) return alerts async def cleanup_old_metrics(self): """Clean up old metrics data.""" try: redis_client = await self.get_redis() # Clean up old query metrics cursor = 0 while True: cursor, keys = await redis_client.scan(cursor, match="queries:*", count=100) for key in keys: ttl = await redis_client.ttl(key) if ttl == -1: # No expiration set await redis_client.expire(key, self.metrics_retention_hours * 3600) if cursor == 0: break logger.info("Metrics cleanup completed") except Exception as e: logger.error(f"Failed to cleanup old metrics: {e}") # Global performance monitor instance performance_monitor = PerformanceMonitor() # Query performance tracking middleware class QueryPerformanceTracker: """Context manager for tracking query performance.""" def __init__(self, query: str): self.query = query self.start_time = None async def __aenter__(self): self.start_time = time.time() return self async def __aexit__(self, exc_type, exc_val, exc_tb): if self.start_time: execution_time = time.time() - self.start_time await performance_monitor.track_query(self.query, execution_time) # Scheduled performance monitoring task async def performance_monitoring_task(): """Run scheduled performance monitoring.""" logger.info("Starting performance monitoring...") try: # Collect metrics summary = await performance_monitor.get_performance_summary() # Store summary in Redis redis_client = await performance_monitor.get_redis() await redis_client.setex( "performance_summary", 3600, # 1 hour json.dumps(summary, default=str) ) # Check for alerts if summary["alerts"]: logger.warning(f"Performance alerts: {len(summary['alerts'])} issues detected") for alert in summary["alerts"]: logger.warning(f"Alert: {alert['message']}") # Cleanup old metrics await performance_monitor.cleanup_old_metrics() logger.info("Performance monitoring completed") except Exception as e: logger.error(f"Performance monitoring failed: {e}") # Health check for performance monitoring async def performance_health_check() -> Dict[str, Any]: """Check performance monitoring health.""" try: # Test query tracking async with QueryPerformanceTracker("SELECT 1 as test"): async with async_engine.begin() as conn: await conn.execute(text("SELECT 1")) # Get basic metrics pool_metrics = await performance_monitor.get_connection_pool_metrics() return { "healthy": True, "query_tracking": "working", "connection_pool": pool_metrics.to_dict() } except Exception as e: return { "healthy": False, "error": str(e) } if __name__ == "__main__": import sys async def main(): command = sys.argv[1] if len(sys.argv) > 1 else "help" if command == "summary": summary = await performance_monitor.get_performance_summary() print(json.dumps(summary, indent=2, default=str)) elif command == "slow-queries": slow_queries = await performance_monitor.get_slow_queries(20) print(f"Recent slow queries: {len(slow_queries)}") for query in slow_queries: print(f" - {query['execution_time']:.2f}s: {query['query_type']}") elif command == "top-slow": top_slow = await performance_monitor.get_top_slow_queries(10) print("Top slow queries by average execution time:") for query in top_slow: print(f" - {query['avg_execution_time']:.2f}s avg: {query['query_type']} ({query['total_executions']} executions)") elif command == "health": health = await performance_health_check() if health["healthy"]: print("✅ Performance monitoring is healthy") else: print("❌ Performance monitoring health check failed") print(f"Error: {health.get('error', 'Unknown error')}") else: print("Usage: python performance_monitor.py ") print("Commands: summary, slow-queries, top-slow, health") asyncio.run(main())