File size: 17,161 Bytes
c2ea5ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7bc750c
c2ea5ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7bc750c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
"""
Service for test-related operations
"""

import os
import json
import time
import logging
import traceback
import threading
from typing import Dict, List, Any, Optional
from pathlib import Path

from backend.server_config import TEST_RESULTS_FILE, PROJECT_ROOT

logger = logging.getLogger("agent_monitoring_server.services.test")

class TestService:
    """Service for test-related operations"""
    
    @staticmethod
    def test_relation(tester, relation_id: str, model: str, system_prompt: Optional[str] = None) -> Dict[str, Any]:
        """Test a specific relation"""
        try:
            result = tester.test_relation(relation_id, model, system_prompt)
            return result
        except Exception as e:
            logger.error(f"Error testing relation: {str(e)}")
            logger.error(traceback.format_exc())
            raise
            
    @staticmethod
    def test_relation_with_jailbreak(
        tester, 
        relation_id: str, 
        model: str, 
        jailbreak_index: int, 
        system_prompt: Optional[str] = None
    ) -> Dict[str, Any]:
        """Test a relation with a jailbreak attack"""
        try:
            result = tester.test_relation_with_jailbreak(relation_id, model, jailbreak_index, system_prompt)
            return result
        except Exception as e:
            logger.error(f"Error testing relation with jailbreak: {str(e)}")
            logger.error(traceback.format_exc())
            raise
            
    @staticmethod
    def test_relations_by_type(
        tester,
        relation_type: str,
        model: str,
        system_prompt: Optional[str] = None
    ) -> Dict[str, Any]:
        """Test all relations of a specific type"""
        try:
            # Get relations of the specified type
            relations = tester.list_relations(relation_type)
            if not relations:
                return {
                    "message": "No relations found of the specified type", 
                    "results": {}
                }
                
            # Get relation IDs
            relation_ids = [r["id"] for r in relations]
            
            # Test the relations and save results
            results = tester.test_multiple_relations(relation_ids, model, system_prompt)
            tester.save_results(TEST_RESULTS_FILE, results)
            
            return {
                "message": f"Tested {len(relation_ids)} relations of type {relation_type}",
                "relation_count": len(relation_ids),
                "results": results
            }
        except Exception as e:
            logger.error(f"Error testing relations by type: {str(e)}")
            logger.error(traceback.format_exc())
            raise
    
    @staticmethod
    def run_perturbation_test(
        tester,
        knowledge_graph: str,
        model: str = "gpt-5-mini",
        perturbation_type: str = "jailbreak",
        relation_type: str = "",
        max_jailbreaks: int = 5,
        relation_limit: int = 5
    ) -> Dict[str, Any]:
        """Run perturbation tests on a knowledge graph (using database)"""
        try:
            # Get a database session
            from backend.database.utils import get_db, get_knowledge_graph
            session = next(get_db())
            
            try:
                # Get the knowledge graph from database
                kg = get_knowledge_graph(session, knowledge_graph)
                if not kg:
                    raise FileNotFoundError(f"Knowledge graph '{knowledge_graph}' not found in database")
                
                # Get the knowledge graph content
                knowledge_graph_content = kg.graph_data
                logger.info(f"Retrieved knowledge graph {knowledge_graph} from database for testing")
                
                # Generate timestamp for this test run
                timestamp = int(time.time())
                output_path = f"perturbation_results_{timestamp}.json"
                    
                # Limit jailbreak techniques if specified
                if perturbation_type == 'jailbreak' and max_jailbreaks > 0:
                    if len(tester.jailbreak_techniques) > max_jailbreaks:
                        logger.info(f"Limiting jailbreak techniques to {max_jailbreaks}")
                        tester.jailbreak_techniques = tester.jailbreak_techniques[:max_jailbreaks]
                
                # Start the test in a separate thread
                def run_tests():
                    try:
                        # Run the tests
                        tester.run_tests(
                            knowledge_graph=knowledge_graph_content,
                            output_file=output_path,
                            model=model,
                            perturbation_type=perturbation_type,
                            relation_type=relation_type,
                            relation_limit=relation_limit,
                            max_jailbreaks=max_jailbreaks
                        )
                        
                        # Add metadata to the results file
                        try:
                            with open(output_path, 'r') as f:
                                results = json.load(f)
                            
                            # Add test metadata
                            results["test_metadata"] = {
                                "timestamp": timestamp,
                                "knowledge_graph_id": kg.id,
                                "knowledge_graph_file": knowledge_graph,
                                "model": model, 
                                "perturbation_type": perturbation_type,
                                "relation_type": relation_type,
                                "max_jailbreaks": max_jailbreaks,
                                "relation_limit": relation_limit
                            }
                            
                            # Write back the updated results
                            with open(output_path, 'w') as f:
                                json.dump(results, f, indent=2)
                                
                            logger.info(f"Added metadata to results file {output_path}")
                        except Exception as e:
                            logger.error(f"Error adding metadata to results: {str(e)}")
                        
                        logger.info(f"Perturbation tests completed. Results saved to {output_path}")
                    except Exception as e:
                        logger.error(f"Error running perturbation tests: {str(e)}")
                        logger.error(traceback.format_exc())
                        
                        # Create an error results file
                        error_results = {
                            "error": True,
                            "error_message": str(e),
                            "timestamp": timestamp,
                            "test_metadata": {
                                "timestamp": timestamp,
                                "knowledge_graph_id": kg.id, 
                                "knowledge_graph_file": knowledge_graph,
                                "model": model, 
                                "perturbation_type": perturbation_type,
                                "relation_type": relation_type,
                                "max_jailbreaks": max_jailbreaks,
                                "relation_limit": relation_limit,
                                "status": "failed"
                            }
                        }
                        
                        with open(output_path, 'w') as f:
                            json.dump(error_results, f, indent=2)
                
                # Start the thread
                threading.Thread(target=run_tests).start()
                
                # Return response with test information
                return {
                    "status": "in_progress",
                    "message": f"Perturbation testing started. Results will be saved to {output_path}",
                    "output_file": output_path,
                    "timestamp": timestamp,
                    "knowledge_graph_id": kg.id
                }
            finally:
                session.close()
        except Exception as e:
            logger.error(f"Error starting perturbation test: {str(e)}")
            logger.error(traceback.format_exc())
            raise
            
    @staticmethod
    def get_test_results() -> Dict[str, Any]:
        """Get test results from the most recent test"""
        try:
            if not os.path.exists(TEST_RESULTS_FILE):
                # Create a simple "no results" response
                return {
                    "status": "no_results",
                    "message": "No test results are currently available. Run a perturbation test first."
                }
                
            try:
                with open(TEST_RESULTS_FILE, 'r') as f:
                    results = json.load(f)
                    
                # Add download URL for the results file if it doesn't have an error
                if not results.get("error", False):
                    # If there's an output file stored in the results, add a download link
                    if "output_file" in results:
                        output_file = results["output_file"]
                        results["download_url"] = f"/download/{output_file}"
                        logger.info(f"Added download URL for {output_file}")
                    # If there's no output file but we can guess it from timestamp
                    elif "timestamp" in results:
                        timestamp = int(results.get("timestamp", time.time()))
                        guessed_file = f"perturbation_results_{timestamp}.json"
                        if os.path.exists(guessed_file):
                            results["output_file"] = guessed_file
                            results["download_url"] = f"/download/{guessed_file}"
                            logger.info(f"Added download URL for guessed file {guessed_file}")
                
                return results
            except json.JSONDecodeError as e:
                # The test results file exists but is not valid JSON
                logger.error(f"Invalid JSON in test results file: {str(e)}")
                return {
                    "error": True,
                    "error_message": f"Test results file contains invalid JSON: {str(e)}",
                    "timestamp": time.time()
                }
        except Exception as e:
            logger.error(f"Error retrieving test results: {str(e)}")
            raise
            
    @staticmethod
    def get_test_history() -> Dict[str, List[Dict[str, Any]]]:
        """Get history of all tests"""
        try:
            # Get all perturbation result files
            test_dir = 'datasets/test_results'
            Path(test_dir).mkdir(parents=True, exist_ok=True)
                
            result_files = [f for f in os.listdir(test_dir) 
                         if f.startswith('perturbation_results_') and f.endswith('.json')]
            
            # Collect metadata from each file
            tests = []
            for file in result_files:
                try:
                    with open(os.path.join(test_dir, file), 'r') as f:
                        data = json.load(f)
                        # Add output file name to the data
                        data['output_file'] = file
                        tests.append(data)
                except Exception as e:
                    logger.error(f"Error reading test result file {file}: {str(e)}")
                    
            return {"tests": tests}
        except Exception as e:
            logger.error(f"Error getting test history: {str(e)}")
            return {"tests": [], "error": str(e)}
            
    @staticmethod
    def get_specific_test_result(test_id: Optional[str] = None, test_file: Optional[str] = None) -> Dict[str, Any]:
        """Get a specific test result by ID or filename"""
        try:
            # If file is specified, load directly
            if test_file:
                file_path = str(PROJECT_ROOT / 'datasets' / 'test_results' / test_file)
                if not os.path.exists(file_path):
                    raise FileNotFoundError(f"Test result file {test_file} not found")
                    
                with open(file_path, 'r') as f:
                    result = json.load(f)
                    # Add download URL
                    result['download_url'] = f"/download/{test_file}"
                    return result
            # Otherwise find by ID
            elif test_id:
                test_dir = 'datasets/test_results'
                result_files = [f for f in os.listdir(test_dir) 
                             if f.startswith('perturbation_results_') and f.endswith('.json')]
                
                for file in result_files:
                    try:
                        with open(os.path.join(test_dir, file), 'r') as f:
                            data = json.load(f)
                            # Check if this is the test we're looking for
                            if str(data.get('timestamp', '')) == test_id or data.get('id', '') == test_id:
                                result = data
                                result['download_url'] = f"/download/{file}"
                                result['output_file'] = file
                                return result
                    except Exception as e:
                        logger.error(f"Error reading test result file {file}: {str(e)}")
                
                raise FileNotFoundError(f"Test result with ID {test_id} not found")
            else:
                raise ValueError("Either test_id or test_file must be provided")
        except Exception as e:
            logger.error(f"Error getting specific test result: {str(e)}")
            raise
            
    @staticmethod
    def get_progress_status(output_file: str) -> Dict[str, Any]:
        """Get progress status for a test"""
        try:
            # Construct the progress file path
            progress_file = f"progress_{output_file}"
            test_dir = 'datasets/test_results'
            
            # Make sure the test_results directory exists
            Path(test_dir).mkdir(parents=True, exist_ok=True)
            
            progress_path = os.path.join(test_dir, progress_file)
            
            if not os.path.exists(progress_path):
                # Create an empty progress file as a placeholder
                try:
                    with open(progress_path, 'w') as f:
                        json.dump({
                            "status": "initializing",
                            "overall_progress_percentage": 0,
                            "current_jailbreak": "Preparing...",
                            "last_tested_relation": "Preparing...",
                            "created_at": time.time()
                        }, f)
                    logger.info(f"Created placeholder progress file: {progress_path}")
                except Exception as e:
                    logger.error(f"Failed to create progress file: {str(e)}")
                    raise
            
            # Read the progress file
            with open(progress_path, 'r') as f:
                progress_data = json.load(f)
            
            # Add metadata about the file itself
            progress_data["progress_file"] = progress_file
            progress_data["last_updated"] = os.path.getmtime(progress_path)
            
            return progress_data
        except Exception as e:
            logger.error(f"Error getting progress status: {str(e)}")
            logger.error(traceback.format_exc())
            raise
            
    @staticmethod
    def check_progress_file(output_file: str) -> Dict[str, Any]:
        """Check if a progress file exists without returning 404"""
        try:
            # Construct the progress file path
            progress_file = f"progress_{output_file}"
            test_dir = 'datasets/test_results'
            
            # Make sure the test_results directory exists
            Path(test_dir).mkdir(parents=True, exist_ok=True)
            
            progress_path = os.path.join(test_dir, progress_file)
            
            exists = os.path.exists(progress_path)
            
            return {
                "exists": exists,
                "file": progress_file,
                "path": progress_path,
                "last_modified": os.path.getmtime(progress_path) if exists else None,
                "status": "ready_to_create" if not exists else "exists"
            }
        except Exception as e:
            logger.error(f"Error checking progress file: {str(e)}")
            logger.error(traceback.format_exc())
            return {
                "exists": False,
                "error": str(e)
            }