AgentGraph / agentgraph /input /parsers /langsmith_parser.py
wu981526092's picture
🚀 Deploy AgentGraph: Complete agent monitoring and knowledge graph system
c2ea5ed
"""
LangSmith Trace Parser
Rule-based parser for extracting structured metadata from LangSmith traces.
This parser identifies and extracts the guaranteed structural elements that
every LangSmith trace contains, providing reliable metadata to enhance
the multi-agent knowledge extraction process.
LangSmith traces typically contain:
- Project information (project_name)
- Run hierarchy (run_id, trace_id, parent runs)
- Agent/LLM information (run_type: "llm", "chain", "tool")
- Input/Output data structures
- Timing information (start_time, end_time)
- Tool usage patterns
- Nested execution flows
"""
from typing import Dict, List, Any, Optional
from datetime import datetime
import json
import re
import logging
from .base_parser import (
BaseTraceParser, ParsedMetadata, AgentInfo, ToolInfo,
WorkflowInfo, DataFlowInfo
)
logger = logging.getLogger(__name__)
class LangSmithParser(BaseTraceParser):
"""
Parser for LangSmith observability platform traces.
Extracts structural metadata that is guaranteed to be present in LangSmith traces,
including project information, run hierarchy, agent types, and execution patterns.
"""
@property
def platform_name(self) -> str:
return "langsmith"
@property
def supported_trace_types(self) -> List[str]:
return ["langsmith_processed_import", "langsmith_export", "langsmith"]
def can_parse(self, trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> bool:
"""
Determine if this trace is from LangSmith platform.
Checks for:
1. Database metadata indicating LangSmith source
2. LangSmith-specific JSON structure markers
3. LangSmith field patterns in content
"""
# Check database metadata first (most reliable)
if trace_metadata:
trace_source = trace_metadata.get('platform', '')
trace_type = trace_metadata.get('processing_type', '')
if trace_source == 'langsmith' or 'langsmith' in trace_type.lower():
return True
# Check for LangSmith JSON structure markers
try:
parsed_content = self._safe_json_parse(trace_content)
if parsed_content and self._has_langsmith_structure(parsed_content):
return True
except Exception:
pass
# Check for LangSmith field patterns in content
return self._has_langsmith_patterns(trace_content)
def parse_trace(self, trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> ParsedMetadata:
"""
Parse LangSmith trace and extract structured metadata.
Args:
trace_content: Raw trace content (typically JSON)
trace_metadata: Database metadata about the trace
Returns:
ParsedMetadata with LangSmith-specific structural information
"""
self.logger.info("Starting LangSmith trace parsing")
# Parse JSON content
parsed_content = self._safe_json_parse(trace_content)
if not parsed_content:
return self._create_minimal_metadata(trace_metadata)
# Extract core components (parsing logic to be implemented)
agents = self._extract_agents(parsed_content)
tools = self._extract_tools(parsed_content)
workflow = self._extract_workflow_info(parsed_content, trace_metadata)
data_flow = self._extract_data_flow_info(parsed_content)
# Create parsed metadata
metadata = ParsedMetadata(
platform="langsmith",
trace_source="langsmith",
confidence=self._calculate_confidence(parsed_content),
agents=agents,
tools=tools,
workflow=workflow,
data_flow=data_flow,
raw_platform_data=parsed_content,
suggested_context_types=self._suggest_context_types(parsed_content)
)
# Generate extraction hints
metadata.extraction_hints = self.generate_extraction_hints(metadata)
metadata.extraction_hints.update(self._generate_langsmith_specific_hints(parsed_content))
self.logger.info(f"LangSmith parsing complete: {len(agents)} agents, {len(tools)} tools")
return metadata
def _has_langsmith_structure(self, data: Dict[str, Any]) -> bool:
"""Check if JSON data has LangSmith-specific structure markers"""
# Check for LangSmith export structure fields
# Our imported traces have: trace_id, trace_name, project_name, runs, export_time, total_runs
required_fields = ['trace_id', 'project_name']
optional_fields = ['runs', 'traces', 'trace_name', 'export_time', 'total_runs']
# Must have required fields
has_required = all(field in data for field in required_fields)
# Must have at least one optional field
has_optional = any(field in data for field in optional_fields)
if has_required and has_optional:
# Additional validation: check if runs array contains LangSmith run structure
if 'runs' in data and isinstance(data['runs'], list) and data['runs']:
first_run = data['runs'][0]
if isinstance(first_run, dict):
# Check for LangSmith run fields
run_fields = ['id', 'name', 'run_type', 'start_time']
has_run_structure = any(field in first_run for field in run_fields)
return has_run_structure
elif 'traces' in data:
# Support the old 'traces' structure too
return True
return has_required and has_optional
def _has_langsmith_patterns(self, content: str) -> bool:
"""Check for LangSmith-specific patterns in text content"""
# TODO: Implement pattern-based detection
# Look for LangSmith-specific keywords, UUID patterns, etc.
# PLACEHOLDER - Replace with actual implementation
langsmith_indicators = [
r'"run_type":\s*"(llm|chain|tool)"',
r'"project_name":\s*"[^"]+',
r'"trace_id":\s*"[a-f0-9-]{36}"',
r'"start_time":\s*"[\d-T:\.Z]+"'
]
return any(re.search(pattern, content) for pattern in langsmith_indicators)
def _extract_agents(self, data: Dict[str, Any]) -> List[AgentInfo]:
"""Extract agent information from LangSmith trace data"""
agents = []
# Extract from both 'runs' and 'traces' arrays for compatibility
runs_data = data.get('runs', data.get('traces', []))
if runs_data:
for run in runs_data:
if isinstance(run, dict) and run.get('run_type') == 'llm':
agent_name = run.get('name', 'Unknown Agent')
agent_id = run.get('id', 'unknown')
# Extract model information if available
model = None
if 'extra' in run and isinstance(run['extra'], dict):
model = run['extra'].get('model')
agent = AgentInfo(
name=agent_name,
agent_type='llm',
model=model,
agent_id=agent_id
)
agents.append(agent)
return agents
def _extract_tools(self, data: Dict[str, Any]) -> List[ToolInfo]:
"""Extract tool usage information from LangSmith trace data"""
tools = []
# Extract from both 'runs' and 'traces' arrays for compatibility
runs_data = data.get('runs', data.get('traces', []))
if runs_data:
for run in runs_data:
if isinstance(run, dict) and run.get('run_type') == 'tool':
tool_name = run.get('name', 'Unknown Tool')
tool_id = run.get('id', 'unknown')
# Extract input/output information if available
inputs = run.get('inputs', {})
outputs = run.get('outputs', {})
tool = ToolInfo(
name=tool_name,
tool_type='external',
tool_id=tool_id,
inputs=inputs,
outputs=outputs
)
tools.append(tool)
return tools
def _extract_workflow_info(self, data: Dict[str, Any], trace_metadata: Optional[Dict[str, Any]] = None) -> Optional[WorkflowInfo]:
"""Extract workflow and execution information from LangSmith trace data"""
# Extract basic workflow information from top-level fields
project_name = data.get('project_name')
trace_id = data.get('trace_id')
trace_name = data.get('trace_name')
# Get run count from runs array or total_runs field
runs_data = data.get('runs', data.get('traces', []))
total_steps = data.get('total_runs', len(runs_data) if runs_data else 0)
# Extract timestamps from runs
start_time, end_time = self._extract_timestamps_from_runs(runs_data)
duration_ms = self._calculate_duration(start_time, end_time)
return WorkflowInfo(
project_name=project_name,
run_id=trace_id,
total_steps=total_steps,
start_time=start_time,
end_time=end_time,
duration_ms=duration_ms,
workflow_type='sequential', # Default for LangSmith traces
workflow_name=trace_name
)
def _extract_timestamps_from_runs(self, runs_data: List[Dict[str, Any]]) -> tuple[Optional[str], Optional[str]]:
"""Extract start and end timestamps from runs array"""
start_time = None
end_time = None
if runs_data:
start_times = []
end_times = []
for run in runs_data:
if isinstance(run, dict):
if 'start_time' in run:
start_times.append(run['start_time'])
if 'end_time' in run:
end_times.append(run['end_time'])
# Get earliest start time and latest end time
if start_times:
start_time = min(start_times)
if end_times:
end_time = max(end_times)
return start_time, end_time
def _extract_data_flow_info(self, data: Dict[str, Any]) -> Optional[DataFlowInfo]:
"""Extract data flow and transformation patterns"""
input_types = []
output_types = []
transformations = []
# Extract from both 'runs' and 'traces' arrays for compatibility
runs_data = data.get('runs', data.get('traces', []))
if runs_data:
for run in runs_data:
if isinstance(run, dict):
# Analyze inputs and outputs
if 'inputs' in run and run['inputs']:
input_data = run['inputs']
if isinstance(input_data, dict):
input_types.extend(list(input_data.keys()))
if 'outputs' in run and run['outputs']:
output_data = run['outputs']
if isinstance(output_data, dict):
output_types.extend(list(output_data.keys()))
# Remove duplicates
input_types = list(set(input_types))
output_types = list(set(output_types))
return DataFlowInfo(
input_types=input_types,
output_types=output_types,
transformation_patterns=transformations
)
def _suggest_context_types(self, data: Dict[str, Any]) -> List[str]:
"""Suggest relevant context document types for this LangSmith trace"""
# TODO: Implement context type suggestion logic
# Based on detected patterns, suggest relevant context documents:
# - "schema" for API/database operations
# - "documentation" for complex workflows
# - "guidelines" for specific domains
# - "examples" for similar patterns
# PLACEHOLDER - Replace with actual implementation
suggestions = ["domain_knowledge"] # Default suggestion
# Add more specific suggestions based on detected patterns
if data.get('project_name'):
suggestions.append("documentation")
return suggestions
def _generate_langsmith_specific_hints(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Generate LangSmith-specific extraction hints"""
# TODO: Implement LangSmith-specific hint generation
# Provide specific guidance for knowledge extractor:
# - Expected entity patterns based on run_types
# - Relationship patterns in LangSmith hierarchies
# - Tool usage patterns
# - Data flow expectations
# PLACEHOLDER - Replace with actual implementation
hints = {}
if 'traces' in data:
traces = data['traces']
if isinstance(traces, list):
hints['langsmith_trace_count'] = len(traces)
hints['run_types'] = list(set(trace.get('run_type', 'unknown')
for trace in traces if isinstance(trace, dict)))
return hints
def _calculate_confidence(self, data: Dict[str, Any]) -> float:
"""Calculate confidence score for parsing accuracy"""
# TODO: Implement confidence calculation
# Base confidence on:
# - Presence of required LangSmith fields
# - Data structure completeness
# - Successful parsing of nested elements
# PLACEHOLDER - Replace with actual implementation
confidence = 0.5 # Base confidence
# Increase confidence based on detected elements
if data.get('project_name'):
confidence += 0.1
if data.get('run_id'):
confidence += 0.1
if data.get('traces'):
confidence += 0.2
if data.get('export_timestamp'):
confidence += 0.1
return min(confidence, 1.0)
def _create_minimal_metadata(self, trace_metadata: Optional[Dict[str, Any]]) -> ParsedMetadata:
"""Create minimal metadata when parsing fails"""
return ParsedMetadata(
platform="langsmith",
trace_source="langsmith",
confidence=0.1,
suggested_context_types=["domain_knowledge"]
)