""" Universal Parser Service for automated context document generation. This service integrates the universal LangSmith trace parser to automatically generate context documents when traces are uploaded or updated. """ import json import logging from typing import List, Dict, Any, Optional from sqlalchemy.orm import Session from sqlalchemy.orm.attributes import flag_modified from backend.services.context_service import ContextService from backend.models import ContextDocumentType from backend.database.models import Trace logger = logging.getLogger("agent_monitoring_server.services.universal_parser") class UniversalParserService: """Service for automatically generating context documents from traces using universal parser.""" def __init__(self, db: Session): self.db = db self.context_service = ContextService(db) def generate_trace_context_documents(self, trace_id: str, trace_content: str) -> List[Dict[str, Any]]: """ Generate context documents for a trace using the universal parser. Args: trace_id: ID of the trace trace_content: Raw content of the trace Returns: List of created context document dictionaries """ try: # Import the universal parser from its proper location from agentgraph.input.parsers import GenericLangSmithParser import tempfile import os # Check if trace content looks like LangSmith format if not self._is_parseable_trace(trace_content): logger.info(f"Trace {trace_id} is not in a parseable format, skipping universal parser") return [] # Initialize the parser parser = GenericLangSmithParser() # Create a temporary file-like structure for the parser # Extract the actual trace data if it's wrapped in a fetched trace structure try: # Strip line numbers first if present cleaned_trace_content = self._strip_line_numbers(trace_content) data = json.loads(cleaned_trace_content) # If this is a fetched trace with nested data, extract just the trace data if isinstance(data, dict) and 'data' in data and 'platform' in data: logger.info(f"Detected fetched trace structure, extracting nested data for parsing") actual_trace_data = data['data'] trace_content_for_parser = json.dumps(actual_trace_data, indent=2) else: trace_content_for_parser = json.dumps(data, indent=2) except json.JSONDecodeError: # If not valid JSON after cleaning, try to use cleaned content as-is logger.info(f"Content is not valid JSON after cleaning, using cleaned content as-is") trace_content_for_parser = self._strip_line_numbers(trace_content) with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as temp_file: temp_file.write(trace_content_for_parser) temp_path = temp_file.name try: # Parse the trace logger.info(f"Running universal parser on trace {trace_id}") parsed_result = parser.parse_trace_file(temp_path) if 'error' in parsed_result: logger.warning(f"Parser error for trace {trace_id}: {parsed_result['error']}") return [] # Store global schema view data in trace metadata global_view = parsed_result.get('global_schema_view') if global_view: self._store_schema_metadata(trace_id, global_view) logger.info(f"Stored global schema view metadata for trace {trace_id}") # Generate context documents from the parsed results context_docs = parser.generate_universal_context_documents(parsed_result) if not context_docs: logger.info(f"No context documents generated for trace {trace_id}") return [] # Create context documents in the database created_docs = [] for doc in context_docs: try: # Map document types to our enum doc_type = self._map_document_type(doc.get('document_type', 'technical')) # Create the context document created_doc = self.context_service.create_context_document( trace_id=trace_id, title=doc.get('title', 'Universal Parser Analysis'), document_type=doc_type, content=doc.get('content', ''), file_name=f"universal_parser_{doc.get('document_type', 'analysis')}.md" ) created_docs.append(created_doc.dict()) logger.info(f"Created context document: {created_doc.title}") except ValueError as e: # Handle duplicate titles or other validation errors if "already exists" in str(e): logger.info(f"Context document already exists: {doc.get('title')}") else: logger.warning(f"Failed to create context document: {str(e)}") except Exception as e: logger.error(f"Error creating context document: {str(e)}") logger.info(f"Successfully created {len(created_docs)} context documents for trace {trace_id}") return created_docs finally: # Clean up temporary file if os.path.exists(temp_path): os.remove(temp_path) except ImportError: logger.warning("Universal parser not available - trace_schema_parser.py not found") return [] except Exception as e: logger.error(f"Error running universal parser on trace {trace_id}: {str(e)}") return [] def _store_schema_metadata(self, trace_id: str, global_view) -> None: """Store global schema view data in trace metadata for frontend access.""" try: # Get the trace trace = self.db.query(Trace).filter(Trace.trace_id == trace_id).first() if not trace: logger.warning(f"Trace {trace_id} not found for schema metadata storage") return # Ensure trace_metadata exists if not trace.trace_metadata: trace.trace_metadata = {} # Convert GlobalSchemaView to dictionary for JSON storage schema_data = { 'architecture_description': global_view.architecture_description, 'execution_flow_summary': global_view.execution_flow_summary, 'component_hierarchy': global_view.component_hierarchy, 'numerical_overview': global_view.numerical_overview, 'prompt_analytics': global_view.prompt_analytics, 'system_complexity_assessment': global_view.system_complexity_assessment } # Store in trace metadata trace.trace_metadata['schema_analytics'] = schema_data # Mark as modified for SQLAlchemy flag_modified(trace, "trace_metadata") self.db.commit() logger.info(f"Successfully stored schema analytics metadata for trace {trace_id}") except Exception as e: logger.error(f"Error storing schema metadata for trace {trace_id}: {str(e)}") def regenerate_context_documents(self, trace_id: str, trace_content: str, force: bool = False) -> List[Dict[str, Any]]: """ Regenerate context documents for a trace, optionally removing existing auto-generated ones. Args: trace_id: ID of the trace trace_content: Raw content of the trace force: Whether to remove existing auto-generated context documents first Returns: List of created context document dictionaries """ if force: try: # Remove existing auto-generated context documents existing_docs = self.context_service.get_context_documents(trace_id) for doc in existing_docs: if doc.title.startswith("Auto-generated:") or doc.file_name.startswith("universal_parser_"): try: self.context_service.delete_context_document(trace_id, doc.id) logger.info(f"Removed existing auto-generated context document: {doc.title}") except Exception as e: logger.warning(f"Failed to remove context document {doc.title}: {str(e)}") except Exception as e: logger.warning(f"Error removing existing context documents: {str(e)}") # Generate new context documents (this will also update schema metadata) return self.generate_trace_context_documents(trace_id, trace_content) def _is_parseable_trace(self, trace_content: str) -> bool: """ Check if trace content is in a format that the universal parser can handle. Args: trace_content: Raw trace content Returns: True if the trace appears to be parseable """ try: # Check if content has line numbers and strip them if present cleaned_content = self._strip_line_numbers(trace_content) # Try to parse as JSON data = json.loads(cleaned_content) # Check for LangSmith-like structure if isinstance(data, dict): # Check for common LangSmith fields at top level langsmith_indicators = ['traces', 'run_id', 'run_name', 'export_timestamp', 'trace_id', 'trace_name'] if any(field in data for field in langsmith_indicators): return True # Check for LangSmith fields nested in 'data' field (for fetched traces) if 'data' in data and isinstance(data['data'], dict): if any(field in data['data'] for field in langsmith_indicators): return True # Check for single trace structure trace_indicators = ['start_time', 'end_time', 'run_type', 'inputs', 'outputs'] if any(field in data for field in trace_indicators): return True # Check for trace indicators in nested data if 'data' in data and isinstance(data['data'], dict): if any(field in data['data'] for field in trace_indicators): return True # Check inside traces array in nested data nested_traces = data['data'].get('traces', []) if isinstance(nested_traces, list) and nested_traces: first_trace = nested_traces[0] if isinstance(first_trace, dict) and any(field in first_trace for field in trace_indicators): return True # Check for traces array at top level traces = data.get('traces', []) if isinstance(traces, list) and traces: first_trace = traces[0] if isinstance(first_trace, dict) and any(field in first_trace for field in trace_indicators): return True # Check for runs array (LangSmith format) runs = data.get('runs', []) if isinstance(runs, list) and runs: first_run = runs[0] if isinstance(first_run, dict) and any(field in first_run for field in trace_indicators): return True return False except json.JSONDecodeError: # Not JSON, check for other formats # Could add support for other trace formats here return False def _strip_line_numbers(self, content: str) -> str: """ Strip line numbers from content if present. Handles formats like: content content etc. Args: content: Content that may have line numbers Returns: Content with line numbers stripped """ import re # Check if content has line number pattern if '' in content: # Remove line numbers using regex # Pattern matches at the start of lines cleaned_content = re.sub(r'^\s*', '', content, flags=re.MULTILINE) return cleaned_content return content def _map_document_type(self, parser_doc_type: str) -> ContextDocumentType: """ Map universal parser document types to our ContextDocumentType enum. Args: parser_doc_type: Document type from the universal parser Returns: ContextDocumentType enum value """ # Map parser document types to our enum type_mapping = { 'component_structure': ContextDocumentType.SCHEMA, 'execution_pattern': ContextDocumentType.DOCUMENTATION, 'performance_profile': ContextDocumentType.DOCUMENTATION, 'system_indicators': ContextDocumentType.DOCUMENTATION, 'langgraph_workflow': ContextDocumentType.GUIDELINES, 'human_interaction': ContextDocumentType.GUIDELINES } return type_mapping.get(parser_doc_type, ContextDocumentType.DOCUMENTATION) def auto_generate_context_documents(trace_id: str, trace_content: str, db: Session) -> List[Dict[str, Any]]: """ Convenience function to auto-generate context documents for a trace. Args: trace_id: ID of the trace trace_content: Raw content of the trace db: Database session Returns: List of created context document dictionaries """ service = UniversalParserService(db) return service.generate_trace_context_documents(trace_id, trace_content) def regenerate_context_documents(trace_id: str, trace_content: str, db: Session, force: bool = False) -> List[Dict[str, Any]]: """ Convenience function to regenerate context documents for a trace. Args: trace_id: ID of the trace trace_content: Raw content of the trace db: Database session force: Whether to remove existing auto-generated context documents first Returns: List of created context document dictionaries """ service = UniversalParserService(db) return service.regenerate_context_documents(trace_id, trace_content, force)