""" LangSmith Trace Parser Rule-based parser for extracting structured metadata from LangSmith traces. This parser identifies and extracts the guaranteed structural elements that every LangSmith trace contains, providing reliable metadata to enhance the multi-agent knowledge extraction process. LangSmith traces typically contain: - Project information (project_name) - Run hierarchy (run_id, trace_id, parent runs) - Agent/LLM information (run_type: "llm", "chain", "tool") - Input/Output data structures - Timing information (start_time, end_time) - Tool usage patterns - Nested execution flows """ from typing import Dict, List, Any, Optional from datetime import datetime import json import re import logging from .base_parser import ( BaseTraceParser, ParsedMetadata, AgentInfo, ToolInfo, WorkflowInfo, DataFlowInfo ) logger = logging.getLogger(__name__) class LangSmithParser(BaseTraceParser): """ Parser for LangSmith observability platform traces. Extracts structural metadata that is guaranteed to be present in LangSmith traces, including project information, run hierarchy, agent types, and execution patterns. """ @property def platform_name(self) -> str: return "langsmith" @property def supported_trace_types(self) -> List[str]: return ["langsmith_processed_import", "langsmith_export", "langsmith"] def can_parse(self, trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> bool: """ Determine if this trace is from LangSmith platform. Checks for: 1. Database metadata indicating LangSmith source 2. LangSmith-specific JSON structure markers 3. LangSmith field patterns in content """ # Check database metadata first (most reliable) if trace_metadata: trace_source = trace_metadata.get('platform', '') trace_type = trace_metadata.get('processing_type', '') if trace_source == 'langsmith' or 'langsmith' in trace_type.lower(): return True # Check for LangSmith JSON structure markers try: parsed_content = self._safe_json_parse(trace_content) if parsed_content and self._has_langsmith_structure(parsed_content): return True except Exception: pass # Check for LangSmith field patterns in content return self._has_langsmith_patterns(trace_content) def parse_trace(self, trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> ParsedMetadata: """ Parse LangSmith trace and extract structured metadata. Args: trace_content: Raw trace content (typically JSON) trace_metadata: Database metadata about the trace Returns: ParsedMetadata with LangSmith-specific structural information """ self.logger.info("Starting LangSmith trace parsing") # Parse JSON content parsed_content = self._safe_json_parse(trace_content) if not parsed_content: return self._create_minimal_metadata(trace_metadata) # Extract core components (parsing logic to be implemented) agents = self._extract_agents(parsed_content) tools = self._extract_tools(parsed_content) workflow = self._extract_workflow_info(parsed_content, trace_metadata) data_flow = self._extract_data_flow_info(parsed_content) # Create parsed metadata metadata = ParsedMetadata( platform="langsmith", trace_source="langsmith", confidence=self._calculate_confidence(parsed_content), agents=agents, tools=tools, workflow=workflow, data_flow=data_flow, raw_platform_data=parsed_content, suggested_context_types=self._suggest_context_types(parsed_content) ) # Generate extraction hints metadata.extraction_hints = self.generate_extraction_hints(metadata) metadata.extraction_hints.update(self._generate_langsmith_specific_hints(parsed_content)) self.logger.info(f"LangSmith parsing complete: {len(agents)} agents, {len(tools)} tools") return metadata def _has_langsmith_structure(self, data: Dict[str, Any]) -> bool: """Check if JSON data has LangSmith-specific structure markers""" # Check for LangSmith export structure fields # Our imported traces have: trace_id, trace_name, project_name, runs, export_time, total_runs required_fields = ['trace_id', 'project_name'] optional_fields = ['runs', 'traces', 'trace_name', 'export_time', 'total_runs'] # Must have required fields has_required = all(field in data for field in required_fields) # Must have at least one optional field has_optional = any(field in data for field in optional_fields) if has_required and has_optional: # Additional validation: check if runs array contains LangSmith run structure if 'runs' in data and isinstance(data['runs'], list) and data['runs']: first_run = data['runs'][0] if isinstance(first_run, dict): # Check for LangSmith run fields run_fields = ['id', 'name', 'run_type', 'start_time'] has_run_structure = any(field in first_run for field in run_fields) return has_run_structure elif 'traces' in data: # Support the old 'traces' structure too return True return has_required and has_optional def _has_langsmith_patterns(self, content: str) -> bool: """Check for LangSmith-specific patterns in text content""" # TODO: Implement pattern-based detection # Look for LangSmith-specific keywords, UUID patterns, etc. # PLACEHOLDER - Replace with actual implementation langsmith_indicators = [ r'"run_type":\s*"(llm|chain|tool)"', r'"project_name":\s*"[^"]+', r'"trace_id":\s*"[a-f0-9-]{36}"', r'"start_time":\s*"[\d-T:\.Z]+"' ] return any(re.search(pattern, content) for pattern in langsmith_indicators) def _extract_agents(self, data: Dict[str, Any]) -> List[AgentInfo]: """Extract agent information from LangSmith trace data""" agents = [] # Extract from both 'runs' and 'traces' arrays for compatibility runs_data = data.get('runs', data.get('traces', [])) if runs_data: for run in runs_data: if isinstance(run, dict) and run.get('run_type') == 'llm': agent_name = run.get('name', 'Unknown Agent') agent_id = run.get('id', 'unknown') # Extract model information if available model = None if 'extra' in run and isinstance(run['extra'], dict): model = run['extra'].get('model') agent = AgentInfo( name=agent_name, agent_type='llm', model=model, agent_id=agent_id ) agents.append(agent) return agents def _extract_tools(self, data: Dict[str, Any]) -> List[ToolInfo]: """Extract tool usage information from LangSmith trace data""" tools = [] # Extract from both 'runs' and 'traces' arrays for compatibility runs_data = data.get('runs', data.get('traces', [])) if runs_data: for run in runs_data: if isinstance(run, dict) and run.get('run_type') == 'tool': tool_name = run.get('name', 'Unknown Tool') tool_id = run.get('id', 'unknown') # Extract input/output information if available inputs = run.get('inputs', {}) outputs = run.get('outputs', {}) tool = ToolInfo( name=tool_name, tool_type='external', tool_id=tool_id, inputs=inputs, outputs=outputs ) tools.append(tool) return tools def _extract_workflow_info(self, data: Dict[str, Any], trace_metadata: Optional[Dict[str, Any]] = None) -> Optional[WorkflowInfo]: """Extract workflow and execution information from LangSmith trace data""" # Extract basic workflow information from top-level fields project_name = data.get('project_name') trace_id = data.get('trace_id') trace_name = data.get('trace_name') # Get run count from runs array or total_runs field runs_data = data.get('runs', data.get('traces', [])) total_steps = data.get('total_runs', len(runs_data) if runs_data else 0) # Extract timestamps from runs start_time, end_time = self._extract_timestamps_from_runs(runs_data) duration_ms = self._calculate_duration(start_time, end_time) return WorkflowInfo( project_name=project_name, run_id=trace_id, total_steps=total_steps, start_time=start_time, end_time=end_time, duration_ms=duration_ms, workflow_type='sequential', # Default for LangSmith traces workflow_name=trace_name ) def _extract_timestamps_from_runs(self, runs_data: List[Dict[str, Any]]) -> tuple[Optional[str], Optional[str]]: """Extract start and end timestamps from runs array""" start_time = None end_time = None if runs_data: start_times = [] end_times = [] for run in runs_data: if isinstance(run, dict): if 'start_time' in run: start_times.append(run['start_time']) if 'end_time' in run: end_times.append(run['end_time']) # Get earliest start time and latest end time if start_times: start_time = min(start_times) if end_times: end_time = max(end_times) return start_time, end_time def _extract_data_flow_info(self, data: Dict[str, Any]) -> Optional[DataFlowInfo]: """Extract data flow and transformation patterns""" input_types = [] output_types = [] transformations = [] # Extract from both 'runs' and 'traces' arrays for compatibility runs_data = data.get('runs', data.get('traces', [])) if runs_data: for run in runs_data: if isinstance(run, dict): # Analyze inputs and outputs if 'inputs' in run and run['inputs']: input_data = run['inputs'] if isinstance(input_data, dict): input_types.extend(list(input_data.keys())) if 'outputs' in run and run['outputs']: output_data = run['outputs'] if isinstance(output_data, dict): output_types.extend(list(output_data.keys())) # Remove duplicates input_types = list(set(input_types)) output_types = list(set(output_types)) return DataFlowInfo( input_types=input_types, output_types=output_types, transformation_patterns=transformations ) def _suggest_context_types(self, data: Dict[str, Any]) -> List[str]: """Suggest relevant context document types for this LangSmith trace""" # TODO: Implement context type suggestion logic # Based on detected patterns, suggest relevant context documents: # - "schema" for API/database operations # - "documentation" for complex workflows # - "guidelines" for specific domains # - "examples" for similar patterns # PLACEHOLDER - Replace with actual implementation suggestions = ["domain_knowledge"] # Default suggestion # Add more specific suggestions based on detected patterns if data.get('project_name'): suggestions.append("documentation") return suggestions def _generate_langsmith_specific_hints(self, data: Dict[str, Any]) -> Dict[str, Any]: """Generate LangSmith-specific extraction hints""" # TODO: Implement LangSmith-specific hint generation # Provide specific guidance for knowledge extractor: # - Expected entity patterns based on run_types # - Relationship patterns in LangSmith hierarchies # - Tool usage patterns # - Data flow expectations # PLACEHOLDER - Replace with actual implementation hints = {} if 'traces' in data: traces = data['traces'] if isinstance(traces, list): hints['langsmith_trace_count'] = len(traces) hints['run_types'] = list(set(trace.get('run_type', 'unknown') for trace in traces if isinstance(trace, dict))) return hints def _calculate_confidence(self, data: Dict[str, Any]) -> float: """Calculate confidence score for parsing accuracy""" # TODO: Implement confidence calculation # Base confidence on: # - Presence of required LangSmith fields # - Data structure completeness # - Successful parsing of nested elements # PLACEHOLDER - Replace with actual implementation confidence = 0.5 # Base confidence # Increase confidence based on detected elements if data.get('project_name'): confidence += 0.1 if data.get('run_id'): confidence += 0.1 if data.get('traces'): confidence += 0.2 if data.get('export_timestamp'): confidence += 0.1 return min(confidence, 1.0) def _create_minimal_metadata(self, trace_metadata: Optional[Dict[str, Any]]) -> ParsedMetadata: """Create minimal metadata when parsing fails""" return ParsedMetadata( platform="langsmith", trace_source="langsmith", confidence=0.1, suggested_context_types=["domain_knowledge"] )