AgentGraph / backend /services /universal_parser_service.py
wu981526092's picture
🚀 Deploy AgentGraph: Complete agent monitoring and knowledge graph system
c2ea5ed
"""
Universal Parser Service for automated context document generation.
This service integrates the universal LangSmith trace parser to automatically
generate context documents when traces are uploaded or updated.
"""
import json
import logging
from typing import List, Dict, Any, Optional
from sqlalchemy.orm import Session
from sqlalchemy.orm.attributes import flag_modified
from backend.services.context_service import ContextService
from backend.models import ContextDocumentType
from backend.database.models import Trace
logger = logging.getLogger("agent_monitoring_server.services.universal_parser")
class UniversalParserService:
"""Service for automatically generating context documents from traces using universal parser."""
def __init__(self, db: Session):
self.db = db
self.context_service = ContextService(db)
def generate_trace_context_documents(self, trace_id: str, trace_content: str) -> List[Dict[str, Any]]:
"""
Generate context documents for a trace using the universal parser.
Args:
trace_id: ID of the trace
trace_content: Raw content of the trace
Returns:
List of created context document dictionaries
"""
try:
# Import the universal parser from its proper location
from agentgraph.input.parsers import GenericLangSmithParser
import tempfile
import os
# Check if trace content looks like LangSmith format
if not self._is_parseable_trace(trace_content):
logger.info(f"Trace {trace_id} is not in a parseable format, skipping universal parser")
return []
# Initialize the parser
parser = GenericLangSmithParser()
# Create a temporary file-like structure for the parser
# Extract the actual trace data if it's wrapped in a fetched trace structure
try:
# Strip line numbers first if present
cleaned_trace_content = self._strip_line_numbers(trace_content)
data = json.loads(cleaned_trace_content)
# If this is a fetched trace with nested data, extract just the trace data
if isinstance(data, dict) and 'data' in data and 'platform' in data:
logger.info(f"Detected fetched trace structure, extracting nested data for parsing")
actual_trace_data = data['data']
trace_content_for_parser = json.dumps(actual_trace_data, indent=2)
else:
trace_content_for_parser = json.dumps(data, indent=2)
except json.JSONDecodeError:
# If not valid JSON after cleaning, try to use cleaned content as-is
logger.info(f"Content is not valid JSON after cleaning, using cleaned content as-is")
trace_content_for_parser = self._strip_line_numbers(trace_content)
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.json') as temp_file:
temp_file.write(trace_content_for_parser)
temp_path = temp_file.name
try:
# Parse the trace
logger.info(f"Running universal parser on trace {trace_id}")
parsed_result = parser.parse_trace_file(temp_path)
if 'error' in parsed_result:
logger.warning(f"Parser error for trace {trace_id}: {parsed_result['error']}")
return []
# Store global schema view data in trace metadata
global_view = parsed_result.get('global_schema_view')
if global_view:
self._store_schema_metadata(trace_id, global_view)
logger.info(f"Stored global schema view metadata for trace {trace_id}")
# Generate context documents from the parsed results
context_docs = parser.generate_universal_context_documents(parsed_result)
if not context_docs:
logger.info(f"No context documents generated for trace {trace_id}")
return []
# Create context documents in the database
created_docs = []
for doc in context_docs:
try:
# Map document types to our enum
doc_type = self._map_document_type(doc.get('document_type', 'technical'))
# Create the context document
created_doc = self.context_service.create_context_document(
trace_id=trace_id,
title=doc.get('title', 'Universal Parser Analysis'),
document_type=doc_type,
content=doc.get('content', ''),
file_name=f"universal_parser_{doc.get('document_type', 'analysis')}.md"
)
created_docs.append(created_doc.dict())
logger.info(f"Created context document: {created_doc.title}")
except ValueError as e:
# Handle duplicate titles or other validation errors
if "already exists" in str(e):
logger.info(f"Context document already exists: {doc.get('title')}")
else:
logger.warning(f"Failed to create context document: {str(e)}")
except Exception as e:
logger.error(f"Error creating context document: {str(e)}")
logger.info(f"Successfully created {len(created_docs)} context documents for trace {trace_id}")
return created_docs
finally:
# Clean up temporary file
if os.path.exists(temp_path):
os.remove(temp_path)
except ImportError:
logger.warning("Universal parser not available - trace_schema_parser.py not found")
return []
except Exception as e:
logger.error(f"Error running universal parser on trace {trace_id}: {str(e)}")
return []
def _store_schema_metadata(self, trace_id: str, global_view) -> None:
"""Store global schema view data in trace metadata for frontend access."""
try:
# Get the trace
trace = self.db.query(Trace).filter(Trace.trace_id == trace_id).first()
if not trace:
logger.warning(f"Trace {trace_id} not found for schema metadata storage")
return
# Ensure trace_metadata exists
if not trace.trace_metadata:
trace.trace_metadata = {}
# Convert GlobalSchemaView to dictionary for JSON storage
schema_data = {
'architecture_description': global_view.architecture_description,
'execution_flow_summary': global_view.execution_flow_summary,
'component_hierarchy': global_view.component_hierarchy,
'numerical_overview': global_view.numerical_overview,
'prompt_analytics': global_view.prompt_analytics,
'system_complexity_assessment': global_view.system_complexity_assessment
}
# Store in trace metadata
trace.trace_metadata['schema_analytics'] = schema_data
# Mark as modified for SQLAlchemy
flag_modified(trace, "trace_metadata")
self.db.commit()
logger.info(f"Successfully stored schema analytics metadata for trace {trace_id}")
except Exception as e:
logger.error(f"Error storing schema metadata for trace {trace_id}: {str(e)}")
def regenerate_context_documents(self, trace_id: str, trace_content: str, force: bool = False) -> List[Dict[str, Any]]:
"""
Regenerate context documents for a trace, optionally removing existing auto-generated ones.
Args:
trace_id: ID of the trace
trace_content: Raw content of the trace
force: Whether to remove existing auto-generated context documents first
Returns:
List of created context document dictionaries
"""
if force:
try:
# Remove existing auto-generated context documents
existing_docs = self.context_service.get_context_documents(trace_id)
for doc in existing_docs:
if doc.title.startswith("Auto-generated:") or doc.file_name.startswith("universal_parser_"):
try:
self.context_service.delete_context_document(trace_id, doc.id)
logger.info(f"Removed existing auto-generated context document: {doc.title}")
except Exception as e:
logger.warning(f"Failed to remove context document {doc.title}: {str(e)}")
except Exception as e:
logger.warning(f"Error removing existing context documents: {str(e)}")
# Generate new context documents (this will also update schema metadata)
return self.generate_trace_context_documents(trace_id, trace_content)
def _is_parseable_trace(self, trace_content: str) -> bool:
"""
Check if trace content is in a format that the universal parser can handle.
Args:
trace_content: Raw trace content
Returns:
True if the trace appears to be parseable
"""
try:
# Check if content has line numbers and strip them if present
cleaned_content = self._strip_line_numbers(trace_content)
# Try to parse as JSON
data = json.loads(cleaned_content)
# Check for LangSmith-like structure
if isinstance(data, dict):
# Check for common LangSmith fields at top level
langsmith_indicators = ['traces', 'run_id', 'run_name', 'export_timestamp', 'trace_id', 'trace_name']
if any(field in data for field in langsmith_indicators):
return True
# Check for LangSmith fields nested in 'data' field (for fetched traces)
if 'data' in data and isinstance(data['data'], dict):
if any(field in data['data'] for field in langsmith_indicators):
return True
# Check for single trace structure
trace_indicators = ['start_time', 'end_time', 'run_type', 'inputs', 'outputs']
if any(field in data for field in trace_indicators):
return True
# Check for trace indicators in nested data
if 'data' in data and isinstance(data['data'], dict):
if any(field in data['data'] for field in trace_indicators):
return True
# Check inside traces array in nested data
nested_traces = data['data'].get('traces', [])
if isinstance(nested_traces, list) and nested_traces:
first_trace = nested_traces[0]
if isinstance(first_trace, dict) and any(field in first_trace for field in trace_indicators):
return True
# Check for traces array at top level
traces = data.get('traces', [])
if isinstance(traces, list) and traces:
first_trace = traces[0]
if isinstance(first_trace, dict) and any(field in first_trace for field in trace_indicators):
return True
# Check for runs array (LangSmith format)
runs = data.get('runs', [])
if isinstance(runs, list) and runs:
first_run = runs[0]
if isinstance(first_run, dict) and any(field in first_run for field in trace_indicators):
return True
return False
except json.JSONDecodeError:
# Not JSON, check for other formats
# Could add support for other trace formats here
return False
def _strip_line_numbers(self, content: str) -> str:
"""
Strip line numbers from content if present.
Handles formats like:
<L1> content
<L2> content
etc.
Args:
content: Content that may have line numbers
Returns:
Content with line numbers stripped
"""
import re
# Check if content has line number pattern
if '<L' in content and '>' in content:
# Remove line numbers using regex
# Pattern matches <L{number}> at the start of lines
cleaned_content = re.sub(r'^<L\d+>\s*', '', content, flags=re.MULTILINE)
return cleaned_content
return content
def _map_document_type(self, parser_doc_type: str) -> ContextDocumentType:
"""
Map universal parser document types to our ContextDocumentType enum.
Args:
parser_doc_type: Document type from the universal parser
Returns:
ContextDocumentType enum value
"""
# Map parser document types to our enum
type_mapping = {
'component_structure': ContextDocumentType.SCHEMA,
'execution_pattern': ContextDocumentType.DOCUMENTATION,
'performance_profile': ContextDocumentType.DOCUMENTATION,
'system_indicators': ContextDocumentType.DOCUMENTATION,
'langgraph_workflow': ContextDocumentType.GUIDELINES,
'human_interaction': ContextDocumentType.GUIDELINES
}
return type_mapping.get(parser_doc_type, ContextDocumentType.DOCUMENTATION)
def auto_generate_context_documents(trace_id: str, trace_content: str, db: Session) -> List[Dict[str, Any]]:
"""
Convenience function to auto-generate context documents for a trace.
Args:
trace_id: ID of the trace
trace_content: Raw content of the trace
db: Database session
Returns:
List of created context document dictionaries
"""
service = UniversalParserService(db)
return service.generate_trace_context_documents(trace_id, trace_content)
def regenerate_context_documents(trace_id: str, trace_content: str, db: Session, force: bool = False) -> List[Dict[str, Any]]:
"""
Convenience function to regenerate context documents for a trace.
Args:
trace_id: ID of the trace
trace_content: Raw content of the trace
db: Database session
force: Whether to remove existing auto-generated context documents first
Returns:
List of created context document dictionaries
"""
service = UniversalParserService(db)
return service.regenerate_context_documents(trace_id, trace_content, force)