""" Boundary Detection Module for Agent-Aware Semantic Splitting This module identifies semantic boundaries in agent execution logs to enable intelligent chunking that preserves agent interaction integrity. """ import re from enum import Enum from typing import Dict, List, Optional, Tuple from dataclasses import dataclass from abc import ABC, abstractmethod from .log_type_detector import LogType, LogTypeDetector class BoundaryType(Enum): """Types of semantic boundaries in agent logs.""" CREW_START = "crew_start" CREW_END = "crew_end" TASK_START = "task_start" TASK_END = "task_end" AGENT_ASSIGNMENT = "agent_assignment" TOOL_CYCLE_START = "tool_cycle_start" TOOL_CYCLE_END = "tool_cycle_end" THINKING_START = "thinking_start" THINKING_END = "thinking_end" FINAL_ANSWER = "final_answer" HUMAN_FEEDBACK = "human_feedback" JSON_OBJECT_START = "json_object_start" JSON_OBJECT_END = "json_object_end" TRACE_START = "trace_start" TRACE_END = "trace_end" OBSERVATION_START = "observation_start" OBSERVATION_END = "observation_end" SEMANTIC_BREAK = "semantic_break" @dataclass class AgentBoundary: """Represents a detected boundary in agent logs.""" position: int boundary_type: BoundaryType pattern_matched: str confidence_score: float context_before: str context_after: str metadata: Dict[str, any] @dataclass class BoundaryConfidence: """Confidence scoring for boundary detection.""" pattern_confidence: float = 0.0 semantic_confidence: float = 0.0 context_confidence: float = 0.0 combined_score: float = 0.0 def is_valid_boundary(self, threshold: float = 0.7) -> bool: """Check if boundary meets confidence threshold.""" return self.combined_score >= threshold class BaseBoundaryDetector(ABC): """Abstract base class for boundary detectors.""" @abstractmethod def detect_boundaries(self, content: str) -> List[AgentBoundary]: """Detect boundaries in the given content.""" pass @abstractmethod def get_priority(self) -> int: """Get the priority of this detector (lower = higher priority).""" pass class FrameworkSpecificDetector(BaseBoundaryDetector): """Detector for framework-specific patterns (CrewAI, Langfuse, etc.).""" def __init__(self): self.crewai_patterns = { BoundaryType.CREW_START: [ r'╭.*Crew Execution Started.*╮', r'🚀 Crew: .*' ], BoundaryType.CREW_END: [ r'╭.*Crew Completion.*╮', r'Crew Execution Completed' ], BoundaryType.TASK_START: [ r'└── 📋 Task: [a-f0-9-]+', r'Status: Executing Task\.\.\.' ], BoundaryType.TASK_END: [ r'Status: ✅ Completed', r'╭.*Task Completion.*╮' ], BoundaryType.AGENT_ASSIGNMENT: [ r'# Agent: .*', r'└── 🤖 Agent: .*' ], BoundaryType.TOOL_CYCLE_START: [ r'## Using tool: .*', r'└── 🔧 Using .*' ], BoundaryType.TOOL_CYCLE_END: [ r'## Tool Output:', r'└── 🔧 Used .*' ], BoundaryType.THINKING_START: [ r'└── 🧠 Thinking\.\.\.', r'## Thinking\.\.\.' ], BoundaryType.FINAL_ANSWER: [ r'## Final Answer:', r'## Final Result:' ], BoundaryType.HUMAN_FEEDBACK: [ r'## HUMAN FEEDBACK:', r'=====\n## HUMAN FEEDBACK:' ] } self.langfuse_patterns = { BoundaryType.TRACE_START: [ r'"data": \{\s*"id": "[a-f0-9-]+"', r'"trace_id": "[a-f0-9-]+"' ], BoundaryType.OBSERVATION_START: [ r'"observations": \[', r'"type": "(SPAN|GENERATION)"' ], BoundaryType.JSON_OBJECT_START: [ r'^\s*\{', r'^\s*\[' ], BoundaryType.JSON_OBJECT_END: [ r'\}\s*$', r'\]\s*$' ] } def detect_boundaries(self, content: str) -> List[AgentBoundary]: """Detect framework-specific boundaries.""" boundaries = [] # Detect CrewAI boundaries boundaries.extend(self._detect_pattern_boundaries( content, self.crewai_patterns, "CrewAI" )) # Detect Langfuse boundaries boundaries.extend(self._detect_pattern_boundaries( content, self.langfuse_patterns, "Langfuse" )) return sorted(boundaries, key=lambda b: b.position) def _detect_pattern_boundaries(self, content: str, patterns: Dict, framework: str) -> List[AgentBoundary]: """Detect boundaries using pattern matching.""" boundaries = [] for boundary_type, pattern_list in patterns.items(): for pattern in pattern_list: for match in re.finditer(pattern, content, re.MULTILINE): start_pos = match.start() # Get context around the boundary context_size = 100 context_start = max(0, start_pos - context_size) context_end = min(len(content), start_pos + len(match.group()) + context_size) context_before = content[context_start:start_pos] context_after = content[start_pos + len(match.group()):context_end] # Calculate confidence based on pattern specificity confidence = self._calculate_pattern_confidence(pattern, match.group()) boundary = AgentBoundary( position=start_pos, boundary_type=boundary_type, pattern_matched=pattern, confidence_score=confidence, context_before=context_before, context_after=context_after, metadata={ "framework": framework, "matched_text": match.group(), "pattern_type": "regex" } ) boundaries.append(boundary) return boundaries def _calculate_pattern_confidence(self, pattern: str, matched_text: str) -> float: """Calculate confidence score for pattern match.""" # Base confidence from pattern specificity base_confidence = 0.7 # Increase confidence for longer, more specific patterns if len(pattern) > 30: base_confidence += 0.1 if len(pattern) > 50: base_confidence += 0.1 # Increase confidence for exact character matches (emojis, special chars) if re.search(r'[🚀📋🤖🔧🧠✅╭╮]', pattern): base_confidence += 0.15 # Increase confidence for UUID patterns if re.search(r'[a-f0-9-]{36}', matched_text): base_confidence += 0.1 return min(base_confidence, 1.0) def get_priority(self) -> int: """Framework-specific has highest priority.""" return 1 class GenericAgentPatternDetector(BaseBoundaryDetector): """Detector for generic agent patterns across frameworks.""" def __init__(self): self.generic_patterns = { BoundaryType.AGENT_ASSIGNMENT: [ r'Agent: .*', r'Role: .*', r'Assistant: .*' ], BoundaryType.TOOL_CYCLE_START: [ r'Tool: .*', r'Action: .*', r'Function: .*' ], BoundaryType.TOOL_CYCLE_END: [ r'Result: .*', r'Output: .*', r'Response: .*' ], BoundaryType.THINKING_START: [ r'Thought: .*', r'Thinking: .*', r'Reasoning: .*' ], BoundaryType.FINAL_ANSWER: [ r'Answer: .*', r'Conclusion: .*', r'Final: .*' ] } def detect_boundaries(self, content: str) -> List[AgentBoundary]: """Detect generic agent pattern boundaries.""" return self._detect_pattern_boundaries(content, self.generic_patterns, "Generic") def _detect_pattern_boundaries(self, content: str, patterns: Dict, framework: str) -> List[AgentBoundary]: """Detect boundaries using generic patterns.""" boundaries = [] for boundary_type, pattern_list in patterns.items(): for pattern in pattern_list: for match in re.finditer(pattern, content, re.MULTILINE): start_pos = match.start() context_size = 50 context_start = max(0, start_pos - context_size) context_end = min(len(content), start_pos + len(match.group()) + context_size) context_before = content[context_start:start_pos] context_after = content[start_pos + len(match.group()):context_end] confidence = 0.6 # Lower confidence for generic patterns boundary = AgentBoundary( position=start_pos, boundary_type=boundary_type, pattern_matched=pattern, confidence_score=confidence, context_before=context_before, context_after=context_after, metadata={ "framework": framework, "matched_text": match.group(), "pattern_type": "generic" } ) boundaries.append(boundary) return boundaries def get_priority(self) -> int: """Generic patterns have medium priority.""" return 2 class StructuralDetector(BaseBoundaryDetector): """Detector for structural boundaries (JSON, sections, etc.).""" def detect_boundaries(self, content: str) -> List[AgentBoundary]: """Detect structural boundaries.""" boundaries = [] # Detect JSON object boundaries boundaries.extend(self._detect_json_boundaries(content)) # Detect section headers boundaries.extend(self._detect_section_boundaries(content)) return sorted(boundaries, key=lambda b: b.position) def _detect_json_boundaries(self, content: str) -> List[AgentBoundary]: """Detect JSON object start/end boundaries.""" boundaries = [] brace_stack = [] bracket_stack = [] for i, char in enumerate(content): if char == '{': brace_stack.append(i) if len(brace_stack) == 1: # Start of top-level object boundary = AgentBoundary( position=i, boundary_type=BoundaryType.JSON_OBJECT_START, pattern_matched="{", confidence_score=0.8, context_before=content[max(0, i-20):i], context_after=content[i+1:min(len(content), i+21)], metadata={"structure_type": "json_object"} ) boundaries.append(boundary) elif char == '}': if brace_stack: brace_stack.pop() if len(brace_stack) == 0: # End of top-level object boundary = AgentBoundary( position=i, boundary_type=BoundaryType.JSON_OBJECT_END, pattern_matched="}", confidence_score=0.8, context_before=content[max(0, i-20):i], context_after=content[i+1:min(len(content), i+21)], metadata={"structure_type": "json_object"} ) boundaries.append(boundary) return boundaries def _detect_section_boundaries(self, content: str) -> List[AgentBoundary]: """Detect section header boundaries.""" boundaries = [] # Markdown-style headers header_pattern = r'^#+\s+.*$' for match in re.finditer(header_pattern, content, re.MULTILINE): boundary = AgentBoundary( position=match.start(), boundary_type=BoundaryType.SEMANTIC_BREAK, pattern_matched=header_pattern, confidence_score=0.7, context_before=content[max(0, match.start()-30):match.start()], context_after=content[match.end():min(len(content), match.end()+30)], metadata={"structure_type": "section_header", "header_text": match.group()} ) boundaries.append(boundary) return boundaries def get_priority(self) -> int: """Structural detection has lower priority.""" return 3 class BoundaryDetector: """Main boundary detection coordinator.""" def __init__(self, log_type_detector: Optional[LogTypeDetector] = None): """Initialize with optional log type detector.""" self.log_type_detector = log_type_detector or LogTypeDetector() # Initialize detectors in priority order self.detectors = [ FrameworkSpecificDetector(), GenericAgentPatternDetector(), StructuralDetector() ] # Sort by priority self.detectors.sort(key=lambda d: d.get_priority()) def detect_boundaries(self, content: str, log_type: Optional[LogType] = None) -> List[AgentBoundary]: """ Detect all boundaries in content using multi-layer approach. Args: content: The content to analyze log_type: Optional pre-detected log type Returns: List of detected boundaries sorted by position """ if not log_type: detection_result = self.log_type_detector.detect_log_type(content) log_type = detection_result.log_type all_boundaries = [] # Run all detectors for detector in self.detectors: try: boundaries = detector.detect_boundaries(content) all_boundaries.extend(boundaries) except Exception as e: # Log error but continue with other detectors print(f"Warning: Detector {detector.__class__.__name__} failed: {e}") # Remove duplicate boundaries (same position, similar type) deduplicated = self._deduplicate_boundaries(all_boundaries) # Sort by position return sorted(deduplicated, key=lambda b: b.position) def _deduplicate_boundaries(self, boundaries: List[AgentBoundary]) -> List[AgentBoundary]: """Remove duplicate boundaries that are too close to each other.""" if not boundaries: return [] # Sort by position first sorted_boundaries = sorted(boundaries, key=lambda b: b.position) deduplicated = [sorted_boundaries[0]] for boundary in sorted_boundaries[1:]: # Check if this boundary is too close to the last one last_boundary = deduplicated[-1] position_diff = boundary.position - last_boundary.position # If boundaries are very close (within 10 characters), keep the higher confidence one if position_diff < 10: if boundary.confidence_score > last_boundary.confidence_score: deduplicated[-1] = boundary else: deduplicated.append(boundary) return deduplicated def calculate_boundary_confidence(self, boundary: AgentBoundary, content: str) -> BoundaryConfidence: """ Calculate comprehensive confidence score for a boundary. Args: boundary: The boundary to analyze content: The full content for context analysis Returns: BoundaryConfidence with detailed scoring """ confidence = BoundaryConfidence() # Pattern confidence (from initial detection) confidence.pattern_confidence = boundary.confidence_score # Context confidence (analyze surrounding content) confidence.context_confidence = self._analyze_context_confidence(boundary, content) # Semantic confidence (placeholder for embedding-based analysis) confidence.semantic_confidence = 0.7 # Will be enhanced with semantic analyzer # Combined score (weighted average) weights = {"pattern": 0.4, "context": 0.3, "semantic": 0.3} confidence.combined_score = ( confidence.pattern_confidence * weights["pattern"] + confidence.context_confidence * weights["context"] + confidence.semantic_confidence * weights["semantic"] ) return confidence def _analyze_context_confidence(self, boundary: AgentBoundary, content: str) -> float: """Analyze context around boundary to determine confidence.""" # Check for consistent formatting around boundary context_window = boundary.context_before + boundary.context_after confidence = 0.5 # Base confidence # Check for consistent indentation/formatting if re.search(r'\n\s*\n', context_window): # Blank lines suggest section breaks confidence += 0.2 # Check for consistent agent markers in context agent_markers = len(re.findall(r'(Agent:|Task:|Tool:|🚀|🤖|📋|🔧)', context_window)) if agent_markers > 0: confidence += 0.1 # Check for timestamp patterns if re.search(r'\d{2}:\d{2}:\d{2}', context_window): confidence += 0.1 return min(confidence, 1.0)