| from typing import List, Dict, Any |
| from datetime import datetime, timedelta, timezone |
| from app.utils import read_json, write_json, utc_now |
| import time |
|
|
| class PerformanceMonitor: |
| """Monitor query performance and detect slow queries""" |
| |
| def log_query(self, user_store, query: str, execution_time: float, database: str, username: str) -> Dict: |
| """Log query execution for performance tracking""" |
| log_path = user_store.local("performance", "query_log.json") |
| logs = read_json(log_path, []) |
| |
| log_entry = { |
| 'id': len(logs) + 1, |
| 'query': query[:500], |
| 'execution_time': execution_time, |
| 'database': database, |
| 'username': username, |
| 'timestamp': utc_now(), |
| 'is_slow': execution_time > 1.0 |
| } |
| |
| logs.append(log_entry) |
| |
| |
| if len(logs) > 1000: |
| logs = logs[-1000:] |
| |
| write_json(log_path, logs) |
| user_store.upload_file(log_path, "performance/query_log.json", "Log query performance") |
| |
| return {'ok': True, 'log_id': log_entry['id']} |
| |
| def get_slow_queries(self, user_store, threshold: float = 1.0, limit: int = 50) -> Dict: |
| """Get slow queries above threshold""" |
| log_path = user_store.local("performance", "query_log.json") |
| logs = read_json(log_path, []) |
| |
| slow_queries = [ |
| log for log in logs |
| if log.get('execution_time', 0) > threshold |
| ] |
| |
| |
| slow_queries.sort(key=lambda x: x.get('execution_time', 0), reverse=True) |
| |
| return { |
| 'ok': True, |
| 'slow_queries': slow_queries[:limit], |
| 'total': len(slow_queries), |
| 'threshold': threshold |
| } |
| |
| def get_query_stats(self, user_store, hours: int = 24) -> Dict: |
| """Get query statistics for last N hours""" |
| log_path = user_store.local("performance", "query_log.json") |
| logs = read_json(log_path, []) |
| |
| |
| cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) |
| recent_logs = [ |
| log for log in logs |
| if datetime.fromisoformat(log['timestamp']).replace(tzinfo=timezone.utc) > cutoff |
| ] |
| |
| if not recent_logs: |
| return { |
| 'ok': True, |
| 'total_queries': 0, |
| 'avg_execution_time': 0, |
| 'slow_queries': 0, |
| 'queries_per_hour': 0 |
| } |
| |
| total = len(recent_logs) |
| avg_time = sum(log.get('execution_time', 0) for log in recent_logs) / total |
| slow_count = sum(1 for log in recent_logs if log.get('is_slow', False)) |
| |
| |
| queries_per_hour = total / hours if hours > 0 else 0 |
| |
| |
| db_counts = {} |
| for log in recent_logs: |
| db = log.get('database', 'unknown') |
| db_counts[db] = db_counts.get(db, 0) + 1 |
| |
| top_databases = sorted(db_counts.items(), key=lambda x: x[1], reverse=True)[:5] |
| |
| return { |
| 'ok': True, |
| 'period_hours': hours, |
| 'total_queries': total, |
| 'avg_execution_time': round(avg_time, 3), |
| 'slow_queries': slow_count, |
| 'slow_percentage': round((slow_count / total) * 100, 1) if total > 0 else 0, |
| 'queries_per_hour': round(queries_per_hour, 1), |
| 'top_databases': [{'database': db, 'count': count} for db, count in top_databases] |
| } |
| |
| def analyze_table_performance(self, user_store, database: str, table: str) -> Dict: |
| """Analyze performance for specific table""" |
| log_path = user_store.local("performance", "query_log.json") |
| logs = read_json(log_path, []) |
| |
| |
| table_queries = [ |
| log for log in logs |
| if database in log.get('query', '') and table in log.get('query', '') |
| ] |
| |
| if not table_queries: |
| return { |
| 'ok': True, |
| 'database': database, |
| 'table': table, |
| 'total_queries': 0, |
| 'recommendations': [] |
| } |
| |
| total = len(table_queries) |
| avg_time = sum(log.get('execution_time', 0) for log in table_queries) / total |
| slow_count = sum(1 for log in table_queries if log.get('is_slow', False)) |
| |
| |
| recommendations = [] |
| |
| if avg_time > 0.5: |
| recommendations.append({ |
| 'type': 'performance', |
| 'severity': 'high', |
| 'message': f'Average query time is {avg_time:.2f}s. Consider adding indexes.', |
| 'action': 'Add indexes on frequently queried columns' |
| }) |
| |
| if slow_count > total * 0.3: |
| recommendations.append({ |
| 'type': 'optimization', |
| 'severity': 'medium', |
| 'message': f'{slow_count} out of {total} queries are slow (>{1.0}s)', |
| 'action': 'Review and optimize slow queries' |
| }) |
| |
| |
| select_all_count = sum(1 for log in table_queries if 'SELECT *' in log.get('query', '')) |
| if select_all_count > 0: |
| recommendations.append({ |
| 'type': 'best_practice', |
| 'severity': 'low', |
| 'message': f'{select_all_count} queries use SELECT *', |
| 'action': 'Specify only needed columns instead of SELECT *' |
| }) |
| |
| return { |
| 'ok': True, |
| 'database': database, |
| 'table': table, |
| 'total_queries': total, |
| 'avg_execution_time': round(avg_time, 3), |
| 'slow_queries': slow_count, |
| 'recommendations': recommendations |
| } |
| |
| def get_index_recommendations(self, user_store, database: str, table: str) -> Dict: |
| """Suggest indexes based on query patterns""" |
| log_path = user_store.local("performance", "query_log.json") |
| logs = read_json(log_path, []) |
| |
| |
| table_queries = [ |
| log for log in logs |
| if database in log.get('query', '') and table in log.get('query', '') |
| ] |
| |
| |
| where_columns = {} |
| for log in table_queries: |
| query = log.get('query', '').upper() |
| if 'WHERE' in query: |
| |
| |
| parts = query.split('WHERE')[1].split('AND') |
| for part in parts: |
| if '=' in part or '>' in part or '<' in part: |
| |
| col = part.split('=')[0].split('>')[0].split('<')[0].strip() |
| col = col.split('.')[-1] if '.' in col else col |
| where_columns[col] = where_columns.get(col, 0) + 1 |
| |
| |
| recommendations = [] |
| for col, count in sorted(where_columns.items(), key=lambda x: x[1], reverse=True)[:5]: |
| recommendations.append({ |
| 'column': col, |
| 'usage_count': count, |
| 'sql': f'CREATE INDEX idx_{table}_{col} ON {database}.{table}({col});', |
| 'reason': f'Column "{col}" is used in WHERE clause {count} times' |
| }) |
| |
| return { |
| 'ok': True, |
| 'database': database, |
| 'table': table, |
| 'recommendations': recommendations |
| } |
| |
| def clear_old_logs(self, user_store, days: int = 30) -> Dict: |
| """Clear logs older than N days""" |
| log_path = user_store.local("performance", "query_log.json") |
| logs = read_json(log_path, []) |
| |
| cutoff = datetime.now(timezone.utc) - timedelta(days=days) |
| filtered_logs = [ |
| log for log in logs |
| if datetime.fromisoformat(log['timestamp']).replace(tzinfo=timezone.utc) > cutoff |
| ] |
| |
| removed = len(logs) - len(filtered_logs) |
| |
| write_json(log_path, filtered_logs) |
| user_store.upload_file(log_path, "performance/query_log.json", f"Clear logs older than {days} days") |
| |
| return { |
| 'ok': True, |
| 'removed': removed, |
| 'remaining': len(filtered_logs) |
| } |
|
|
| performance_monitor = PerformanceMonitor() |
|
|