Spaces:
Running
Running
| """ | |
| Base Trace Parser Interface | |
| Defines the common interface and data structures for platform-specific trace parsers. | |
| Each parser extracts structured metadata that is guaranteed to be present in traces | |
| from that specific platform. | |
| """ | |
| from abc import ABC, abstractmethod | |
| from typing import Dict, List, Any, Optional, Union | |
| from dataclasses import dataclass, field | |
| from datetime import datetime | |
| import json | |
| import logging | |
| logger = logging.getLogger(__name__) | |
| class AgentInfo: | |
| """Information about an agent found in the trace""" | |
| name: str | |
| role: Optional[str] = None | |
| agent_type: Optional[str] = None # e.g., "llm", "tool", "chain" | |
| model: Optional[str] = None | |
| parameters: Optional[Dict[str, Any]] = None | |
| first_appearance_line: Optional[int] = None | |
| class ToolInfo: | |
| """Information about a tool found in the trace""" | |
| name: str | |
| tool_type: Optional[str] = None # e.g., "function", "api", "external" | |
| description: Optional[str] = None | |
| parameters: Optional[Dict[str, Any]] = None | |
| usage_count: int = 0 | |
| first_appearance_line: Optional[int] = None | |
| class WorkflowInfo: | |
| """Information about the workflow structure""" | |
| workflow_type: Optional[str] = None # e.g., "sequential", "parallel", "hierarchical" | |
| total_steps: Optional[int] = None | |
| project_name: Optional[str] = None | |
| run_id: Optional[str] = None | |
| start_time: Optional[datetime] = None | |
| end_time: Optional[datetime] = None | |
| duration_ms: Optional[float] = None | |
| class DataFlowInfo: | |
| """Information about data flows and transformations""" | |
| input_types: List[str] = field(default_factory=list) | |
| output_types: List[str] = field(default_factory=list) | |
| intermediate_data_types: List[str] = field(default_factory=list) | |
| transformation_patterns: List[str] = field(default_factory=list) | |
| class ParsedMetadata: | |
| """ | |
| Structured metadata extracted from a platform-specific trace. | |
| This complements the multi-agent knowledge extractor by providing | |
| reliable structural information that can guide the extraction process. | |
| """ | |
| # Source information | |
| platform: str | |
| trace_source: str | |
| confidence: float # 0.0-1.0 confidence in parsing accuracy | |
| # Core structural information | |
| agents: List[AgentInfo] = field(default_factory=list) | |
| tools: List[ToolInfo] = field(default_factory=list) | |
| workflow: Optional[WorkflowInfo] = None | |
| data_flow: Optional[DataFlowInfo] = None | |
| # Platform-specific raw data (preserved for reference) | |
| raw_platform_data: Optional[Dict[str, Any]] = None | |
| # Extraction hints for the knowledge extractor | |
| extraction_hints: Dict[str, Any] = field(default_factory=dict) | |
| # Context document suggestions | |
| suggested_context_types: List[str] = field(default_factory=list) | |
| def to_dict(self) -> Dict[str, Any]: | |
| """Convert to dictionary for serialization""" | |
| return { | |
| 'platform': self.platform, | |
| 'trace_source': self.trace_source, | |
| 'confidence': self.confidence, | |
| 'agents': [ | |
| { | |
| 'name': agent.name, | |
| 'role': agent.role, | |
| 'agent_type': agent.agent_type, | |
| 'model': agent.model, | |
| 'parameters': agent.parameters, | |
| 'first_appearance_line': agent.first_appearance_line | |
| } | |
| for agent in self.agents | |
| ], | |
| 'tools': [ | |
| { | |
| 'name': tool.name, | |
| 'tool_type': tool.tool_type, | |
| 'description': tool.description, | |
| 'parameters': tool.parameters, | |
| 'usage_count': tool.usage_count, | |
| 'first_appearance_line': tool.first_appearance_line | |
| } | |
| for tool in self.tools | |
| ], | |
| 'workflow': { | |
| 'workflow_type': self.workflow.workflow_type if self.workflow else None, | |
| 'total_steps': self.workflow.total_steps if self.workflow else None, | |
| 'project_name': self.workflow.project_name if self.workflow else None, | |
| 'run_id': self.workflow.run_id if self.workflow else None, | |
| 'start_time': self.workflow.start_time.isoformat() if self.workflow and self.workflow.start_time else None, | |
| 'end_time': self.workflow.end_time.isoformat() if self.workflow and self.workflow.end_time else None, | |
| 'duration_ms': self.workflow.duration_ms if self.workflow else None | |
| } if self.workflow else None, | |
| 'data_flow': { | |
| 'input_types': self.data_flow.input_types if self.data_flow else [], | |
| 'output_types': self.data_flow.output_types if self.data_flow else [], | |
| 'intermediate_data_types': self.data_flow.intermediate_data_types if self.data_flow else [], | |
| 'transformation_patterns': self.data_flow.transformation_patterns if self.data_flow else [] | |
| } if self.data_flow else None, | |
| 'extraction_hints': self.extraction_hints, | |
| 'suggested_context_types': self.suggested_context_types | |
| } | |
| class BaseTraceParser(ABC): | |
| """ | |
| Abstract base class for platform-specific trace parsers. | |
| Each parser is responsible for extracting structured metadata that is | |
| guaranteed to be present in traces from that specific platform. | |
| """ | |
| def __init__(self): | |
| self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") | |
| def platform_name(self) -> str: | |
| """Return the name of the platform this parser handles""" | |
| pass | |
| def supported_trace_types(self) -> List[str]: | |
| """Return list of trace types this parser can handle""" | |
| pass | |
| def can_parse(self, trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> bool: | |
| """ | |
| Determine if this parser can handle the given trace. | |
| Args: | |
| trace_content: Raw trace content | |
| trace_metadata: Optional metadata from database | |
| Returns: | |
| True if this parser can handle the trace | |
| """ | |
| pass | |
| def parse_trace(self, trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> ParsedMetadata: | |
| """ | |
| Parse the trace and extract structured metadata. | |
| Args: | |
| trace_content: Raw trace content | |
| trace_metadata: Optional metadata from database | |
| Returns: | |
| ParsedMetadata object with extracted information | |
| """ | |
| pass | |
| def _safe_json_parse(self, content: str) -> Optional[Dict[str, Any]]: | |
| """Safely parse JSON content, returning None if invalid""" | |
| try: | |
| return json.loads(content) | |
| except (json.JSONDecodeError, TypeError): | |
| return None | |
| def _extract_timestamps(self, data: Dict[str, Any]) -> tuple[Optional[datetime], Optional[datetime]]: | |
| """Extract start and end timestamps from platform data""" | |
| start_time = None | |
| end_time = None | |
| # Common timestamp field names | |
| start_fields = ['start_time', 'startTime', 'created_at', 'createdAt', 'timestamp'] | |
| end_fields = ['end_time', 'endTime', 'finished_at', 'finishedAt', 'completed_at'] | |
| for field in start_fields: | |
| if field in data and data[field]: | |
| try: | |
| if isinstance(data[field], str): | |
| start_time = datetime.fromisoformat(data[field].replace('Z', '+00:00')) | |
| elif isinstance(data[field], (int, float)): | |
| start_time = datetime.fromtimestamp(data[field]) | |
| break | |
| except (ValueError, TypeError): | |
| continue | |
| for field in end_fields: | |
| if field in data and data[field]: | |
| try: | |
| if isinstance(data[field], str): | |
| end_time = datetime.fromisoformat(data[field].replace('Z', '+00:00')) | |
| elif isinstance(data[field], (int, float)): | |
| end_time = datetime.fromtimestamp(data[field]) | |
| break | |
| except (ValueError, TypeError): | |
| continue | |
| return start_time, end_time | |
| def _calculate_duration(self, start_time: Optional[datetime], end_time: Optional[datetime]) -> Optional[float]: | |
| """Calculate duration in milliseconds between start and end times""" | |
| if start_time and end_time: | |
| try: | |
| delta = end_time - start_time | |
| return delta.total_seconds() * 1000 | |
| except (TypeError, ValueError): | |
| return None | |
| return None | |
| def generate_extraction_hints(self, parsed_metadata: ParsedMetadata) -> Dict[str, Any]: | |
| """ | |
| Generate hints for the multi-agent knowledge extractor based on parsed metadata. | |
| This method can be overridden by specific parsers to provide platform-specific hints. | |
| """ | |
| hints = {} | |
| # Agent-related hints | |
| if parsed_metadata.agents: | |
| hints['expected_agent_count'] = len(parsed_metadata.agents) | |
| hints['agent_types'] = list(set(agent.agent_type for agent in parsed_metadata.agents if agent.agent_type)) | |
| hints['agent_names'] = [agent.name for agent in parsed_metadata.agents] | |
| # Tool-related hints | |
| if parsed_metadata.tools: | |
| hints['expected_tool_count'] = len(parsed_metadata.tools) | |
| hints['tool_types'] = list(set(tool.tool_type for tool in parsed_metadata.tools if tool.tool_type)) | |
| hints['tool_names'] = [tool.name for tool in parsed_metadata.tools] | |
| # Workflow hints | |
| if parsed_metadata.workflow: | |
| if parsed_metadata.workflow.workflow_type: | |
| hints['workflow_pattern'] = parsed_metadata.workflow.workflow_type | |
| if parsed_metadata.workflow.total_steps: | |
| hints['expected_task_count'] = parsed_metadata.workflow.total_steps | |
| # Data flow hints | |
| if parsed_metadata.data_flow: | |
| hints['input_types'] = parsed_metadata.data_flow.input_types | |
| hints['output_types'] = parsed_metadata.data_flow.output_types | |
| hints['transformation_patterns'] = parsed_metadata.data_flow.transformation_patterns | |
| return hints |