ALM-2 / backend /core /performance_monitor.py
ACA050's picture
Upload 520 files
2ed8996 verified
"""
Database Performance Monitor for AegisLM SaaS Backend.
Production-ready performance monitoring with metrics collection,
query analysis, and performance optimization recommendations.
"""
import asyncio
import time
import json
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Any, Tuple
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.pool import QueuePool
import logging
import psutil
from collections import defaultdict, deque
import statistics
from .database import async_engine, get_redis
from .config import settings
logger = logging.getLogger(__name__)
class QueryMetrics:
"""Query performance metrics."""
def __init__(self, query_hash: str, query_type: str, execution_time: float,
rows_returned: int, timestamp: datetime):
self.query_hash = query_hash
self.query_type = query_type
self.execution_time = execution_time
self.rows_returned = rows_returned
self.timestamp = timestamp
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"query_hash": self.query_hash,
"query_type": self.query_type,
"execution_time": self.execution_time,
"rows_returned": self.rows_returned,
"timestamp": self.timestamp.isoformat()
}
class ConnectionPoolMetrics:
"""Connection pool metrics."""
def __init__(self, pool_size: int, checked_in: int, checked_out: int,
overflow: int, invalid: int):
self.pool_size = pool_size
self.checked_in = checked_in
self.checked_out = checked_out
self.overflow = overflow
self.invalid = invalid
self.timestamp = datetime.utcnow()
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"pool_size": self.pool_size,
"checked_in": self.checked_in,
"checked_out": self.checked_out,
"overflow": self.overflow,
"invalid": self.invalid,
"utilization_percent": round((self.checked_out / self.pool_size) * 100, 2),
"timestamp": self.timestamp.isoformat()
}
class DatabaseMetrics:
"""Database performance metrics."""
def __init__(self):
self.connections = 0
self.active_connections = 0
self.idle_connections = 0
self.transaction_count = 0
self.rollback_count = 0
self.deadlock_count = 0
self.cache_hit_ratio = 0.0
self.index_usage_stats = {}
self.table_sizes = {}
self.timestamp = datetime.utcnow()
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary."""
return {
"connections": self.connections,
"active_connections": self.active_connections,
"idle_connections": self.idle_connections,
"transaction_count": self.transaction_count,
"rollback_count": self.rollback_count,
"deadlock_count": self.deadlock_count,
"cache_hit_ratio": self.cache_hit_ratio,
"index_usage_stats": self.index_usage_stats,
"table_sizes": self.table_sizes,
"timestamp": self.timestamp.isoformat()
}
class PerformanceMonitor:
"""Database performance monitor."""
def __init__(self):
self.redis_client = None
self.query_history = deque(maxlen=1000) # Last 1000 queries
self.pool_history = deque(maxlen=100) # Last 100 pool metrics
self.slow_query_threshold = getattr(settings, 'SLOW_QUERY_THRESHOLD', 1.0) # seconds
self.metrics_retention_hours = getattr(settings, 'METRICS_RETENTION_HOURS', 24)
async def get_redis(self):
"""Get Redis client."""
if not self.redis_client:
self.redis_client = await get_redis()
return self.redis_client
def _hash_query(self, query: str) -> str:
"""Create hash of normalized query."""
import hashlib
# Simple normalization - remove extra whitespace and convert to lowercase
normalized = ' '.join(query.lower().split())
return hashlib.md5(normalized.encode()).hexdigest()[:16]
async def track_query(self, query: str, execution_time: float, rows_returned: int = 0):
"""Track query performance."""
query_hash = self._hash_query(query)
query_type = self._extract_query_type(query)
metrics = QueryMetrics(
query_hash=query_hash,
query_type=query_type,
execution_time=execution_time,
rows_returned=rows_returned,
timestamp=datetime.utcnow()
)
self.query_history.append(metrics)
# Store in Redis for analytics
try:
redis_client = await self.get_redis()
await redis_client.lpush(
f"queries:{query_hash}",
json.dumps(metrics.to_dict())
)
await redis_client.expire(f"queries:{query_hash}", self.metrics_retention_hours * 3600)
# Alert on slow queries
if execution_time > self.slow_query_threshold:
await redis_client.lpush(
"slow_queries",
json.dumps(metrics.to_dict())
)
await redis_client.expire("slow_queries", self.metrics_retention_hours * 3600)
logger.warning(f"Slow query detected: {execution_time:.2f}s - {query[:100]}...")
except Exception as e:
logger.error(f"Failed to track query: {e}")
def _extract_query_type(self, query: str) -> str:
"""Extract query type from SQL."""
query_upper = query.strip().upper()
if query_upper.startswith('SELECT'):
return 'SELECT'
elif query_upper.startswith('INSERT'):
return 'INSERT'
elif query_upper.startswith('UPDATE'):
return 'UPDATE'
elif query_upper.startswith('DELETE'):
return 'DELETE'
elif query_upper.startswith('CREATE'):
return 'CREATE'
elif query_upper.startswith('DROP'):
return 'DROP'
elif query_upper.startswith('ALTER'):
return 'ALTER'
else:
return 'OTHER'
async def get_detailed_connection_pool_metrics(self) -> Dict[str, Any]:
"""Get detailed connection pool metrics with historical data."""
pool = async_engine.pool
# Current pool state
current_metrics = ConnectionPoolMetrics(
pool_size=pool.size(),
checked_in=pool.checkedin(),
checked_out=pool.checkedout(),
overflow=pool.overflow(),
invalid=pool.invalid()
)
# Calculate additional metrics
pool_metrics = {
"current": current_metrics.to_dict(),
"historical": {
"last_10_samples": [m.to_dict() for m in list(self.pool_history)[-10:]],
"avg_utilization": 0,
"max_utilization": 0,
"overflow_events": 0
},
"health_indicators": {
"is_healthy": True,
"warnings": [],
"errors": []
},
"performance_impact": {
"wait_time_estimate": 0,
"throughput_impact": "low"
}
}
# Calculate historical metrics
if len(self.pool_history) > 1:
utilizations = [m.utilization_percent for m in self.pool_history]
pool_metrics["historical"]["avg_utilization"] = round(statistics.mean(utilizations), 2)
pool_metrics["historical"]["max_utilization"] = max(utilizations)
pool_metrics["historical"]["overflow_events"] = sum(1 for m in self.pool_history if m.overflow > 0)
# Health indicators
if current_metrics.utilization_percent > 90:
pool_metrics["health_indicators"]["warnings"].append("Very high pool utilization")
pool_metrics["health_indicators"]["is_healthy"] = False
if current_metrics.overflow > 0:
pool_metrics["health_indicators"]["warnings"].append(f"Pool overflow: {current_metrics.overflow} connections")
if current_metrics.invalid > 0:
pool_metrics["health_indicators"]["errors"].append(f"Invalid connections: {current_metrics.invalid}")
pool_metrics["health_indicators"]["is_healthy"] = False
# Performance impact estimation
if current_metrics.utilization_percent > 80:
pool_metrics["performance_impact"]["wait_time_estimate"] = "high"
pool_metrics["performance_impact"]["throughput_impact"] = "medium"
elif current_metrics.utilization_percent > 60:
pool_metrics["performance_impact"]["wait_time_estimate"] = "medium"
pool_metrics["performance_impact"]["throughput_impact"] = "low"
return pool_metrics
async def get_connection_pool_analysis(self) -> Dict[str, Any]:
"""Get comprehensive connection pool analysis."""
detailed_metrics = await self.get_detailed_connection_pool_metrics()
analysis = {
"summary": {
"current_status": "healthy" if detailed_metrics["health_indicators"]["is_healthy"] else "unhealthy",
"utilization_trend": "stable",
"recommendation": "no_action"
},
"metrics": detailed_metrics,
"optimization_suggestions": [],
"capacity_planning": {
"current_capacity": detailed_metrics["current"]["pool_size"],
"recommended_capacity": detailed_metrics["current"]["pool_size"],
"scaling_needed": False
}
}
# Generate optimization suggestions
if detailed_metrics["current"]["utilization_percent"] > 80:
analysis["optimization_suggestions"].append("Consider increasing pool size")
analysis["capacity_planning"]["recommended_capacity"] = detailed_metrics["current"]["pool_size"] + 5
analysis["capacity_planning"]["scaling_needed"] = True
if detailed_metrics["historical"]["overflow_events"] > 0:
analysis["optimization_suggestions"].append("Pool overflow detected - increase max_overflow setting")
if detailed_metrics["current"]["invalid"] > 0:
analysis["optimization_suggestions"].append("Investigate connection invalidation causes")
# Determine utilization trend
if len(self.pool_history) >= 5:
recent_utilizations = [m.utilization_percent for m in list(self.pool_history)[-5:]]
if all(u > 80 for u in recent_utilizations):
analysis["summary"]["utilization_trend"] = "increasing"
elif all(u < 20 for u in recent_utilizations):
analysis["summary"]["utilization_trend"] = "decreasing"
# Set recommendation
if analysis["optimization_suggestions"]:
analysis["summary"]["recommendation"] = "optimize"
elif detailed_metrics["current"]["utilization_percent"] > 90:
analysis["summary"]["recommendation"] = "urgent"
return analysis
async def get_connection_pool_metrics(self) -> ConnectionPoolMetrics:
"""Get connection pool metrics."""
pool = async_engine.pool
metrics = ConnectionPoolMetrics(
pool_size=pool.size(),
checked_in=pool.checkedin(),
checked_out=pool.checkedout(),
overflow=pool.overflow(),
invalid=pool.invalid()
)
self.pool_history.append(metrics)
# Store in Redis
try:
redis_client = await self.get_redis()
await redis_client.lpush(
"pool_metrics",
json.dumps(metrics.to_dict())
)
await redis_client.expire("pool_metrics", self.metrics_retention_hours * 3600)
except Exception as e:
logger.error(f"Failed to store pool metrics: {e}")
return metrics
async def get_database_metrics(self) -> DatabaseMetrics:
"""Get comprehensive database metrics."""
metrics = DatabaseMetrics()
try:
async with async_engine.begin() as conn:
# Connection statistics
result = await conn.execute(text("""
SELECT
count(*) as total_connections,
count(*) FILTER (WHERE state = 'active') as active_connections,
count(*) FILTER (WHERE state = 'idle') as idle_connections
FROM pg_stat_activity
WHERE datname = current_database()
"""))
row = result.fetchone()
if row:
metrics.connections = row.total_connections
metrics.active_connections = row.active_connections
metrics.idle_connections = row.idle_connections
# Transaction statistics
result = await conn.execute(text("""
SELECT
xact_commit as transaction_count,
xact_rollback as rollback_count,
deadlocks as deadlock_count
FROM pg_stat_database
WHERE datname = current_database()
"""))
row = result.fetchone()
if row:
metrics.transaction_count = row.transaction_count
metrics.rollback_count = row.rollback_count
metrics.deadlock_count = row.deadlock_count
# Cache hit ratio
result = await conn.execute(text("""
SELECT
round(sum(heap_blks_hit) / nullif(sum(heap_blks_hit) + sum(heap_blks_read), 0) * 100, 2) as cache_hit_ratio
FROM pg_statio_user_tables
"""))
row = result.fetchone()
if row and row.cache_hit_ratio:
metrics.cache_hit_ratio = float(row.cache_hit_ratio)
# Index usage statistics
result = await conn.execute(text("""
SELECT
schemaname,
tablename,
indexname,
idx_tup_read,
idx_tup_fetch
FROM pg_stat_user_indexes
WHERE idx_tup_read > 0
ORDER BY idx_tup_read DESC
LIMIT 20
"""))
for row in result.fetchall():
key = f"{row.schemaname}.{row.tablename}.{row.indexname}"
metrics.index_usage_stats[key] = {
"tuples_read": row.idx_tup_read,
"tuples_fetched": row.idx_tup_fetch
}
# Table sizes
result = await conn.execute(text("""
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) as size,
pg_total_relation_size(schemaname||'.'||tablename) as size_bytes
FROM pg_tables
WHERE schemaname = 'public'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC
"""))
for row in result.fetchall():
key = f"{row.schemaname}.{row.tablename}"
metrics.table_sizes[key] = {
"size_pretty": row.size,
"size_bytes": row.size_bytes
}
except Exception as e:
logger.error(f"Failed to collect database metrics: {e}")
return metrics
async def get_slow_queries(self, limit: int = 50) -> List[Dict[str, Any]]:
"""Get recent slow queries."""
try:
redis_client = await self.get_redis()
slow_queries = await redis_client.lrange("slow_queries", 0, limit - 1)
return [json.loads(query) for query in slow_queries]
except Exception as e:
logger.error(f"Failed to get slow queries: {e}")
return []
async def get_query_performance_stats(self, query_hash: str) -> Dict[str, Any]:
"""Get performance statistics for a specific query."""
try:
redis_client = await self.get_redis()
query_data = await redis_client.lrange(f"queries:{query_hash}", 0, -1)
if not query_data:
return {}
queries = [json.loads(q) for q in query_data]
execution_times = [q['execution_time'] for q in queries]
return {
"query_hash": query_hash,
"query_type": queries[0]['query_type'],
"total_executions": len(queries),
"avg_execution_time": statistics.mean(execution_times),
"min_execution_time": min(execution_times),
"max_execution_time": max(execution_times),
"median_execution_time": statistics.median(execution_times),
"total_rows_returned": sum(q['rows_returned'] for q in queries),
"first_seen": min(q['timestamp'] for q in queries),
"last_seen": max(q['timestamp'] for q in queries)
}
except Exception as e:
logger.error(f"Failed to get query performance stats: {e}")
return {}
async def get_top_slow_queries(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get top slow queries by average execution time."""
try:
redis_client = await self.get_redis()
# Get all query keys
query_keys = []
cursor = 0
while True:
cursor, keys = await redis_client.scan(cursor, match="queries:*", count=100)
query_keys.extend(keys)
if cursor == 0:
break
# Get stats for each query
query_stats = []
for key in query_keys:
query_hash = key.split(':')[1]
stats = await self.get_query_performance_stats(query_hash)
if stats and stats['total_executions'] >= 5: # Only consider queries with sufficient data
query_stats.append(stats)
# Sort by average execution time
query_stats.sort(key=lambda x: x['avg_execution_time'], reverse=True)
return query_stats[:limit]
except Exception as e:
logger.error(f"Failed to get top slow queries: {e}")
return []
async def get_performance_summary(self) -> Dict[str, Any]:
"""Get performance summary dashboard."""
pool_metrics = await self.get_connection_pool_metrics()
db_metrics = await self.get_database_metrics()
slow_queries = await self.get_slow_queries(10)
top_slow = await self.get_top_slow_queries(5)
# Calculate query statistics from recent history
recent_queries = list(self.query_history)[-100:] # Last 100 queries
if recent_queries:
execution_times = [q.execution_time for q in recent_queries]
avg_time = statistics.mean(execution_times)
slow_count = len([q for q in recent_queries if q.execution_time > self.slow_query_threshold])
else:
avg_time = 0
slow_count = 0
return {
"timestamp": datetime.utcnow().isoformat(),
"connection_pool": pool_metrics.to_dict(),
"database": db_metrics.to_dict(),
"query_performance": {
"recent_queries_count": len(recent_queries),
"avg_execution_time": round(avg_time, 4),
"slow_queries_count": slow_count,
"slow_query_threshold": self.slow_query_threshold
},
"recent_slow_queries": slow_queries[:5],
"top_slow_queries": top_slow,
"alerts": await self._generate_alerts(pool_metrics, db_metrics)
}
async def _generate_alerts(self, pool_metrics: ConnectionPoolMetrics,
db_metrics: DatabaseMetrics) -> List[Dict[str, Any]]:
"""Generate performance alerts."""
alerts = []
# Connection pool alerts
if pool_metrics.utilization_percent > 80:
alerts.append({
"type": "connection_pool",
"severity": "warning",
"message": f"High connection pool utilization: {pool_metrics.utilization_percent}%",
"timestamp": datetime.utcnow().isoformat()
})
if pool_metrics.overflow > 0:
alerts.append({
"type": "connection_pool",
"severity": "warning",
"message": f"Connection pool overflow detected: {pool_metrics.overflow} connections",
"timestamp": datetime.utcnow().isoformat()
})
# Database performance alerts
if db_metrics.cache_hit_ratio < 90:
alerts.append({
"type": "database",
"severity": "warning",
"message": f"Low cache hit ratio: {db_metrics.cache_hit_ratio}%",
"timestamp": datetime.utcnow().isoformat()
})
if db_metrics.deadlock_count > 0:
alerts.append({
"type": "database",
"severity": "error",
"message": f"Deadlocks detected: {db_metrics.deadlock_count}",
"timestamp": datetime.utcnow().isoformat()
})
# Recent slow queries alert
recent_slow = len([q for q in list(self.query_history)[-10:] if q.execution_time > self.slow_query_threshold])
if recent_slow >= 3:
alerts.append({
"type": "queries",
"severity": "warning",
"message": f"Multiple slow queries detected: {recent_slow} in last 10 queries",
"timestamp": datetime.utcnow().isoformat()
})
return alerts
async def cleanup_old_metrics(self):
"""Clean up old metrics data."""
try:
redis_client = await self.get_redis()
# Clean up old query metrics
cursor = 0
while True:
cursor, keys = await redis_client.scan(cursor, match="queries:*", count=100)
for key in keys:
ttl = await redis_client.ttl(key)
if ttl == -1: # No expiration set
await redis_client.expire(key, self.metrics_retention_hours * 3600)
if cursor == 0:
break
logger.info("Metrics cleanup completed")
except Exception as e:
logger.error(f"Failed to cleanup old metrics: {e}")
# Global performance monitor instance
performance_monitor = PerformanceMonitor()
# Query performance tracking middleware
class QueryPerformanceTracker:
"""Context manager for tracking query performance."""
def __init__(self, query: str):
self.query = query
self.start_time = None
async def __aenter__(self):
self.start_time = time.time()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self.start_time:
execution_time = time.time() - self.start_time
await performance_monitor.track_query(self.query, execution_time)
# Scheduled performance monitoring task
async def performance_monitoring_task():
"""Run scheduled performance monitoring."""
logger.info("Starting performance monitoring...")
try:
# Collect metrics
summary = await performance_monitor.get_performance_summary()
# Store summary in Redis
redis_client = await performance_monitor.get_redis()
await redis_client.setex(
"performance_summary",
3600, # 1 hour
json.dumps(summary, default=str)
)
# Check for alerts
if summary["alerts"]:
logger.warning(f"Performance alerts: {len(summary['alerts'])} issues detected")
for alert in summary["alerts"]:
logger.warning(f"Alert: {alert['message']}")
# Cleanup old metrics
await performance_monitor.cleanup_old_metrics()
logger.info("Performance monitoring completed")
except Exception as e:
logger.error(f"Performance monitoring failed: {e}")
# Health check for performance monitoring
async def performance_health_check() -> Dict[str, Any]:
"""Check performance monitoring health."""
try:
# Test query tracking
async with QueryPerformanceTracker("SELECT 1 as test"):
async with async_engine.begin() as conn:
await conn.execute(text("SELECT 1"))
# Get basic metrics
pool_metrics = await performance_monitor.get_connection_pool_metrics()
return {
"healthy": True,
"query_tracking": "working",
"connection_pool": pool_metrics.to_dict()
}
except Exception as e:
return {
"healthy": False,
"error": str(e)
}
if __name__ == "__main__":
import sys
async def main():
command = sys.argv[1] if len(sys.argv) > 1 else "help"
if command == "summary":
summary = await performance_monitor.get_performance_summary()
print(json.dumps(summary, indent=2, default=str))
elif command == "slow-queries":
slow_queries = await performance_monitor.get_slow_queries(20)
print(f"Recent slow queries: {len(slow_queries)}")
for query in slow_queries:
print(f" - {query['execution_time']:.2f}s: {query['query_type']}")
elif command == "top-slow":
top_slow = await performance_monitor.get_top_slow_queries(10)
print("Top slow queries by average execution time:")
for query in top_slow:
print(f" - {query['avg_execution_time']:.2f}s avg: {query['query_type']} ({query['total_executions']} executions)")
elif command == "health":
health = await performance_health_check()
if health["healthy"]:
print("✅ Performance monitoring is healthy")
else:
print("❌ Performance monitoring health check failed")
print(f"Error: {health.get('error', 'Unknown error')}")
else:
print("Usage: python performance_monitor.py <command>")
print("Commands: summary, slow-queries, top-slow, health")
asyncio.run(main())