Spaces:
Running
Running
| """ | |
| Universal Parser Service for automated context document generation. | |
| This service integrates the universal LangSmith trace parser to automatically | |
| generate context documents when traces are uploaded or updated. | |
| """ | |
| import json | |
| import logging | |
| from typing import List, Dict, Any, Optional | |
| from sqlalchemy.orm import Session | |
| from sqlalchemy.orm.attributes import flag_modified | |
| from backend.services.context_service import ContextService | |
| from backend.models import ContextDocumentType | |
| from backend.database.models import Trace | |
| logger = logging.getLogger("agent_monitoring_server.services.universal_parser") | |
| class UniversalParserService: | |
| """Service for automatically generating context documents from traces using universal parser.""" | |
| def __init__(self, db: Session): | |
| self.db = db | |
| self.context_service = ContextService(db) | |
| def generate_trace_context_documents(self, trace_id: str, trace_content: str) -> List[Dict[str, Any]]: | |
| """ | |
| Generate context documents for a trace using the universal parser. | |
| Args: | |
| trace_id: ID of the trace | |
| trace_content: Raw content of the trace | |
| Returns: | |
| List of created context document dictionaries | |
| """ | |
| try: | |
| # Import the universal parser from its proper location | |
| from agentgraph.input.parsers import GenericLangSmithParser | |
| import tempfile | |
| import os | |
| # Check if trace content looks like LangSmith format | |
| if not self._is_parseable_trace(trace_content): | |
| logger.info(f"Trace {trace_id} is not in a parseable format, skipping universal parser") | |
| return [] | |
| # Initialize the parser | |
| parser = GenericLangSmithParser() | |
| # Create a temporary file-like structure for the parser | |
| # Extract the actual trace data if it's wrapped in a fetched trace structure | |
| try: | |
| # Strip line numbers first if present | |
| cleaned_trace_content = self._strip_line_numbers(trace_content) | |
| data = json.loads(cleaned_trace_content) | |
| # If this is a fetched trace with nested data, extract just the trace data | |
| if isinstance(data, dict) and 'data' in data and 'platform' in data: | |
| logger.info(f"Detected fetched trace structure, extracting nested data for parsing") | |
| actual_trace_data = data['data'] | |
| trace_content_for_parser = json.dumps(actual_trace_data, indent=2) | |
| else: | |
| trace_content_for_parser = json.dumps(data, indent=2) | |
| except json.JSONDecodeError: | |
| # If not valid JSON after cleaning, try to use cleaned content as-is | |
| logger.info(f"Content is not valid JSON after cleaning, using cleaned content as-is") | |
| trace_content_for_parser = self._strip_line_numbers(trace_content) | |
| with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as temp_file: | |
| temp_file.write(trace_content_for_parser) | |
| temp_path = temp_file.name | |
| try: | |
| # Parse the trace | |
| logger.info(f"Running universal parser on trace {trace_id}") | |
| parsed_result = parser.parse_trace_file(temp_path) | |
| if 'error' in parsed_result: | |
| logger.warning(f"Parser error for trace {trace_id}: {parsed_result['error']}") | |
| return [] | |
| # Store global schema view data in trace metadata | |
| global_view = parsed_result.get('global_schema_view') | |
| if global_view: | |
| self._store_schema_metadata(trace_id, global_view) | |
| logger.info(f"Stored global schema view metadata for trace {trace_id}") | |
| # Generate context documents from the parsed results | |
| context_docs = parser.generate_universal_context_documents(parsed_result) | |
| if not context_docs: | |
| logger.info(f"No context documents generated for trace {trace_id}") | |
| return [] | |
| # Create context documents in the database | |
| created_docs = [] | |
| for doc in context_docs: | |
| try: | |
| # Map document types to our enum | |
| doc_type = self._map_document_type(doc.get('document_type', 'technical')) | |
| # Create the context document | |
| created_doc = self.context_service.create_context_document( | |
| trace_id=trace_id, | |
| title=doc.get('title', 'Universal Parser Analysis'), | |
| document_type=doc_type, | |
| content=doc.get('content', ''), | |
| file_name=f"universal_parser_{doc.get('document_type', 'analysis')}.md" | |
| ) | |
| created_docs.append(created_doc.dict()) | |
| logger.info(f"Created context document: {created_doc.title}") | |
| except ValueError as e: | |
| # Handle duplicate titles or other validation errors | |
| if "already exists" in str(e): | |
| logger.info(f"Context document already exists: {doc.get('title')}") | |
| else: | |
| logger.warning(f"Failed to create context document: {str(e)}") | |
| except Exception as e: | |
| logger.error(f"Error creating context document: {str(e)}") | |
| logger.info(f"Successfully created {len(created_docs)} context documents for trace {trace_id}") | |
| return created_docs | |
| finally: | |
| # Clean up temporary file | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| except ImportError: | |
| logger.warning("Universal parser not available - trace_schema_parser.py not found") | |
| return [] | |
| except Exception as e: | |
| logger.error(f"Error running universal parser on trace {trace_id}: {str(e)}") | |
| return [] | |
| def _store_schema_metadata(self, trace_id: str, global_view) -> None: | |
| """Store global schema view data in trace metadata for frontend access.""" | |
| try: | |
| # Get the trace | |
| trace = self.db.query(Trace).filter(Trace.trace_id == trace_id).first() | |
| if not trace: | |
| logger.warning(f"Trace {trace_id} not found for schema metadata storage") | |
| return | |
| # Ensure trace_metadata exists | |
| if not trace.trace_metadata: | |
| trace.trace_metadata = {} | |
| # Convert GlobalSchemaView to dictionary for JSON storage | |
| schema_data = { | |
| 'architecture_description': global_view.architecture_description, | |
| 'execution_flow_summary': global_view.execution_flow_summary, | |
| 'component_hierarchy': global_view.component_hierarchy, | |
| 'numerical_overview': global_view.numerical_overview, | |
| 'prompt_analytics': global_view.prompt_analytics, | |
| 'system_complexity_assessment': global_view.system_complexity_assessment | |
| } | |
| # Store in trace metadata | |
| trace.trace_metadata['schema_analytics'] = schema_data | |
| # Mark as modified for SQLAlchemy | |
| flag_modified(trace, "trace_metadata") | |
| self.db.commit() | |
| logger.info(f"Successfully stored schema analytics metadata for trace {trace_id}") | |
| except Exception as e: | |
| logger.error(f"Error storing schema metadata for trace {trace_id}: {str(e)}") | |
| def regenerate_context_documents(self, trace_id: str, trace_content: str, force: bool = False) -> List[Dict[str, Any]]: | |
| """ | |
| Regenerate context documents for a trace, optionally removing existing auto-generated ones. | |
| Args: | |
| trace_id: ID of the trace | |
| trace_content: Raw content of the trace | |
| force: Whether to remove existing auto-generated context documents first | |
| Returns: | |
| List of created context document dictionaries | |
| """ | |
| if force: | |
| try: | |
| # Remove existing auto-generated context documents | |
| existing_docs = self.context_service.get_context_documents(trace_id) | |
| for doc in existing_docs: | |
| if doc.title.startswith("Auto-generated:") or doc.file_name.startswith("universal_parser_"): | |
| try: | |
| self.context_service.delete_context_document(trace_id, doc.id) | |
| logger.info(f"Removed existing auto-generated context document: {doc.title}") | |
| except Exception as e: | |
| logger.warning(f"Failed to remove context document {doc.title}: {str(e)}") | |
| except Exception as e: | |
| logger.warning(f"Error removing existing context documents: {str(e)}") | |
| # Generate new context documents (this will also update schema metadata) | |
| return self.generate_trace_context_documents(trace_id, trace_content) | |
| def _is_parseable_trace(self, trace_content: str) -> bool: | |
| """ | |
| Check if trace content is in a format that the universal parser can handle. | |
| Args: | |
| trace_content: Raw trace content | |
| Returns: | |
| True if the trace appears to be parseable | |
| """ | |
| try: | |
| # Check if content has line numbers and strip them if present | |
| cleaned_content = self._strip_line_numbers(trace_content) | |
| # Try to parse as JSON | |
| data = json.loads(cleaned_content) | |
| # Check for LangSmith-like structure | |
| if isinstance(data, dict): | |
| # Check for common LangSmith fields at top level | |
| langsmith_indicators = ['traces', 'run_id', 'run_name', 'export_timestamp', 'trace_id', 'trace_name'] | |
| if any(field in data for field in langsmith_indicators): | |
| return True | |
| # Check for LangSmith fields nested in 'data' field (for fetched traces) | |
| if 'data' in data and isinstance(data['data'], dict): | |
| if any(field in data['data'] for field in langsmith_indicators): | |
| return True | |
| # Check for single trace structure | |
| trace_indicators = ['start_time', 'end_time', 'run_type', 'inputs', 'outputs'] | |
| if any(field in data for field in trace_indicators): | |
| return True | |
| # Check for trace indicators in nested data | |
| if 'data' in data and isinstance(data['data'], dict): | |
| if any(field in data['data'] for field in trace_indicators): | |
| return True | |
| # Check inside traces array in nested data | |
| nested_traces = data['data'].get('traces', []) | |
| if isinstance(nested_traces, list) and nested_traces: | |
| first_trace = nested_traces[0] | |
| if isinstance(first_trace, dict) and any(field in first_trace for field in trace_indicators): | |
| return True | |
| # Check for traces array at top level | |
| traces = data.get('traces', []) | |
| if isinstance(traces, list) and traces: | |
| first_trace = traces[0] | |
| if isinstance(first_trace, dict) and any(field in first_trace for field in trace_indicators): | |
| return True | |
| # Check for runs array (LangSmith format) | |
| runs = data.get('runs', []) | |
| if isinstance(runs, list) and runs: | |
| first_run = runs[0] | |
| if isinstance(first_run, dict) and any(field in first_run for field in trace_indicators): | |
| return True | |
| return False | |
| except json.JSONDecodeError: | |
| # Not JSON, check for other formats | |
| # Could add support for other trace formats here | |
| return False | |
| def _strip_line_numbers(self, content: str) -> str: | |
| """ | |
| Strip line numbers from content if present. | |
| Handles formats like: | |
| <L1> content | |
| <L2> content | |
| etc. | |
| Args: | |
| content: Content that may have line numbers | |
| Returns: | |
| Content with line numbers stripped | |
| """ | |
| import re | |
| # Check if content has line number pattern | |
| if '<L' in content and '>' in content: | |
| # Remove line numbers using regex | |
| # Pattern matches <L{number}> at the start of lines | |
| cleaned_content = re.sub(r'^<L\d+>\s*', '', content, flags=re.MULTILINE) | |
| return cleaned_content | |
| return content | |
| def _map_document_type(self, parser_doc_type: str) -> ContextDocumentType: | |
| """ | |
| Map universal parser document types to our ContextDocumentType enum. | |
| Args: | |
| parser_doc_type: Document type from the universal parser | |
| Returns: | |
| ContextDocumentType enum value | |
| """ | |
| # Map parser document types to our enum | |
| type_mapping = { | |
| 'component_structure': ContextDocumentType.SCHEMA, | |
| 'execution_pattern': ContextDocumentType.DOCUMENTATION, | |
| 'performance_profile': ContextDocumentType.DOCUMENTATION, | |
| 'system_indicators': ContextDocumentType.DOCUMENTATION, | |
| 'langgraph_workflow': ContextDocumentType.GUIDELINES, | |
| 'human_interaction': ContextDocumentType.GUIDELINES | |
| } | |
| return type_mapping.get(parser_doc_type, ContextDocumentType.DOCUMENTATION) | |
| def auto_generate_context_documents(trace_id: str, trace_content: str, db: Session) -> List[Dict[str, Any]]: | |
| """ | |
| Convenience function to auto-generate context documents for a trace. | |
| Args: | |
| trace_id: ID of the trace | |
| trace_content: Raw content of the trace | |
| db: Database session | |
| Returns: | |
| List of created context document dictionaries | |
| """ | |
| service = UniversalParserService(db) | |
| return service.generate_trace_context_documents(trace_id, trace_content) | |
| def regenerate_context_documents(trace_id: str, trace_content: str, db: Session, force: bool = False) -> List[Dict[str, Any]]: | |
| """ | |
| Convenience function to regenerate context documents for a trace. | |
| Args: | |
| trace_id: ID of the trace | |
| trace_content: Raw content of the trace | |
| db: Database session | |
| force: Whether to remove existing auto-generated context documents first | |
| Returns: | |
| List of created context document dictionaries | |
| """ | |
| service = UniversalParserService(db) | |
| return service.regenerate_context_documents(trace_id, trace_content, force) |