| """ |
| Query Analysis and Optimization System for AegisLM SaaS Backend. |
| |
| Production-ready query analysis with optimization recommendations, |
| index suggestions, and performance improvements. |
| """ |
|
|
| import asyncio |
| import re |
| from typing import List, Dict, Optional, Any, Tuple, Set |
| from sqlalchemy import text |
| from sqlalchemy.ext.asyncio import AsyncSession |
| import logging |
| import json |
| from dataclasses import dataclass |
| from enum import Enum |
|
|
| from .database import async_engine |
| from .performance_monitor import performance_monitor |
| from .config import settings |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class QueryType(Enum): |
| """Query types for analysis.""" |
| SELECT = "SELECT" |
| INSERT = "INSERT" |
| UPDATE = "UPDATE" |
| DELETE = "DELETE" |
| JOIN = "JOIN" |
| AGGREGATE = "AGGREGATE" |
| SUBQUERY = "SUBQUERY" |
|
|
|
|
| @dataclass |
| class QueryAnalysis: |
| """Query analysis results.""" |
| query: str |
| query_type: QueryType |
| complexity_score: float |
| tables_involved: List[str] |
| indexes_used: List[str] |
| missing_indexes: List[Dict[str, Any]] |
| optimization_suggestions: List[str] |
| estimated_cost: Optional[float] |
| execution_plan: Optional[Dict[str, Any]] |
|
|
|
|
| @dataclass |
| class IndexRecommendation: |
| """Index recommendation.""" |
| table_name: str |
| columns: List[str] |
| index_type: str |
| estimated_impact: str |
| reason: str |
|
|
|
|
| class QueryAnalyzer: |
| """Advanced query analyzer and optimizer.""" |
| |
| def __init__(self): |
| self.query_patterns = { |
| 'slow_patterns': [ |
| r'SELECT.*\s+FROM\s+\w+\s+WHERE\s+.*LIKE\s+.*%', |
| r'SELECT.*\s+FROM\s+\w+\s+WHERE\s+.*OR\s+', |
| r'SELECT.*\s+FROM\s+\w+\s+ORDER BY\s+.*\s+LIMIT\s+', |
| r'SELECT.*\s+FROM\s+\w+\s+WHERE\s+.*IN\s+\(.*SELECT', |
| r'SELECT.*\s+FROM\s+\w+\s+WHERE\s+.*NOT\s+IN', |
| r'SELECT.*\s+FROM\s+\w+\s+WHERE\s+.*!=\s*', |
| ], |
| 'join_patterns': [ |
| r'JOIN\s+\w+\s+ON\s+.*=.*', |
| r'LEFT\s+JOIN', |
| r'RIGHT\s+JOIN', |
| r'FULL\s+OUTER\s+JOIN', |
| ], |
| 'aggregate_patterns': [ |
| r'COUNT\(', r'SUM\(', r'AVG\(', r'MIN\(', r'MAX\(', |
| r'GROUP\s+BY', |
| r'HAVING\s+', |
| ] |
| } |
| |
| async def analyze_query(self, query: str) -> QueryAnalysis: |
| """Perform comprehensive query analysis.""" |
| |
| normalized_query = self._normalize_query(query) |
| |
| |
| query_type = self._determine_query_type(normalized_query) |
| |
| |
| complexity_score = self._calculate_complexity(normalized_query) |
| |
| |
| tables = self._extract_tables(normalized_query) |
| |
| |
| execution_plan = await self._get_execution_plan(query) |
| |
| |
| indexes_used, missing_indexes = await self._analyze_indexes(normalized_query, tables, execution_plan) |
| |
| |
| suggestions = self._generate_optimization_suggestions( |
| normalized_query, query_type, complexity_score, execution_plan, missing_indexes |
| ) |
| |
| |
| estimated_cost = self._extract_cost(execution_plan) |
| |
| return QueryAnalysis( |
| query=query, |
| query_type=query_type, |
| complexity_score=complexity_score, |
| tables_involved=tables, |
| indexes_used=indexes_used, |
| missing_indexes=missing_indexes, |
| optimization_suggestions=suggestions, |
| estimated_cost=estimated_cost, |
| execution_plan=execution_plan |
| ) |
| |
| def _normalize_query(self, query: str) -> str: |
| """Normalize query for analysis.""" |
| |
| normalized = ' '.join(query.split()) |
| |
| return normalized |
| |
| def _determine_query_type(self, query: str) -> QueryType: |
| """Determine the primary query type.""" |
| query_upper = query.upper() |
| |
| if 'JOIN' in query_upper: |
| return QueryType.JOIN |
| elif any(pattern in query_upper for pattern in ['COUNT(', 'SUM(', 'AVG(', 'GROUP BY']): |
| return QueryType.AGGREGATE |
| elif query_upper.startswith('SELECT'): |
| return QueryType.SELECT |
| elif query_upper.startswith('INSERT'): |
| return QueryType.INSERT |
| elif query_upper.startswith('UPDATE'): |
| return QueryType.UPDATE |
| elif query_upper.startswith('DELETE'): |
| return QueryType.DELETE |
| elif 'SELECT' in query_upper and '(' in query_upper: |
| return QueryType.SUBQUERY |
| else: |
| return QueryType.SELECT |
| |
| def _calculate_complexity(self, query: str) -> float: |
| """Calculate query complexity score.""" |
| score = 0.0 |
| |
| |
| if 'JOIN' in query.upper(): |
| score += 2.0 |
| if 'SUBQUERY' in query.upper() or '(' in query: |
| score += 1.5 |
| if any(pattern in query.upper() for pattern in ['COUNT(', 'SUM(', 'AVG(', 'GROUP BY']): |
| score += 1.0 |
| |
| |
| if 'WHERE' in query.upper(): |
| conditions = query.upper().split('WHERE')[1].split('ORDER BY')[0].split('GROUP BY')[0] |
| score += conditions.count('AND') * 0.5 |
| score += conditions.count('OR') * 0.8 |
| score += conditions.count('LIKE') * 0.3 |
| score += conditions.count('IN') * 0.4 |
| |
| |
| functions = ['COUNT(', 'SUM(', 'AVG(', 'MIN(', 'MAX(', 'COALESCE(', 'CASE WHEN'] |
| for func in functions: |
| score += query.upper().count(func) * 0.2 |
| |
| return min(score, 10.0) |
| |
| def _extract_tables(self, query: str) -> List[str]: |
| """Extract table names from query.""" |
| tables = [] |
| |
| |
| from_pattern = re.search(r'FROM\s+(\w+)', query, re.IGNORECASE) |
| if from_pattern: |
| tables.append(from_pattern.group(1)) |
| |
| |
| join_patterns = re.findall(r'JOIN\s+(\w+)', query, re.IGNORECASE) |
| tables.extend(join_patterns) |
| |
| return list(set(tables)) |
| |
| async def _get_execution_plan(self, query: str) -> Optional[Dict[str, Any]]: |
| """Get query execution plan.""" |
| try: |
| async with async_engine.begin() as conn: |
| |
| explain_query = f"EXPLAIN (ANALYZE, BUFFERS, FORMAT JSON) {query}" |
| result = await conn.execute(text(explain_query)) |
| plan_data = result.fetchone()[0] |
| |
| if plan_data and len(plan_data) > 0: |
| return plan_data[0] |
| except Exception as e: |
| logger.error(f"Failed to get execution plan: {e}") |
| |
| return None |
| |
| async def _analyze_indexes(self, query: str, tables: List[str], |
| execution_plan: Optional[Dict[str, Any]]) -> Tuple[List[str], List[Dict[str, Any]]]: |
| """Analyze index usage and suggest missing indexes.""" |
| indexes_used = [] |
| missing_indexes = [] |
| |
| try: |
| async with async_engine.begin() as conn: |
| |
| for table in tables: |
| result = await conn.execute(text(""" |
| SELECT indexname, indexdef |
| FROM pg_indexes |
| WHERE tablename = :table_name |
| """), {"table_name": table}) |
| |
| table_indexes = {row.indexname: row.indexdef for row in result.fetchall()} |
| |
| |
| if execution_plan: |
| used_indexes = self._extract_indexes_from_plan(execution_plan, table) |
| indexes_used.extend(used_indexes) |
| |
| |
| where_clause = self._extract_where_clause(query) |
| if where_clause: |
| suggestions = self._suggest_indexes_for_where(table, where_clause, table_indexes) |
| missing_indexes.extend(suggestions) |
| |
| except Exception as e: |
| logger.error(f"Failed to analyze indexes: {e}") |
| |
| return list(set(indexes_used)), missing_indexes |
| |
| def _extract_indexes_from_plan(self, plan: Dict[str, Any], table_name: str) -> List[str]: |
| """Extract index names from execution plan.""" |
| indexes = [] |
| |
| def traverse_plan(node): |
| if isinstance(node, dict): |
| if 'Index Name' in node and table_name in str(node.get('Relation Name', '')): |
| indexes.append(node['Index Name']) |
| |
| |
| for key, value in node.items(): |
| if key in ['Plans', 'Plan']: |
| if isinstance(value, list): |
| for child in value: |
| traverse_plan(child) |
| else: |
| traverse_plan(value) |
| |
| traverse_plan(plan) |
| return indexes |
| |
| def _extract_where_clause(self, query: str) -> Optional[str]: |
| """Extract WHERE clause from query.""" |
| where_match = re.search(r'WHERE\s+(.+?)(?:\s+ORDER\s+BY|\s+GROUP\s+BY|\s+LIMIT|$)', query, re.IGNORECASE) |
| return where_match.group(1) if where_match else None |
| |
| def _suggest_indexes_for_where(self, table: str, where_clause: str, |
| existing_indexes: Dict[str, str]) -> List[Dict[str, Any]]: |
| """Suggest indexes based on WHERE clause.""" |
| suggestions = [] |
| |
| |
| columns = re.findall(r'(\w+)\s*=', where_clause) |
| columns.extend(re.findall(r'(\w+)\s+IN\s+', where_clause)) |
| columns.extend(re.findall(r'(\w+)\s+LIKE\s+', where_clause)) |
| |
| |
| columns = list(set([col for col in columns if col.lower() not in ['is', 'are', 'not', 'null']])) |
| |
| if columns: |
| |
| for col in columns: |
| col_lower = col.lower() |
| existing = any(col_lower in idx_def.lower() for idx_def in existing_indexes.values()) |
| |
| if not existing: |
| suggestions.append({ |
| 'table': table, |
| 'columns': [col], |
| 'type': 'btree', |
| 'reason': f'Column "{col}" used in WHERE clause but no suitable index found', |
| 'estimated_impact': 'medium' |
| }) |
| |
| |
| if len(columns) >= 2: |
| |
| suggestions.append({ |
| 'table': table, |
| 'columns': columns[:2], |
| 'type': 'btree', |
| 'reason': f'Multiple columns in WHERE clause could benefit from composite index', |
| 'estimated_impact': 'high' |
| }) |
| |
| return suggestions |
| |
| def _generate_optimization_suggestions(self, query: str, query_type: QueryType, |
| complexity: float, execution_plan: Optional[Dict[str, Any]], |
| missing_indexes: List[Dict[str, Any]]) -> List[str]: |
| """Generate optimization suggestions.""" |
| suggestions = [] |
| |
| |
| if complexity > 7.0: |
| suggestions.append("Consider breaking this complex query into simpler parts") |
| |
| |
| for index_rec in missing_indexes: |
| if index_rec['estimated_impact'] == 'high': |
| suggestions.append(f"Create index on {index_rec['table']}.{', '.join(index_rec['columns'])}") |
| |
| |
| query_upper = query.upper() |
| |
| |
| if re.search(r'LIKE\s+\'\%', query_upper): |
| suggestions.append("Avoid leading wildcards in LIKE queries - consider full-text search") |
| |
| |
| if query_upper.count(' OR ') > 2: |
| suggestions.append("Multiple OR conditions - consider using UNION ALL or IN clauses") |
| |
| |
| if 'NOT IN' in query_upper: |
| suggestions.append("NOT IN can be slow - consider LEFT JOIN/IS NULL pattern") |
| |
| |
| if 'SELECT *' in query_upper: |
| suggestions.append("Avoid SELECT * - specify only needed columns") |
| |
| |
| if execution_plan: |
| if 'Seq Scan' in str(execution_plan): |
| suggestions.append("Sequential scan detected - consider adding indexes") |
| |
| if 'Sort' in str(execution_plan): |
| suggestions.append("Sorting operation detected - ensure proper indexes for ORDER BY") |
| |
| |
| if query_type == QueryType.JOIN: |
| if 'LEFT JOIN' in query_upper: |
| suggestions.append("LEFT JOIN can be expensive - ensure it's necessary") |
| |
| if query_upper.count('JOIN') > 3: |
| suggestions.append("Multiple JOINs - consider query restructuring") |
| |
| return suggestions |
| |
| def _extract_cost(self, execution_plan: Optional[Dict[str, Any]]) -> Optional[float]: |
| """Extract estimated cost from execution plan.""" |
| if execution_plan and 'Total Cost' in execution_plan: |
| return float(execution_plan['Total Cost']) |
| return None |
| |
| async def generate_index_recommendations(self, table_name: Optional[str] = None) -> List[IndexRecommendation]: |
| """Generate index recommendations for tables.""" |
| recommendations = [] |
| |
| try: |
| async with async_engine.begin() as conn: |
| if table_name: |
| tables = [table_name] |
| else: |
| |
| result = await conn.execute(text(""" |
| SELECT tablename FROM pg_tables |
| WHERE schemaname = 'public' |
| ORDER BY tablename |
| """)) |
| tables = [row.tablename for row in result.fetchall()] |
| |
| for table in tables: |
| |
| table_recommendations = await self._analyze_table_for_indexes(table, conn) |
| recommendations.extend(table_recommendations) |
| |
| except Exception as e: |
| logger.error(f"Failed to generate index recommendations: {e}") |
| |
| return recommendations |
| |
| async def _analyze_table_for_indexes(self, table: str, conn) -> List[IndexRecommendation]: |
| """Analyze a specific table for index opportunities.""" |
| recommendations = [] |
| |
| try: |
| |
| result = await conn.execute(text(f""" |
| SELECT |
| n_tup_ins as inserts, |
| n_tup_upd as updates, |
| n_tup_del as deletes, |
| n_live_tup as live_tuples, |
| n_dead_tup as dead_tuples |
| FROM pg_stat_user_tables |
| WHERE schemaname = 'public' AND tablename = '{table}' |
| """)) |
| |
| stats = result.fetchone() |
| if not stats: |
| return recommendations |
| |
| |
| if stats.updates > stats.inserts * 2: |
| recommendations.append(IndexRecommendation( |
| table_name=table, |
| columns=['id'], |
| index_type='btree', |
| estimated_impact='medium', |
| reason='High update activity - ensure primary key index is optimized' |
| )) |
| |
| |
| result = await conn.execute(text(f""" |
| SELECT |
| tc.constraint_name, |
| kcu.column_name |
| FROM information_schema.table_constraints AS tc |
| JOIN information_schema.key_column_usage AS kcu |
| ON tc.constraint_name = kcu.constraint_name |
| AND tc.table_schema = kcu.table_schema |
| WHERE tc.constraint_type = 'FOREIGN KEY' |
| AND tc.table_schema = 'public' |
| AND tc.table_name = '{table}' |
| """)) |
| |
| for row in result.fetchall(): |
| recommendations.append(IndexRecommendation( |
| table_name=table, |
| columns=[row.column_name], |
| index_type='btree', |
| estimated_impact='high', |
| reason=f'Foreign key column {row.column_name} should be indexed' |
| )) |
| |
| except Exception as e: |
| logger.error(f"Failed to analyze table {table}: {e}") |
| |
| return recommendations |
|
|
|
|
| |
| query_analyzer = QueryAnalyzer() |
|
|
|
|
| |
| class QueryOptimizationService: |
| """Service for query optimization and analysis.""" |
| |
| def __init__(self): |
| self.analyzer = query_analyzer |
| |
| async def optimize_query(self, query: str) -> Dict[str, Any]: |
| """Optimize a query and return recommendations.""" |
| try: |
| |
| analysis = await self.analyzer.analyze_query(query) |
| |
| |
| optimized_query = self._generate_optimized_query(query, analysis) |
| |
| |
| index_recs = await self.analyzer.generate_index_recommendations() |
| |
| return { |
| "original_query": query, |
| "optimized_query": optimized_query, |
| "analysis": { |
| "query_type": analysis.query_type.value, |
| "complexity_score": analysis.complexity_score, |
| "tables_involved": analysis.tables_involved, |
| "estimated_cost": analysis.estimated_cost |
| }, |
| "optimization_suggestions": analysis.optimization_suggestions, |
| "missing_indexes": analysis.missing_indexes, |
| "index_recommendations": [rec.__dict__ for rec in index_recs[:10]] |
| } |
| |
| except Exception as e: |
| logger.error(f"Query optimization failed: {e}") |
| return { |
| "error": str(e), |
| "original_query": query |
| } |
| |
| def _generate_optimized_query(self, query: str, analysis: QueryAnalysis) -> Optional[str]: |
| """Generate an optimized version of the query.""" |
| optimized = query |
| |
| try: |
| |
| if 'SELECT *' in optimized.upper(): |
| |
| if analysis.tables_involved: |
| optimized = optimized.replace('SELECT *', f'SELECT id') |
| |
| |
| optimized = re.sub(r'\(\s*([^()]+)\s*\)', r'\1', optimized) |
| |
| |
| if '1=1' in optimized: |
| optimized = optimized.replace('AND 1=1', '').replace('WHERE 1=1', 'WHERE') |
| |
| |
| return optimized if optimized != query else None |
| |
| except Exception: |
| return None |
| |
| async def analyze_slow_queries(self, limit: int = 20) -> List[Dict[str, Any]]: |
| """Analyze recent slow queries.""" |
| slow_queries = await performance_monitor.get_slow_queries(limit) |
| |
| analyzed_queries = [] |
| for query_data in slow_queries: |
| try: |
| analysis = await self.analyzer.analyze_query(query_data['query']) |
| analyzed_queries.append({ |
| "query_data": query_data, |
| "analysis": analysis.__dict__ |
| }) |
| except Exception as e: |
| logger.error(f"Failed to analyze slow query: {e}") |
| |
| return analyzed_queries |
| |
| async def get_database_optimization_report(self) -> Dict[str, Any]: |
| """Generate comprehensive database optimization report.""" |
| try: |
| |
| index_recs = await self.analyzer.generate_index_recommendations() |
| |
| |
| slow_analysis = await self.analyze_slow_queries(10) |
| |
| |
| perf_summary = await performance_monitor.get_performance_summary() |
| |
| return { |
| "timestamp": asyncio.get_event_loop().time(), |
| "index_recommendations": [rec.__dict__ for rec in index_recs], |
| "slow_queries_analysis": slow_analysis, |
| "performance_summary": perf_summary, |
| "optimization_priority": self._calculate_optimization_priority(index_recs, slow_analysis) |
| } |
| |
| except Exception as e: |
| logger.error(f"Failed to generate optimization report: {e}") |
| return {"error": str(e)} |
| |
| def _calculate_optimization_priority(self, index_recs: List[IndexRecommendation], |
| slow_analysis: List[Dict[str, Any]]) -> Dict[str, Any]: |
| """Calculate optimization priority recommendations.""" |
| high_priority = [] |
| medium_priority = [] |
| low_priority = [] |
| |
| |
| for rec in index_recs: |
| if rec.estimated_impact == 'high' and 'foreign key' in rec.reason.lower(): |
| high_priority.append(f"Create index: {rec.table_name}({', '.join(rec.columns)})") |
| elif rec.estimated_impact == 'high': |
| medium_priority.append(f"Create index: {rec.table_name}({', '.join(rec.columns)})") |
| else: |
| low_priority.append(f"Create index: {rec.table_name}({', '.join(rec.columns)})") |
| |
| |
| for analysis in slow_analysis: |
| query_data = analysis['query_data'] |
| if query_data['execution_time'] > 5.0: |
| high_priority.append(f"Optimize slow query: {query_data['execution_time']:.2f}s {query_data['query_type']}") |
| |
| return { |
| "high_priority": high_priority[:5], |
| "medium_priority": medium_priority[:10], |
| "low_priority": low_priority[:10] |
| } |
|
|
|
|
| |
| query_optimizer = QueryOptimizationService() |
|
|
|
|
| if __name__ == "__main__": |
| import sys |
| |
| async def main(): |
| command = sys.argv[1] if len(sys.argv) > 1 else "help" |
| |
| if command == "analyze": |
| if len(sys.argv) < 3: |
| print("Error: analyze requires a query string") |
| sys.exit(1) |
| |
| query = ' '.join(sys.argv[2:]) |
| optimization = await query_optimizer.optimize_query(query) |
| print(json.dumps(optimization, indent=2, default=str)) |
| |
| elif command == "report": |
| report = await query_optimizer.get_database_optimization_report() |
| print(json.dumps(report, indent=2, default=str)) |
| |
| elif command == "indexes": |
| recommendations = await query_analyzer.generate_index_recommendations() |
| print(f"Index recommendations: {len(recommendations)}") |
| for rec in recommendations: |
| print(f" - {rec.table_name}({', '.join(rec.columns)}) - {rec.reason}") |
| |
| else: |
| print("Usage: python query_optimizer.py <command> [args]") |
| print("Commands: analyze <query>, report, indexes") |
| |
| asyncio.run(main()) |
|
|