Spaces:
Running
Running
File size: 15,114 Bytes
c2ea5ed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 |
"""
LangSmith Trace Parser
Rule-based parser for extracting structured metadata from LangSmith traces.
This parser identifies and extracts the guaranteed structural elements that
every LangSmith trace contains, providing reliable metadata to enhance
the multi-agent knowledge extraction process.
LangSmith traces typically contain:
- Project information (project_name)
- Run hierarchy (run_id, trace_id, parent runs)
- Agent/LLM information (run_type: "llm", "chain", "tool")
- Input/Output data structures
- Timing information (start_time, end_time)
- Tool usage patterns
- Nested execution flows
"""
from typing import Dict, List, Any, Optional
from datetime import datetime
import json
import re
import logging
from .base_parser import (
BaseTraceParser, ParsedMetadata, AgentInfo, ToolInfo,
WorkflowInfo, DataFlowInfo
)
logger = logging.getLogger(__name__)
class LangSmithParser(BaseTraceParser):
"""
Parser for LangSmith observability platform traces.
Extracts structural metadata that is guaranteed to be present in LangSmith traces,
including project information, run hierarchy, agent types, and execution patterns.
"""
@property
def platform_name(self) -> str:
return "langsmith"
@property
def supported_trace_types(self) -> List[str]:
return ["langsmith_processed_import", "langsmith_export", "langsmith"]
def can_parse(self, trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> bool:
"""
Determine if this trace is from LangSmith platform.
Checks for:
1. Database metadata indicating LangSmith source
2. LangSmith-specific JSON structure markers
3. LangSmith field patterns in content
"""
# Check database metadata first (most reliable)
if trace_metadata:
trace_source = trace_metadata.get('platform', '')
trace_type = trace_metadata.get('processing_type', '')
if trace_source == 'langsmith' or 'langsmith' in trace_type.lower():
return True
# Check for LangSmith JSON structure markers
try:
parsed_content = self._safe_json_parse(trace_content)
if parsed_content and self._has_langsmith_structure(parsed_content):
return True
except Exception:
pass
# Check for LangSmith field patterns in content
return self._has_langsmith_patterns(trace_content)
def parse_trace(self, trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> ParsedMetadata:
"""
Parse LangSmith trace and extract structured metadata.
Args:
trace_content: Raw trace content (typically JSON)
trace_metadata: Database metadata about the trace
Returns:
ParsedMetadata with LangSmith-specific structural information
"""
self.logger.info("Starting LangSmith trace parsing")
# Parse JSON content
parsed_content = self._safe_json_parse(trace_content)
if not parsed_content:
return self._create_minimal_metadata(trace_metadata)
# Extract core components (parsing logic to be implemented)
agents = self._extract_agents(parsed_content)
tools = self._extract_tools(parsed_content)
workflow = self._extract_workflow_info(parsed_content, trace_metadata)
data_flow = self._extract_data_flow_info(parsed_content)
# Create parsed metadata
metadata = ParsedMetadata(
platform="langsmith",
trace_source="langsmith",
confidence=self._calculate_confidence(parsed_content),
agents=agents,
tools=tools,
workflow=workflow,
data_flow=data_flow,
raw_platform_data=parsed_content,
suggested_context_types=self._suggest_context_types(parsed_content)
)
# Generate extraction hints
metadata.extraction_hints = self.generate_extraction_hints(metadata)
metadata.extraction_hints.update(self._generate_langsmith_specific_hints(parsed_content))
self.logger.info(f"LangSmith parsing complete: {len(agents)} agents, {len(tools)} tools")
return metadata
def _has_langsmith_structure(self, data: Dict[str, Any]) -> bool:
"""Check if JSON data has LangSmith-specific structure markers"""
# Check for LangSmith export structure fields
# Our imported traces have: trace_id, trace_name, project_name, runs, export_time, total_runs
required_fields = ['trace_id', 'project_name']
optional_fields = ['runs', 'traces', 'trace_name', 'export_time', 'total_runs']
# Must have required fields
has_required = all(field in data for field in required_fields)
# Must have at least one optional field
has_optional = any(field in data for field in optional_fields)
if has_required and has_optional:
# Additional validation: check if runs array contains LangSmith run structure
if 'runs' in data and isinstance(data['runs'], list) and data['runs']:
first_run = data['runs'][0]
if isinstance(first_run, dict):
# Check for LangSmith run fields
run_fields = ['id', 'name', 'run_type', 'start_time']
has_run_structure = any(field in first_run for field in run_fields)
return has_run_structure
elif 'traces' in data:
# Support the old 'traces' structure too
return True
return has_required and has_optional
def _has_langsmith_patterns(self, content: str) -> bool:
"""Check for LangSmith-specific patterns in text content"""
# TODO: Implement pattern-based detection
# Look for LangSmith-specific keywords, UUID patterns, etc.
# PLACEHOLDER - Replace with actual implementation
langsmith_indicators = [
r'"run_type":\s*"(llm|chain|tool)"',
r'"project_name":\s*"[^"]+',
r'"trace_id":\s*"[a-f0-9-]{36}"',
r'"start_time":\s*"[\d-T:\.Z]+"'
]
return any(re.search(pattern, content) for pattern in langsmith_indicators)
def _extract_agents(self, data: Dict[str, Any]) -> List[AgentInfo]:
"""Extract agent information from LangSmith trace data"""
agents = []
# Extract from both 'runs' and 'traces' arrays for compatibility
runs_data = data.get('runs', data.get('traces', []))
if runs_data:
for run in runs_data:
if isinstance(run, dict) and run.get('run_type') == 'llm':
agent_name = run.get('name', 'Unknown Agent')
agent_id = run.get('id', 'unknown')
# Extract model information if available
model = None
if 'extra' in run and isinstance(run['extra'], dict):
model = run['extra'].get('model')
agent = AgentInfo(
name=agent_name,
agent_type='llm',
model=model,
agent_id=agent_id
)
agents.append(agent)
return agents
def _extract_tools(self, data: Dict[str, Any]) -> List[ToolInfo]:
"""Extract tool usage information from LangSmith trace data"""
tools = []
# Extract from both 'runs' and 'traces' arrays for compatibility
runs_data = data.get('runs', data.get('traces', []))
if runs_data:
for run in runs_data:
if isinstance(run, dict) and run.get('run_type') == 'tool':
tool_name = run.get('name', 'Unknown Tool')
tool_id = run.get('id', 'unknown')
# Extract input/output information if available
inputs = run.get('inputs', {})
outputs = run.get('outputs', {})
tool = ToolInfo(
name=tool_name,
tool_type='external',
tool_id=tool_id,
inputs=inputs,
outputs=outputs
)
tools.append(tool)
return tools
def _extract_workflow_info(self, data: Dict[str, Any], trace_metadata: Optional[Dict[str, Any]] = None) -> Optional[WorkflowInfo]:
"""Extract workflow and execution information from LangSmith trace data"""
# Extract basic workflow information from top-level fields
project_name = data.get('project_name')
trace_id = data.get('trace_id')
trace_name = data.get('trace_name')
# Get run count from runs array or total_runs field
runs_data = data.get('runs', data.get('traces', []))
total_steps = data.get('total_runs', len(runs_data) if runs_data else 0)
# Extract timestamps from runs
start_time, end_time = self._extract_timestamps_from_runs(runs_data)
duration_ms = self._calculate_duration(start_time, end_time)
return WorkflowInfo(
project_name=project_name,
run_id=trace_id,
total_steps=total_steps,
start_time=start_time,
end_time=end_time,
duration_ms=duration_ms,
workflow_type='sequential', # Default for LangSmith traces
workflow_name=trace_name
)
def _extract_timestamps_from_runs(self, runs_data: List[Dict[str, Any]]) -> tuple[Optional[str], Optional[str]]:
"""Extract start and end timestamps from runs array"""
start_time = None
end_time = None
if runs_data:
start_times = []
end_times = []
for run in runs_data:
if isinstance(run, dict):
if 'start_time' in run:
start_times.append(run['start_time'])
if 'end_time' in run:
end_times.append(run['end_time'])
# Get earliest start time and latest end time
if start_times:
start_time = min(start_times)
if end_times:
end_time = max(end_times)
return start_time, end_time
def _extract_data_flow_info(self, data: Dict[str, Any]) -> Optional[DataFlowInfo]:
"""Extract data flow and transformation patterns"""
input_types = []
output_types = []
transformations = []
# Extract from both 'runs' and 'traces' arrays for compatibility
runs_data = data.get('runs', data.get('traces', []))
if runs_data:
for run in runs_data:
if isinstance(run, dict):
# Analyze inputs and outputs
if 'inputs' in run and run['inputs']:
input_data = run['inputs']
if isinstance(input_data, dict):
input_types.extend(list(input_data.keys()))
if 'outputs' in run and run['outputs']:
output_data = run['outputs']
if isinstance(output_data, dict):
output_types.extend(list(output_data.keys()))
# Remove duplicates
input_types = list(set(input_types))
output_types = list(set(output_types))
return DataFlowInfo(
input_types=input_types,
output_types=output_types,
transformation_patterns=transformations
)
def _suggest_context_types(self, data: Dict[str, Any]) -> List[str]:
"""Suggest relevant context document types for this LangSmith trace"""
# TODO: Implement context type suggestion logic
# Based on detected patterns, suggest relevant context documents:
# - "schema" for API/database operations
# - "documentation" for complex workflows
# - "guidelines" for specific domains
# - "examples" for similar patterns
# PLACEHOLDER - Replace with actual implementation
suggestions = ["domain_knowledge"] # Default suggestion
# Add more specific suggestions based on detected patterns
if data.get('project_name'):
suggestions.append("documentation")
return suggestions
def _generate_langsmith_specific_hints(self, data: Dict[str, Any]) -> Dict[str, Any]:
"""Generate LangSmith-specific extraction hints"""
# TODO: Implement LangSmith-specific hint generation
# Provide specific guidance for knowledge extractor:
# - Expected entity patterns based on run_types
# - Relationship patterns in LangSmith hierarchies
# - Tool usage patterns
# - Data flow expectations
# PLACEHOLDER - Replace with actual implementation
hints = {}
if 'traces' in data:
traces = data['traces']
if isinstance(traces, list):
hints['langsmith_trace_count'] = len(traces)
hints['run_types'] = list(set(trace.get('run_type', 'unknown')
for trace in traces if isinstance(trace, dict)))
return hints
def _calculate_confidence(self, data: Dict[str, Any]) -> float:
"""Calculate confidence score for parsing accuracy"""
# TODO: Implement confidence calculation
# Base confidence on:
# - Presence of required LangSmith fields
# - Data structure completeness
# - Successful parsing of nested elements
# PLACEHOLDER - Replace with actual implementation
confidence = 0.5 # Base confidence
# Increase confidence based on detected elements
if data.get('project_name'):
confidence += 0.1
if data.get('run_id'):
confidence += 0.1
if data.get('traces'):
confidence += 0.2
if data.get('export_timestamp'):
confidence += 0.1
return min(confidence, 1.0)
def _create_minimal_metadata(self, trace_metadata: Optional[Dict[str, Any]]) -> ParsedMetadata:
"""Create minimal metadata when parsing fails"""
return ParsedMetadata(
platform="langsmith",
trace_source="langsmith",
confidence=0.1,
suggested_context_types=["domain_knowledge"]
) |