corpusdb / app /process_manager.py
mrsavage1's picture
Upload 52 files
723f9ab verified
"""
Process List and Query Killer
Monitor running queries and kill long-running or problematic queries
"""
import json
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import uuid
import threading
import time
class ProcessManager:
"""Manage running queries and processes"""
def __init__(self):
self.active_queries = {} # {query_id: query_info}
self.query_history = []
self.lock = threading.Lock()
self.max_history = 1000
def _get_processes_path(self, user_store: Path) -> Path:
"""Get path to processes directory"""
processes_dir = user_store / 'processes'
processes_dir.mkdir(exist_ok=True)
return processes_dir
def register_query(self, user_store: Path, username: str, sql: str,
database: str = None) -> str:
"""Register a new query execution"""
query_id = str(uuid.uuid4())
query_info = {
'id': query_id,
'username': username,
'sql': sql,
'database': database,
'status': 'RUNNING',
'started_at': datetime.now().isoformat(),
'duration': 0,
'rows_affected': 0,
'error': None
}
with self.lock:
self.active_queries[query_id] = query_info
return query_id
def complete_query(self, query_id: str, rows_affected: int = 0, error: str = None):
"""Mark query as completed"""
with self.lock:
if query_id in self.active_queries:
query_info = self.active_queries[query_id]
started = datetime.fromisoformat(query_info['started_at'])
duration = (datetime.now() - started).total_seconds()
query_info['status'] = 'ERROR' if error else 'COMPLETED'
query_info['completed_at'] = datetime.now().isoformat()
query_info['duration'] = duration
query_info['rows_affected'] = rows_affected
query_info['error'] = error
# Move to history
self.query_history.append(query_info)
if len(self.query_history) > self.max_history:
self.query_history.pop(0)
# Remove from active
del self.active_queries[query_id]
def kill_query(self, query_id: str) -> Dict:
"""Kill a running query"""
with self.lock:
if query_id not in self.active_queries:
return {'ok': False, 'error': 'Query not found or already completed'}
query_info = self.active_queries[query_id]
query_info['status'] = 'KILLED'
query_info['completed_at'] = datetime.now().isoformat()
query_info['error'] = 'Query killed by user'
# Move to history
self.query_history.append(query_info)
del self.active_queries[query_id]
return {
'ok': True,
'message': f'Query {query_id} killed successfully'
}
def list_processes(self, username: str = None, status: str = None) -> List[Dict]:
"""List all active queries"""
with self.lock:
processes = list(self.active_queries.values())
# Filter by username
if username:
processes = [p for p in processes if p['username'] == username]
# Filter by status
if status:
processes = [p for p in processes if p['status'] == status]
# Calculate current duration for running queries
for process in processes:
if process['status'] == 'RUNNING':
started = datetime.fromisoformat(process['started_at'])
process['duration'] = (datetime.now() - started).total_seconds()
return processes
def get_query_history(self, username: str = None, limit: int = 100) -> List[Dict]:
"""Get query execution history"""
with self.lock:
history = self.query_history.copy()
if username:
history = [h for h in history if h['username'] == username]
# Sort by completed_at descending
history.sort(key=lambda x: x.get('completed_at', ''), reverse=True)
return history[:limit]
def get_slow_queries(self, threshold_seconds: float = 5.0) -> List[Dict]:
"""Get queries running longer than threshold"""
with self.lock:
slow_queries = []
now = datetime.now()
for query_info in self.active_queries.values():
started = datetime.fromisoformat(query_info['started_at'])
duration = (now - started).total_seconds()
if duration > threshold_seconds:
query_copy = query_info.copy()
query_copy['duration'] = duration
slow_queries.append(query_copy)
return slow_queries
def kill_slow_queries(self, threshold_seconds: float = 30.0) -> Dict:
"""Kill all queries running longer than threshold"""
slow_queries = self.get_slow_queries(threshold_seconds)
killed_count = 0
for query in slow_queries:
result = self.kill_query(query['id'])
if result['ok']:
killed_count += 1
return {
'ok': True,
'message': f'Killed {killed_count} slow queries',
'killed_count': killed_count
}
def get_query_stats(self, username: str = None) -> Dict:
"""Get query execution statistics"""
with self.lock:
history = self.query_history.copy()
if username:
history = [h for h in history if h['username'] == username]
if not history:
return {
'ok': True,
'total_queries': 0,
'avg_duration': 0,
'max_duration': 0,
'min_duration': 0,
'error_rate': 0
}
durations = [h['duration'] for h in history if h.get('duration')]
errors = [h for h in history if h['status'] == 'ERROR']
return {
'ok': True,
'total_queries': len(history),
'completed': len([h for h in history if h['status'] == 'COMPLETED']),
'errors': len(errors),
'killed': len([h for h in history if h['status'] == 'KILLED']),
'avg_duration': sum(durations) / len(durations) if durations else 0,
'max_duration': max(durations) if durations else 0,
'min_duration': min(durations) if durations else 0,
'error_rate': len(errors) / len(history) if history else 0
}
def get_active_users(self) -> List[Dict]:
"""Get list of users with active queries"""
with self.lock:
users = {}
for query_info in self.active_queries.values():
username = query_info['username']
if username not in users:
users[username] = {
'username': username,
'active_queries': 0,
'oldest_query': None
}
users[username]['active_queries'] += 1
# Track oldest query
if not users[username]['oldest_query']:
users[username]['oldest_query'] = query_info['started_at']
else:
if query_info['started_at'] < users[username]['oldest_query']:
users[username]['oldest_query'] = query_info['started_at']
return list(users.values())
def export_process_list(self) -> str:
"""Export current process list as text"""
processes = self.list_processes()
lines = []
lines.append('=' * 80)
lines.append('CORPUSDB PROCESS LIST')
lines.append(f'Generated: {datetime.now().isoformat()}')
lines.append('=' * 80)
lines.append('')
if not processes:
lines.append('No active queries')
else:
for proc in processes:
lines.append(f"Query ID: {proc['id']}")
lines.append(f"User: {proc['username']}")
lines.append(f"Database: {proc.get('database', 'N/A')}")
lines.append(f"Status: {proc['status']}")
lines.append(f"Started: {proc['started_at']}")
lines.append(f"Duration: {proc['duration']:.2f}s")
lines.append(f"SQL: {proc['sql'][:100]}...")
lines.append('-' * 80)
return '\n'.join(lines)
def clear_history(self, older_than_hours: int = 24) -> Dict:
"""Clear old query history"""
with self.lock:
cutoff = datetime.now() - timedelta(hours=older_than_hours)
original_count = len(self.query_history)
self.query_history = [
h for h in self.query_history
if datetime.fromisoformat(h.get('completed_at', datetime.now().isoformat())) > cutoff
]
removed = original_count - len(self.query_history)
return {
'ok': True,
'message': f'Removed {removed} old queries from history',
'removed': removed,
'remaining': len(self.query_history)
}
process_manager = ProcessManager()