| """ |
| Process List and Query Killer |
| Monitor running queries and kill long-running or problematic queries |
| """ |
|
|
| import json |
| from pathlib import Path |
| from datetime import datetime, timedelta |
| from typing import Dict, List, Optional |
| import uuid |
| import threading |
| import time |
|
|
| class ProcessManager: |
| """Manage running queries and processes""" |
| |
| def __init__(self): |
| self.active_queries = {} |
| self.query_history = [] |
| self.lock = threading.Lock() |
| self.max_history = 1000 |
| |
| def _get_processes_path(self, user_store: Path) -> Path: |
| """Get path to processes directory""" |
| processes_dir = user_store / 'processes' |
| processes_dir.mkdir(exist_ok=True) |
| return processes_dir |
| |
| def register_query(self, user_store: Path, username: str, sql: str, |
| database: str = None) -> str: |
| """Register a new query execution""" |
| query_id = str(uuid.uuid4()) |
| |
| query_info = { |
| 'id': query_id, |
| 'username': username, |
| 'sql': sql, |
| 'database': database, |
| 'status': 'RUNNING', |
| 'started_at': datetime.now().isoformat(), |
| 'duration': 0, |
| 'rows_affected': 0, |
| 'error': None |
| } |
| |
| with self.lock: |
| self.active_queries[query_id] = query_info |
| |
| return query_id |
| |
| def complete_query(self, query_id: str, rows_affected: int = 0, error: str = None): |
| """Mark query as completed""" |
| with self.lock: |
| if query_id in self.active_queries: |
| query_info = self.active_queries[query_id] |
| |
| started = datetime.fromisoformat(query_info['started_at']) |
| duration = (datetime.now() - started).total_seconds() |
| |
| query_info['status'] = 'ERROR' if error else 'COMPLETED' |
| query_info['completed_at'] = datetime.now().isoformat() |
| query_info['duration'] = duration |
| query_info['rows_affected'] = rows_affected |
| query_info['error'] = error |
| |
| |
| self.query_history.append(query_info) |
| if len(self.query_history) > self.max_history: |
| self.query_history.pop(0) |
| |
| |
| del self.active_queries[query_id] |
| |
| def kill_query(self, query_id: str) -> Dict: |
| """Kill a running query""" |
| with self.lock: |
| if query_id not in self.active_queries: |
| return {'ok': False, 'error': 'Query not found or already completed'} |
| |
| query_info = self.active_queries[query_id] |
| query_info['status'] = 'KILLED' |
| query_info['completed_at'] = datetime.now().isoformat() |
| query_info['error'] = 'Query killed by user' |
| |
| |
| self.query_history.append(query_info) |
| del self.active_queries[query_id] |
| |
| return { |
| 'ok': True, |
| 'message': f'Query {query_id} killed successfully' |
| } |
| |
| def list_processes(self, username: str = None, status: str = None) -> List[Dict]: |
| """List all active queries""" |
| with self.lock: |
| processes = list(self.active_queries.values()) |
| |
| |
| if username: |
| processes = [p for p in processes if p['username'] == username] |
| |
| |
| if status: |
| processes = [p for p in processes if p['status'] == status] |
| |
| |
| for process in processes: |
| if process['status'] == 'RUNNING': |
| started = datetime.fromisoformat(process['started_at']) |
| process['duration'] = (datetime.now() - started).total_seconds() |
| |
| return processes |
| |
| def get_query_history(self, username: str = None, limit: int = 100) -> List[Dict]: |
| """Get query execution history""" |
| with self.lock: |
| history = self.query_history.copy() |
| |
| if username: |
| history = [h for h in history if h['username'] == username] |
| |
| |
| history.sort(key=lambda x: x.get('completed_at', ''), reverse=True) |
| |
| return history[:limit] |
| |
| def get_slow_queries(self, threshold_seconds: float = 5.0) -> List[Dict]: |
| """Get queries running longer than threshold""" |
| with self.lock: |
| slow_queries = [] |
| now = datetime.now() |
| |
| for query_info in self.active_queries.values(): |
| started = datetime.fromisoformat(query_info['started_at']) |
| duration = (now - started).total_seconds() |
| |
| if duration > threshold_seconds: |
| query_copy = query_info.copy() |
| query_copy['duration'] = duration |
| slow_queries.append(query_copy) |
| |
| return slow_queries |
| |
| def kill_slow_queries(self, threshold_seconds: float = 30.0) -> Dict: |
| """Kill all queries running longer than threshold""" |
| slow_queries = self.get_slow_queries(threshold_seconds) |
| killed_count = 0 |
| |
| for query in slow_queries: |
| result = self.kill_query(query['id']) |
| if result['ok']: |
| killed_count += 1 |
| |
| return { |
| 'ok': True, |
| 'message': f'Killed {killed_count} slow queries', |
| 'killed_count': killed_count |
| } |
| |
| def get_query_stats(self, username: str = None) -> Dict: |
| """Get query execution statistics""" |
| with self.lock: |
| history = self.query_history.copy() |
| |
| if username: |
| history = [h for h in history if h['username'] == username] |
| |
| if not history: |
| return { |
| 'ok': True, |
| 'total_queries': 0, |
| 'avg_duration': 0, |
| 'max_duration': 0, |
| 'min_duration': 0, |
| 'error_rate': 0 |
| } |
| |
| durations = [h['duration'] for h in history if h.get('duration')] |
| errors = [h for h in history if h['status'] == 'ERROR'] |
| |
| return { |
| 'ok': True, |
| 'total_queries': len(history), |
| 'completed': len([h for h in history if h['status'] == 'COMPLETED']), |
| 'errors': len(errors), |
| 'killed': len([h for h in history if h['status'] == 'KILLED']), |
| 'avg_duration': sum(durations) / len(durations) if durations else 0, |
| 'max_duration': max(durations) if durations else 0, |
| 'min_duration': min(durations) if durations else 0, |
| 'error_rate': len(errors) / len(history) if history else 0 |
| } |
| |
| def get_active_users(self) -> List[Dict]: |
| """Get list of users with active queries""" |
| with self.lock: |
| users = {} |
| |
| for query_info in self.active_queries.values(): |
| username = query_info['username'] |
| if username not in users: |
| users[username] = { |
| 'username': username, |
| 'active_queries': 0, |
| 'oldest_query': None |
| } |
| |
| users[username]['active_queries'] += 1 |
| |
| |
| if not users[username]['oldest_query']: |
| users[username]['oldest_query'] = query_info['started_at'] |
| else: |
| if query_info['started_at'] < users[username]['oldest_query']: |
| users[username]['oldest_query'] = query_info['started_at'] |
| |
| return list(users.values()) |
| |
| def export_process_list(self) -> str: |
| """Export current process list as text""" |
| processes = self.list_processes() |
| |
| lines = [] |
| lines.append('=' * 80) |
| lines.append('CORPUSDB PROCESS LIST') |
| lines.append(f'Generated: {datetime.now().isoformat()}') |
| lines.append('=' * 80) |
| lines.append('') |
| |
| if not processes: |
| lines.append('No active queries') |
| else: |
| for proc in processes: |
| lines.append(f"Query ID: {proc['id']}") |
| lines.append(f"User: {proc['username']}") |
| lines.append(f"Database: {proc.get('database', 'N/A')}") |
| lines.append(f"Status: {proc['status']}") |
| lines.append(f"Started: {proc['started_at']}") |
| lines.append(f"Duration: {proc['duration']:.2f}s") |
| lines.append(f"SQL: {proc['sql'][:100]}...") |
| lines.append('-' * 80) |
| |
| return '\n'.join(lines) |
| |
| def clear_history(self, older_than_hours: int = 24) -> Dict: |
| """Clear old query history""" |
| with self.lock: |
| cutoff = datetime.now() - timedelta(hours=older_than_hours) |
| |
| original_count = len(self.query_history) |
| self.query_history = [ |
| h for h in self.query_history |
| if datetime.fromisoformat(h.get('completed_at', datetime.now().isoformat())) > cutoff |
| ] |
| |
| removed = original_count - len(self.query_history) |
| |
| return { |
| 'ok': True, |
| 'message': f'Removed {removed} old queries from history', |
| 'removed': removed, |
| 'remaining': len(self.query_history) |
| } |
|
|
| process_manager = ProcessManager() |
|
|