File size: 15,901 Bytes
c2ea5ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
"""
Universal Parser Service for automated context document generation.

This service integrates the universal LangSmith trace parser to automatically
generate context documents when traces are uploaded or updated.
"""

import json
import logging
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from sqlalchemy.orm.attributes import flag_modified

from backend.services.context_service import ContextService
from backend.models import ContextDocumentType
from backend.database.models import Trace

logger = logging.getLogger("agent_monitoring_server.services.universal_parser")


class UniversalParserService:
    """Service for automatically generating context documents from traces using universal parser."""
    
    def __init__(self, db: Session):
        self.db = db
        self.context_service = ContextService(db)
    
    def generate_trace_context_documents(self, trace_id: str, trace_content: str) -> List[Dict[str, Any]]:
        """
        Generate context documents for a trace using the universal parser.
        
        Args:
            trace_id: ID of the trace
            trace_content: Raw content of the trace
            
        Returns:
            List of created context document dictionaries
        """
        try:
            # Import the universal parser from its proper location
            from agentgraph.input.parsers import GenericLangSmithParser
            import tempfile
            import os
            
            # Check if trace content looks like LangSmith format
            if not self._is_parseable_trace(trace_content):
                logger.info(f"Trace {trace_id} is not in a parseable format, skipping universal parser")
                return []
            
            # Initialize the parser
            parser = GenericLangSmithParser()
            
            # Create a temporary file-like structure for the parser
            # Extract the actual trace data if it's wrapped in a fetched trace structure
            try:
                # Strip line numbers first if present
                cleaned_trace_content = self._strip_line_numbers(trace_content)
                
                data = json.loads(cleaned_trace_content)
                
                # If this is a fetched trace with nested data, extract just the trace data
                if isinstance(data, dict) and 'data' in data and 'platform' in data:
                    logger.info(f"Detected fetched trace structure, extracting nested data for parsing")
                    actual_trace_data = data['data']
                    trace_content_for_parser = json.dumps(actual_trace_data, indent=2)
                else:
                    trace_content_for_parser = json.dumps(data, indent=2)
                    
            except json.JSONDecodeError:
                # If not valid JSON after cleaning, try to use cleaned content as-is
                logger.info(f"Content is not valid JSON after cleaning, using cleaned content as-is")
                trace_content_for_parser = self._strip_line_numbers(trace_content)
            
            with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as temp_file:
                temp_file.write(trace_content_for_parser)
                temp_path = temp_file.name
            
            try:
                # Parse the trace
                logger.info(f"Running universal parser on trace {trace_id}")
                parsed_result = parser.parse_trace_file(temp_path)
                
                if 'error' in parsed_result:
                    logger.warning(f"Parser error for trace {trace_id}: {parsed_result['error']}")
                    return []
                
                # Store global schema view data in trace metadata
                global_view = parsed_result.get('global_schema_view')
                if global_view:
                    self._store_schema_metadata(trace_id, global_view)
                    logger.info(f"Stored global schema view metadata for trace {trace_id}")
                
                # Generate context documents from the parsed results
                context_docs = parser.generate_universal_context_documents(parsed_result)
                
                if not context_docs:
                    logger.info(f"No context documents generated for trace {trace_id}")
                    return []
                
                # Create context documents in the database
                created_docs = []
                
                for doc in context_docs:
                    try:
                        # Map document types to our enum
                        doc_type = self._map_document_type(doc.get('document_type', 'technical'))
                        
                        # Create the context document
                        created_doc = self.context_service.create_context_document(
                            trace_id=trace_id,
                            title=doc.get('title', 'Universal Parser Analysis'),
                            document_type=doc_type,
                            content=doc.get('content', ''),
                            file_name=f"universal_parser_{doc.get('document_type', 'analysis')}.md"
                        )
                        
                        created_docs.append(created_doc.dict())
                        logger.info(f"Created context document: {created_doc.title}")
                        
                    except ValueError as e:
                        # Handle duplicate titles or other validation errors
                        if "already exists" in str(e):
                            logger.info(f"Context document already exists: {doc.get('title')}")
                        else:
                            logger.warning(f"Failed to create context document: {str(e)}")
                    except Exception as e:
                        logger.error(f"Error creating context document: {str(e)}")
                
                logger.info(f"Successfully created {len(created_docs)} context documents for trace {trace_id}")
                return created_docs
                
            finally:
                # Clean up temporary file
                if os.path.exists(temp_path):
                    os.remove(temp_path)
                    
        except ImportError:
            logger.warning("Universal parser not available - trace_schema_parser.py not found")
            return []
        except Exception as e:
            logger.error(f"Error running universal parser on trace {trace_id}: {str(e)}")
            return []
    
    def _store_schema_metadata(self, trace_id: str, global_view) -> None:
        """Store global schema view data in trace metadata for frontend access."""
        try:
            # Get the trace
            trace = self.db.query(Trace).filter(Trace.trace_id == trace_id).first()
            if not trace:
                logger.warning(f"Trace {trace_id} not found for schema metadata storage")
                return
            
            # Ensure trace_metadata exists
            if not trace.trace_metadata:
                trace.trace_metadata = {}
            
            # Convert GlobalSchemaView to dictionary for JSON storage
            schema_data = {
                'architecture_description': global_view.architecture_description,
                'execution_flow_summary': global_view.execution_flow_summary,
                'component_hierarchy': global_view.component_hierarchy,
                'numerical_overview': global_view.numerical_overview,
                'prompt_analytics': global_view.prompt_analytics,
                'system_complexity_assessment': global_view.system_complexity_assessment
            }
            
            # Store in trace metadata
            trace.trace_metadata['schema_analytics'] = schema_data
            
            # Mark as modified for SQLAlchemy
            flag_modified(trace, "trace_metadata")
            
            self.db.commit()
            logger.info(f"Successfully stored schema analytics metadata for trace {trace_id}")
            
        except Exception as e:
            logger.error(f"Error storing schema metadata for trace {trace_id}: {str(e)}")
    
    def regenerate_context_documents(self, trace_id: str, trace_content: str, force: bool = False) -> List[Dict[str, Any]]:
        """
        Regenerate context documents for a trace, optionally removing existing auto-generated ones.
        
        Args:
            trace_id: ID of the trace
            trace_content: Raw content of the trace
            force: Whether to remove existing auto-generated context documents first
            
        Returns:
            List of created context document dictionaries
        """
        if force:
            try:
                # Remove existing auto-generated context documents
                existing_docs = self.context_service.get_context_documents(trace_id)
                
                for doc in existing_docs:
                    if doc.title.startswith("Auto-generated:") or doc.file_name.startswith("universal_parser_"):
                        try:
                            self.context_service.delete_context_document(trace_id, doc.id)
                            logger.info(f"Removed existing auto-generated context document: {doc.title}")
                        except Exception as e:
                            logger.warning(f"Failed to remove context document {doc.title}: {str(e)}")
                            
            except Exception as e:
                logger.warning(f"Error removing existing context documents: {str(e)}")
        
        # Generate new context documents (this will also update schema metadata)
        return self.generate_trace_context_documents(trace_id, trace_content)
    
    def _is_parseable_trace(self, trace_content: str) -> bool:
        """
        Check if trace content is in a format that the universal parser can handle.
        
        Args:
            trace_content: Raw trace content
            
        Returns:
            True if the trace appears to be parseable
        """
        try:
            # Check if content has line numbers and strip them if present
            cleaned_content = self._strip_line_numbers(trace_content)
            
            # Try to parse as JSON
            data = json.loads(cleaned_content)
            
            # Check for LangSmith-like structure
            if isinstance(data, dict):
                # Check for common LangSmith fields at top level
                langsmith_indicators = ['traces', 'run_id', 'run_name', 'export_timestamp', 'trace_id', 'trace_name']
                if any(field in data for field in langsmith_indicators):
                    return True
                
                # Check for LangSmith fields nested in 'data' field (for fetched traces)
                if 'data' in data and isinstance(data['data'], dict):
                    if any(field in data['data'] for field in langsmith_indicators):
                        return True
                
                # Check for single trace structure
                trace_indicators = ['start_time', 'end_time', 'run_type', 'inputs', 'outputs']
                if any(field in data for field in trace_indicators):
                    return True
                
                # Check for trace indicators in nested data
                if 'data' in data and isinstance(data['data'], dict):
                    if any(field in data['data'] for field in trace_indicators):
                        return True
                    
                    # Check inside traces array in nested data
                    nested_traces = data['data'].get('traces', [])
                    if isinstance(nested_traces, list) and nested_traces:
                        first_trace = nested_traces[0]
                        if isinstance(first_trace, dict) and any(field in first_trace for field in trace_indicators):
                            return True
                
                # Check for traces array at top level
                traces = data.get('traces', [])
                if isinstance(traces, list) and traces:
                    first_trace = traces[0]
                    if isinstance(first_trace, dict) and any(field in first_trace for field in trace_indicators):
                        return True
                    
                # Check for runs array (LangSmith format)
                runs = data.get('runs', [])
                if isinstance(runs, list) and runs:
                    first_run = runs[0]
                    if isinstance(first_run, dict) and any(field in first_run for field in trace_indicators):
                        return True
            
            return False
            
        except json.JSONDecodeError:
            # Not JSON, check for other formats
            # Could add support for other trace formats here
            return False

    def _strip_line_numbers(self, content: str) -> str:
        """
        Strip line numbers from content if present.
        
        Handles formats like:
        <L1> content
        <L2> content
        etc.
        
        Args:
            content: Content that may have line numbers
            
        Returns:
            Content with line numbers stripped
        """
        import re
        
        # Check if content has line number pattern
        if '<L' in content and '>' in content:
            # Remove line numbers using regex
            # Pattern matches <L{number}> at the start of lines
            cleaned_content = re.sub(r'^<L\d+>\s*', '', content, flags=re.MULTILINE)
            return cleaned_content
        
        return content
    
    def _map_document_type(self, parser_doc_type: str) -> ContextDocumentType:
        """
        Map universal parser document types to our ContextDocumentType enum.
        
        Args:
            parser_doc_type: Document type from the universal parser
            
        Returns:
            ContextDocumentType enum value
        """
        # Map parser document types to our enum
        type_mapping = {
            'component_structure': ContextDocumentType.SCHEMA,
            'execution_pattern': ContextDocumentType.DOCUMENTATION, 
            'performance_profile': ContextDocumentType.DOCUMENTATION,
            'system_indicators': ContextDocumentType.DOCUMENTATION,
            'langgraph_workflow': ContextDocumentType.GUIDELINES,
            'human_interaction': ContextDocumentType.GUIDELINES
        }
        
        return type_mapping.get(parser_doc_type, ContextDocumentType.DOCUMENTATION)


def auto_generate_context_documents(trace_id: str, trace_content: str, db: Session) -> List[Dict[str, Any]]:
    """
    Convenience function to auto-generate context documents for a trace.
    
    Args:
        trace_id: ID of the trace
        trace_content: Raw content of the trace  
        db: Database session
        
    Returns:
        List of created context document dictionaries
    """
    service = UniversalParserService(db)
    return service.generate_trace_context_documents(trace_id, trace_content)


def regenerate_context_documents(trace_id: str, trace_content: str, db: Session, force: bool = False) -> List[Dict[str, Any]]:
    """
    Convenience function to regenerate context documents for a trace.
    
    Args:
        trace_id: ID of the trace
        trace_content: Raw content of the trace
        db: Database session
        force: Whether to remove existing auto-generated context documents first
        
    Returns:
        List of created context document dictionaries
    """
    service = UniversalParserService(db)
    return service.regenerate_context_documents(trace_id, trace_content, force)