File size: 10,948 Bytes
c2ea5ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
"""
Parser Factory and Source Detection

Factory for creating appropriate parsers based on trace source and automatically
injecting relevant context documents. This module provides the main integration
point between the parsing system and the knowledge extraction pipeline.
"""

from typing import Dict, List, Any, Optional, Type
import logging
import json
import re

from .base_parser import BaseTraceParser, ParsedMetadata
from .langsmith_parser import LangSmithParser

logger = logging.getLogger(__name__)


# Registry of available parsers
_PARSER_REGISTRY: Dict[str, Type[BaseTraceParser]] = {
    'langsmith': LangSmithParser,
    # Future parsers can be added here:
    # 'langfuse': LangfuseParser,
    # 'opentelemetry': OpenTelemetryParser,
}

# Default context documents for each platform
_DEFAULT_CONTEXT_DOCUMENTS = {
    'langsmith': [
        {
            'document_type': 'schema',
            'title': 'LangSmith Trace Structure',
            'content': '''
LangSmith traces follow a standardized structure:

**Top-level fields:**
- run_id: Unique identifier for the run
- project_name: Name of the LangSmith project
- trace_id: Trace identifier linking related runs
- export_timestamp: When the trace was exported
- total_traces: Number of traces in the run
- traces: Array of individual trace objects

**Trace object fields:**
- run_type: Type of run ("llm", "chain", "tool")
- name: Human-readable name of the run
- inputs: Input data passed to the run
- outputs: Output data produced by the run
- start_time: ISO timestamp when run started
- end_time: ISO timestamp when run completed
- tags: Array of tags for categorization
- parent_run_id: ID of parent run (for nested calls)

**Run Types:**
- "llm": Language model invocations
- "chain": Composite operations with multiple steps
- "tool": External tool or function calls
            ''',
            'tags': ['langsmith', 'schema', 'structure']
        },
        {
            'document_type': 'guidelines',
            'title': 'LangSmith Entity Extraction Guidelines',
            'content': '''
**Entity Extraction Guidelines for LangSmith Traces:**

**Agents (run_type: "llm"):**
- Extract model name from extra.invocation_params.model_name
- Use run name as agent identifier
- Role can be inferred from tags or name patterns
- System prompts found in inputs.messages[0] where role="system"

**Tools (run_type: "tool"):**
- Tool name is in the "name" field
- Parameters are in inputs object
- Usage patterns can be tracked across multiple invocations
- Function signatures often in extra.function.name

**Tasks (run_type: "chain"):**
- Chain name represents the high-level task
- Sub-tasks found in child runs (parent_run_id references)
- Input/output flows show task dependencies
- Execution order determined by start_time

**Relationships:**
- PERFORMS: agent (llm) → task (chain)
- USES: agent (llm) → tool (tool) 
- SUBTASK_OF: child chain → parent chain
- NEXT: sequential chains based on timestamps
- PRODUCES: task → output data structures
            ''',
            'tags': ['langsmith', 'extraction', 'guidelines']
        }
    ],
    'langfuse': [
        {
            'document_type': 'schema',
            'title': 'Langfuse Trace Structure',
            'content': '''
Langfuse traces are organized into sessions and observations:

**Session level:**
- Session ID links related traces
- User information and metadata
- Timeline of interactions

**Trace level:**
- Trace ID for individual conversations
- Input/output pairs
- Model and generation information
- Scores and evaluations

**Observation types:**
- SPAN: Execution spans with timing
- GENERATION: LLM generations with prompts
- EVENT: Discrete events in the flow
            ''',
            'tags': ['langfuse', 'schema', 'structure']
        }
    ],
    'default': [
        {
            'document_type': 'domain_knowledge',
            'title': 'General Agent System Concepts',
            'content': '''
**Common Agent System Patterns:**

**Agents:** Autonomous entities that can:
- Receive input and produce output
- Make decisions based on context
- Use tools to accomplish tasks
- Communicate with other agents

**Tools:** External capabilities that agents can invoke:
- APIs and web services
- Database queries  
- File operations
- Computational functions

**Tasks:** Work units with:
- Clear objectives
- Input requirements
- Expected outputs
- Success criteria

**Workflows:** Orchestration patterns:
- Sequential: Steps execute in order
- Parallel: Steps execute simultaneously
- Conditional: Steps based on conditions
- Hierarchical: Nested task structures
            ''',
            'tags': ['general', 'concepts', 'patterns']
        }
    ]
}


