File size: 10,116 Bytes
723f9ab
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
"""
Process List and Query Killer
Monitor running queries and kill long-running or problematic queries
"""

import json
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import uuid
import threading
import time

class ProcessManager:
    """Manage running queries and processes"""
    
    def __init__(self):
        self.active_queries = {}  # {query_id: query_info}
        self.query_history = []
        self.lock = threading.Lock()
        self.max_history = 1000
    
    def _get_processes_path(self, user_store: Path) -> Path:
        """Get path to processes directory"""
        processes_dir = user_store / 'processes'
        processes_dir.mkdir(exist_ok=True)
        return processes_dir
    
    def register_query(self, user_store: Path, username: str, sql: str, 
                      database: str = None) -> str:
        """Register a new query execution"""
        query_id = str(uuid.uuid4())
        
        query_info = {
            'id': query_id,
            'username': username,
            'sql': sql,
            'database': database,
            'status': 'RUNNING',
            'started_at': datetime.now().isoformat(),
            'duration': 0,
            'rows_affected': 0,
            'error': None
        }
        
        with self.lock:
            self.active_queries[query_id] = query_info
        
        return query_id
    
    def complete_query(self, query_id: str, rows_affected: int = 0, error: str = None):
        """Mark query as completed"""
        with self.lock:
            if query_id in self.active_queries:
                query_info = self.active_queries[query_id]
                
                started = datetime.fromisoformat(query_info['started_at'])
                duration = (datetime.now() - started).total_seconds()
                
                query_info['status'] = 'ERROR' if error else 'COMPLETED'
                query_info['completed_at'] = datetime.now().isoformat()
                query_info['duration'] = duration
                query_info['rows_affected'] = rows_affected
                query_info['error'] = error
                
                # Move to history
                self.query_history.append(query_info)
                if len(self.query_history) > self.max_history:
                    self.query_history.pop(0)
                
                # Remove from active
                del self.active_queries[query_id]
    
    def kill_query(self, query_id: str) -> Dict:
        """Kill a running query"""
        with self.lock:
            if query_id not in self.active_queries:
                return {'ok': False, 'error': 'Query not found or already completed'}
            
            query_info = self.active_queries[query_id]
            query_info['status'] = 'KILLED'
            query_info['completed_at'] = datetime.now().isoformat()
            query_info['error'] = 'Query killed by user'
            
            # Move to history
            self.query_history.append(query_info)
            del self.active_queries[query_id]
            
            return {
                'ok': True,
                'message': f'Query {query_id} killed successfully'
            }
    
    def list_processes(self, username: str = None, status: str = None) -> List[Dict]:
        """List all active queries"""
        with self.lock:
            processes = list(self.active_queries.values())
            
            # Filter by username
            if username:
                processes = [p for p in processes if p['username'] == username]
            
            # Filter by status
            if status:
                processes = [p for p in processes if p['status'] == status]
            
            # Calculate current duration for running queries
            for process in processes:
                if process['status'] == 'RUNNING':
                    started = datetime.fromisoformat(process['started_at'])
                    process['duration'] = (datetime.now() - started).total_seconds()
            
            return processes
    
    def get_query_history(self, username: str = None, limit: int = 100) -> List[Dict]:
        """Get query execution history"""
        with self.lock:
            history = self.query_history.copy()
            
            if username:
                history = [h for h in history if h['username'] == username]
            
            # Sort by completed_at descending
            history.sort(key=lambda x: x.get('completed_at', ''), reverse=True)
            
            return history[:limit]
    
    def get_slow_queries(self, threshold_seconds: float = 5.0) -> List[Dict]:
        """Get queries running longer than threshold"""
        with self.lock:
            slow_queries = []
            now = datetime.now()
            
            for query_info in self.active_queries.values():
                started = datetime.fromisoformat(query_info['started_at'])
                duration = (now - started).total_seconds()
                
                if duration > threshold_seconds:
                    query_copy = query_info.copy()
                    query_copy['duration'] = duration
                    slow_queries.append(query_copy)
            
            return slow_queries
    
    def kill_slow_queries(self, threshold_seconds: float = 30.0) -> Dict:
        """Kill all queries running longer than threshold"""
        slow_queries = self.get_slow_queries(threshold_seconds)
        killed_count = 0
        
        for query in slow_queries:
            result = self.kill_query(query['id'])
            if result['ok']:
                killed_count += 1
        
        return {
            'ok': True,
            'message': f'Killed {killed_count} slow queries',
            'killed_count': killed_count
        }
    
    def get_query_stats(self, username: str = None) -> Dict:
        """Get query execution statistics"""
        with self.lock:
            history = self.query_history.copy()
            
            if username:
                history = [h for h in history if h['username'] == username]
            
            if not history:
                return {
                    'ok': True,
                    'total_queries': 0,
                    'avg_duration': 0,
                    'max_duration': 0,
                    'min_duration': 0,
                    'error_rate': 0
                }
            
            durations = [h['duration'] for h in history if h.get('duration')]
            errors = [h for h in history if h['status'] == 'ERROR']
            
            return {
                'ok': True,
                'total_queries': len(history),
                'completed': len([h for h in history if h['status'] == 'COMPLETED']),
                'errors': len(errors),
                'killed': len([h for h in history if h['status'] == 'KILLED']),
                'avg_duration': sum(durations) / len(durations) if durations else 0,
                'max_duration': max(durations) if durations else 0,
                'min_duration': min(durations) if durations else 0,
                'error_rate': len(errors) / len(history) if history else 0
            }
    
    def get_active_users(self) -> List[Dict]:
        """Get list of users with active queries"""
        with self.lock:
            users = {}
            
            for query_info in self.active_queries.values():
                username = query_info['username']
                if username not in users:
                    users[username] = {
                        'username': username,
                        'active_queries': 0,
                        'oldest_query': None
                    }
                
                users[username]['active_queries'] += 1
                
                # Track oldest query
                if not users[username]['oldest_query']:
                    users[username]['oldest_query'] = query_info['started_at']
                else:
                    if query_info['started_at'] < users[username]['oldest_query']:
                        users[username]['oldest_query'] = query_info['started_at']
            
            return list(users.values())
    
    def export_process_list(self) -> str:
        """Export current process list as text"""
        processes = self.list_processes()
        
        lines = []
        lines.append('=' * 80)
        lines.append('CORPUSDB PROCESS LIST')
        lines.append(f'Generated: {datetime.now().isoformat()}')
        lines.append('=' * 80)
        lines.append('')
        
        if not processes:
            lines.append('No active queries')
        else:
            for proc in processes:
                lines.append(f"Query ID: {proc['id']}")
                lines.append(f"User: {proc['username']}")
                lines.append(f"Database: {proc.get('database', 'N/A')}")
                lines.append(f"Status: {proc['status']}")
                lines.append(f"Started: {proc['started_at']}")
                lines.append(f"Duration: {proc['duration']:.2f}s")
                lines.append(f"SQL: {proc['sql'][:100]}...")
                lines.append('-' * 80)
        
        return '\n'.join(lines)
    
    def clear_history(self, older_than_hours: int = 24) -> Dict:
        """Clear old query history"""
        with self.lock:
            cutoff = datetime.now() - timedelta(hours=older_than_hours)
            
            original_count = len(self.query_history)
            self.query_history = [
                h for h in self.query_history
                if datetime.fromisoformat(h.get('completed_at', datetime.now().isoformat())) > cutoff
            ]
            
            removed = original_count - len(self.query_history)
            
            return {
                'ok': True,
                'message': f'Removed {removed} old queries from history',
                'removed': removed,
                'remaining': len(self.query_history)
            }

process_manager = ProcessManager()