File size: 10,763 Bytes
c2ea5ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
"""
Base Trace Parser Interface

Defines the common interface and data structures for platform-specific trace parsers.
Each parser extracts structured metadata that is guaranteed to be present in traces
from that specific platform.
"""

from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional, Union
from dataclasses import dataclass, field
from datetime import datetime
import json
import logging

logger = logging.getLogger(__name__)


@dataclass
class AgentInfo:
    """Information about an agent found in the trace"""
    name: str
    role: Optional[str] = None
    agent_type: Optional[str] = None  # e.g., "llm", "tool", "chain"
    model: Optional[str] = None
    parameters: Optional[Dict[str, Any]] = None
    first_appearance_line: Optional[int] = None


@dataclass 
class ToolInfo:
    """Information about a tool found in the trace"""
    name: str
    tool_type: Optional[str] = None  # e.g., "function", "api", "external"
    description: Optional[str] = None
    parameters: Optional[Dict[str, Any]] = None
    usage_count: int = 0
    first_appearance_line: Optional[int] = None


@dataclass
class WorkflowInfo:
    """Information about the workflow structure"""
    workflow_type: Optional[str] = None  # e.g., "sequential", "parallel", "hierarchical"
    total_steps: Optional[int] = None
    project_name: Optional[str] = None
    run_id: Optional[str] = None
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None
    duration_ms: Optional[float] = None


@dataclass
class DataFlowInfo:
    """Information about data flows and transformations"""
    input_types: List[str] = field(default_factory=list)
    output_types: List[str] = field(default_factory=list)
    intermediate_data_types: List[str] = field(default_factory=list)
    transformation_patterns: List[str] = field(default_factory=list)


