""" Parser Factory and Source Detection Factory for creating appropriate parsers based on trace source and automatically injecting relevant context documents. This module provides the main integration point between the parsing system and the knowledge extraction pipeline. """ from typing import Dict, List, Any, Optional, Type import logging import json import re from .base_parser import BaseTraceParser, ParsedMetadata from .langsmith_parser import LangSmithParser logger = logging.getLogger(__name__) # Registry of available parsers _PARSER_REGISTRY: Dict[str, Type[BaseTraceParser]] = { 'langsmith': LangSmithParser, # Future parsers can be added here: # 'langfuse': LangfuseParser, # 'opentelemetry': OpenTelemetryParser, } # Default context documents for each platform _DEFAULT_CONTEXT_DOCUMENTS = { 'langsmith': [ { 'document_type': 'schema', 'title': 'LangSmith Trace Structure', 'content': ''' LangSmith traces follow a standardized structure: **Top-level fields:** - run_id: Unique identifier for the run - project_name: Name of the LangSmith project - trace_id: Trace identifier linking related runs - export_timestamp: When the trace was exported - total_traces: Number of traces in the run - traces: Array of individual trace objects **Trace object fields:** - run_type: Type of run ("llm", "chain", "tool") - name: Human-readable name of the run - inputs: Input data passed to the run - outputs: Output data produced by the run - start_time: ISO timestamp when run started - end_time: ISO timestamp when run completed - tags: Array of tags for categorization - parent_run_id: ID of parent run (for nested calls) **Run Types:** - "llm": Language model invocations - "chain": Composite operations with multiple steps - "tool": External tool or function calls ''', 'tags': ['langsmith', 'schema', 'structure'] }, { 'document_type': 'guidelines', 'title': 'LangSmith Entity Extraction Guidelines', 'content': ''' **Entity Extraction Guidelines for LangSmith Traces:** **Agents (run_type: "llm"):** - Extract model name from extra.invocation_params.model_name - Use run name as agent identifier - Role can be inferred from tags or name patterns - System prompts found in inputs.messages[0] where role="system" **Tools (run_type: "tool"):** - Tool name is in the "name" field - Parameters are in inputs object - Usage patterns can be tracked across multiple invocations - Function signatures often in extra.function.name **Tasks (run_type: "chain"):** - Chain name represents the high-level task - Sub-tasks found in child runs (parent_run_id references) - Input/output flows show task dependencies - Execution order determined by start_time **Relationships:** - PERFORMS: agent (llm) → task (chain) - USES: agent (llm) → tool (tool) - SUBTASK_OF: child chain → parent chain - NEXT: sequential chains based on timestamps - PRODUCES: task → output data structures ''', 'tags': ['langsmith', 'extraction', 'guidelines'] } ], 'langfuse': [ { 'document_type': 'schema', 'title': 'Langfuse Trace Structure', 'content': ''' Langfuse traces are organized into sessions and observations: **Session level:** - Session ID links related traces - User information and metadata - Timeline of interactions **Trace level:** - Trace ID for individual conversations - Input/output pairs - Model and generation information - Scores and evaluations **Observation types:** - SPAN: Execution spans with timing - GENERATION: LLM generations with prompts - EVENT: Discrete events in the flow ''', 'tags': ['langfuse', 'schema', 'structure'] } ], 'default': [ { 'document_type': 'domain_knowledge', 'title': 'General Agent System Concepts', 'content': ''' **Common Agent System Patterns:** **Agents:** Autonomous entities that can: - Receive input and produce output - Make decisions based on context - Use tools to accomplish tasks - Communicate with other agents **Tools:** External capabilities that agents can invoke: - APIs and web services - Database queries - File operations - Computational functions **Tasks:** Work units with: - Clear objectives - Input requirements - Expected outputs - Success criteria **Workflows:** Orchestration patterns: - Sequential: Steps execute in order - Parallel: Steps execute simultaneously - Conditional: Steps based on conditions - Hierarchical: Nested task structures ''', 'tags': ['general', 'concepts', 'patterns'] } ] } def detect_trace_source(trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> str: """ Detect the source platform of a trace. Args: trace_content: Raw trace content trace_metadata: Optional database metadata Returns: Platform name (e.g., 'langsmith', 'langfuse', 'unknown') """ logger.debug("Detecting trace source") # Check database metadata first (most reliable) if trace_metadata: # Check trace_source field trace_source = trace_metadata.get('platform', '').lower() if trace_source in _PARSER_REGISTRY: logger.info(f"Source detected from metadata: {trace_source}") return trace_source # Check trace_type field for platform indicators trace_type = trace_metadata.get('processing_type', '').lower() for platform in _PARSER_REGISTRY: if platform in trace_type: logger.info(f"Source detected from trace type: {platform}") return platform # Try each parser to see which can handle the content for platform_name, parser_class in _PARSER_REGISTRY.items(): try: parser = parser_class() if parser.can_parse(trace_content, trace_metadata): logger.info(f"Source detected by parser: {platform_name}") return platform_name except Exception as e: logger.debug(f"Parser {platform_name} failed detection: {e}") continue logger.warning("Could not detect trace source, using 'unknown'") return 'unknown' def create_parser(platform: str) -> Optional[BaseTraceParser]: """ Create a parser for the specified platform. Args: platform: Platform name Returns: Parser instance or None if platform not supported """ if platform in _PARSER_REGISTRY: parser_class = _PARSER_REGISTRY[platform] parser = parser_class() logger.info(f"Created parser for platform: {platform}") return parser logger.warning(f"No parser available for platform: {platform}") return None def get_context_documents_for_source(platform: str, parsed_metadata: Optional[ParsedMetadata] = None) -> List[Dict[str, Any]]: """ Get appropriate context documents for a trace source. Args: platform: Platform name parsed_metadata: Optional parsed metadata for customization Returns: List of context documents """ logger.debug(f"Getting context documents for platform: {platform}") # Get default context documents for the platform context_docs = _DEFAULT_CONTEXT_DOCUMENTS.get(platform, _DEFAULT_CONTEXT_DOCUMENTS['default']).copy() # Add platform-specific customizations based on parsed metadata if parsed_metadata and parsed_metadata.suggested_context_types: for suggested_type in parsed_metadata.suggested_context_types: if suggested_type not in [doc['document_type'] for doc in context_docs]: # Add suggested context types (placeholder documents) context_docs.append({ 'document_type': suggested_type, 'title': f'{platform.title()} {suggested_type.replace("_", " ").title()}', 'content': f'Context document for {suggested_type} in {platform} traces.', 'tags': [platform, suggested_type, 'auto-generated'] }) logger.info(f"Providing {len(context_docs)} context documents for {platform}") return context_docs def parse_trace_with_context(trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> tuple[Optional[ParsedMetadata], List[Dict[str, Any]]]: """ Parse a trace and return both structured metadata and appropriate context documents. This is the main entry point for the parsing system, providing a complete solution for trace analysis with automatic context injection. Args: trace_content: Raw trace content trace_metadata: Optional database metadata Returns: Tuple of (parsed_metadata, context_documents) """ logger.info("Starting trace parsing with context injection") # Detect source platform platform = detect_trace_source(trace_content, trace_metadata) # Create appropriate parser parser = create_parser(platform) # Parse the trace (if parser available) parsed_metadata = None if parser: try: parsed_metadata = parser.parse_trace(trace_content, trace_metadata) logger.info(f"Successfully parsed {platform} trace") except Exception as e: logger.error(f"Failed to parse {platform} trace: {e}") else: logger.warning(f"No parser available for platform: {platform}") # Get appropriate context documents context_documents = get_context_documents_for_source(platform, parsed_metadata) logger.info(f"Parsing complete: metadata={'available' if parsed_metadata else 'unavailable'}, context_docs={len(context_documents)}") return parsed_metadata, context_documents def register_parser(platform: str, parser_class: Type[BaseTraceParser]) -> None: """ Register a new parser for a platform. Args: platform: Platform name parser_class: Parser class to register """ _PARSER_REGISTRY[platform] = parser_class logger.info(f"Registered parser for platform: {platform}") def get_supported_platforms() -> List[str]: """Get list of supported platforms.""" return list(_PARSER_REGISTRY.keys()) def add_context_document_template(platform: str, document: Dict[str, Any]) -> None: """ Add a context document template for a platform. Args: platform: Platform name document: Context document dictionary """ if platform not in _DEFAULT_CONTEXT_DOCUMENTS: _DEFAULT_CONTEXT_DOCUMENTS[platform] = [] _DEFAULT_CONTEXT_DOCUMENTS[platform].append(document) logger.info(f"Added context document template for {platform}: {document.get('title', 'Untitled')}")