Spaces:
Running
Running
| """ | |
| Parser Factory and Source Detection | |
| Factory for creating appropriate parsers based on trace source and automatically | |
| injecting relevant context documents. This module provides the main integration | |
| point between the parsing system and the knowledge extraction pipeline. | |
| """ | |
| from typing import Dict, List, Any, Optional, Type | |
| import logging | |
| import json | |
| import re | |
| from .base_parser import BaseTraceParser, ParsedMetadata | |
| from .langsmith_parser import LangSmithParser | |
| logger = logging.getLogger(__name__) | |
| # Registry of available parsers | |
| _PARSER_REGISTRY: Dict[str, Type[BaseTraceParser]] = { | |
| 'langsmith': LangSmithParser, | |
| # Future parsers can be added here: | |
| # 'langfuse': LangfuseParser, | |
| # 'opentelemetry': OpenTelemetryParser, | |
| } | |
| # Default context documents for each platform | |
| _DEFAULT_CONTEXT_DOCUMENTS = { | |
| 'langsmith': [ | |
| { | |
| 'document_type': 'schema', | |
| 'title': 'LangSmith Trace Structure', | |
| 'content': ''' | |
| LangSmith traces follow a standardized structure: | |
| **Top-level fields:** | |
| - run_id: Unique identifier for the run | |
| - project_name: Name of the LangSmith project | |
| - trace_id: Trace identifier linking related runs | |
| - export_timestamp: When the trace was exported | |
| - total_traces: Number of traces in the run | |
| - traces: Array of individual trace objects | |
| **Trace object fields:** | |
| - run_type: Type of run ("llm", "chain", "tool") | |
| - name: Human-readable name of the run | |
| - inputs: Input data passed to the run | |
| - outputs: Output data produced by the run | |
| - start_time: ISO timestamp when run started | |
| - end_time: ISO timestamp when run completed | |
| - tags: Array of tags for categorization | |
| - parent_run_id: ID of parent run (for nested calls) | |
| **Run Types:** | |
| - "llm": Language model invocations | |
| - "chain": Composite operations with multiple steps | |
| - "tool": External tool or function calls | |
| ''', | |
| 'tags': ['langsmith', 'schema', 'structure'] | |
| }, | |
| { | |
| 'document_type': 'guidelines', | |
| 'title': 'LangSmith Entity Extraction Guidelines', | |
| 'content': ''' | |
| **Entity Extraction Guidelines for LangSmith Traces:** | |
| **Agents (run_type: "llm"):** | |
| - Extract model name from extra.invocation_params.model_name | |
| - Use run name as agent identifier | |
| - Role can be inferred from tags or name patterns | |
| - System prompts found in inputs.messages[0] where role="system" | |
| **Tools (run_type: "tool"):** | |
| - Tool name is in the "name" field | |
| - Parameters are in inputs object | |
| - Usage patterns can be tracked across multiple invocations | |
| - Function signatures often in extra.function.name | |
| **Tasks (run_type: "chain"):** | |
| - Chain name represents the high-level task | |
| - Sub-tasks found in child runs (parent_run_id references) | |
| - Input/output flows show task dependencies | |
| - Execution order determined by start_time | |
| **Relationships:** | |
| - PERFORMS: agent (llm) โ task (chain) | |
| - USES: agent (llm) โ tool (tool) | |
| - SUBTASK_OF: child chain โ parent chain | |
| - NEXT: sequential chains based on timestamps | |
| - PRODUCES: task โ output data structures | |
| ''', | |
| 'tags': ['langsmith', 'extraction', 'guidelines'] | |
| } | |
| ], | |
| 'langfuse': [ | |
| { | |
| 'document_type': 'schema', | |
| 'title': 'Langfuse Trace Structure', | |
| 'content': ''' | |
| Langfuse traces are organized into sessions and observations: | |
| **Session level:** | |
| - Session ID links related traces | |
| - User information and metadata | |
| - Timeline of interactions | |
| **Trace level:** | |
| - Trace ID for individual conversations | |
| - Input/output pairs | |
| - Model and generation information | |
| - Scores and evaluations | |
| **Observation types:** | |
| - SPAN: Execution spans with timing | |
| - GENERATION: LLM generations with prompts | |
| - EVENT: Discrete events in the flow | |
| ''', | |
| 'tags': ['langfuse', 'schema', 'structure'] | |
| } | |
| ], | |
| 'default': [ | |
| { | |
| 'document_type': 'domain_knowledge', | |
| 'title': 'General Agent System Concepts', | |
| 'content': ''' | |
| **Common Agent System Patterns:** | |
| **Agents:** Autonomous entities that can: | |
| - Receive input and produce output | |
| - Make decisions based on context | |
| - Use tools to accomplish tasks | |
| - Communicate with other agents | |
| **Tools:** External capabilities that agents can invoke: | |
| - APIs and web services | |
| - Database queries | |
| - File operations | |
| - Computational functions | |
| **Tasks:** Work units with: | |
| - Clear objectives | |
| - Input requirements | |
| - Expected outputs | |
| - Success criteria | |
| **Workflows:** Orchestration patterns: | |
| - Sequential: Steps execute in order | |
| - Parallel: Steps execute simultaneously | |
| - Conditional: Steps based on conditions | |
| - Hierarchical: Nested task structures | |
| ''', | |
| 'tags': ['general', 'concepts', 'patterns'] | |
| } | |
| ] | |
| } | |
| def detect_trace_source(trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> str: | |
| """ | |
| Detect the source platform of a trace. | |
| Args: | |
| trace_content: Raw trace content | |
| trace_metadata: Optional database metadata | |
| Returns: | |
| Platform name (e.g., 'langsmith', 'langfuse', 'unknown') | |
| """ | |
| logger.debug("Detecting trace source") | |
| # Check database metadata first (most reliable) | |
| if trace_metadata: | |
| # Check trace_source field | |
| trace_source = trace_metadata.get('platform', '').lower() | |
| if trace_source in _PARSER_REGISTRY: | |
| logger.info(f"Source detected from metadata: {trace_source}") | |
| return trace_source | |
| # Check trace_type field for platform indicators | |
| trace_type = trace_metadata.get('processing_type', '').lower() | |
| for platform in _PARSER_REGISTRY: | |
| if platform in trace_type: | |
| logger.info(f"Source detected from trace type: {platform}") | |
| return platform | |
| # Try each parser to see which can handle the content | |
| for platform_name, parser_class in _PARSER_REGISTRY.items(): | |
| try: | |
| parser = parser_class() | |
| if parser.can_parse(trace_content, trace_metadata): | |
| logger.info(f"Source detected by parser: {platform_name}") | |
| return platform_name | |
| except Exception as e: | |
| logger.debug(f"Parser {platform_name} failed detection: {e}") | |
| continue | |
| logger.warning("Could not detect trace source, using 'unknown'") | |
| return 'unknown' | |
| def create_parser(platform: str) -> Optional[BaseTraceParser]: | |
| """ | |
| Create a parser for the specified platform. | |
| Args: | |
| platform: Platform name | |
| Returns: | |
| Parser instance or None if platform not supported | |
| """ | |
| if platform in _PARSER_REGISTRY: | |
| parser_class = _PARSER_REGISTRY[platform] | |
| parser = parser_class() | |
| logger.info(f"Created parser for platform: {platform}") | |
| return parser | |
| logger.warning(f"No parser available for platform: {platform}") | |
| return None | |
| def get_context_documents_for_source(platform: str, parsed_metadata: Optional[ParsedMetadata] = None) -> List[Dict[str, Any]]: | |
| """ | |
| Get appropriate context documents for a trace source. | |
| Args: | |
| platform: Platform name | |
| parsed_metadata: Optional parsed metadata for customization | |
| Returns: | |
| List of context documents | |
| """ | |
| logger.debug(f"Getting context documents for platform: {platform}") | |
| # Get default context documents for the platform | |
| context_docs = _DEFAULT_CONTEXT_DOCUMENTS.get(platform, _DEFAULT_CONTEXT_DOCUMENTS['default']).copy() | |
| # Add platform-specific customizations based on parsed metadata | |
| if parsed_metadata and parsed_metadata.suggested_context_types: | |
| for suggested_type in parsed_metadata.suggested_context_types: | |
| if suggested_type not in [doc['document_type'] for doc in context_docs]: | |
| # Add suggested context types (placeholder documents) | |
| context_docs.append({ | |
| 'document_type': suggested_type, | |
| 'title': f'{platform.title()} {suggested_type.replace("_", " ").title()}', | |
| 'content': f'Context document for {suggested_type} in {platform} traces.', | |
| 'tags': [platform, suggested_type, 'auto-generated'] | |
| }) | |
| logger.info(f"Providing {len(context_docs)} context documents for {platform}") | |
| return context_docs | |
| def parse_trace_with_context(trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> tuple[Optional[ParsedMetadata], List[Dict[str, Any]]]: | |
| """ | |
| Parse a trace and return both structured metadata and appropriate context documents. | |
| This is the main entry point for the parsing system, providing a complete | |
| solution for trace analysis with automatic context injection. | |
| Args: | |
| trace_content: Raw trace content | |
| trace_metadata: Optional database metadata | |
| Returns: | |
| Tuple of (parsed_metadata, context_documents) | |
| """ | |
| logger.info("Starting trace parsing with context injection") | |
| # Detect source platform | |
| platform = detect_trace_source(trace_content, trace_metadata) | |
| # Create appropriate parser | |
| parser = create_parser(platform) | |
| # Parse the trace (if parser available) | |
| parsed_metadata = None | |
| if parser: | |
| try: | |
| parsed_metadata = parser.parse_trace(trace_content, trace_metadata) | |
| logger.info(f"Successfully parsed {platform} trace") | |
| except Exception as e: | |
| logger.error(f"Failed to parse {platform} trace: {e}") | |
| else: | |
| logger.warning(f"No parser available for platform: {platform}") | |
| # Get appropriate context documents | |
| context_documents = get_context_documents_for_source(platform, parsed_metadata) | |
| logger.info(f"Parsing complete: metadata={'available' if parsed_metadata else 'unavailable'}, context_docs={len(context_documents)}") | |
| return parsed_metadata, context_documents | |
| def register_parser(platform: str, parser_class: Type[BaseTraceParser]) -> None: | |
| """ | |
| Register a new parser for a platform. | |
| Args: | |
| platform: Platform name | |
| parser_class: Parser class to register | |
| """ | |
| _PARSER_REGISTRY[platform] = parser_class | |
| logger.info(f"Registered parser for platform: {platform}") | |
| def get_supported_platforms() -> List[str]: | |
| """Get list of supported platforms.""" | |
| return list(_PARSER_REGISTRY.keys()) | |
| def add_context_document_template(platform: str, document: Dict[str, Any]) -> None: | |
| """ | |
| Add a context document template for a platform. | |
| Args: | |
| platform: Platform name | |
| document: Context document dictionary | |
| """ | |
| if platform not in _DEFAULT_CONTEXT_DOCUMENTS: | |
| _DEFAULT_CONTEXT_DOCUMENTS[platform] = [] | |
| _DEFAULT_CONTEXT_DOCUMENTS[platform].append(document) | |
| logger.info(f"Added context document template for {platform}: {document.get('title', 'Untitled')}") |