from typing import List, Dict, Any from datetime import datetime, timedelta, timezone from app.utils import read_json, write_json, utc_now import time class PerformanceMonitor: """Monitor query performance and detect slow queries""" def log_query(self, user_store, query: str, execution_time: float, database: str, username: str) -> Dict: """Log query execution for performance tracking""" log_path = user_store.local("performance", "query_log.json") logs = read_json(log_path, []) log_entry = { 'id': len(logs) + 1, 'query': query[:500], # Truncate long queries 'execution_time': execution_time, 'database': database, 'username': username, 'timestamp': utc_now(), 'is_slow': execution_time > 1.0 # Slow if > 1 second } logs.append(log_entry) # Keep only last 1000 queries if len(logs) > 1000: logs = logs[-1000:] write_json(log_path, logs) user_store.upload_file(log_path, "performance/query_log.json", "Log query performance") return {'ok': True, 'log_id': log_entry['id']} def get_slow_queries(self, user_store, threshold: float = 1.0, limit: int = 50) -> Dict: """Get slow queries above threshold""" log_path = user_store.local("performance", "query_log.json") logs = read_json(log_path, []) slow_queries = [ log for log in logs if log.get('execution_time', 0) > threshold ] # Sort by execution time descending slow_queries.sort(key=lambda x: x.get('execution_time', 0), reverse=True) return { 'ok': True, 'slow_queries': slow_queries[:limit], 'total': len(slow_queries), 'threshold': threshold } def get_query_stats(self, user_store, hours: int = 24) -> Dict: """Get query statistics for last N hours""" log_path = user_store.local("performance", "query_log.json") logs = read_json(log_path, []) # Filter by time cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) recent_logs = [ log for log in logs if datetime.fromisoformat(log['timestamp']).replace(tzinfo=timezone.utc) > cutoff ] if not recent_logs: return { 'ok': True, 'total_queries': 0, 'avg_execution_time': 0, 'slow_queries': 0, 'queries_per_hour': 0 } total = len(recent_logs) avg_time = sum(log.get('execution_time', 0) for log in recent_logs) / total slow_count = sum(1 for log in recent_logs if log.get('is_slow', False)) # Queries per hour queries_per_hour = total / hours if hours > 0 else 0 # Most queried databases db_counts = {} for log in recent_logs: db = log.get('database', 'unknown') db_counts[db] = db_counts.get(db, 0) + 1 top_databases = sorted(db_counts.items(), key=lambda x: x[1], reverse=True)[:5] return { 'ok': True, 'period_hours': hours, 'total_queries': total, 'avg_execution_time': round(avg_time, 3), 'slow_queries': slow_count, 'slow_percentage': round((slow_count / total) * 100, 1) if total > 0 else 0, 'queries_per_hour': round(queries_per_hour, 1), 'top_databases': [{'database': db, 'count': count} for db, count in top_databases] } def analyze_table_performance(self, user_store, database: str, table: str) -> Dict: """Analyze performance for specific table""" log_path = user_store.local("performance", "query_log.json") logs = read_json(log_path, []) # Filter queries for this table table_queries = [ log for log in logs if database in log.get('query', '') and table in log.get('query', '') ] if not table_queries: return { 'ok': True, 'database': database, 'table': table, 'total_queries': 0, 'recommendations': [] } total = len(table_queries) avg_time = sum(log.get('execution_time', 0) for log in table_queries) / total slow_count = sum(1 for log in table_queries if log.get('is_slow', False)) # Generate recommendations recommendations = [] if avg_time > 0.5: recommendations.append({ 'type': 'performance', 'severity': 'high', 'message': f'Average query time is {avg_time:.2f}s. Consider adding indexes.', 'action': 'Add indexes on frequently queried columns' }) if slow_count > total * 0.3: recommendations.append({ 'type': 'optimization', 'severity': 'medium', 'message': f'{slow_count} out of {total} queries are slow (>{1.0}s)', 'action': 'Review and optimize slow queries' }) # Check for SELECT * select_all_count = sum(1 for log in table_queries if 'SELECT *' in log.get('query', '')) if select_all_count > 0: recommendations.append({ 'type': 'best_practice', 'severity': 'low', 'message': f'{select_all_count} queries use SELECT *', 'action': 'Specify only needed columns instead of SELECT *' }) return { 'ok': True, 'database': database, 'table': table, 'total_queries': total, 'avg_execution_time': round(avg_time, 3), 'slow_queries': slow_count, 'recommendations': recommendations } def get_index_recommendations(self, user_store, database: str, table: str) -> Dict: """Suggest indexes based on query patterns""" log_path = user_store.local("performance", "query_log.json") logs = read_json(log_path, []) # Filter queries for this table table_queries = [ log for log in logs if database in log.get('query', '') and table in log.get('query', '') ] # Analyze WHERE clauses where_columns = {} for log in table_queries: query = log.get('query', '').upper() if 'WHERE' in query: # Simple pattern matching for column names # This is a basic implementation parts = query.split('WHERE')[1].split('AND') for part in parts: if '=' in part or '>' in part or '<' in part: # Extract column name (simplified) col = part.split('=')[0].split('>')[0].split('<')[0].strip() col = col.split('.')[-1] if '.' in col else col where_columns[col] = where_columns.get(col, 0) + 1 # Generate recommendations recommendations = [] for col, count in sorted(where_columns.items(), key=lambda x: x[1], reverse=True)[:5]: recommendations.append({ 'column': col, 'usage_count': count, 'sql': f'CREATE INDEX idx_{table}_{col} ON {database}.{table}({col});', 'reason': f'Column "{col}" is used in WHERE clause {count} times' }) return { 'ok': True, 'database': database, 'table': table, 'recommendations': recommendations } def clear_old_logs(self, user_store, days: int = 30) -> Dict: """Clear logs older than N days""" log_path = user_store.local("performance", "query_log.json") logs = read_json(log_path, []) cutoff = datetime.now(timezone.utc) - timedelta(days=days) filtered_logs = [ log for log in logs if datetime.fromisoformat(log['timestamp']).replace(tzinfo=timezone.utc) > cutoff ] removed = len(logs) - len(filtered_logs) write_json(log_path, filtered_logs) user_store.upload_file(log_path, "performance/query_log.json", f"Clear logs older than {days} days") return { 'ok': True, 'removed': removed, 'remaining': len(filtered_logs) } performance_monitor = PerformanceMonitor()