""" Process List and Query Killer Monitor running queries and kill long-running or problematic queries """ import json from pathlib import Path from datetime import datetime, timedelta from typing import Dict, List, Optional import uuid import threading import time class ProcessManager: """Manage running queries and processes""" def __init__(self): self.active_queries = {} # {query_id: query_info} self.query_history = [] self.lock = threading.Lock() self.max_history = 1000 def _get_processes_path(self, user_store: Path) -> Path: """Get path to processes directory""" processes_dir = user_store / 'processes' processes_dir.mkdir(exist_ok=True) return processes_dir def register_query(self, user_store: Path, username: str, sql: str, database: str = None) -> str: """Register a new query execution""" query_id = str(uuid.uuid4()) query_info = { 'id': query_id, 'username': username, 'sql': sql, 'database': database, 'status': 'RUNNING', 'started_at': datetime.now().isoformat(), 'duration': 0, 'rows_affected': 0, 'error': None } with self.lock: self.active_queries[query_id] = query_info return query_id def complete_query(self, query_id: str, rows_affected: int = 0, error: str = None): """Mark query as completed""" with self.lock: if query_id in self.active_queries: query_info = self.active_queries[query_id] started = datetime.fromisoformat(query_info['started_at']) duration = (datetime.now() - started).total_seconds() query_info['status'] = 'ERROR' if error else 'COMPLETED' query_info['completed_at'] = datetime.now().isoformat() query_info['duration'] = duration query_info['rows_affected'] = rows_affected query_info['error'] = error # Move to history self.query_history.append(query_info) if len(self.query_history) > self.max_history: self.query_history.pop(0) # Remove from active del self.active_queries[query_id] def kill_query(self, query_id: str) -> Dict: """Kill a running query""" with self.lock: if query_id not in self.active_queries: return {'ok': False, 'error': 'Query not found or already completed'} query_info = self.active_queries[query_id] query_info['status'] = 'KILLED' query_info['completed_at'] = datetime.now().isoformat() query_info['error'] = 'Query killed by user' # Move to history self.query_history.append(query_info) del self.active_queries[query_id] return { 'ok': True, 'message': f'Query {query_id} killed successfully' } def list_processes(self, username: str = None, status: str = None) -> List[Dict]: """List all active queries""" with self.lock: processes = list(self.active_queries.values()) # Filter by username if username: processes = [p for p in processes if p['username'] == username] # Filter by status if status: processes = [p for p in processes if p['status'] == status] # Calculate current duration for running queries for process in processes: if process['status'] == 'RUNNING': started = datetime.fromisoformat(process['started_at']) process['duration'] = (datetime.now() - started).total_seconds() return processes def get_query_history(self, username: str = None, limit: int = 100) -> List[Dict]: """Get query execution history""" with self.lock: history = self.query_history.copy() if username: history = [h for h in history if h['username'] == username] # Sort by completed_at descending history.sort(key=lambda x: x.get('completed_at', ''), reverse=True) return history[:limit] def get_slow_queries(self, threshold_seconds: float = 5.0) -> List[Dict]: """Get queries running longer than threshold""" with self.lock: slow_queries = [] now = datetime.now() for query_info in self.active_queries.values(): started = datetime.fromisoformat(query_info['started_at']) duration = (now - started).total_seconds() if duration > threshold_seconds: query_copy = query_info.copy() query_copy['duration'] = duration slow_queries.append(query_copy) return slow_queries def kill_slow_queries(self, threshold_seconds: float = 30.0) -> Dict: """Kill all queries running longer than threshold""" slow_queries = self.get_slow_queries(threshold_seconds) killed_count = 0 for query in slow_queries: result = self.kill_query(query['id']) if result['ok']: killed_count += 1 return { 'ok': True, 'message': f'Killed {killed_count} slow queries', 'killed_count': killed_count } def get_query_stats(self, username: str = None) -> Dict: """Get query execution statistics""" with self.lock: history = self.query_history.copy() if username: history = [h for h in history if h['username'] == username] if not history: return { 'ok': True, 'total_queries': 0, 'avg_duration': 0, 'max_duration': 0, 'min_duration': 0, 'error_rate': 0 } durations = [h['duration'] for h in history if h.get('duration')] errors = [h for h in history if h['status'] == 'ERROR'] return { 'ok': True, 'total_queries': len(history), 'completed': len([h for h in history if h['status'] == 'COMPLETED']), 'errors': len(errors), 'killed': len([h for h in history if h['status'] == 'KILLED']), 'avg_duration': sum(durations) / len(durations) if durations else 0, 'max_duration': max(durations) if durations else 0, 'min_duration': min(durations) if durations else 0, 'error_rate': len(errors) / len(history) if history else 0 } def get_active_users(self) -> List[Dict]: """Get list of users with active queries""" with self.lock: users = {} for query_info in self.active_queries.values(): username = query_info['username'] if username not in users: users[username] = { 'username': username, 'active_queries': 0, 'oldest_query': None } users[username]['active_queries'] += 1 # Track oldest query if not users[username]['oldest_query']: users[username]['oldest_query'] = query_info['started_at'] else: if query_info['started_at'] < users[username]['oldest_query']: users[username]['oldest_query'] = query_info['started_at'] return list(users.values()) def export_process_list(self) -> str: """Export current process list as text""" processes = self.list_processes() lines = [] lines.append('=' * 80) lines.append('CORPUSDB PROCESS LIST') lines.append(f'Generated: {datetime.now().isoformat()}') lines.append('=' * 80) lines.append('') if not processes: lines.append('No active queries') else: for proc in processes: lines.append(f"Query ID: {proc['id']}") lines.append(f"User: {proc['username']}") lines.append(f"Database: {proc.get('database', 'N/A')}") lines.append(f"Status: {proc['status']}") lines.append(f"Started: {proc['started_at']}") lines.append(f"Duration: {proc['duration']:.2f}s") lines.append(f"SQL: {proc['sql'][:100]}...") lines.append('-' * 80) return '\n'.join(lines) def clear_history(self, older_than_hours: int = 24) -> Dict: """Clear old query history""" with self.lock: cutoff = datetime.now() - timedelta(hours=older_than_hours) original_count = len(self.query_history) self.query_history = [ h for h in self.query_history if datetime.fromisoformat(h.get('completed_at', datetime.now().isoformat())) > cutoff ] removed = original_count - len(self.query_history) return { 'ok': True, 'message': f'Removed {removed} old queries from history', 'removed': removed, 'remaining': len(self.query_history) } process_manager = ProcessManager()