@dataclass
class ParsedMetadata:
    """
    Structured metadata extracted from a platform-specific trace.
    
    This complements the multi-agent knowledge extractor by providing
    reliable structural information that can guide the extraction process.
    """
    # Source information
    platform: str
    trace_source: str
    confidence: float  # 0.0-1.0 confidence in parsing accuracy
    
    # Core structural information
    agents: List[AgentInfo] = field(default_factory=list)
    tools: List[ToolInfo] = field(default_factory=list)
    workflow: Optional[WorkflowInfo] = None
    data_flow: Optional[DataFlowInfo] = None
    
    # Platform-specific raw data (preserved for reference)
    raw_platform_data: Optional[Dict[str, Any]] = None
    
    # Extraction hints for the knowledge extractor
    extraction_hints: Dict[str, Any] = field(default_factory=dict)
    
    # Context document suggestions
    suggested_context_types: List[str] = field(default_factory=list)
    
    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary for serialization"""
        return {
            'platform': self.platform,
            'trace_source': self.trace_source,
            'confidence': self.confidence,
            'agents': [
                {
                    'name': agent.name,
                    'role': agent.role,
                    'agent_type': agent.agent_type,
                    'model': agent.model,
                    'parameters': agent.parameters,
                    'first_appearance_line': agent.first_appearance_line
                }
                for agent in self.agents
            ],
            'tools': [
                {
                    'name': tool.name,
                    'tool_type': tool.tool_type,
                    'description': tool.description,
                    'parameters': tool.parameters,
                    'usage_count': tool.usage_count,
                    'first_appearance_line': tool.first_appearance_line
                }
                for tool in self.tools
            ],
            'workflow': {
                'workflow_type': self.workflow.workflow_type if self.workflow else None,
                'total_steps': self.workflow.total_steps if self.workflow else None,
                'project_name': self.workflow.project_name if self.workflow else None,
                'run_id': self.workflow.run_id if self.workflow else None,
                'start_time': self.workflow.start_time.isoformat() if self.workflow and self.workflow.start_time else None,
                'end_time': self.workflow.end_time.isoformat() if self.workflow and self.workflow.end_time else None,
                'duration_ms': self.workflow.duration_ms if self.workflow else None
            } if self.workflow else None,
            'data_flow': {
                'input_types': self.data_flow.input_types if self.data_flow else [],
                'output_types': self.data_flow.output_types if self.data_flow else [],
                'intermediate_data_types': self.data_flow.intermediate_data_types if self.data_flow else [],
                'transformation_patterns': self.data_flow.transformation_patterns if self.data_flow else []
            } if self.data_flow else None,
            'extraction_hints': self.extraction_hints,
            'suggested_context_types': self.suggested_context_types
        }


class BaseTraceParser(ABC):
    """
    Abstract base class for platform-specific trace parsers.
    
    Each parser is responsible for extracting structured metadata that is
    guaranteed to be present in traces from that specific platform.
    """
    
    def __init__(self):
        self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
    
    @property
    @abstractmethod
    def platform_name(self) -> str:
        """Return the name of the platform this parser handles"""
        pass
    
    @property
    @abstractmethod
    def supported_trace_types(self) -> List[str]:
        """Return list of trace types this parser can handle"""
        pass
    
    @abstractmethod
    def can_parse(self, trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> bool:
        """
        Determine if this parser can handle the given trace.
        
        Args:
            trace_content: Raw trace content
            trace_metadata: Optional metadata from database
            
        Returns:
            True if this parser can handle the trace
        """
        pass
    
    @abstractmethod
    def parse_trace(self, trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> ParsedMetadata:
        """
        Parse the trace and extract structured metadata.
        
        Args:
            trace_content: Raw trace content
            trace_metadata: Optional metadata from database
            
        Returns:
            ParsedMetadata object with extracted information
        """
        pass
    
    def _safe_json_parse(self, content: str) -> Optional[Dict[str, Any]]:
        """Safely parse JSON content, returning None if invalid"""
        try:
            return json.loads(content)
        except (json.JSONDecodeError, TypeError):
            return None
    
    def _extract_timestamps(self, data: Dict[str, Any]) -> tuple[Optional[datetime], Optional[datetime]]:
        """Extract start and end timestamps from platform data"""
        start_time = None
        end_time = None
        
        # Common timestamp field names
        start_fields = ['start_time', 'startTime', 'created_at', 'createdAt', 'timestamp']
        end_fields = ['end_time', 'endTime', 'finished_at', 'finishedAt', 'completed_at']
        
        for field in start_fields:
            if field in data and data[field]:
                try:
                    if isinstance(data[field], str):
                        start_time = datetime.fromisoformat(data[field].replace('Z', '+00:00'))
                    elif isinstance(data[field], (int, float)):
                        start_time = datetime.fromtimestamp(data[field])
                    break
                except (ValueError, TypeError):
                    continue
        
        for field in end_fields:
            if field in data and data[field]:
                try:
                    if isinstance(data[field], str):
                        end_time = datetime.fromisoformat(data[field].replace('Z', '+00:00'))
                    elif isinstance(data[field], (int, float)):
                        end_time = datetime.fromtimestamp(data[field])
                    break
                except (ValueError, TypeError):
                    continue
        
        return start_time, end_time
    
    def _calculate_duration(self, start_time: Optional[datetime], end_time: Optional[datetime]) -> Optional[float]:
        """Calculate duration in milliseconds between start and end times"""
        if start_time and end_time:
            try:
                delta = end_time - start_time
                return delta.total_seconds() * 1000
            except (TypeError, ValueError):
                return None
        return None
    
    def generate_extraction_hints(self, parsed_metadata: ParsedMetadata) -> Dict[str, Any]:
        """
        Generate hints for the multi-agent knowledge extractor based on parsed metadata.
        
        This method can be overridden by specific parsers to provide platform-specific hints.
        """
        hints = {}
        
        # Agent-related hints
        if parsed_metadata.agents:
            hints['expected_agent_count'] = len(parsed_metadata.agents)
            hints['agent_types'] = list(set(agent.agent_type for agent in parsed_metadata.agents if agent.agent_type))
            hints['agent_names'] = [agent.name for agent in parsed_metadata.agents]
        
        # Tool-related hints
        if parsed_metadata.tools:
            hints['expected_tool_count'] = len(parsed_metadata.tools)
            hints['tool_types'] = list(set(tool.tool_type for tool in parsed_metadata.tools if tool.tool_type))
            hints['tool_names'] = [tool.name for tool in parsed_metadata.tools]
        
        # Workflow hints
        if parsed_metadata.workflow:
            if parsed_metadata.workflow.workflow_type:
                hints['workflow_pattern'] = parsed_metadata.workflow.workflow_type
            if parsed_metadata.workflow.total_steps:
                hints['expected_task_count'] = parsed_metadata.workflow.total_steps
        
        # Data flow hints
        if parsed_metadata.data_flow:
            hints['input_types'] = parsed_metadata.data_flow.input_types
            hints['output_types'] = parsed_metadata.data_flow.output_types
            hints['transformation_patterns'] = parsed_metadata.data_flow.transformation_patterns
        
        return hints