Spaces:
Running
Running
File size: 10,948 Bytes
c2ea5ed |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 |
"""
Parser Factory and Source Detection
Factory for creating appropriate parsers based on trace source and automatically
injecting relevant context documents. This module provides the main integration
point between the parsing system and the knowledge extraction pipeline.
"""
from typing import Dict, List, Any, Optional, Type
import logging
import json
import re
from .base_parser import BaseTraceParser, ParsedMetadata
from .langsmith_parser import LangSmithParser
logger = logging.getLogger(__name__)
# Registry of available parsers
_PARSER_REGISTRY: Dict[str, Type[BaseTraceParser]] = {
'langsmith': LangSmithParser,
# Future parsers can be added here:
# 'langfuse': LangfuseParser,
# 'opentelemetry': OpenTelemetryParser,
}
# Default context documents for each platform
_DEFAULT_CONTEXT_DOCUMENTS = {
'langsmith': [
{
'document_type': 'schema',
'title': 'LangSmith Trace Structure',
'content': '''
LangSmith traces follow a standardized structure:
**Top-level fields:**
- run_id: Unique identifier for the run
- project_name: Name of the LangSmith project
- trace_id: Trace identifier linking related runs
- export_timestamp: When the trace was exported
- total_traces: Number of traces in the run
- traces: Array of individual trace objects
**Trace object fields:**
- run_type: Type of run ("llm", "chain", "tool")
- name: Human-readable name of the run
- inputs: Input data passed to the run
- outputs: Output data produced by the run
- start_time: ISO timestamp when run started
- end_time: ISO timestamp when run completed
- tags: Array of tags for categorization
- parent_run_id: ID of parent run (for nested calls)
**Run Types:**
- "llm": Language model invocations
- "chain": Composite operations with multiple steps
- "tool": External tool or function calls
''',
'tags': ['langsmith', 'schema', 'structure']
},
{
'document_type': 'guidelines',
'title': 'LangSmith Entity Extraction Guidelines',
'content': '''
**Entity Extraction Guidelines for LangSmith Traces:**
**Agents (run_type: "llm"):**
- Extract model name from extra.invocation_params.model_name
- Use run name as agent identifier
- Role can be inferred from tags or name patterns
- System prompts found in inputs.messages[0] where role="system"
**Tools (run_type: "tool"):**
- Tool name is in the "name" field
- Parameters are in inputs object
- Usage patterns can be tracked across multiple invocations
- Function signatures often in extra.function.name
**Tasks (run_type: "chain"):**
- Chain name represents the high-level task
- Sub-tasks found in child runs (parent_run_id references)
- Input/output flows show task dependencies
- Execution order determined by start_time
**Relationships:**
- PERFORMS: agent (llm) → task (chain)
- USES: agent (llm) → tool (tool)
- SUBTASK_OF: child chain → parent chain
- NEXT: sequential chains based on timestamps
- PRODUCES: task → output data structures
''',
'tags': ['langsmith', 'extraction', 'guidelines']
}
],
'langfuse': [
{
'document_type': 'schema',
'title': 'Langfuse Trace Structure',
'content': '''
Langfuse traces are organized into sessions and observations:
**Session level:**
- Session ID links related traces
- User information and metadata
- Timeline of interactions
**Trace level:**
- Trace ID for individual conversations
- Input/output pairs
- Model and generation information
- Scores and evaluations
**Observation types:**
- SPAN: Execution spans with timing
- GENERATION: LLM generations with prompts
- EVENT: Discrete events in the flow
''',
'tags': ['langfuse', 'schema', 'structure']
}
],
'default': [
{
'document_type': 'domain_knowledge',
'title': 'General Agent System Concepts',
'content': '''
**Common Agent System Patterns:**
**Agents:** Autonomous entities that can:
- Receive input and produce output
- Make decisions based on context
- Use tools to accomplish tasks
- Communicate with other agents
**Tools:** External capabilities that agents can invoke:
- APIs and web services
- Database queries
- File operations
- Computational functions
**Tasks:** Work units with:
- Clear objectives
- Input requirements
- Expected outputs
- Success criteria
**Workflows:** Orchestration patterns:
- Sequential: Steps execute in order
- Parallel: Steps execute simultaneously
- Conditional: Steps based on conditions
- Hierarchical: Nested task structures
''',
'tags': ['general', 'concepts', 'patterns']
}
]
}
def detect_trace_source(trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> str:
"""
Detect the source platform of a trace.
Args:
trace_content: Raw trace content
trace_metadata: Optional database metadata
Returns:
Platform name (e.g., 'langsmith', 'langfuse', 'unknown')
"""
logger.debug("Detecting trace source")
# Check database metadata first (most reliable)
if trace_metadata:
# Check trace_source field
trace_source = trace_metadata.get('platform', '').lower()
if trace_source in _PARSER_REGISTRY:
logger.info(f"Source detected from metadata: {trace_source}")
return trace_source
# Check trace_type field for platform indicators
trace_type = trace_metadata.get('processing_type', '').lower()
for platform in _PARSER_REGISTRY:
if platform in trace_type:
logger.info(f"Source detected from trace type: {platform}")
return platform
# Try each parser to see which can handle the content
for platform_name, parser_class in _PARSER_REGISTRY.items():
try:
parser = parser_class()
if parser.can_parse(trace_content, trace_metadata):
logger.info(f"Source detected by parser: {platform_name}")
return platform_name
except Exception as e:
logger.debug(f"Parser {platform_name} failed detection: {e}")
continue
logger.warning("Could not detect trace source, using 'unknown'")
return 'unknown'
def create_parser(platform: str) -> Optional[BaseTraceParser]:
"""
Create a parser for the specified platform.
Args:
platform: Platform name
Returns:
Parser instance or None if platform not supported
"""
if platform in _PARSER_REGISTRY:
parser_class = _PARSER_REGISTRY[platform]
parser = parser_class()
logger.info(f"Created parser for platform: {platform}")
return parser
logger.warning(f"No parser available for platform: {platform}")
return None
def get_context_documents_for_source(platform: str, parsed_metadata: Optional[ParsedMetadata] = None) -> List[Dict[str, Any]]:
"""
Get appropriate context documents for a trace source.
Args:
platform: Platform name
parsed_metadata: Optional parsed metadata for customization
Returns:
List of context documents
"""
logger.debug(f"Getting context documents for platform: {platform}")
# Get default context documents for the platform
context_docs = _DEFAULT_CONTEXT_DOCUMENTS.get(platform, _DEFAULT_CONTEXT_DOCUMENTS['default']).copy()
# Add platform-specific customizations based on parsed metadata
if parsed_metadata and parsed_metadata.suggested_context_types:
for suggested_type in parsed_metadata.suggested_context_types:
if suggested_type not in [doc['document_type'] for doc in context_docs]:
# Add suggested context types (placeholder documents)
context_docs.append({
'document_type': suggested_type,
'title': f'{platform.title()} {suggested_type.replace("_", " ").title()}',
'content': f'Context document for {suggested_type} in {platform} traces.',
'tags': [platform, suggested_type, 'auto-generated']
})
logger.info(f"Providing {len(context_docs)} context documents for {platform}")
return context_docs
def parse_trace_with_context(trace_content: str, trace_metadata: Optional[Dict[str, Any]] = None) -> tuple[Optional[ParsedMetadata], List[Dict[str, Any]]]:
"""
Parse a trace and return both structured metadata and appropriate context documents.
This is the main entry point for the parsing system, providing a complete
solution for trace analysis with automatic context injection.
Args:
trace_content: Raw trace content
trace_metadata: Optional database metadata
Returns:
Tuple of (parsed_metadata, context_documents)
"""
logger.info("Starting trace parsing with context injection")
# Detect source platform
platform = detect_trace_source(trace_content, trace_metadata)
# Create appropriate parser
parser = create_parser(platform)
# Parse the trace (if parser available)
parsed_metadata = None
if parser:
try:
parsed_metadata = parser.parse_trace(trace_content, trace_metadata)
logger.info(f"Successfully parsed {platform} trace")
except Exception as e:
logger.error(f"Failed to parse {platform} trace: {e}")
else:
logger.warning(f"No parser available for platform: {platform}")
# Get appropriate context documents
context_documents = get_context_documents_for_source(platform, parsed_metadata)
logger.info(f"Parsing complete: metadata={'available' if parsed_metadata else 'unavailable'}, context_docs={len(context_documents)}")
return parsed_metadata, context_documents
def register_parser(platform: str, parser_class: Type[BaseTraceParser]) -> None:
"""
Register a new parser for a platform.
Args:
platform: Platform name
parser_class: Parser class to register
"""
_PARSER_REGISTRY[platform] = parser_class
logger.info(f"Registered parser for platform: {platform}")
def get_supported_platforms() -> List[str]:
"""Get list of supported platforms."""
return list(_PARSER_REGISTRY.keys())
def add_context_document_template(platform: str, document: Dict[str, Any]) -> None:
"""
Add a context document template for a platform.
Args:
platform: Platform name
document: Context document dictionary
"""
if platform not in _DEFAULT_CONTEXT_DOCUMENTS:
_DEFAULT_CONTEXT_DOCUMENTS[platform] = []
_DEFAULT_CONTEXT_DOCUMENTS[platform].append(document)
logger.info(f"Added context document template for {platform}: {document.get('title', 'Untitled')}") |