Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| Generic LangSmith Trace Schema Parser | |
| This script parses ANY LangSmith trace and extracts universal structural/schema | |
| information without making assumptions about specific workflows or domains. | |
| """ | |
| import json | |
| import sys | |
| from typing import Dict, List, Any, Set, Optional | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| class ComponentInfo: | |
| """Universal schema information about a trace component""" | |
| name: str | |
| run_type: str | |
| depth: int | |
| has_children: bool | |
| child_count: int | |
| execution_time_ms: int | |
| token_usage: Dict[str, int] | |
| status: str | |
| class TraceSchema: | |
| """Universal high-level schema of any trace""" | |
| total_components: int | |
| max_depth: int | |
| component_types: Set[str] | |
| execution_topology: str # "linear", "branched", "deeply_nested" | |
| parallelism_detected: bool | |
| error_components: int | |
| performance_metrics: Dict[str, Any] | |
| metadata_keys: Set[str] | |
| class GlobalSchemaView: | |
| """Comprehensive global view of trace schema and architecture""" | |
| architecture_description: str | |
| execution_flow_summary: str | |
| component_hierarchy: Dict[str, Any] | |
| numerical_overview: Dict[str, Any] | |
| prompt_analytics: Dict[str, Any] | |
| system_complexity_assessment: str | |
| class GenericLangSmithParser: | |
| """Generic parser for any LangSmith trace - no domain assumptions""" | |
| def __init__(self): | |
| pass | |
| def parse_trace_file(self, file_path: str) -> Dict[str, Any]: | |
| """Parse any trace file and extract universal schema information""" | |
| with open(file_path, 'r') as f: | |
| data = json.load(f) | |
| # Handle different trace formats universally | |
| if 'traces' in data: | |
| return self._parse_trace_export(data) | |
| elif 'runs' in data: | |
| # Handle LangSmith export format with 'runs' array | |
| return self._parse_trace_export(data) | |
| elif 'trace' in data: | |
| return self._parse_metadata_format(data) | |
| else: | |
| raise ValueError(f"Unknown trace format in {file_path}") | |
| def parse_directory(self, directory_path: str, max_files: int = 5) -> Dict[str, Any]: | |
| """Parse multiple trace files from a directory""" | |
| directory = Path(directory_path) | |
| if not directory.exists(): | |
| return {"error": f"Directory {directory_path} not found"} | |
| trace_files = list(directory.glob("*.json"))[:max_files] | |
| results = [] | |
| print(f"π Processing {len(trace_files)} files from {directory_path}") | |
| for i, file_path in enumerate(trace_files, 1): | |
| print(f" {i}/{len(trace_files)}: {file_path.name}") | |
| try: | |
| result = self.parse_trace_file(str(file_path)) | |
| result['filename'] = file_path.name | |
| results.append(result) | |
| except Exception as e: | |
| print(f" β Error: {str(e)}") | |
| results.append({ | |
| 'filename': file_path.name, | |
| 'error': str(e) | |
| }) | |
| return { | |
| "directory": directory_path, | |
| "files_processed": len(results), | |
| "results": results, | |
| "summary": self._generate_directory_summary(results) | |
| } | |
| def _parse_trace_export(self, data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Parse any trace export format universally""" | |
| # Handle both 'traces' and 'runs' arrays for compatibility | |
| traces = data.get('traces', data.get('runs', [])) | |
| if not traces: | |
| return {"error": "No traces/runs found"} | |
| # Extract universal metadata without assumptions | |
| metadata = { | |
| "trace_id": data.get('trace_id'), | |
| "run_name": data.get('run_name', data.get('trace_name')), | |
| "total_traces": data.get('total_traces', data.get('total_runs', len(traces))), | |
| "export_timestamp": data.get('export_timestamp', data.get('export_time')), | |
| "project_name": data.get('project_name') | |
| } | |
| # Detect system type generically | |
| system_indicators = self._detect_system_type(traces) | |
| metadata.update(system_indicators) | |
| # Parse component structure universally | |
| components = self._extract_components(traces) | |
| schema = self._analyze_universal_schema(components, traces) | |
| # Generate comprehensive global schema view | |
| global_view = self._generate_global_schema_view(components, schema, traces) | |
| return { | |
| "metadata": metadata, | |
| "schema": schema, | |
| "components": components, | |
| "global_schema_view": global_view | |
| } | |
| def _parse_metadata_format(self, data: Dict[str, Any]) -> Dict[str, Any]: | |
| """Parse backend metadata format""" | |
| trace_info = data.get('trace', {}) | |
| return { | |
| "metadata": { | |
| "trace_id": trace_info.get('trace_id'), | |
| "filename": trace_info.get('filename'), | |
| "trace_source": trace_info.get('trace_source'), | |
| "character_count": trace_info.get('character_count'), | |
| "turn_count": trace_info.get('turn_count'), | |
| "processing_method": trace_info.get('metadata', {}).get('preprocessing_method') | |
| }, | |
| "schema": TraceSchema(0, 0, set(), "metadata_only", False, 0, {}, set()), | |
| "components": [], | |
| "global_schema_view": None | |
| } | |
| def _detect_system_type(self, traces: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Detect system type based on universal indicators""" | |
| indicators = { | |
| "langgraph_detected": False, | |
| "langchain_detected": False, | |
| "custom_system_detected": False | |
| } | |
| metadata_keys = set() | |
| for trace in traces: | |
| extra = trace.get('extra', {}) | |
| metadata = extra.get('metadata', {}) | |
| # Collect all metadata keys for pattern analysis | |
| metadata_keys.update(metadata.keys()) | |
| # Universal detection without hardcoded assumptions | |
| if any(key.startswith('langgraph') for key in metadata.keys()): | |
| indicators["langgraph_detected"] = True | |
| if any(key.startswith('langchain') for key in metadata.keys()): | |
| indicators["langchain_detected"] = True | |
| # Check for custom/unknown systems | |
| if metadata and not indicators["langgraph_detected"] and not indicators["langchain_detected"]: | |
| indicators["custom_system_detected"] = True | |
| indicators["metadata_keys"] = list(metadata_keys) | |
| return indicators | |
| def _extract_components(self, traces: List[Dict[str, Any]]) -> List[ComponentInfo]: | |
| """Extract universal component information""" | |
| components = [] | |
| for trace in traces: | |
| # Universal timing calculation | |
| start_time = trace.get('start_time', '') | |
| end_time = trace.get('end_time', '') | |
| exec_time = self._calculate_execution_time(start_time, end_time) | |
| # Universal token extraction | |
| tokens = { | |
| 'prompt_tokens': trace.get('prompt_tokens', 0), | |
| 'completion_tokens': trace.get('completion_tokens', 0), | |
| 'total_tokens': trace.get('total_tokens', 0) | |
| } | |
| # Universal depth detection | |
| depth = self._extract_depth(trace) | |
| # Universal child counting | |
| child_ids = trace.get('child_run_ids', []) | |
| child_count = len(child_ids) if child_ids else 0 | |
| component = ComponentInfo( | |
| name=trace.get('name', 'Unknown'), | |
| run_type=trace.get('run_type', 'unknown'), | |
| depth=depth, | |
| has_children=child_count > 0, | |
| child_count=child_count, | |
| execution_time_ms=exec_time, | |
| token_usage=tokens, | |
| status=trace.get('status', 'unknown') | |
| ) | |
| components.append(component) | |
| return components | |
| def _extract_depth(self, trace: Dict[str, Any]) -> int: | |
| """Extract depth using universal methods""" | |
| # Try multiple depth indicators | |
| depth_indicators = [ | |
| trace.get('extra', {}).get('metadata', {}).get('ls_run_depth'), | |
| trace.get('extra', {}).get('metadata', {}).get('langgraph_step'), | |
| len(trace.get('parent_run_ids', [])), # Use parent chain length | |
| ] | |
| for depth in depth_indicators: | |
| if depth is not None and isinstance(depth, int): | |
| return depth | |
| return 0 | |
| def _analyze_universal_schema(self, components: List[ComponentInfo], traces: List[Dict[str, Any]]) -> TraceSchema: | |
| """Analyze components to determine universal schema patterns""" | |
| if not components: | |
| return TraceSchema(0, 0, set(), "empty", False, 0, {}, set()) | |
| # Universal component analysis | |
| component_types = {comp.run_type for comp in components} | |
| max_depth = max(comp.depth for comp in components) if components else 0 | |
| error_count = sum(1 for comp in components if comp.status in ['error', 'failed', 'interrupted']) | |
| # Universal topology detection | |
| topology = self._detect_execution_topology(components) | |
| # Universal parallelism detection | |
| parallelism = self._detect_parallelism(components) | |
| # Universal metadata collection | |
| metadata_keys = set() | |
| for trace in traces: | |
| metadata = trace.get('extra', {}).get('metadata', {}) | |
| metadata_keys.update(metadata.keys()) | |
| # Enhanced performance metrics with detailed analytics | |
| total_time = sum(comp.execution_time_ms for comp in components) | |
| total_tokens = sum(comp.token_usage['total_tokens'] for comp in components) | |
| total_prompt_tokens = sum(comp.token_usage['prompt_tokens'] for comp in components) | |
| total_completion_tokens = sum(comp.token_usage['completion_tokens'] for comp in components) | |
| # Calculate prompt call analytics | |
| llm_components = [comp for comp in components if comp.run_type in ['llm', 'chat_model', 'language_model']] | |
| prompt_calls = len(llm_components) | |
| # Calculate depth distribution | |
| depth_distribution = {} | |
| for comp in components: | |
| depth_distribution[comp.depth] = depth_distribution.get(comp.depth, 0) + 1 | |
| performance_metrics = { | |
| "total_execution_time_ms": total_time, | |
| "avg_execution_time_ms": total_time / len(components) if components else 0, | |
| "min_execution_time_ms": min(comp.execution_time_ms for comp in components) if components else 0, | |
| "max_execution_time_ms": max(comp.execution_time_ms for comp in components) if components else 0, | |
| "total_tokens": total_tokens, | |
| "total_prompt_tokens": total_prompt_tokens, | |
| "total_completion_tokens": total_completion_tokens, | |
| "avg_prompt_tokens": total_prompt_tokens / prompt_calls if prompt_calls > 0 else 0, | |
| "avg_completion_tokens": total_completion_tokens / prompt_calls if prompt_calls > 0 else 0, | |
| "avg_tokens_per_component": total_tokens / len(components) if components else 0, | |
| "prompt_calls_count": prompt_calls, | |
| "token_efficiency": total_tokens / total_time if total_time > 0 else 0, | |
| "depth_distribution": depth_distribution, | |
| "components_with_children": sum(1 for comp in components if comp.has_children), | |
| "avg_children_per_component": sum(comp.child_count for comp in components) / len(components) if components else 0 | |
| } | |
| return TraceSchema( | |
| total_components=len(components), | |
| max_depth=max_depth, | |
| component_types=component_types, | |
| execution_topology=topology, | |
| parallelism_detected=parallelism, | |
| error_components=error_count, | |
| performance_metrics=performance_metrics, | |
| metadata_keys=metadata_keys | |
| ) | |
| def _generate_global_schema_view(self, components: List[ComponentInfo], schema: TraceSchema, traces: List[Dict[str, Any]]) -> GlobalSchemaView: | |
| """Generate comprehensive global view of the trace schema""" | |
| # Architecture Description | |
| architecture_desc = self._generate_architecture_description(schema, components) | |
| # Execution Flow Summary | |
| flow_summary = self._generate_execution_flow_summary(schema, components) | |
| # Component Hierarchy | |
| hierarchy = self._build_component_hierarchy(components) | |
| # Numerical Overview | |
| numerical_overview = self._compile_numerical_overview(schema) | |
| # Prompt Analytics | |
| prompt_analytics = self._analyze_prompt_patterns(components, schema) | |
| # System Complexity Assessment | |
| complexity_assessment = self._assess_system_complexity(schema, components) | |
| return GlobalSchemaView( | |
| architecture_description=architecture_desc, | |
| execution_flow_summary=flow_summary, | |
| component_hierarchy=hierarchy, | |
| numerical_overview=numerical_overview, | |
| prompt_analytics=prompt_analytics, | |
| system_complexity_assessment=complexity_assessment | |
| ) | |
| def _generate_architecture_description(self, schema: TraceSchema, components: List[ComponentInfo]) -> str: | |
| """Generate high-level architecture description""" | |
| component_types = list(schema.component_types) | |
| # Categorize components | |
| processing_components = [t for t in component_types if t in ['llm', 'chat_model', 'chain', 'agent']] | |
| data_components = [t for t in component_types if t in ['retriever', 'vectorstore', 'document_loader']] | |
| tool_components = [t for t in component_types if t in ['tool', 'function', 'api']] | |
| control_components = [t for t in component_types if t in ['router', 'conditional', 'parallel']] | |
| description_parts = [] | |
| if processing_components: | |
| description_parts.append(f"**Processing Layer:** {len(processing_components)} type(s) - {', '.join(processing_components)}") | |
| if data_components: | |
| description_parts.append(f"**Data Layer:** {len(data_components)} type(s) - {', '.join(data_components)}") | |
| if tool_components: | |
| description_parts.append(f"**Tool Layer:** {len(tool_components)} type(s) - {', '.join(tool_components)}") | |
| if control_components: | |
| description_parts.append(f"**Control Layer:** {len(control_components)} type(s) - {', '.join(control_components)}") | |
| # Architecture pattern detection | |
| if len(processing_components) > 0 and len(data_components) > 0: | |
| pattern = "**Architecture Pattern:** Retrieval-Augmented Generation (RAG) System" | |
| elif len(tool_components) > 2: | |
| pattern = "**Architecture Pattern:** Multi-Tool Agent System" | |
| elif schema.max_depth > 3: | |
| pattern = "**Architecture Pattern:** Hierarchical Processing Pipeline" | |
| else: | |
| pattern = "**Architecture Pattern:** Linear Processing Chain" | |
| return f"{pattern}\n\n" + "\n".join(description_parts) | |
| def _generate_execution_flow_summary(self, schema: TraceSchema, components: List[ComponentInfo]) -> str: | |
| """Generate execution flow summary""" | |
| perf = schema.performance_metrics | |
| flow_characteristics = [] | |
| # Execution topology description | |
| if schema.execution_topology == "flat": | |
| flow_characteristics.append("**Flow Type:** Flat execution (all components at same level)") | |
| elif schema.execution_topology == "shallow": | |
| flow_characteristics.append(f"**Flow Type:** Shallow hierarchy ({schema.max_depth} levels)") | |
| elif schema.execution_topology == "moderate": | |
| flow_characteristics.append(f"**Flow Type:** Moderate hierarchy ({schema.max_depth} levels)") | |
| else: | |
| flow_characteristics.append(f"**Flow Type:** Deep hierarchy ({schema.max_depth} levels)") | |
| # Parallelism description | |
| if schema.parallelism_detected: | |
| parallel_components = max(perf['depth_distribution'].values()) if perf['depth_distribution'] else 1 | |
| flow_characteristics.append(f"**Concurrency:** Parallel execution detected (max {parallel_components} concurrent components)") | |
| else: | |
| flow_characteristics.append("**Concurrency:** Sequential execution") | |
| # Error handling | |
| if schema.error_components > 0: | |
| error_rate = (schema.error_components / schema.total_components) * 100 | |
| flow_characteristics.append(f"**Error Handling:** {schema.error_components} failed components ({error_rate:.1f}% failure rate)") | |
| else: | |
| flow_characteristics.append("**Error Handling:** Clean execution (no failures detected)") | |
| return "\n".join(flow_characteristics) | |
| def _build_component_hierarchy(self, components: List[ComponentInfo]) -> Dict[str, Any]: | |
| """Build component hierarchy structure""" | |
| hierarchy = { | |
| "total_components": len(components), | |
| "by_depth": {}, | |
| "by_type": {}, | |
| "branching_factor": {} | |
| } | |
| # Group by depth | |
| for comp in components: | |
| depth = comp.depth | |
| if depth not in hierarchy["by_depth"]: | |
| hierarchy["by_depth"][depth] = [] | |
| hierarchy["by_depth"][depth].append({ | |
| "name": comp.name, | |
| "type": comp.run_type, | |
| "children": comp.child_count, | |
| "execution_time": comp.execution_time_ms | |
| }) | |
| # Group by type | |
| for comp in components: | |
| comp_type = comp.run_type | |
| if comp_type not in hierarchy["by_type"]: | |
| hierarchy["by_type"][comp_type] = 0 | |
| hierarchy["by_type"][comp_type] += 1 | |
| # Calculate branching factors | |
| for comp in components: | |
| if comp.has_children: | |
| if comp.child_count not in hierarchy["branching_factor"]: | |
| hierarchy["branching_factor"][comp.child_count] = 0 | |
| hierarchy["branching_factor"][comp.child_count] += 1 | |
| return hierarchy | |
| def _compile_numerical_overview(self, schema: TraceSchema) -> Dict[str, Any]: | |
| """Compile comprehensive numerical overview""" | |
| perf = schema.performance_metrics | |
| return { | |
| # Component Statistics | |
| "component_stats": { | |
| "total_components": schema.total_components, | |
| "unique_component_types": len(schema.component_types), | |
| "max_depth": schema.max_depth, | |
| "components_with_children": perf['components_with_children'], | |
| "avg_children_per_parent": perf['avg_children_per_component'], | |
| "error_components": schema.error_components, | |
| "success_rate": ((schema.total_components - schema.error_components) / schema.total_components * 100) if schema.total_components > 0 else 0 | |
| }, | |
| # Execution Time Analytics | |
| "timing_analytics": { | |
| "total_execution_time_ms": perf['total_execution_time_ms'], | |
| "total_execution_time_seconds": perf['total_execution_time_ms'] / 1000, | |
| "avg_execution_time_ms": perf['avg_execution_time_ms'], | |
| "min_execution_time_ms": perf['min_execution_time_ms'], | |
| "max_execution_time_ms": perf['max_execution_time_ms'], | |
| "execution_time_variance": perf['max_execution_time_ms'] - perf['min_execution_time_ms'] | |
| }, | |
| # Token Analytics | |
| "token_analytics": { | |
| "total_tokens": perf['total_tokens'], | |
| "total_prompt_tokens": perf['total_prompt_tokens'], | |
| "total_completion_tokens": perf['total_completion_tokens'], | |
| "avg_tokens_per_component": perf['avg_tokens_per_component'], | |
| "token_efficiency_per_ms": perf['token_efficiency'], | |
| "prompt_to_completion_ratio": perf['total_prompt_tokens'] / perf['total_completion_tokens'] if perf['total_completion_tokens'] > 0 else 0 | |
| }, | |
| # Depth Distribution | |
| "depth_distribution": perf['depth_distribution'] | |
| } | |
| def _analyze_prompt_patterns(self, components: List[ComponentInfo], schema: TraceSchema) -> Dict[str, Any]: | |
| """Analyze prompt call patterns and statistics""" | |
| perf = schema.performance_metrics | |
| # Find LLM/prompt components | |
| llm_components = [comp for comp in components if comp.run_type in ['llm', 'chat_model', 'language_model', 'prompt']] | |
| if not llm_components: | |
| return { | |
| "prompt_calls_detected": 0, | |
| "message": "No LLM/prompt components detected in trace" | |
| } | |
| # Calculate prompt statistics | |
| prompt_tokens = [comp.token_usage['prompt_tokens'] for comp in llm_components if comp.token_usage['prompt_tokens'] > 0] | |
| completion_tokens = [comp.token_usage['completion_tokens'] for comp in llm_components if comp.token_usage['completion_tokens'] > 0] | |
| execution_times = [comp.execution_time_ms for comp in llm_components if comp.execution_time_ms > 0] | |
| analytics = { | |
| "prompt_calls_detected": len(llm_components), | |
| "successful_calls": len([comp for comp in llm_components if comp.status == 'success']), | |
| "failed_calls": len([comp for comp in llm_components if comp.status in ['error', 'failed']]), | |
| # Token statistics | |
| "token_statistics": { | |
| "avg_prompt_tokens": perf['avg_prompt_tokens'], | |
| "avg_completion_tokens": perf['avg_completion_tokens'], | |
| "total_prompt_tokens": perf['total_prompt_tokens'], | |
| "total_completion_tokens": perf['total_completion_tokens'], | |
| "min_prompt_tokens": min(prompt_tokens) if prompt_tokens else 0, | |
| "max_prompt_tokens": max(prompt_tokens) if prompt_tokens else 0, | |
| "min_completion_tokens": min(completion_tokens) if completion_tokens else 0, | |
| "max_completion_tokens": max(completion_tokens) if completion_tokens else 0 | |
| }, | |
| # Performance statistics | |
| "performance_statistics": { | |
| "avg_llm_execution_time_ms": sum(execution_times) / len(execution_times) if execution_times else 0, | |
| "min_llm_execution_time_ms": min(execution_times) if execution_times else 0, | |
| "max_llm_execution_time_ms": max(execution_times) if execution_times else 0, | |
| "total_llm_execution_time_ms": sum(execution_times), | |
| "llm_time_percentage": (sum(execution_times) / schema.performance_metrics['total_execution_time_ms'] * 100) if schema.performance_metrics['total_execution_time_ms'] > 0 else 0 | |
| }, | |
| # Call pattern analysis | |
| "call_patterns": { | |
| "depth_distribution": {depth: len([comp for comp in llm_components if comp.depth == depth]) for depth in set(comp.depth for comp in llm_components)}, | |
| "component_types": list(set(comp.run_type for comp in llm_components)), | |
| "parallel_llm_calls": len([depth for depth, count in {depth: len([comp for comp in llm_components if comp.depth == depth]) for depth in set(comp.depth for comp in llm_components)}.items() if count > 1]) | |
| } | |
| } | |
| return analytics | |
| def _assess_system_complexity(self, schema: TraceSchema, components: List[ComponentInfo]) -> str: | |
| """Assess overall system complexity""" | |
| complexity_factors = [] | |
| # Component count factor | |
| if schema.total_components < 5: | |
| complexity_factors.append("Simple (few components)") | |
| elif schema.total_components < 20: | |
| complexity_factors.append("Moderate (medium component count)") | |
| else: | |
| complexity_factors.append("Complex (many components)") | |
| # Depth factor | |
| if schema.max_depth <= 1: | |
| complexity_factors.append("Flat architecture") | |
| elif schema.max_depth <= 3: | |
| complexity_factors.append("Moderate hierarchy") | |
| else: | |
| complexity_factors.append("Deep hierarchical structure") | |
| # Type diversity factor | |
| type_count = len(schema.component_types) | |
| if type_count <= 2: | |
| complexity_factors.append("Homogeneous components") | |
| elif type_count <= 5: | |
| complexity_factors.append("Diverse component types") | |
| else: | |
| complexity_factors.append("Highly diverse component ecosystem") | |
| # Error factor | |
| if schema.error_components > 0: | |
| error_rate = (schema.error_components / schema.total_components) * 100 | |
| if error_rate > 20: | |
| complexity_factors.append("High error rate (system instability)") | |
| elif error_rate > 5: | |
| complexity_factors.append("Moderate error rate") | |
| else: | |
| complexity_factors.append("Low error rate") | |
| else: | |
| complexity_factors.append("Error-free execution") | |
| # Parallelism factor | |
| if schema.parallelism_detected: | |
| complexity_factors.append("Concurrent execution patterns") | |
| else: | |
| complexity_factors.append("Sequential execution") | |
| return " β’ ".join(complexity_factors) | |
| def _detect_execution_topology(self, components: List[ComponentInfo]) -> str: | |
| """Detect execution topology without domain assumptions""" | |
| if not components: | |
| return "empty" | |
| depths = [comp.depth for comp in components] | |
| max_depth = max(depths) | |
| if max_depth == 0: | |
| return "flat" | |
| elif max_depth <= 2: | |
| return "shallow" | |
| elif max_depth <= 5: | |
| return "moderate" | |
| else: | |
| return "deep" | |
| def _detect_parallelism(self, components: List[ComponentInfo]) -> bool: | |
| """Detect parallel execution patterns universally""" | |
| depth_groups = {} | |
| for comp in components: | |
| if comp.depth not in depth_groups: | |
| depth_groups[comp.depth] = 0 | |
| depth_groups[comp.depth] += 1 | |
| # If any depth level has multiple components, parallelism is detected | |
| return any(count > 1 for count in depth_groups.values()) | |
| def _calculate_execution_time(self, start_time: str, end_time: str) -> int: | |
| """Universal execution time calculation""" | |
| try: | |
| from datetime import datetime | |
| if not start_time or not end_time: | |
| return 0 | |
| # Try multiple timestamp formats | |
| formats = [ | |
| '%Y-%m-%d %H:%M:%S.%f', | |
| '%Y-%m-%dT%H:%M:%S.%f+00:00', | |
| '%Y-%m-%dT%H:%M:%S.%fZ', | |
| '%Y-%m-%dT%H:%M:%S+00:00' | |
| ] | |
| for fmt in formats: | |
| try: | |
| start = datetime.strptime(start_time.replace('Z', ''), fmt.replace('+00:00', '')) | |
| end = datetime.strptime(end_time.replace('Z', ''), fmt.replace('+00:00', '')) | |
| return int((end - start).total_seconds() * 1000) | |
| except ValueError: | |
| continue | |
| return 0 | |
| except Exception: | |
| return 0 | |
| def generate_universal_context_documents(self, parsed_trace: Dict[str, Any]) -> List[Dict[str, str]]: | |
| """Generate focused context documents that directly assist knowledge extraction""" | |
| documents = [] | |
| schema = parsed_trace.get('schema') | |
| global_view = parsed_trace.get('global_schema_view') | |
| if not schema or not hasattr(schema, 'total_components'): | |
| return documents | |
| metadata = parsed_trace.get('metadata', {}) | |
| # Document 1: Global Schema Overview (NEW) | |
| if global_view: | |
| overview_content = f"""**GLOBAL TRACE SCHEMA OVERVIEW** | |
| {global_view.architecture_description} | |
| **EXECUTION CHARACTERISTICS:** | |
| {global_view.execution_flow_summary} | |
| **NUMERICAL OVERVIEW:** | |
| β’ **Components:** {global_view.numerical_overview['component_stats']['total_components']} total ({global_view.numerical_overview['component_stats']['unique_component_types']} types) | |
| β’ **Depth:** {global_view.numerical_overview['component_stats']['max_depth']} levels maximum | |
| β’ **Execution Time:** {global_view.numerical_overview['timing_analytics']['total_execution_time_seconds']:.2f}s total (avg: {global_view.numerical_overview['timing_analytics']['avg_execution_time_ms']:.1f}ms per component) | |
| β’ **Token Usage:** {global_view.numerical_overview['token_analytics']['total_tokens']} total tokens ({global_view.numerical_overview['token_analytics']['total_prompt_tokens']} input, {global_view.numerical_overview['token_analytics']['total_completion_tokens']} output) | |
| β’ **Prompt Calls:** {global_view.prompt_analytics.get('prompt_calls_detected', 0)} LLM calls (avg: {global_view.prompt_analytics.get('token_statistics', {}).get('avg_prompt_tokens', 0):.0f} input, {global_view.prompt_analytics.get('token_statistics', {}).get('avg_completion_tokens', 0):.0f} output tokens) | |
| β’ **Success Rate:** {global_view.numerical_overview['component_stats']['success_rate']:.1f}% | |
| **SYSTEM COMPLEXITY:** {global_view.system_complexity_assessment} | |
| **Context for Current Processing Window:** This global overview provides architectural context for understanding the trace structure during chunked processing.""" | |
| documents.append({ | |
| "document_type": "global_schema", | |
| "title": "Global Schema Architecture Overview", | |
| "content": overview_content | |
| }) | |
| # Document 2: Component Entity Mapping Guide (Enhanced) | |
| component_types = list(schema.component_types) | |
| # Create entity type guidance based on detected components | |
| entity_guidance = [] | |
| if 'chain' in component_types: | |
| entity_guidance.append("β’ **Chain components** β Extract as Task entities (workflow steps)") | |
| if 'llm' in component_types: | |
| entity_guidance.append("β’ **LLM components** β Extract as Agent entities (AI processing units)") | |
| if 'prompt' in component_types: | |
| entity_guidance.append("β’ **Prompt components** β Extract content as Agent system prompts") | |
| if 'tool' in component_types: | |
| entity_guidance.append("β’ **Tool components** β Extract as Tool entities with function definitions") | |
| if 'retriever' in component_types: | |
| entity_guidance.append("β’ **Retriever components** β Extract as Tool entities (data access tools)") | |
| if 'parser' in component_types: | |
| entity_guidance.append("β’ **Parser components** β Extract as Tool entities (data processing tools)") | |
| component_content = f"""**Entity Extraction Guidance for Detected Components:** | |
| {chr(10).join(entity_guidance)} | |
| **Expected Entity Count:** {schema.total_components} total components detected | |
| **System Complexity:** {'Simple' if schema.total_components < 10 else 'Moderate' if schema.total_components < 50 else 'Complex'} - expect {'basic entity types' if schema.total_components < 10 else 'diverse entity types and relationships'}""" | |
| documents.append({ | |
| "document_type": "component_structure", | |
| "title": "Component-to-Entity Mapping Guide", | |
| "content": component_content | |
| }) | |
| # Document 3: Relationship Pattern Guide (Enhanced) | |
| relationship_guidance = [] | |
| if schema.parallelism_detected: | |
| relationship_guidance.append("β’ **Parallel execution detected** β Look for multiple PERFORMS relationships from different agents to concurrent tasks") | |
| relationship_guidance.append("β’ **Expect USES relationships** β Multiple agents likely share common tools") | |
| else: | |
| relationship_guidance.append("β’ **Sequential execution detected** β Look for NEXT relationships between sequential tasks") | |
| relationship_guidance.append("β’ **Linear workflow** β Expect simple AgentβTaskβOutput chains") | |
| if schema.max_depth > 2: | |
| relationship_guidance.append("β’ **Deep nesting detected** β Look for SUBTASK_OF relationships in hierarchical workflows") | |
| if schema.error_components > 0: | |
| relationship_guidance.append(f"β’ **{schema.error_components} error(s) detected** β Look for INTERVENES relationships where agents correct failed tasks") | |
| # Add prompt call patterns | |
| if global_view and global_view.prompt_analytics.get('prompt_calls_detected', 0) > 0: | |
| prompt_calls = global_view.prompt_analytics['prompt_calls_detected'] | |
| relationship_guidance.append(f"β’ **{prompt_calls} prompt calls detected** β Look for QUERIES relationships between agents and LLM services") | |
| relationship_content = f"""**Relationship Detection Guidance:** | |
| {chr(10).join(relationship_guidance)} | |
| **Expected Relationship Density:** {schema.max_depth} execution layer{'s' if schema.max_depth != 1 else ''} suggests {'simple linear relationships' if schema.max_depth <= 1 else 'complex multi-layered relationships'}""" | |
| documents.append({ | |
| "document_type": "execution_pattern", | |
| "title": "Relationship Pattern Guide", | |
| "content": relationship_content | |
| }) | |
| # Document 4: System Domain Classification (Enhanced) | |
| system_classification = "Unknown Domain" | |
| domain_guidance = [] | |
| # Classify based on component patterns | |
| if 'retriever' in component_types and 'llm' in component_types: | |
| system_classification = "RAG (Retrieval-Augmented Generation) System" | |
| domain_guidance.extend([ | |
| "β’ **Expected entities:** Search agents, knowledge retrieval tools, document processing tasks", | |
| "β’ **Expected relationships:** Agents USE retrieval tools, tasks CONSUME search results", | |
| "β’ **Common patterns:** Query processing β Knowledge retrieval β Answer generation" | |
| ]) | |
| elif 'tool' in component_types and len(component_types) > 3: | |
| system_classification = "Multi-Agent Tool-Using System" | |
| domain_guidance.extend([ | |
| "β’ **Expected entities:** Specialized agents for different tools, coordination tasks", | |
| "β’ **Expected relationships:** Agents USE specific tools, tasks may be ASSIGNED_TO specialist agents", | |
| "β’ **Common patterns:** Task delegation β Tool usage β Result aggregation" | |
| ]) | |
| elif 'chain' in component_types and 'llm' in component_types: | |
| system_classification = "LLM Chain Processing System" | |
| domain_guidance.extend([ | |
| "β’ **Expected entities:** Processing stages as tasks, LLM agents, data transformation tools", | |
| "β’ **Expected relationships:** Sequential NEXT relationships, agents PERFORM processing tasks", | |
| "β’ **Common patterns:** Input processing β LLM reasoning β Output formatting" | |
| ]) | |
| if not domain_guidance: | |
| domain_guidance.append("β’ **Generic system** β Look for standard agent-task-tool patterns") | |
| domain_content = f"""**System Type:** {system_classification} | |
| **Domain-Specific Extraction Guidance:** | |
| {chr(10).join(domain_guidance)} | |
| **Error Context:** {'No errors detected - expect clean entity/relationship extraction' if schema.error_components == 0 else f'{schema.error_components} error(s) present - look for failure handling and recovery patterns'}""" | |
| documents.append({ | |
| "document_type": "system_indicators", | |
| "title": "Domain Classification Guide", | |
| "content": domain_content | |
| }) | |
| return documents | |
| def _generate_directory_summary(self, results: List[Dict[str, Any]]) -> Dict[str, Any]: | |
| """Generate universal summary of all traces in a directory""" | |
| successful_results = [r for r in results if 'error' not in r] | |
| if not successful_results: | |
| return {"error": "No successful parses"} | |
| # Aggregate universal statistics | |
| total_components = sum( | |
| r.get('schema', TraceSchema(0, 0, set(), "", False, 0, {}, set())).total_components | |
| for r in successful_results | |
| ) | |
| all_component_types = set() | |
| all_topologies = set() | |
| total_errors = 0 | |
| for result in successful_results: | |
| schema = result.get('schema') | |
| if schema and hasattr(schema, 'component_types'): | |
| all_component_types.update(schema.component_types) | |
| all_topologies.add(schema.execution_topology) | |
| total_errors += schema.error_components | |
| return { | |
| "total_files": len(results), | |
| "successful_parses": len(successful_results), | |
| "failed_parses": len(results) - len(successful_results), | |
| "total_components": total_components, | |
| "unique_component_types": list(all_component_types), | |
| "execution_topologies": list(all_topologies), | |
| "total_error_components": total_errors | |
| } | |
| def main(): | |
| """Main function to demonstrate universal parsing""" | |
| parser = GenericLangSmithParser() | |
| print("=" * 80) | |
| print("UNIVERSAL LANGSMITH TRACE PARSER") | |
| print("=" * 80) | |
| # Test files - adjusted for when running from parsers directory | |
| project_root = Path(__file__).parent.parent.parent.parent | |
| test_files = [ | |
| (project_root / "logs" / "archive" / "RunnableSequence_RunnableSequence_09d61de5-06df-43cb-9542-30bb50052015_raw.json (1).json", "LCEL Chain"), | |
| (project_root / "logs" / "Sample Agent Trace_Sample Agent Trace_89561000-079d-4d24-9c72-7c8f88b6d579_raw.json.json", "Agent Trace") | |
| ] | |
| # Parse individual files | |
| for file_path, trace_name in test_files: | |
| if not file_path.exists(): | |
| print(f"\nβ File not found: {file_path}") | |
| continue | |
| print(f"\nπ ANALYZING: {trace_name}") | |
| print("-" * 50) | |
| try: | |
| result = parser.parse_trace_file(str(file_path)) | |
| # Display universal metadata | |
| metadata = result.get('metadata', {}) | |
| print(f"π Universal Metadata:") | |
| for key, value in metadata.items(): | |
| if isinstance(value, (str, int, float)): | |
| print(f" {key}: {value}") | |
| # Display universal schema | |
| schema = result.get('schema') | |
| if schema and hasattr(schema, 'total_components'): | |
| print(f"\nποΈ Universal Schema:") | |
| print(f" Components: {schema.total_components}") | |
| print(f" Component Types: {', '.join(list(schema.component_types)[:5])}") | |
| print(f" Execution Topology: {schema.execution_topology}") | |
| print(f" Max Depth: {schema.max_depth}") | |
| print(f" Parallelism: {'Yes' if schema.parallelism_detected else 'No'}") | |
| print(f" Error Components: {schema.error_components}") | |
| print(f"\nβ‘ Performance:") | |
| perf = schema.performance_metrics | |
| print(f" Total Time: {perf['total_execution_time_ms']}ms") | |
| print(f" Total Tokens: {perf['total_tokens']}") | |
| print(f" Token Efficiency: {perf['token_efficiency']:.2f} tokens/ms") | |
| # Generate universal context documents | |
| context_docs = parser.generate_universal_context_documents(result) | |
| if context_docs: | |
| print(f"\nπ Universal Context Documents ({len(context_docs)}):") | |
| for i, doc in enumerate(context_docs, 1): | |
| print(f" {i}. [{doc['document_type']}] {doc['title']}") | |
| print(f" {doc['content'][:100]}...") | |
| except Exception as e: | |
| print(f"β Error parsing {trace_name}: {str(e)}") | |
| # Parse open_deepresearch directory | |
| open_deepresearch_dir = project_root / "logs" / "open_deepresearch" | |
| if open_deepresearch_dir.exists(): | |
| print(f"\nποΈ ANALYZING DIRECTORY: {open_deepresearch_dir}") | |
| print("=" * 50) | |
| try: | |
| batch_result = parser.parse_directory(str(open_deepresearch_dir), max_files=2) | |
| # Display universal directory summary | |
| summary = batch_result.get('summary', {}) | |
| if 'error' not in summary: | |
| print(f"\nπ Universal Directory Summary:") | |
| print(f" Files Processed: {summary['successful_parses']}/{summary['total_files']}") | |
| print(f" Total Components: {summary['total_components']}") | |
| print(f" Component Types: {', '.join(summary['unique_component_types'][:5])}") | |
| print(f" Execution Topologies: {', '.join(summary['execution_topologies'])}") | |
| print(f" Error Components: {summary['total_error_components']}") | |
| except Exception as e: | |
| print(f"β Error processing directory: {str(e)}") | |
| print("\n" + "=" * 80) | |
| if __name__ == "__main__": | |
| main() |