Spaces:
Running
Running
| """ | |
| Boundary Detection Module for Agent-Aware Semantic Splitting | |
| This module identifies semantic boundaries in agent execution logs | |
| to enable intelligent chunking that preserves agent interaction integrity. | |
| """ | |
| import re | |
| from enum import Enum | |
| from typing import Dict, List, Optional, Tuple | |
| from dataclasses import dataclass | |
| from abc import ABC, abstractmethod | |
| from .log_type_detector import LogType, LogTypeDetector | |
| class BoundaryType(Enum): | |
| """Types of semantic boundaries in agent logs.""" | |
| CREW_START = "crew_start" | |
| CREW_END = "crew_end" | |
| TASK_START = "task_start" | |
| TASK_END = "task_end" | |
| AGENT_ASSIGNMENT = "agent_assignment" | |
| TOOL_CYCLE_START = "tool_cycle_start" | |
| TOOL_CYCLE_END = "tool_cycle_end" | |
| THINKING_START = "thinking_start" | |
| THINKING_END = "thinking_end" | |
| FINAL_ANSWER = "final_answer" | |
| HUMAN_FEEDBACK = "human_feedback" | |
| JSON_OBJECT_START = "json_object_start" | |
| JSON_OBJECT_END = "json_object_end" | |
| TRACE_START = "trace_start" | |
| TRACE_END = "trace_end" | |
| OBSERVATION_START = "observation_start" | |
| OBSERVATION_END = "observation_end" | |
| SEMANTIC_BREAK = "semantic_break" | |
| class AgentBoundary: | |
| """Represents a detected boundary in agent logs.""" | |
| position: int | |
| boundary_type: BoundaryType | |
| pattern_matched: str | |
| confidence_score: float | |
| context_before: str | |
| context_after: str | |
| metadata: Dict[str, any] | |
| class BoundaryConfidence: | |
| """Confidence scoring for boundary detection.""" | |
| pattern_confidence: float = 0.0 | |
| semantic_confidence: float = 0.0 | |
| context_confidence: float = 0.0 | |
| combined_score: float = 0.0 | |
| def is_valid_boundary(self, threshold: float = 0.7) -> bool: | |
| """Check if boundary meets confidence threshold.""" | |
| return self.combined_score >= threshold | |
| class BaseBoundaryDetector(ABC): | |
| """Abstract base class for boundary detectors.""" | |
| def detect_boundaries(self, content: str) -> List[AgentBoundary]: | |
| """Detect boundaries in the given content.""" | |
| pass | |
| def get_priority(self) -> int: | |
| """Get the priority of this detector (lower = higher priority).""" | |
| pass | |
| class FrameworkSpecificDetector(BaseBoundaryDetector): | |
| """Detector for framework-specific patterns (CrewAI, Langfuse, etc.).""" | |
| def __init__(self): | |
| self.crewai_patterns = { | |
| BoundaryType.CREW_START: [ | |
| r'โญ.*Crew Execution Started.*โฎ', | |
| r'๐ Crew: .*' | |
| ], | |
| BoundaryType.CREW_END: [ | |
| r'โญ.*Crew Completion.*โฎ', | |
| r'Crew Execution Completed' | |
| ], | |
| BoundaryType.TASK_START: [ | |
| r'โโโ ๐ Task: [a-f0-9-]+', | |
| r'Status: Executing Task\.\.\.' | |
| ], | |
| BoundaryType.TASK_END: [ | |
| r'Status: โ Completed', | |
| r'โญ.*Task Completion.*โฎ' | |
| ], | |
| BoundaryType.AGENT_ASSIGNMENT: [ | |
| r'# Agent: .*', | |
| r'โโโ ๐ค Agent: .*' | |
| ], | |
| BoundaryType.TOOL_CYCLE_START: [ | |
| r'## Using tool: .*', | |
| r'โโโ ๐ง Using .*' | |
| ], | |
| BoundaryType.TOOL_CYCLE_END: [ | |
| r'## Tool Output:', | |
| r'โโโ ๐ง Used .*' | |
| ], | |
| BoundaryType.THINKING_START: [ | |
| r'โโโ ๐ง Thinking\.\.\.', | |
| r'## Thinking\.\.\.' | |
| ], | |
| BoundaryType.FINAL_ANSWER: [ | |
| r'## Final Answer:', | |
| r'## Final Result:' | |
| ], | |
| BoundaryType.HUMAN_FEEDBACK: [ | |
| r'## HUMAN FEEDBACK:', | |
| r'=====\n## HUMAN FEEDBACK:' | |
| ] | |
| } | |
| self.langfuse_patterns = { | |
| BoundaryType.TRACE_START: [ | |
| r'"data": \{\s*"id": "[a-f0-9-]+"', | |
| r'"trace_id": "[a-f0-9-]+"' | |
| ], | |
| BoundaryType.OBSERVATION_START: [ | |
| r'"observations": \[', | |
| r'"type": "(SPAN|GENERATION)"' | |
| ], | |
| BoundaryType.JSON_OBJECT_START: [ | |
| r'^\s*\{', | |
| r'^\s*\[' | |
| ], | |
| BoundaryType.JSON_OBJECT_END: [ | |
| r'\}\s*$', | |
| r'\]\s*$' | |
| ] | |
| } | |
| def detect_boundaries(self, content: str) -> List[AgentBoundary]: | |
| """Detect framework-specific boundaries.""" | |
| boundaries = [] | |
| # Detect CrewAI boundaries | |
| boundaries.extend(self._detect_pattern_boundaries( | |
| content, self.crewai_patterns, "CrewAI" | |
| )) | |
| # Detect Langfuse boundaries | |
| boundaries.extend(self._detect_pattern_boundaries( | |
| content, self.langfuse_patterns, "Langfuse" | |
| )) | |
| return sorted(boundaries, key=lambda b: b.position) | |
| def _detect_pattern_boundaries(self, content: str, patterns: Dict, framework: str) -> List[AgentBoundary]: | |
| """Detect boundaries using pattern matching.""" | |
| boundaries = [] | |
| for boundary_type, pattern_list in patterns.items(): | |
| for pattern in pattern_list: | |
| for match in re.finditer(pattern, content, re.MULTILINE): | |
| start_pos = match.start() | |
| # Get context around the boundary | |
| context_size = 100 | |
| context_start = max(0, start_pos - context_size) | |
| context_end = min(len(content), start_pos + len(match.group()) + context_size) | |
| context_before = content[context_start:start_pos] | |
| context_after = content[start_pos + len(match.group()):context_end] | |
| # Calculate confidence based on pattern specificity | |
| confidence = self._calculate_pattern_confidence(pattern, match.group()) | |
| boundary = AgentBoundary( | |
| position=start_pos, | |
| boundary_type=boundary_type, | |
| pattern_matched=pattern, | |
| confidence_score=confidence, | |
| context_before=context_before, | |
| context_after=context_after, | |
| metadata={ | |
| "framework": framework, | |
| "matched_text": match.group(), | |
| "pattern_type": "regex" | |
| } | |
| ) | |
| boundaries.append(boundary) | |
| return boundaries | |
| def _calculate_pattern_confidence(self, pattern: str, matched_text: str) -> float: | |
| """Calculate confidence score for pattern match.""" | |
| # Base confidence from pattern specificity | |
| base_confidence = 0.7 | |
| # Increase confidence for longer, more specific patterns | |
| if len(pattern) > 30: | |
| base_confidence += 0.1 | |
| if len(pattern) > 50: | |
| base_confidence += 0.1 | |
| # Increase confidence for exact character matches (emojis, special chars) | |
| if re.search(r'[๐๐๐ค๐ง๐ง โ โญโฎ]', pattern): | |
| base_confidence += 0.15 | |
| # Increase confidence for UUID patterns | |
| if re.search(r'[a-f0-9-]{36}', matched_text): | |
| base_confidence += 0.1 | |
| return min(base_confidence, 1.0) | |
| def get_priority(self) -> int: | |
| """Framework-specific has highest priority.""" | |
| return 1 | |
| class GenericAgentPatternDetector(BaseBoundaryDetector): | |
| """Detector for generic agent patterns across frameworks.""" | |
| def __init__(self): | |
| self.generic_patterns = { | |
| BoundaryType.AGENT_ASSIGNMENT: [ | |
| r'Agent: .*', | |
| r'Role: .*', | |
| r'Assistant: .*' | |
| ], | |
| BoundaryType.TOOL_CYCLE_START: [ | |
| r'Tool: .*', | |
| r'Action: .*', | |
| r'Function: .*' | |
| ], | |
| BoundaryType.TOOL_CYCLE_END: [ | |
| r'Result: .*', | |
| r'Output: .*', | |
| r'Response: .*' | |
| ], | |
| BoundaryType.THINKING_START: [ | |
| r'Thought: .*', | |
| r'Thinking: .*', | |
| r'Reasoning: .*' | |
| ], | |
| BoundaryType.FINAL_ANSWER: [ | |
| r'Answer: .*', | |
| r'Conclusion: .*', | |
| r'Final: .*' | |
| ] | |
| } | |
| def detect_boundaries(self, content: str) -> List[AgentBoundary]: | |
| """Detect generic agent pattern boundaries.""" | |
| return self._detect_pattern_boundaries(content, self.generic_patterns, "Generic") | |
| def _detect_pattern_boundaries(self, content: str, patterns: Dict, framework: str) -> List[AgentBoundary]: | |
| """Detect boundaries using generic patterns.""" | |
| boundaries = [] | |
| for boundary_type, pattern_list in patterns.items(): | |
| for pattern in pattern_list: | |
| for match in re.finditer(pattern, content, re.MULTILINE): | |
| start_pos = match.start() | |
| context_size = 50 | |
| context_start = max(0, start_pos - context_size) | |
| context_end = min(len(content), start_pos + len(match.group()) + context_size) | |
| context_before = content[context_start:start_pos] | |
| context_after = content[start_pos + len(match.group()):context_end] | |
| confidence = 0.6 # Lower confidence for generic patterns | |
| boundary = AgentBoundary( | |
| position=start_pos, | |
| boundary_type=boundary_type, | |
| pattern_matched=pattern, | |
| confidence_score=confidence, | |
| context_before=context_before, | |
| context_after=context_after, | |
| metadata={ | |
| "framework": framework, | |
| "matched_text": match.group(), | |
| "pattern_type": "generic" | |
| } | |
| ) | |
| boundaries.append(boundary) | |
| return boundaries | |
| def get_priority(self) -> int: | |
| """Generic patterns have medium priority.""" | |
| return 2 | |
| class StructuralDetector(BaseBoundaryDetector): | |
| """Detector for structural boundaries (JSON, sections, etc.).""" | |
| def detect_boundaries(self, content: str) -> List[AgentBoundary]: | |
| """Detect structural boundaries.""" | |
| boundaries = [] | |
| # Detect JSON object boundaries | |
| boundaries.extend(self._detect_json_boundaries(content)) | |
| # Detect section headers | |
| boundaries.extend(self._detect_section_boundaries(content)) | |
| return sorted(boundaries, key=lambda b: b.position) | |
| def _detect_json_boundaries(self, content: str) -> List[AgentBoundary]: | |
| """Detect JSON object start/end boundaries.""" | |
| boundaries = [] | |
| brace_stack = [] | |
| bracket_stack = [] | |
| for i, char in enumerate(content): | |
| if char == '{': | |
| brace_stack.append(i) | |
| if len(brace_stack) == 1: # Start of top-level object | |
| boundary = AgentBoundary( | |
| position=i, | |
| boundary_type=BoundaryType.JSON_OBJECT_START, | |
| pattern_matched="{", | |
| confidence_score=0.8, | |
| context_before=content[max(0, i-20):i], | |
| context_after=content[i+1:min(len(content), i+21)], | |
| metadata={"structure_type": "json_object"} | |
| ) | |
| boundaries.append(boundary) | |
| elif char == '}': | |
| if brace_stack: | |
| brace_stack.pop() | |
| if len(brace_stack) == 0: # End of top-level object | |
| boundary = AgentBoundary( | |
| position=i, | |
| boundary_type=BoundaryType.JSON_OBJECT_END, | |
| pattern_matched="}", | |
| confidence_score=0.8, | |
| context_before=content[max(0, i-20):i], | |
| context_after=content[i+1:min(len(content), i+21)], | |
| metadata={"structure_type": "json_object"} | |
| ) | |
| boundaries.append(boundary) | |
| return boundaries | |
| def _detect_section_boundaries(self, content: str) -> List[AgentBoundary]: | |
| """Detect section header boundaries.""" | |
| boundaries = [] | |
| # Markdown-style headers | |
| header_pattern = r'^#+\s+.*$' | |
| for match in re.finditer(header_pattern, content, re.MULTILINE): | |
| boundary = AgentBoundary( | |
| position=match.start(), | |
| boundary_type=BoundaryType.SEMANTIC_BREAK, | |
| pattern_matched=header_pattern, | |
| confidence_score=0.7, | |
| context_before=content[max(0, match.start()-30):match.start()], | |
| context_after=content[match.end():min(len(content), match.end()+30)], | |
| metadata={"structure_type": "section_header", "header_text": match.group()} | |
| ) | |
| boundaries.append(boundary) | |
| return boundaries | |
| def get_priority(self) -> int: | |
| """Structural detection has lower priority.""" | |
| return 3 | |
| class BoundaryDetector: | |
| """Main boundary detection coordinator.""" | |
| def __init__(self, log_type_detector: Optional[LogTypeDetector] = None): | |
| """Initialize with optional log type detector.""" | |
| self.log_type_detector = log_type_detector or LogTypeDetector() | |
| # Initialize detectors in priority order | |
| self.detectors = [ | |
| FrameworkSpecificDetector(), | |
| GenericAgentPatternDetector(), | |
| StructuralDetector() | |
| ] | |
| # Sort by priority | |
| self.detectors.sort(key=lambda d: d.get_priority()) | |
| def detect_boundaries(self, content: str, log_type: Optional[LogType] = None) -> List[AgentBoundary]: | |
| """ | |
| Detect all boundaries in content using multi-layer approach. | |
| Args: | |
| content: The content to analyze | |
| log_type: Optional pre-detected log type | |
| Returns: | |
| List of detected boundaries sorted by position | |
| """ | |
| if not log_type: | |
| detection_result = self.log_type_detector.detect_log_type(content) | |
| log_type = detection_result.log_type | |
| all_boundaries = [] | |
| # Run all detectors | |
| for detector in self.detectors: | |
| try: | |
| boundaries = detector.detect_boundaries(content) | |
| all_boundaries.extend(boundaries) | |
| except Exception as e: | |
| # Log error but continue with other detectors | |
| print(f"Warning: Detector {detector.__class__.__name__} failed: {e}") | |
| # Remove duplicate boundaries (same position, similar type) | |
| deduplicated = self._deduplicate_boundaries(all_boundaries) | |
| # Sort by position | |
| return sorted(deduplicated, key=lambda b: b.position) | |
| def _deduplicate_boundaries(self, boundaries: List[AgentBoundary]) -> List[AgentBoundary]: | |
| """Remove duplicate boundaries that are too close to each other.""" | |
| if not boundaries: | |
| return [] | |
| # Sort by position first | |
| sorted_boundaries = sorted(boundaries, key=lambda b: b.position) | |
| deduplicated = [sorted_boundaries[0]] | |
| for boundary in sorted_boundaries[1:]: | |
| # Check if this boundary is too close to the last one | |
| last_boundary = deduplicated[-1] | |
| position_diff = boundary.position - last_boundary.position | |
| # If boundaries are very close (within 10 characters), keep the higher confidence one | |
| if position_diff < 10: | |
| if boundary.confidence_score > last_boundary.confidence_score: | |
| deduplicated[-1] = boundary | |
| else: | |
| deduplicated.append(boundary) | |
| return deduplicated | |
| def calculate_boundary_confidence(self, boundary: AgentBoundary, content: str) -> BoundaryConfidence: | |
| """ | |
| Calculate comprehensive confidence score for a boundary. | |
| Args: | |
| boundary: The boundary to analyze | |
| content: The full content for context analysis | |
| Returns: | |
| BoundaryConfidence with detailed scoring | |
| """ | |
| confidence = BoundaryConfidence() | |
| # Pattern confidence (from initial detection) | |
| confidence.pattern_confidence = boundary.confidence_score | |
| # Context confidence (analyze surrounding content) | |
| confidence.context_confidence = self._analyze_context_confidence(boundary, content) | |
| # Semantic confidence (placeholder for embedding-based analysis) | |
| confidence.semantic_confidence = 0.7 # Will be enhanced with semantic analyzer | |
| # Combined score (weighted average) | |
| weights = {"pattern": 0.4, "context": 0.3, "semantic": 0.3} | |
| confidence.combined_score = ( | |
| confidence.pattern_confidence * weights["pattern"] + | |
| confidence.context_confidence * weights["context"] + | |
| confidence.semantic_confidence * weights["semantic"] | |
| ) | |
| return confidence | |
| def _analyze_context_confidence(self, boundary: AgentBoundary, content: str) -> float: | |
| """Analyze context around boundary to determine confidence.""" | |
| # Check for consistent formatting around boundary | |
| context_window = boundary.context_before + boundary.context_after | |
| confidence = 0.5 # Base confidence | |
| # Check for consistent indentation/formatting | |
| if re.search(r'\n\s*\n', context_window): # Blank lines suggest section breaks | |
| confidence += 0.2 | |
| # Check for consistent agent markers in context | |
| agent_markers = len(re.findall(r'(Agent:|Task:|Tool:|๐|๐ค|๐|๐ง)', context_window)) | |
| if agent_markers > 0: | |
| confidence += 0.1 | |
| # Check for timestamp patterns | |
| if re.search(r'\d{2}:\d{2}:\d{2}', context_window): | |
| confidence += 0.1 | |
| return min(confidence, 1.0) |