corpusdb / app /performance_monitor.py
mrsavage1's picture
Upload 62 files
177164c verified
from typing import List, Dict, Any
from datetime import datetime, timedelta, timezone
from app.utils import read_json, write_json, utc_now
import time
class PerformanceMonitor:
"""Monitor query performance and detect slow queries"""
def log_query(self, user_store, query: str, execution_time: float, database: str, username: str) -> Dict:
"""Log query execution for performance tracking"""
log_path = user_store.local("performance", "query_log.json")
logs = read_json(log_path, [])
log_entry = {
'id': len(logs) + 1,
'query': query[:500], # Truncate long queries
'execution_time': execution_time,
'database': database,
'username': username,
'timestamp': utc_now(),
'is_slow': execution_time > 1.0 # Slow if > 1 second
}
logs.append(log_entry)
# Keep only last 1000 queries
if len(logs) > 1000:
logs = logs[-1000:]
write_json(log_path, logs)
user_store.upload_file(log_path, "performance/query_log.json", "Log query performance")
return {'ok': True, 'log_id': log_entry['id']}
def get_slow_queries(self, user_store, threshold: float = 1.0, limit: int = 50) -> Dict:
"""Get slow queries above threshold"""
log_path = user_store.local("performance", "query_log.json")
logs = read_json(log_path, [])
slow_queries = [
log for log in logs
if log.get('execution_time', 0) > threshold
]
# Sort by execution time descending
slow_queries.sort(key=lambda x: x.get('execution_time', 0), reverse=True)
return {
'ok': True,
'slow_queries': slow_queries[:limit],
'total': len(slow_queries),
'threshold': threshold
}
def get_query_stats(self, user_store, hours: int = 24) -> Dict:
"""Get query statistics for last N hours"""
log_path = user_store.local("performance", "query_log.json")
logs = read_json(log_path, [])
# Filter by time
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
recent_logs = [
log for log in logs
if datetime.fromisoformat(log['timestamp']).replace(tzinfo=timezone.utc) > cutoff
]
if not recent_logs:
return {
'ok': True,
'total_queries': 0,
'avg_execution_time': 0,
'slow_queries': 0,
'queries_per_hour': 0
}
total = len(recent_logs)
avg_time = sum(log.get('execution_time', 0) for log in recent_logs) / total
slow_count = sum(1 for log in recent_logs if log.get('is_slow', False))
# Queries per hour
queries_per_hour = total / hours if hours > 0 else 0
# Most queried databases
db_counts = {}
for log in recent_logs:
db = log.get('database', 'unknown')
db_counts[db] = db_counts.get(db, 0) + 1
top_databases = sorted(db_counts.items(), key=lambda x: x[1], reverse=True)[:5]
return {
'ok': True,
'period_hours': hours,
'total_queries': total,
'avg_execution_time': round(avg_time, 3),
'slow_queries': slow_count,
'slow_percentage': round((slow_count / total) * 100, 1) if total > 0 else 0,
'queries_per_hour': round(queries_per_hour, 1),
'top_databases': [{'database': db, 'count': count} for db, count in top_databases]
}
def analyze_table_performance(self, user_store, database: str, table: str) -> Dict:
"""Analyze performance for specific table"""
log_path = user_store.local("performance", "query_log.json")
logs = read_json(log_path, [])
# Filter queries for this table
table_queries = [
log for log in logs
if database in log.get('query', '') and table in log.get('query', '')
]
if not table_queries:
return {
'ok': True,
'database': database,
'table': table,
'total_queries': 0,
'recommendations': []
}
total = len(table_queries)
avg_time = sum(log.get('execution_time', 0) for log in table_queries) / total
slow_count = sum(1 for log in table_queries if log.get('is_slow', False))
# Generate recommendations
recommendations = []
if avg_time > 0.5:
recommendations.append({
'type': 'performance',
'severity': 'high',
'message': f'Average query time is {avg_time:.2f}s. Consider adding indexes.',
'action': 'Add indexes on frequently queried columns'
})
if slow_count > total * 0.3:
recommendations.append({
'type': 'optimization',
'severity': 'medium',
'message': f'{slow_count} out of {total} queries are slow (>{1.0}s)',
'action': 'Review and optimize slow queries'
})
# Check for SELECT *
select_all_count = sum(1 for log in table_queries if 'SELECT *' in log.get('query', ''))
if select_all_count > 0:
recommendations.append({
'type': 'best_practice',
'severity': 'low',
'message': f'{select_all_count} queries use SELECT *',
'action': 'Specify only needed columns instead of SELECT *'
})
return {
'ok': True,
'database': database,
'table': table,
'total_queries': total,
'avg_execution_time': round(avg_time, 3),
'slow_queries': slow_count,
'recommendations': recommendations
}
def get_index_recommendations(self, user_store, database: str, table: str) -> Dict:
"""Suggest indexes based on query patterns"""
log_path = user_store.local("performance", "query_log.json")
logs = read_json(log_path, [])
# Filter queries for this table
table_queries = [
log for log in logs
if database in log.get('query', '') and table in log.get('query', '')
]
# Analyze WHERE clauses
where_columns = {}
for log in table_queries:
query = log.get('query', '').upper()
if 'WHERE' in query:
# Simple pattern matching for column names
# This is a basic implementation
parts = query.split('WHERE')[1].split('AND')
for part in parts:
if '=' in part or '>' in part or '<' in part:
# Extract column name (simplified)
col = part.split('=')[0].split('>')[0].split('<')[0].strip()
col = col.split('.')[-1] if '.' in col else col
where_columns[col] = where_columns.get(col, 0) + 1
# Generate recommendations
recommendations = []
for col, count in sorted(where_columns.items(), key=lambda x: x[1], reverse=True)[:5]:
recommendations.append({
'column': col,
'usage_count': count,
'sql': f'CREATE INDEX idx_{table}_{col} ON {database}.{table}({col});',
'reason': f'Column "{col}" is used in WHERE clause {count} times'
})
return {
'ok': True,
'database': database,
'table': table,
'recommendations': recommendations
}
def clear_old_logs(self, user_store, days: int = 30) -> Dict:
"""Clear logs older than N days"""
log_path = user_store.local("performance", "query_log.json")
logs = read_json(log_path, [])
cutoff = datetime.now(timezone.utc) - timedelta(days=days)
filtered_logs = [
log for log in logs
if datetime.fromisoformat(log['timestamp']).replace(tzinfo=timezone.utc) > cutoff
]
removed = len(logs) - len(filtered_logs)
write_json(log_path, filtered_logs)
user_store.upload_file(log_path, "performance/query_log.json", f"Clear logs older than {days} days")
return {
'ok': True,
'removed': removed,
'remaining': len(filtered_logs)
}
performance_monitor = PerformanceMonitor()