""" Base Trace Parser Interface Defines the common interface and data structures for platform-specific trace parsers. Each parser extracts structured metadata that is guaranteed to be present in traces from that specific platform. """ from abc import ABC, abstractmethod from typing import Dict, List, Any, Optional, Union from dataclasses import dataclass, field from datetime import datetime import json import logging logger = logging.getLogger(__name__) @dataclass class AgentInfo: """Information about an agent found in the trace""" name: str role: Optional[str] = None agent_type: Optional[str] = None # e.g., "llm", "tool", "chain" model: Optional[str] = None parameters: Optional[Dict[str, Any]] = None first_appearance_line: Optional[int] = None @dataclass class ToolInfo: """Information about a tool found in the trace""" name: str tool_type: Optional[str] = None # e.g., "function", "api", "external" description: Optional[str] = None parameters: Optional[Dict[str, Any]] = None usage_count: int = 0 first_appearance_line: Optional[int] = None @dataclass class WorkflowInfo: """Information about the workflow structure""" workflow_type: Optional[str] = None # e.g., "sequential", "parallel", "hierarchical" total_steps: Optional[int] = None project_name: Optional[str] = None run_id: Optional[str] = None start_time: Optional[datetime] = None end_time: Optional[datetime] = None duration_ms: Optional[float] = None @dataclass class DataFlowInfo: """Information about data flows and transformations""" input_types: List[str] = field(default_factory=list) output_types: List[str] = field(default_factory=list) intermediate_data_types: List[str] = field(default_factory=list) transformation_patterns: List[str] = field(default_factory=list) @dataclass class ParsedMetadata: """ Structured metadata extracted from a platform-specific trace. This complements the multi-agent knowledge extractor by providing reliable structural information that can guide the extraction process. """ # Source information platform: str trace_source: str confidence: float # 0.0-1.0 confidence in parsing accuracy # Core structural information agents: List[AgentInfo] = field(default_factory=list) tools: List[ToolInfo] = field(default_factory=list) workflow: Optional[WorkflowInfo] = None data_flow: Optional[DataFlowInfo] = None # Platform-specific raw data (preserved for reference) raw_platform_data: Optional[Dict[str, Any]] = None # Extraction hints for the knowledge extractor extraction_hints: Dict[str, Any] = field(default_factory=dict) # Context document suggestions suggested_context_types: List[str] = field(default_factory=list) def to_dict(self) -> Dict[str, Any]: """Convert to dictionary for serialization""" return { 'platform': self.platform, 'trace_source': self.trace_source, 'confidence': self.confidence, 'agents': [ { 'name': agent.name, 'role': agent.role, 'agent_type': agent.agent_type, 'model': agent.model, 'parameters': agent.parameters, 'first_appearance_line': agent.first_appearance_line } for agent in self.agents ], 'tools': [ { 'name': tool.name, 'tool_type': tool.tool_type, 'description': tool.description, 'parameters': tool.parameters, 'usage_count': tool.usage_count, 'first_appearance_line': tool.first_appearance_line } for tool in self.tools ], 'workflow': { 'workflow_type': self.workflow.workflow_type if self.workflow else None, 'total_steps': self.workflow.total_steps if self.workflow else None, 'project_name': self.workflow.project_name if self.workflow else None, 'run_id': self.workflow.run_id if self.workflow else None, 'start_time': self.workflow.start_time.isoformat() if self.workflow and self.workflow.start_time else None, 'end_time': self.workflow.end_time.isoformat() if self.workflow and self.workflow.end_time else None, 'duration_ms': self.workflow.duration_ms if self.workflow else None } if self.workflow else None, 'data_flow': { 'input_types': self.data_flow.input_types if self.data_flow else [], 'output_types': self.data_flow.output_types if self.data_flow else [], 'intermediate_data_types': self.data_flow.intermediate_data_types if self.data_flow else [], 'transformation_patterns': self.data_flow.transformation_patterns if self.data_flow else [] } if self.data_flow else None, 'extraction_hints': self.extraction_hints, 'suggested_context_types': self.suggested_context_types } class BaseTraceParser(ABC): """ Abstract base class for platform-specific trace parsers. Each parser is responsible for extracting structured metadata that is guaranteed to be present in traces from that specific platform. """ def __init__(self): self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}") @property @abstractmethod def platform_name(self) -> str: """Return the name of the platform this parser handles""" pass @property @abstractmethod def supported_trace_types(self) -> List[str]: """Return list of trace types this parser can handle""" pass @abstractmethod def can_parse(self, trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> bool: """ Determine if this parser can handle the given trace. Args: trace_content: Raw trace content trace_metadata: Optional metadata from database Returns: True if this parser can handle the trace """ pass @abstractmethod def parse_trace(self, trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> ParsedMetadata: """ Parse the trace and extract structured metadata. Args: trace_content: Raw trace content trace_metadata: Optional metadata from database Returns: ParsedMetadata object with extracted information """ pass def _safe_json_parse(self, content: str) -> Optional[Dict[str, Any]]: """Safely parse JSON content, returning None if invalid""" try: return json.loads(content) except (json.JSONDecodeError, TypeError): return None def _extract_timestamps(self, data: Dict[str, Any]) -> tuple[Optional[datetime], Optional[datetime]]: """Extract start and end timestamps from platform data""" start_time = None end_time = None # Common timestamp field names start_fields = ['start_time', 'startTime', 'created_at', 'createdAt', 'timestamp'] end_fields = ['end_time', 'endTime', 'finished_at', 'finishedAt', 'completed_at'] for field in start_fields: if field in data and data[field]: try: if isinstance(data[field], str): start_time = datetime.fromisoformat(data[field].replace('Z', '+00:00')) elif isinstance(data[field], (int, float)): start_time = datetime.fromtimestamp(data[field]) break except (ValueError, TypeError): continue for field in end_fields: if field in data and data[field]: try: if isinstance(data[field], str): end_time = datetime.fromisoformat(data[field].replace('Z', '+00:00')) elif isinstance(data[field], (int, float)): end_time = datetime.fromtimestamp(data[field]) break except (ValueError, TypeError): continue return start_time, end_time def _calculate_duration(self, start_time: Optional[datetime], end_time: Optional[datetime]) -> Optional[float]: """Calculate duration in milliseconds between start and end times""" if start_time and end_time: try: delta = end_time - start_time return delta.total_seconds() * 1000 except (TypeError, ValueError): return None return None def generate_extraction_hints(self, parsed_metadata: ParsedMetadata) -> Dict[str, Any]: """ Generate hints for the multi-agent knowledge extractor based on parsed metadata. This method can be overridden by specific parsers to provide platform-specific hints. """ hints = {} # Agent-related hints if parsed_metadata.agents: hints['expected_agent_count'] = len(parsed_metadata.agents) hints['agent_types'] = list(set(agent.agent_type for agent in parsed_metadata.agents if agent.agent_type)) hints['agent_names'] = [agent.name for agent in parsed_metadata.agents] # Tool-related hints if parsed_metadata.tools: hints['expected_tool_count'] = len(parsed_metadata.tools) hints['tool_types'] = list(set(tool.tool_type for tool in parsed_metadata.tools if tool.tool_type)) hints['tool_names'] = [tool.name for tool in parsed_metadata.tools] # Workflow hints if parsed_metadata.workflow: if parsed_metadata.workflow.workflow_type: hints['workflow_pattern'] = parsed_metadata.workflow.workflow_type if parsed_metadata.workflow.total_steps: hints['expected_task_count'] = parsed_metadata.workflow.total_steps # Data flow hints if parsed_metadata.data_flow: hints['input_types'] = parsed_metadata.data_flow.input_types hints['output_types'] = parsed_metadata.data_flow.output_types hints['transformation_patterns'] = parsed_metadata.data_flow.transformation_patterns return hints