| """ |
| Database Performance Monitor for AegisLM SaaS Backend. |
| |
| Production-ready performance monitoring with metrics collection, |
| query analysis, and performance optimization recommendations. |
| """ |
|
|
| import asyncio |
| import time |
| import json |
| from datetime import datetime, timedelta |
| from typing import List, Dict, Optional, Any, Tuple |
| from sqlalchemy import text |
| from sqlalchemy.ext.asyncio import AsyncSession |
| from sqlalchemy.pool import QueuePool |
| import logging |
| import psutil |
| from collections import defaultdict, deque |
| import statistics |
|
|
| from .database import async_engine, get_redis |
| from .config import settings |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class QueryMetrics: |
| """Query performance metrics.""" |
| |
| def __init__(self, query_hash: str, query_type: str, execution_time: float, |
| rows_returned: int, timestamp: datetime): |
| self.query_hash = query_hash |
| self.query_type = query_type |
| self.execution_time = execution_time |
| self.rows_returned = rows_returned |
| self.timestamp = timestamp |
| |
| def to_dict(self) -> Dict[str, Any]: |
| """Convert to dictionary.""" |
| return { |
| "query_hash": self.query_hash, |
| "query_type": self.query_type, |
| "execution_time": self.execution_time, |
| "rows_returned": self.rows_returned, |
| "timestamp": self.timestamp.isoformat() |
| } |
|
|
|
|
| class ConnectionPoolMetrics: |
| """Connection pool metrics.""" |
| |
| def __init__(self, pool_size: int, checked_in: int, checked_out: int, |
| overflow: int, invalid: int): |
| self.pool_size = pool_size |
| self.checked_in = checked_in |
| self.checked_out = checked_out |
| self.overflow = overflow |
| self.invalid = invalid |
| self.timestamp = datetime.utcnow() |
| |
| def to_dict(self) -> Dict[str, Any]: |
| """Convert to dictionary.""" |
| return { |
| "pool_size": self.pool_size, |
| "checked_in": self.checked_in, |
| "checked_out": self.checked_out, |
| "overflow": self.overflow, |
| "invalid": self.invalid, |
| "utilization_percent": round((self.checked_out / self.pool_size) * 100, 2), |
| "timestamp": self.timestamp.isoformat() |
| } |
|
|
|
|
| class DatabaseMetrics: |
| """Database performance metrics.""" |
| |
| def __init__(self): |
| self.connections = 0 |
| self.active_connections = 0 |
| self.idle_connections = 0 |
| self.transaction_count = 0 |
| self.rollback_count = 0 |
| self.deadlock_count = 0 |
| self.cache_hit_ratio = 0.0 |
| self.index_usage_stats = {} |
| self.table_sizes = {} |
| self.timestamp = datetime.utcnow() |
| |
| def to_dict(self) -> Dict[str, Any]: |
| """Convert to dictionary.""" |
| return { |
| "connections": self.connections, |
| "active_connections": self.active_connections, |
| "idle_connections": self.idle_connections, |
| "transaction_count": self.transaction_count, |
| "rollback_count": self.rollback_count, |
| "deadlock_count": self.deadlock_count, |
| "cache_hit_ratio": self.cache_hit_ratio, |
| "index_usage_stats": self.index_usage_stats, |
| "table_sizes": self.table_sizes, |
| "timestamp": self.timestamp.isoformat() |
| } |
|
|
|
|
| class PerformanceMonitor: |
| """Database performance monitor.""" |
| |
| def __init__(self): |
| self.redis_client = None |
| self.query_history = deque(maxlen=1000) |
| self.pool_history = deque(maxlen=100) |
| self.slow_query_threshold = getattr(settings, 'SLOW_QUERY_THRESHOLD', 1.0) |
| self.metrics_retention_hours = getattr(settings, 'METRICS_RETENTION_HOURS', 24) |
| |
| async def get_redis(self): |
| """Get Redis client.""" |
| if not self.redis_client: |
| self.redis_client = await get_redis() |
| return self.redis_client |
| |
| def _hash_query(self, query: str) -> str: |
| """Create hash of normalized query.""" |
| import hashlib |
| |
| normalized = ' '.join(query.lower().split()) |
| return hashlib.md5(normalized.encode()).hexdigest()[:16] |
| |
| async def track_query(self, query: str, execution_time: float, rows_returned: int = 0): |
| """Track query performance.""" |
| query_hash = self._hash_query(query) |
| query_type = self._extract_query_type(query) |
| |
| metrics = QueryMetrics( |
| query_hash=query_hash, |
| query_type=query_type, |
| execution_time=execution_time, |
| rows_returned=rows_returned, |
| timestamp=datetime.utcnow() |
| ) |
| |
| self.query_history.append(metrics) |
| |
| |
| try: |
| redis_client = await self.get_redis() |
| await redis_client.lpush( |
| f"queries:{query_hash}", |
| json.dumps(metrics.to_dict()) |
| ) |
| await redis_client.expire(f"queries:{query_hash}", self.metrics_retention_hours * 3600) |
| |
| |
| if execution_time > self.slow_query_threshold: |
| await redis_client.lpush( |
| "slow_queries", |
| json.dumps(metrics.to_dict()) |
| ) |
| await redis_client.expire("slow_queries", self.metrics_retention_hours * 3600) |
| |
| logger.warning(f"Slow query detected: {execution_time:.2f}s - {query[:100]}...") |
| |
| except Exception as e: |
| logger.error(f"Failed to track query: {e}") |
| |
| def _extract_query_type(self, query: str) -> str: |
| """Extract query type from SQL.""" |
| query_upper = query.strip().upper() |
| if query_upper.startswith('SELECT'): |
| return 'SELECT' |
| elif query_upper.startswith('INSERT'): |
| return 'INSERT' |
| elif query_upper.startswith('UPDATE'): |
| return 'UPDATE' |
| elif query_upper.startswith('DELETE'): |
| return 'DELETE' |
| elif query_upper.startswith('CREATE'): |
| return 'CREATE' |
| elif query_upper.startswith('DROP'): |
| return 'DROP' |
| elif query_upper.startswith('ALTER'): |
| return 'ALTER' |
| else: |
| return 'OTHER' |
| |
| async def get_detailed_connection_pool_metrics(self) -> Dict[str, Any]: |
| """Get detailed connection pool metrics with historical data.""" |
| pool = async_engine.pool |
| |
| |
| current_metrics = ConnectionPoolMetrics( |
| pool_size=pool.size(), |
| checked_in=pool.checkedin(), |
| checked_out=pool.checkedout(), |
| overflow=pool.overflow(), |
| invalid=pool.invalid() |
| ) |
| |
| |
| pool_metrics = { |
| "current": current_metrics.to_dict(), |
| "historical": { |
| "last_10_samples": [m.to_dict() for m in list(self.pool_history)[-10:]], |
| "avg_utilization": 0, |
| "max_utilization": 0, |
| "overflow_events": 0 |
| }, |
| "health_indicators": { |
| "is_healthy": True, |
| "warnings": [], |
| "errors": [] |
| }, |
| "performance_impact": { |
| "wait_time_estimate": 0, |
| "throughput_impact": "low" |
| } |
| } |
| |
| |
| if len(self.pool_history) > 1: |
| utilizations = [m.utilization_percent for m in self.pool_history] |
| pool_metrics["historical"]["avg_utilization"] = round(statistics.mean(utilizations), 2) |
| pool_metrics["historical"]["max_utilization"] = max(utilizations) |
| pool_metrics["historical"]["overflow_events"] = sum(1 for m in self.pool_history if m.overflow > 0) |
| |
| |
| if current_metrics.utilization_percent > 90: |
| pool_metrics["health_indicators"]["warnings"].append("Very high pool utilization") |
| pool_metrics["health_indicators"]["is_healthy"] = False |
| |
| if current_metrics.overflow > 0: |
| pool_metrics["health_indicators"]["warnings"].append(f"Pool overflow: {current_metrics.overflow} connections") |
| |
| if current_metrics.invalid > 0: |
| pool_metrics["health_indicators"]["errors"].append(f"Invalid connections: {current_metrics.invalid}") |
| pool_metrics["health_indicators"]["is_healthy"] = False |
| |
| |
| if current_metrics.utilization_percent > 80: |
| pool_metrics["performance_impact"]["wait_time_estimate"] = "high" |
| pool_metrics["performance_impact"]["throughput_impact"] = "medium" |
| elif current_metrics.utilization_percent > 60: |
| pool_metrics["performance_impact"]["wait_time_estimate"] = "medium" |
| pool_metrics["performance_impact"]["throughput_impact"] = "low" |
| |
| return pool_metrics |
| |
| async def get_connection_pool_analysis(self) -> Dict[str, Any]: |
| """Get comprehensive connection pool analysis.""" |
| detailed_metrics = await self.get_detailed_connection_pool_metrics() |
| |
| analysis = { |
| "summary": { |
| "current_status": "healthy" if detailed_metrics["health_indicators"]["is_healthy"] else "unhealthy", |
| "utilization_trend": "stable", |
| "recommendation": "no_action" |
| }, |
| "metrics": detailed_metrics, |
| "optimization_suggestions": [], |
| "capacity_planning": { |
| "current_capacity": detailed_metrics["current"]["pool_size"], |
| "recommended_capacity": detailed_metrics["current"]["pool_size"], |
| "scaling_needed": False |
| } |
| } |
| |
| |
| if detailed_metrics["current"]["utilization_percent"] > 80: |
| analysis["optimization_suggestions"].append("Consider increasing pool size") |
| analysis["capacity_planning"]["recommended_capacity"] = detailed_metrics["current"]["pool_size"] + 5 |
| analysis["capacity_planning"]["scaling_needed"] = True |
| |
| if detailed_metrics["historical"]["overflow_events"] > 0: |
| analysis["optimization_suggestions"].append("Pool overflow detected - increase max_overflow setting") |
| |
| if detailed_metrics["current"]["invalid"] > 0: |
| analysis["optimization_suggestions"].append("Investigate connection invalidation causes") |
| |
| |
| if len(self.pool_history) >= 5: |
| recent_utilizations = [m.utilization_percent for m in list(self.pool_history)[-5:]] |
| if all(u > 80 for u in recent_utilizations): |
| analysis["summary"]["utilization_trend"] = "increasing" |
| elif all(u < 20 for u in recent_utilizations): |
| analysis["summary"]["utilization_trend"] = "decreasing" |
| |
| |
| if analysis["optimization_suggestions"]: |
| analysis["summary"]["recommendation"] = "optimize" |
| elif detailed_metrics["current"]["utilization_percent"] > 90: |
| analysis["summary"]["recommendation"] = "urgent" |
| |
| return analysis |
| |
| async def get_connection_pool_metrics(self) -> ConnectionPoolMetrics: |
| """Get connection pool metrics.""" |
| pool = async_engine.pool |
| |
| metrics = ConnectionPoolMetrics( |
| pool_size=pool.size(), |
| checked_in=pool.checkedin(), |
| checked_out=pool.checkedout(), |
| overflow=pool.overflow(), |
| invalid=pool.invalid() |
| ) |
| |
| self.pool_history.append(metrics) |
| |
| |
| try: |
| redis_client = await self.get_redis() |
| await redis_client.lpush( |
| "pool_metrics", |
| json.dumps(metrics.to_dict()) |
| ) |
| await redis_client.expire("pool_metrics", self.metrics_retention_hours * 3600) |
| except Exception as e: |
| logger.error(f"Failed to store pool metrics: {e}") |
| |
| return metrics |
| |
| async def get_database_metrics(self) -> DatabaseMetrics: |
| """Get comprehensive database metrics.""" |
| metrics = DatabaseMetrics() |
| |
| try: |
| async with async_engine.begin() as conn: |
| |
| result = await conn.execute(text(""" |
| SELECT |
| count(*) as total_connections, |
| count(*) FILTER (WHERE state = 'active') as active_connections, |
| count(*) FILTER (WHERE state = 'idle') as idle_connections |
| FROM pg_stat_activity |
| WHERE datname = current_database() |
| """)) |
| row = result.fetchone() |
| if row: |
| metrics.connections = row.total_connections |
| metrics.active_connections = row.active_connections |
| metrics.idle_connections = row.idle_connections |
| |
| |
| result = await conn.execute(text(""" |
| SELECT |
| xact_commit as transaction_count, |
| xact_rollback as rollback_count, |
| deadlocks as deadlock_count |
| FROM pg_stat_database |
| WHERE datname = current_database() |
| """)) |
| row = result.fetchone() |
| if row: |
| metrics.transaction_count = row.transaction_count |
| metrics.rollback_count = row.rollback_count |
| metrics.deadlock_count = row.deadlock_count |
| |
| |
| result = await conn.execute(text(""" |
| SELECT |
| round(sum(heap_blks_hit) / nullif(sum(heap_blks_hit) + sum(heap_blks_read), 0) * 100, 2) as cache_hit_ratio |
| FROM pg_statio_user_tables |
| """)) |
| row = result.fetchone() |
| if row and row.cache_hit_ratio: |
| metrics.cache_hit_ratio = float(row.cache_hit_ratio) |
| |
| |
| result = await conn.execute(text(""" |
| SELECT |
| schemaname, |
| tablename, |
| indexname, |
| idx_tup_read, |
| idx_tup_fetch |
| FROM pg_stat_user_indexes |
| WHERE idx_tup_read > 0 |
| ORDER BY idx_tup_read DESC |
| LIMIT 20 |
| """)) |
| |
| for row in result.fetchall(): |
| key = f"{row.schemaname}.{row.tablename}.{row.indexname}" |
| metrics.index_usage_stats[key] = { |
| "tuples_read": row.idx_tup_read, |
| "tuples_fetched": row.idx_tup_fetch |
| } |
| |
| |
| result = await conn.execute(text(""" |
| SELECT |
| schemaname, |
| tablename, |
| pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size, |
| pg_total_relation_size(schemaname||'.'||tablename) as size_bytes |
| FROM pg_tables |
| WHERE schemaname = 'public' |
| ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC |
| """)) |
| |
| for row in result.fetchall(): |
| key = f"{row.schemaname}.{row.tablename}" |
| metrics.table_sizes[key] = { |
| "size_pretty": row.size, |
| "size_bytes": row.size_bytes |
| } |
| |
| except Exception as e: |
| logger.error(f"Failed to collect database metrics: {e}") |
| |
| return metrics |
| |
| async def get_slow_queries(self, limit: int = 50) -> List[Dict[str, Any]]: |
| """Get recent slow queries.""" |
| try: |
| redis_client = await self.get_redis() |
| slow_queries = await redis_client.lrange("slow_queries", 0, limit - 1) |
| |
| return [json.loads(query) for query in slow_queries] |
| |
| except Exception as e: |
| logger.error(f"Failed to get slow queries: {e}") |
| return [] |
| |
| async def get_query_performance_stats(self, query_hash: str) -> Dict[str, Any]: |
| """Get performance statistics for a specific query.""" |
| try: |
| redis_client = await self.get_redis() |
| query_data = await redis_client.lrange(f"queries:{query_hash}", 0, -1) |
| |
| if not query_data: |
| return {} |
| |
| queries = [json.loads(q) for q in query_data] |
| execution_times = [q['execution_time'] for q in queries] |
| |
| return { |
| "query_hash": query_hash, |
| "query_type": queries[0]['query_type'], |
| "total_executions": len(queries), |
| "avg_execution_time": statistics.mean(execution_times), |
| "min_execution_time": min(execution_times), |
| "max_execution_time": max(execution_times), |
| "median_execution_time": statistics.median(execution_times), |
| "total_rows_returned": sum(q['rows_returned'] for q in queries), |
| "first_seen": min(q['timestamp'] for q in queries), |
| "last_seen": max(q['timestamp'] for q in queries) |
| } |
| |
| except Exception as e: |
| logger.error(f"Failed to get query performance stats: {e}") |
| return {} |
| |
| async def get_top_slow_queries(self, limit: int = 10) -> List[Dict[str, Any]]: |
| """Get top slow queries by average execution time.""" |
| try: |
| redis_client = await self.get_redis() |
| |
| |
| query_keys = [] |
| cursor = 0 |
| while True: |
| cursor, keys = await redis_client.scan(cursor, match="queries:*", count=100) |
| query_keys.extend(keys) |
| if cursor == 0: |
| break |
| |
| |
| query_stats = [] |
| for key in query_keys: |
| query_hash = key.split(':')[1] |
| stats = await self.get_query_performance_stats(query_hash) |
| if stats and stats['total_executions'] >= 5: |
| query_stats.append(stats) |
| |
| |
| query_stats.sort(key=lambda x: x['avg_execution_time'], reverse=True) |
| |
| return query_stats[:limit] |
| |
| except Exception as e: |
| logger.error(f"Failed to get top slow queries: {e}") |
| return [] |
| |
| async def get_performance_summary(self) -> Dict[str, Any]: |
| """Get performance summary dashboard.""" |
| pool_metrics = await self.get_connection_pool_metrics() |
| db_metrics = await self.get_database_metrics() |
| slow_queries = await self.get_slow_queries(10) |
| top_slow = await self.get_top_slow_queries(5) |
| |
| |
| recent_queries = list(self.query_history)[-100:] |
| if recent_queries: |
| execution_times = [q.execution_time for q in recent_queries] |
| avg_time = statistics.mean(execution_times) |
| slow_count = len([q for q in recent_queries if q.execution_time > self.slow_query_threshold]) |
| else: |
| avg_time = 0 |
| slow_count = 0 |
| |
| return { |
| "timestamp": datetime.utcnow().isoformat(), |
| "connection_pool": pool_metrics.to_dict(), |
| "database": db_metrics.to_dict(), |
| "query_performance": { |
| "recent_queries_count": len(recent_queries), |
| "avg_execution_time": round(avg_time, 4), |
| "slow_queries_count": slow_count, |
| "slow_query_threshold": self.slow_query_threshold |
| }, |
| "recent_slow_queries": slow_queries[:5], |
| "top_slow_queries": top_slow, |
| "alerts": await self._generate_alerts(pool_metrics, db_metrics) |
| } |
| |
| async def _generate_alerts(self, pool_metrics: ConnectionPoolMetrics, |
| db_metrics: DatabaseMetrics) -> List[Dict[str, Any]]: |
| """Generate performance alerts.""" |
| alerts = [] |
| |
| |
| if pool_metrics.utilization_percent > 80: |
| alerts.append({ |
| "type": "connection_pool", |
| "severity": "warning", |
| "message": f"High connection pool utilization: {pool_metrics.utilization_percent}%", |
| "timestamp": datetime.utcnow().isoformat() |
| }) |
| |
| if pool_metrics.overflow > 0: |
| alerts.append({ |
| "type": "connection_pool", |
| "severity": "warning", |
| "message": f"Connection pool overflow detected: {pool_metrics.overflow} connections", |
| "timestamp": datetime.utcnow().isoformat() |
| }) |
| |
| |
| if db_metrics.cache_hit_ratio < 90: |
| alerts.append({ |
| "type": "database", |
| "severity": "warning", |
| "message": f"Low cache hit ratio: {db_metrics.cache_hit_ratio}%", |
| "timestamp": datetime.utcnow().isoformat() |
| }) |
| |
| if db_metrics.deadlock_count > 0: |
| alerts.append({ |
| "type": "database", |
| "severity": "error", |
| "message": f"Deadlocks detected: {db_metrics.deadlock_count}", |
| "timestamp": datetime.utcnow().isoformat() |
| }) |
| |
| |
| recent_slow = len([q for q in list(self.query_history)[-10:] if q.execution_time > self.slow_query_threshold]) |
| if recent_slow >= 3: |
| alerts.append({ |
| "type": "queries", |
| "severity": "warning", |
| "message": f"Multiple slow queries detected: {recent_slow} in last 10 queries", |
| "timestamp": datetime.utcnow().isoformat() |
| }) |
| |
| return alerts |
| |
| async def cleanup_old_metrics(self): |
| """Clean up old metrics data.""" |
| try: |
| redis_client = await self.get_redis() |
| |
| |
| cursor = 0 |
| while True: |
| cursor, keys = await redis_client.scan(cursor, match="queries:*", count=100) |
| for key in keys: |
| ttl = await redis_client.ttl(key) |
| if ttl == -1: |
| await redis_client.expire(key, self.metrics_retention_hours * 3600) |
| |
| if cursor == 0: |
| break |
| |
| logger.info("Metrics cleanup completed") |
| |
| except Exception as e: |
| logger.error(f"Failed to cleanup old metrics: {e}") |
|
|
|
|
| |
| performance_monitor = PerformanceMonitor() |
|
|
|
|
| |
| class QueryPerformanceTracker: |
| """Context manager for tracking query performance.""" |
| |
| def __init__(self, query: str): |
| self.query = query |
| self.start_time = None |
| |
| async def __aenter__(self): |
| self.start_time = time.time() |
| return self |
| |
| async def __aexit__(self, exc_type, exc_val, exc_tb): |
| if self.start_time: |
| execution_time = time.time() - self.start_time |
| await performance_monitor.track_query(self.query, execution_time) |
|
|
|
|
| |
| async def performance_monitoring_task(): |
| """Run scheduled performance monitoring.""" |
| logger.info("Starting performance monitoring...") |
| |
| try: |
| |
| summary = await performance_monitor.get_performance_summary() |
| |
| |
| redis_client = await performance_monitor.get_redis() |
| await redis_client.setex( |
| "performance_summary", |
| 3600, |
| json.dumps(summary, default=str) |
| ) |
| |
| |
| if summary["alerts"]: |
| logger.warning(f"Performance alerts: {len(summary['alerts'])} issues detected") |
| for alert in summary["alerts"]: |
| logger.warning(f"Alert: {alert['message']}") |
| |
| |
| await performance_monitor.cleanup_old_metrics() |
| |
| logger.info("Performance monitoring completed") |
| |
| except Exception as e: |
| logger.error(f"Performance monitoring failed: {e}") |
|
|
|
|
| |
| async def performance_health_check() -> Dict[str, Any]: |
| """Check performance monitoring health.""" |
| try: |
| |
| async with QueryPerformanceTracker("SELECT 1 as test"): |
| async with async_engine.begin() as conn: |
| await conn.execute(text("SELECT 1")) |
| |
| |
| pool_metrics = await performance_monitor.get_connection_pool_metrics() |
| |
| return { |
| "healthy": True, |
| "query_tracking": "working", |
| "connection_pool": pool_metrics.to_dict() |
| } |
| |
| except Exception as e: |
| return { |
| "healthy": False, |
| "error": str(e) |
| } |
|
|
|
|
| if __name__ == "__main__": |
| import sys |
| |
| async def main(): |
| command = sys.argv[1] if len(sys.argv) > 1 else "help" |
| |
| if command == "summary": |
| summary = await performance_monitor.get_performance_summary() |
| print(json.dumps(summary, indent=2, default=str)) |
| |
| elif command == "slow-queries": |
| slow_queries = await performance_monitor.get_slow_queries(20) |
| print(f"Recent slow queries: {len(slow_queries)}") |
| for query in slow_queries: |
| print(f" - {query['execution_time']:.2f}s: {query['query_type']}") |
| |
| elif command == "top-slow": |
| top_slow = await performance_monitor.get_top_slow_queries(10) |
| print("Top slow queries by average execution time:") |
| for query in top_slow: |
| print(f" - {query['avg_execution_time']:.2f}s avg: {query['query_type']} ({query['total_executions']} executions)") |
| |
| elif command == "health": |
| health = await performance_health_check() |
| if health["healthy"]: |
| print("✅ Performance monitoring is healthy") |
| else: |
| print("❌ Performance monitoring health check failed") |
| print(f"Error: {health.get('error', 'Unknown error')}") |
| |
| else: |
| print("Usage: python performance_monitor.py <command>") |
| print("Commands: summary, slow-queries, top-slow, health") |
| |
| asyncio.run(main()) |
|
|