AgentGraph / agentgraph /input /parsers /parser_factory.py
wu981526092's picture
๐Ÿš€ Deploy AgentGraph: Complete agent monitoring and knowledge graph system
c2ea5ed
"""
Parser Factory and Source Detection
Factory for creating appropriate parsers based on trace source and automatically
injecting relevant context documents. This module provides the main integration
point between the parsing system and the knowledge extraction pipeline.
"""
from typing import Dict, List, Any, Optional, Type
import logging
import json
import re
from .base_parser import BaseTraceParser, ParsedMetadata
from .langsmith_parser import LangSmithParser
logger = logging.getLogger(__name__)
# Registry of available parsers
_PARSER_REGISTRY: Dict[str, Type[BaseTraceParser]] = {
'langsmith': LangSmithParser,
# Future parsers can be added here:
# 'langfuse': LangfuseParser,
# 'opentelemetry': OpenTelemetryParser,
}
# Default context documents for each platform
_DEFAULT_CONTEXT_DOCUMENTS = {
'langsmith': [
{
'document_type': 'schema',
'title': 'LangSmith Trace Structure',
'content': '''
LangSmith traces follow a standardized structure:
**Top-level fields:**
- run_id: Unique identifier for the run
- project_name: Name of the LangSmith project
- trace_id: Trace identifier linking related runs
- export_timestamp: When the trace was exported
- total_traces: Number of traces in the run
- traces: Array of individual trace objects
**Trace object fields:**
- run_type: Type of run ("llm", "chain", "tool")
- name: Human-readable name of the run
- inputs: Input data passed to the run
- outputs: Output data produced by the run
- start_time: ISO timestamp when run started
- end_time: ISO timestamp when run completed
- tags: Array of tags for categorization
- parent_run_id: ID of parent run (for nested calls)
**Run Types:**
- "llm": Language model invocations
- "chain": Composite operations with multiple steps
- "tool": External tool or function calls
''',
'tags': ['langsmith', 'schema', 'structure']
},
{
'document_type': 'guidelines',
'title': 'LangSmith Entity Extraction Guidelines',
'content': '''
**Entity Extraction Guidelines for LangSmith Traces:**
**Agents (run_type: "llm"):**
- Extract model name from extra.invocation_params.model_name
- Use run name as agent identifier
- Role can be inferred from tags or name patterns
- System prompts found in inputs.messages[0] where role="system"
**Tools (run_type: "tool"):**
- Tool name is in the "name" field
- Parameters are in inputs object
- Usage patterns can be tracked across multiple invocations
- Function signatures often in extra.function.name
**Tasks (run_type: "chain"):**
- Chain name represents the high-level task
- Sub-tasks found in child runs (parent_run_id references)
- Input/output flows show task dependencies
- Execution order determined by start_time
**Relationships:**
- PERFORMS: agent (llm) โ†’ task (chain)
- USES: agent (llm) โ†’ tool (tool)
- SUBTASK_OF: child chain โ†’ parent chain
- NEXT: sequential chains based on timestamps
- PRODUCES: task โ†’ output data structures
''',
'tags': ['langsmith', 'extraction', 'guidelines']
}
],
'langfuse': [
{
'document_type': 'schema',
'title': 'Langfuse Trace Structure',
'content': '''
Langfuse traces are organized into sessions and observations:
**Session level:**
- Session ID links related traces
- User information and metadata
- Timeline of interactions
**Trace level:**
- Trace ID for individual conversations
- Input/output pairs
- Model and generation information
- Scores and evaluations
**Observation types:**
- SPAN: Execution spans with timing
- GENERATION: LLM generations with prompts
- EVENT: Discrete events in the flow
''',
'tags': ['langfuse', 'schema', 'structure']
}
],
'default': [
{
'document_type': 'domain_knowledge',
'title': 'General Agent System Concepts',
'content': '''
**Common Agent System Patterns:**
**Agents:** Autonomous entities that can:
- Receive input and produce output
- Make decisions based on context
- Use tools to accomplish tasks
- Communicate with other agents
**Tools:** External capabilities that agents can invoke:
- APIs and web services
- Database queries
- File operations
- Computational functions
**Tasks:** Work units with:
- Clear objectives
- Input requirements
- Expected outputs
- Success criteria
**Workflows:** Orchestration patterns:
- Sequential: Steps execute in order
- Parallel: Steps execute simultaneously
- Conditional: Steps based on conditions
- Hierarchical: Nested task structures
''',
'tags': ['general', 'concepts', 'patterns']
}
]
}
def detect_trace_source(trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> str:
"""
Detect the source platform of a trace.
Args:
trace_content: Raw trace content
trace_metadata: Optional database metadata
Returns:
Platform name (e.g., 'langsmith', 'langfuse', 'unknown')
"""
logger.debug("Detecting trace source")
# Check database metadata first (most reliable)
if trace_metadata:
# Check trace_source field
trace_source = trace_metadata.get('platform', '').lower()
if trace_source in _PARSER_REGISTRY:
logger.info(f"Source detected from metadata: {trace_source}")
return trace_source
# Check trace_type field for platform indicators
trace_type = trace_metadata.get('processing_type', '').lower()
for platform in _PARSER_REGISTRY:
if platform in trace_type:
logger.info(f"Source detected from trace type: {platform}")
return platform
# Try each parser to see which can handle the content
for platform_name, parser_class in _PARSER_REGISTRY.items():
try:
parser = parser_class()
if parser.can_parse(trace_content, trace_metadata):
logger.info(f"Source detected by parser: {platform_name}")
return platform_name
except Exception as e:
logger.debug(f"Parser {platform_name} failed detection: {e}")
continue
logger.warning("Could not detect trace source, using 'unknown'")
return 'unknown'
def create_parser(platform: str) -> Optional[BaseTraceParser]:
"""
Create a parser for the specified platform.
Args:
platform: Platform name
Returns:
Parser instance or None if platform not supported
"""
if platform in _PARSER_REGISTRY:
parser_class = _PARSER_REGISTRY[platform]
parser = parser_class()
logger.info(f"Created parser for platform: {platform}")
return parser
logger.warning(f"No parser available for platform: {platform}")
return None
def get_context_documents_for_source(platform: str, parsed_metadata: Optional[ParsedMetadata] = None) -> List[Dict[str, Any]]:
"""
Get appropriate context documents for a trace source.
Args:
platform: Platform name
parsed_metadata: Optional parsed metadata for customization
Returns:
List of context documents
"""
logger.debug(f"Getting context documents for platform: {platform}")
# Get default context documents for the platform
context_docs = _DEFAULT_CONTEXT_DOCUMENTS.get(platform, _DEFAULT_CONTEXT_DOCUMENTS['default']).copy()
# Add platform-specific customizations based on parsed metadata
if parsed_metadata and parsed_metadata.suggested_context_types:
for suggested_type in parsed_metadata.suggested_context_types:
if suggested_type not in [doc['document_type'] for doc in context_docs]:
# Add suggested context types (placeholder documents)
context_docs.append({
'document_type': suggested_type,
'title': f'{platform.title()} {suggested_type.replace("_", " ").title()}',
'content': f'Context document for {suggested_type} in {platform} traces.',
'tags': [platform, suggested_type, 'auto-generated']
})
logger.info(f"Providing {len(context_docs)} context documents for {platform}")
return context_docs
def parse_trace_with_context(trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> tuple[Optional[ParsedMetadata], List[Dict[str, Any]]]:
"""
Parse a trace and return both structured metadata and appropriate context documents.
This is the main entry point for the parsing system, providing a complete
solution for trace analysis with automatic context injection.
Args:
trace_content: Raw trace content
trace_metadata: Optional database metadata
Returns:
Tuple of (parsed_metadata, context_documents)
"""
logger.info("Starting trace parsing with context injection")
# Detect source platform
platform = detect_trace_source(trace_content, trace_metadata)
# Create appropriate parser
parser = create_parser(platform)
# Parse the trace (if parser available)
parsed_metadata = None
if parser:
try:
parsed_metadata = parser.parse_trace(trace_content, trace_metadata)
logger.info(f"Successfully parsed {platform} trace")
except Exception as e:
logger.error(f"Failed to parse {platform} trace: {e}")
else:
logger.warning(f"No parser available for platform: {platform}")
# Get appropriate context documents
context_documents = get_context_documents_for_source(platform, parsed_metadata)
logger.info(f"Parsing complete: metadata={'available' if parsed_metadata else 'unavailable'}, context_docs={len(context_documents)}")
return parsed_metadata, context_documents
def register_parser(platform: str, parser_class: Type[BaseTraceParser]) -> None:
"""
Register a new parser for a platform.
Args:
platform: Platform name
parser_class: Parser class to register
"""
_PARSER_REGISTRY[platform] = parser_class
logger.info(f"Registered parser for platform: {platform}")
def get_supported_platforms() -> List[str]:
"""Get list of supported platforms."""
return list(_PARSER_REGISTRY.keys())
def add_context_document_template(platform: str, document: Dict[str, Any]) -> None:
"""
Add a context document template for a platform.
Args:
platform: Platform name
document: Context document dictionary
"""
if platform not in _DEFAULT_CONTEXT_DOCUMENTS:
_DEFAULT_CONTEXT_DOCUMENTS[platform] = []
_DEFAULT_CONTEXT_DOCUMENTS[platform].append(document)
logger.info(f"Added context document template for {platform}: {document.get('title', 'Untitled')}")