def detect_trace_source(trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> str:
    """
    Detect the source platform of a trace.
    
    Args:
        trace_content: Raw trace content
        trace_metadata: Optional database metadata
        
    Returns:
        Platform name (e.g., 'langsmith', 'langfuse', 'unknown')
    """
    logger.debug("Detecting trace source")
    
    # Check database metadata first (most reliable)
    if trace_metadata:
        # Check trace_source field
        trace_source = trace_metadata.get('platform', '').lower()
        if trace_source in _PARSER_REGISTRY:
            logger.info(f"Source detected from metadata: {trace_source}")
            return trace_source
        
        # Check trace_type field for platform indicators
        trace_type = trace_metadata.get('processing_type', '').lower()
        for platform in _PARSER_REGISTRY:
            if platform in trace_type:
                logger.info(f"Source detected from trace type: {platform}")
                return platform
    
    # Try each parser to see which can handle the content
    for platform_name, parser_class in _PARSER_REGISTRY.items():
        try:
            parser = parser_class()
            if parser.can_parse(trace_content, trace_metadata):
                logger.info(f"Source detected by parser: {platform_name}")
                return platform_name
        except Exception as e:
            logger.debug(f"Parser {platform_name} failed detection: {e}")
            continue
    
    logger.warning("Could not detect trace source, using 'unknown'")
    return 'unknown'


def create_parser(platform: str) -> Optional[BaseTraceParser]:
    """
    Create a parser for the specified platform.
    
    Args:
        platform: Platform name
        
    Returns:
        Parser instance or None if platform not supported
    """
    if platform in _PARSER_REGISTRY:
        parser_class = _PARSER_REGISTRY[platform]
        parser = parser_class()
        logger.info(f"Created parser for platform: {platform}")
        return parser
    
    logger.warning(f"No parser available for platform: {platform}")
    return None


def get_context_documents_for_source(platform: str, parsed_metadata: Optional[ParsedMetadata] = None) -> List[Dict[str, Any]]:
    """
    Get appropriate context documents for a trace source.
    
    Args:
        platform: Platform name  
        parsed_metadata: Optional parsed metadata for customization
        
    Returns:
        List of context documents
    """
    logger.debug(f"Getting context documents for platform: {platform}")
    
    # Get default context documents for the platform
    context_docs = _DEFAULT_CONTEXT_DOCUMENTS.get(platform, _DEFAULT_CONTEXT_DOCUMENTS['default']).copy()
    
    # Add platform-specific customizations based on parsed metadata
    if parsed_metadata and parsed_metadata.suggested_context_types:
        for suggested_type in parsed_metadata.suggested_context_types:
            if suggested_type not in [doc['document_type'] for doc in context_docs]:
                # Add suggested context types (placeholder documents)
                context_docs.append({
                    'document_type': suggested_type,
                    'title': f'{platform.title()} {suggested_type.replace("_", " ").title()}',
                    'content': f'Context document for {suggested_type} in {platform} traces.',
                    'tags': [platform, suggested_type, 'auto-generated']
                })
    
    logger.info(f"Providing {len(context_docs)} context documents for {platform}")
    return context_docs


def parse_trace_with_context(trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> tuple[Optional[ParsedMetadata], List[Dict[str, Any]]]:
    """
    Parse a trace and return both structured metadata and appropriate context documents.
    
    This is the main entry point for the parsing system, providing a complete
    solution for trace analysis with automatic context injection.
    
    Args:
        trace_content: Raw trace content
        trace_metadata: Optional database metadata
        
    Returns:
        Tuple of (parsed_metadata, context_documents)
    """
    logger.info("Starting trace parsing with context injection")
    
    # Detect source platform
    platform = detect_trace_source(trace_content, trace_metadata)
    
    # Create appropriate parser
    parser = create_parser(platform)
    
    # Parse the trace (if parser available)
    parsed_metadata = None
    if parser:
        try:
            parsed_metadata = parser.parse_trace(trace_content, trace_metadata)
            logger.info(f"Successfully parsed {platform} trace")
        except Exception as e:
            logger.error(f"Failed to parse {platform} trace: {e}")
    else:
        logger.warning(f"No parser available for platform: {platform}")
    
    # Get appropriate context documents
    context_documents = get_context_documents_for_source(platform, parsed_metadata)
    
    logger.info(f"Parsing complete: metadata={'available' if parsed_metadata else 'unavailable'}, context_docs={len(context_documents)}")
    
    return parsed_metadata, context_documents


def register_parser(platform: str, parser_class: Type[BaseTraceParser]) -> None:
    """
    Register a new parser for a platform.
    
    Args:
        platform: Platform name
        parser_class: Parser class to register
    """
    _PARSER_REGISTRY[platform] = parser_class
    logger.info(f"Registered parser for platform: {platform}")


def get_supported_platforms() -> List[str]:
    """Get list of supported platforms."""
    return list(_PARSER_REGISTRY.keys())


def add_context_document_template(platform: str, document: Dict[str, Any]) -> None:
    """
    Add a context document template for a platform.
    
    Args:
        platform: Platform name
        document: Context document dictionary
    """
    if platform not in _DEFAULT_CONTEXT_DOCUMENTS:
        _DEFAULT_CONTEXT_DOCUMENTS[platform] = []
    
    _DEFAULT_CONTEXT_DOCUMENTS[platform].append(document)
    logger.info(f"Added context document template for {platform}: {document.get('title', 'Untitled')}")