Spaces:
Running
Running
| """ | |
| LangSmith Trace Parser | |
| Rule-based parser for extracting structured metadata from LangSmith traces. | |
| This parser identifies and extracts the guaranteed structural elements that | |
| every LangSmith trace contains, providing reliable metadata to enhance | |
| the multi-agent knowledge extraction process. | |
| LangSmith traces typically contain: | |
| - Project information (project_name) | |
| - Run hierarchy (run_id, trace_id, parent runs) | |
| - Agent/LLM information (run_type: "llm", "chain", "tool") | |
| - Input/Output data structures | |
| - Timing information (start_time, end_time) | |
| - Tool usage patterns | |
| - Nested execution flows | |
| """ | |
| from typing import Dict, List, Any, Optional | |
| from datetime import datetime | |
| import json | |
| import re | |
| import logging | |
| from .base_parser import ( | |
| BaseTraceParser, ParsedMetadata, AgentInfo, ToolInfo, | |
| WorkflowInfo, DataFlowInfo | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class LangSmithParser(BaseTraceParser): | |
| """ | |
| Parser for LangSmith observability platform traces. | |
| Extracts structural metadata that is guaranteed to be present in LangSmith traces, | |
| including project information, run hierarchy, agent types, and execution patterns. | |
| """ | |
| def platform_name(self) -> str: | |
| return "langsmith" | |
| def supported_trace_types(self) -> List[str]: | |
| return ["langsmith_processed_import", "langsmith_export", "langsmith"] | |
| def can_parse(self, trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> bool: | |
| """ | |
| Determine if this trace is from LangSmith platform. | |
| Checks for: | |
| 1. Database metadata indicating LangSmith source | |
| 2. LangSmith-specific JSON structure markers | |
| 3. LangSmith field patterns in content | |
| """ | |
| # Check database metadata first (most reliable) | |
| if trace_metadata: | |
| trace_source = trace_metadata.get('platform', '') | |
| trace_type = trace_metadata.get('processing_type', '') | |
| if trace_source == 'langsmith' or 'langsmith' in trace_type.lower(): | |
| return True | |
| # Check for LangSmith JSON structure markers | |
| try: | |
| parsed_content = self._safe_json_parse(trace_content) | |
| if parsed_content and self._has_langsmith_structure(parsed_content): | |
| return True | |
| except Exception: | |
| pass | |
| # Check for LangSmith field patterns in content | |
| return self._has_langsmith_patterns(trace_content) | |
| def parse_trace(self, trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> ParsedMetadata: | |
| """ | |
| Parse LangSmith trace and extract structured metadata. | |
| Args: | |
| trace_content: Raw trace content (typically JSON) | |
| trace_metadata: Database metadata about the trace | |
| Returns: | |
| ParsedMetadata with LangSmith-specific structural information | |
| """ | |
| self.logger.info("Starting LangSmith trace parsing") | |
| # Parse JSON content | |
| parsed_content = self._safe_json_parse(trace_content) | |
| if not parsed_content: | |
| return self._create_minimal_metadata(trace_metadata) | |
| # Extract core components (parsing logic to be implemented) | |
| agents = self._extract_agents(parsed_content) | |
| tools = self._extract_tools(parsed_content) | |
| workflow = self._extract_workflow_info(parsed_content, trace_metadata) | |
| data_flow = self._extract_data_flow_info(parsed_content) | |
| # Create parsed metadata | |
| metadata = ParsedMetadata( | |
| platform="langsmith", | |
| trace_source="langsmith", | |
| confidence=self._calculate_confidence(parsed_content), | |
| agents=agents, | |
| tools=tools, | |
| workflow=workflow, | |
| data_flow=data_flow, | |
| raw_platform_data=parsed_content, | |
| suggested_context_types=self._suggest_context_types(parsed_content) | |
| ) | |
| # Generate extraction hints | |
| metadata.extraction_hints = self.generate_extraction_hints(metadata) | |
| metadata.extraction_hints.update(self._generate_langsmith_specific_hints(parsed_content)) | |
| self.logger.info(f"LangSmith parsing complete: {len(agents)} agents, {len(tools)} tools") | |
| return metadata | |
| def _has_langsmith_structure(self, data: Dict[str, Any]) -> bool: | |
| """Check if JSON data has LangSmith-specific structure markers""" | |
| # Check for LangSmith export structure fields | |
| # Our imported traces have: trace_id, trace_name, project_name, runs, export_time, total_runs | |
| required_fields = ['trace_id', 'project_name'] | |
| optional_fields = ['runs', 'traces', 'trace_name', 'export_time', 'total_runs'] | |
| # Must have required fields | |
| has_required = all(field in data for field in required_fields) | |
| # Must have at least one optional field | |
| has_optional = any(field in data for field in optional_fields) | |
| if has_required and has_optional: | |
| # Additional validation: check if runs array contains LangSmith run structure | |
| if 'runs' in data and isinstance(data['runs'], list) and data['runs']: | |
| first_run = data['runs'][0] | |
| if isinstance(first_run, dict): | |
| # Check for LangSmith run fields | |
| run_fields = ['id', 'name', 'run_type', 'start_time'] | |
| has_run_structure = any(field in first_run for field in run_fields) | |
| return has_run_structure | |
| elif 'traces' in data: | |
| # Support the old 'traces' structure too | |
| return True | |
| return has_required and has_optional | |
| def _has_langsmith_patterns(self, content: str) -> bool: | |
| """Check for LangSmith-specific patterns in text content""" | |
| # TODO: Implement pattern-based detection | |
| # Look for LangSmith-specific keywords, UUID patterns, etc. | |
| # PLACEHOLDER - Replace with actual implementation | |
| langsmith_indicators = [ | |
| r'"run_type":\s*"(llm|chain|tool)"', | |
| r'"project_name":\s*"[^"]+', | |
| r'"trace_id":\s*"[a-f0-9-]{36}"', | |
| r'"start_time":\s*"[\d-T:\.Z]+"' | |
| ] | |
| return any(re.search(pattern, content) for pattern in langsmith_indicators) | |
| def _extract_agents(self, data: Dict[str, Any]) -> List[AgentInfo]: | |
| """Extract agent information from LangSmith trace data""" | |
| agents = [] | |
| # Extract from both 'runs' and 'traces' arrays for compatibility | |
| runs_data = data.get('runs', data.get('traces', [])) | |
| if runs_data: | |
| for run in runs_data: | |
| if isinstance(run, dict) and run.get('run_type') == 'llm': | |
| agent_name = run.get('name', 'Unknown Agent') | |
| agent_id = run.get('id', 'unknown') | |
| # Extract model information if available | |
| model = None | |
| if 'extra' in run and isinstance(run['extra'], dict): | |
| model = run['extra'].get('model') | |
| agent = AgentInfo( | |
| name=agent_name, | |
| agent_type='llm', | |
| model=model, | |
| agent_id=agent_id | |
| ) | |
| agents.append(agent) | |
| return agents | |
| def _extract_tools(self, data: Dict[str, Any]) -> List[ToolInfo]: | |
| """Extract tool usage information from LangSmith trace data""" | |
| tools = [] | |
| # Extract from both 'runs' and 'traces' arrays for compatibility | |
| runs_data = data.get('runs', data.get('traces', [])) | |
| if runs_data: | |
| for run in runs_data: | |
| if isinstance(run, dict) and run.get('run_type') == 'tool': | |
| tool_name = run.get('name', 'Unknown Tool') | |
| tool_id = run.get('id', 'unknown') | |
| # Extract input/output information if available | |
| inputs = run.get('inputs', {}) | |
| outputs = run.get('outputs', {}) | |
| tool = ToolInfo( | |
| name=tool_name, | |
| tool_type='external', | |
| tool_id=tool_id, | |
| inputs=inputs, | |
| outputs=outputs | |
| ) | |
| tools.append(tool) | |
| return tools | |
| def _extract_workflow_info(self, data: Dict[str, Any], trace_metadata: Optional[Dict[str, Any]] = None) -> Optional[WorkflowInfo]: | |
| """Extract workflow and execution information from LangSmith trace data""" | |
| # Extract basic workflow information from top-level fields | |
| project_name = data.get('project_name') | |
| trace_id = data.get('trace_id') | |
| trace_name = data.get('trace_name') | |
| # Get run count from runs array or total_runs field | |
| runs_data = data.get('runs', data.get('traces', [])) | |
| total_steps = data.get('total_runs', len(runs_data) if runs_data else 0) | |
| # Extract timestamps from runs | |
| start_time, end_time = self._extract_timestamps_from_runs(runs_data) | |
| duration_ms = self._calculate_duration(start_time, end_time) | |
| return WorkflowInfo( | |
| project_name=project_name, | |
| run_id=trace_id, | |
| total_steps=total_steps, | |
| start_time=start_time, | |
| end_time=end_time, | |
| duration_ms=duration_ms, | |
| workflow_type='sequential', # Default for LangSmith traces | |
| workflow_name=trace_name | |
| ) | |
| def _extract_timestamps_from_runs(self, runs_data: List[Dict[str, Any]]) -> tuple[Optional[str], Optional[str]]: | |
| """Extract start and end timestamps from runs array""" | |
| start_time = None | |
| end_time = None | |
| if runs_data: | |
| start_times = [] | |
| end_times = [] | |
| for run in runs_data: | |
| if isinstance(run, dict): | |
| if 'start_time' in run: | |
| start_times.append(run['start_time']) | |
| if 'end_time' in run: | |
| end_times.append(run['end_time']) | |
| # Get earliest start time and latest end time | |
| if start_times: | |
| start_time = min(start_times) | |
| if end_times: | |
| end_time = max(end_times) | |
| return start_time, end_time | |
| def _extract_data_flow_info(self, data: Dict[str, Any]) -> Optional[DataFlowInfo]: | |
| """Extract data flow and transformation patterns""" | |
| input_types = [] | |
| output_types = [] | |
| transformations = [] | |
| # Extract from both 'runs' and 'traces' arrays for compatibility | |
| runs_data = data.get('runs', data.get('traces', [])) | |
| if runs_data: | |
| for run in runs_data: | |
| if isinstance(run, dict): | |
| # Analyze inputs and outputs | |
| if 'inputs' in run and run['inputs']: | |
| input_data = run['inputs'] | |
| if isinstance(input_data, dict): | |
| input_types.extend(list(input_data.keys())) | |
| if 'outputs' in run and run['outputs']: | |
| output_data = run['outputs'] | |
| if isinstance(output_data, dict): | |
| output_types.extend(list(output_data.keys())) | |
| # Remove duplicates | |
| input_types = list(set(input_types)) | |
| output_types = list(set(output_types)) | |
| return DataFlowInfo( | |
| input_types=input_types, | |
| output_types=output_types, | |
| transformation_patterns=transformations | |
| ) | |
| def _suggest_context_types(self, data: Dict[str, Any]) -> List[str]: | |
| """Suggest relevant context document types for this LangSmith trace""" | |
| # TODO: Implement context type suggestion logic | |
| # Based on detected patterns, suggest relevant context documents: | |
| # - "schema" for API/database operations | |
| # - "documentation" for complex workflows | |
| # - "guidelines" for specific domains | |
| # - "examples" for similar patterns | |
| # PLACEHOLDER - Replace with actual implementation | |
| suggestions = ["domain_knowledge"] # Default suggestion | |
| # Add more specific suggestions based on detected patterns | |
| if data.get('project_name'): | |
| suggestions.append("documentation") | |
| return suggestions | |
| def _generate_langsmith_specific_hints(self, data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Generate LangSmith-specific extraction hints""" | |
| # TODO: Implement LangSmith-specific hint generation | |
| # Provide specific guidance for knowledge extractor: | |
| # - Expected entity patterns based on run_types | |
| # - Relationship patterns in LangSmith hierarchies | |
| # - Tool usage patterns | |
| # - Data flow expectations | |
| # PLACEHOLDER - Replace with actual implementation | |
| hints = {} | |
| if 'traces' in data: | |
| traces = data['traces'] | |
| if isinstance(traces, list): | |
| hints['langsmith_trace_count'] = len(traces) | |
| hints['run_types'] = list(set(trace.get('run_type', 'unknown') | |
| for trace in traces if isinstance(trace, dict))) | |
| return hints | |
| def _calculate_confidence(self, data: Dict[str, Any]) -> float: | |
| """Calculate confidence score for parsing accuracy""" | |
| # TODO: Implement confidence calculation | |
| # Base confidence on: | |
| # - Presence of required LangSmith fields | |
| # - Data structure completeness | |
| # - Successful parsing of nested elements | |
| # PLACEHOLDER - Replace with actual implementation | |
| confidence = 0.5 # Base confidence | |
| # Increase confidence based on detected elements | |
| if data.get('project_name'): | |
| confidence += 0.1 | |
| if data.get('run_id'): | |
| confidence += 0.1 | |
| if data.get('traces'): | |
| confidence += 0.2 | |
| if data.get('export_timestamp'): | |
| confidence += 0.1 | |
| return min(confidence, 1.0) | |
| def _create_minimal_metadata(self, trace_metadata: Optional[Dict[str, Any]]) -> ParsedMetadata: | |
| """Create minimal metadata when parsing fails""" | |
| return ParsedMetadata( | |
| platform="langsmith", | |
| trace_source="langsmith", | |
| confidence=0.1, | |
| suggested_context_types=["domain_knowledge"] | |
| ) |