wu981526092's picture
๐Ÿš€ Deploy AgentGraph: Complete agent monitoring and knowledge graph system
c2ea5ed
"""
Boundary Detection Module for Agent-Aware Semantic Splitting
This module identifies semantic boundaries in agent execution logs
to enable intelligent chunking that preserves agent interaction integrity.
"""
import re
from enum import Enum
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from abc import ABC, abstractmethod
from .log_type_detector import LogType, LogTypeDetector
class BoundaryType(Enum):
"""Types of semantic boundaries in agent logs."""
CREW_START = "crew_start"
CREW_END = "crew_end"
TASK_START = "task_start"
TASK_END = "task_end"
AGENT_ASSIGNMENT = "agent_assignment"
TOOL_CYCLE_START = "tool_cycle_start"
TOOL_CYCLE_END = "tool_cycle_end"
THINKING_START = "thinking_start"
THINKING_END = "thinking_end"
FINAL_ANSWER = "final_answer"
HUMAN_FEEDBACK = "human_feedback"
JSON_OBJECT_START = "json_object_start"
JSON_OBJECT_END = "json_object_end"
TRACE_START = "trace_start"
TRACE_END = "trace_end"
OBSERVATION_START = "observation_start"
OBSERVATION_END = "observation_end"
SEMANTIC_BREAK = "semantic_break"
@dataclass
class AgentBoundary:
"""Represents a detected boundary in agent logs."""
position: int
boundary_type: BoundaryType
pattern_matched: str
confidence_score: float
context_before: str
context_after: str
metadata: Dict[str, any]
@dataclass
class BoundaryConfidence:
"""Confidence scoring for boundary detection."""
pattern_confidence: float = 0.0
semantic_confidence: float = 0.0
context_confidence: float = 0.0
combined_score: float = 0.0
def is_valid_boundary(self, threshold: float = 0.7) -> bool:
"""Check if boundary meets confidence threshold."""
return self.combined_score >= threshold
class BaseBoundaryDetector(ABC):
"""Abstract base class for boundary detectors."""
@abstractmethod
def detect_boundaries(self, content: str) -> List[AgentBoundary]:
"""Detect boundaries in the given content."""
pass
@abstractmethod
def get_priority(self) -> int:
"""Get the priority of this detector (lower = higher priority)."""
pass
class FrameworkSpecificDetector(BaseBoundaryDetector):
"""Detector for framework-specific patterns (CrewAI, Langfuse, etc.)."""
def __init__(self):
self.crewai_patterns = {
BoundaryType.CREW_START: [
r'โ•ญ.*Crew Execution Started.*โ•ฎ',
r'๐Ÿš€ Crew: .*'
],
BoundaryType.CREW_END: [
r'โ•ญ.*Crew Completion.*โ•ฎ',
r'Crew Execution Completed'
],
BoundaryType.TASK_START: [
r'โ””โ”€โ”€ ๐Ÿ“‹ Task: [a-f0-9-]+',
r'Status: Executing Task\.\.\.'
],
BoundaryType.TASK_END: [
r'Status: โœ… Completed',
r'โ•ญ.*Task Completion.*โ•ฎ'
],
BoundaryType.AGENT_ASSIGNMENT: [
r'# Agent: .*',
r'โ””โ”€โ”€ ๐Ÿค– Agent: .*'
],
BoundaryType.TOOL_CYCLE_START: [
r'## Using tool: .*',
r'โ””โ”€โ”€ ๐Ÿ”ง Using .*'
],
BoundaryType.TOOL_CYCLE_END: [
r'## Tool Output:',
r'โ””โ”€โ”€ ๐Ÿ”ง Used .*'
],
BoundaryType.THINKING_START: [
r'โ””โ”€โ”€ ๐Ÿง  Thinking\.\.\.',
r'## Thinking\.\.\.'
],
BoundaryType.FINAL_ANSWER: [
r'## Final Answer:',
r'## Final Result:'
],
BoundaryType.HUMAN_FEEDBACK: [
r'## HUMAN FEEDBACK:',
r'=====\n## HUMAN FEEDBACK:'
]
}
self.langfuse_patterns = {
BoundaryType.TRACE_START: [
r'"data": \{\s*"id": "[a-f0-9-]+"',
r'"trace_id": "[a-f0-9-]+"'
],
BoundaryType.OBSERVATION_START: [
r'"observations": \[',
r'"type": "(SPAN|GENERATION)"'
],
BoundaryType.JSON_OBJECT_START: [
r'^\s*\{',
r'^\s*\['
],
BoundaryType.JSON_OBJECT_END: [
r'\}\s*$',
r'\]\s*$'
]
}
def detect_boundaries(self, content: str) -> List[AgentBoundary]:
"""Detect framework-specific boundaries."""
boundaries = []
# Detect CrewAI boundaries
boundaries.extend(self._detect_pattern_boundaries(
content, self.crewai_patterns, "CrewAI"
))
# Detect Langfuse boundaries
boundaries.extend(self._detect_pattern_boundaries(
content, self.langfuse_patterns, "Langfuse"
))
return sorted(boundaries, key=lambda b: b.position)
def _detect_pattern_boundaries(self, content: str, patterns: Dict, framework: str) -> List[AgentBoundary]:
"""Detect boundaries using pattern matching."""
boundaries = []
for boundary_type, pattern_list in patterns.items():
for pattern in pattern_list:
for match in re.finditer(pattern, content, re.MULTILINE):
start_pos = match.start()
# Get context around the boundary
context_size = 100
context_start = max(0, start_pos - context_size)
context_end = min(len(content), start_pos + len(match.group()) + context_size)
context_before = content[context_start:start_pos]
context_after = content[start_pos + len(match.group()):context_end]
# Calculate confidence based on pattern specificity
confidence = self._calculate_pattern_confidence(pattern, match.group())
boundary = AgentBoundary(
position=start_pos,
boundary_type=boundary_type,
pattern_matched=pattern,
confidence_score=confidence,
context_before=context_before,
context_after=context_after,
metadata={
"framework": framework,
"matched_text": match.group(),
"pattern_type": "regex"
}
)
boundaries.append(boundary)
return boundaries
def _calculate_pattern_confidence(self, pattern: str, matched_text: str) -> float:
"""Calculate confidence score for pattern match."""
# Base confidence from pattern specificity
base_confidence = 0.7
# Increase confidence for longer, more specific patterns
if len(pattern) > 30:
base_confidence += 0.1
if len(pattern) > 50:
base_confidence += 0.1
# Increase confidence for exact character matches (emojis, special chars)
if re.search(r'[๐Ÿš€๐Ÿ“‹๐Ÿค–๐Ÿ”ง๐Ÿง โœ…โ•ญโ•ฎ]', pattern):
base_confidence += 0.15
# Increase confidence for UUID patterns
if re.search(r'[a-f0-9-]{36}', matched_text):
base_confidence += 0.1
return min(base_confidence, 1.0)
def get_priority(self) -> int:
"""Framework-specific has highest priority."""
return 1
class GenericAgentPatternDetector(BaseBoundaryDetector):
"""Detector for generic agent patterns across frameworks."""
def __init__(self):
self.generic_patterns = {
BoundaryType.AGENT_ASSIGNMENT: [
r'Agent: .*',
r'Role: .*',
r'Assistant: .*'
],
BoundaryType.TOOL_CYCLE_START: [
r'Tool: .*',
r'Action: .*',
r'Function: .*'
],
BoundaryType.TOOL_CYCLE_END: [
r'Result: .*',
r'Output: .*',
r'Response: .*'
],
BoundaryType.THINKING_START: [
r'Thought: .*',
r'Thinking: .*',
r'Reasoning: .*'
],
BoundaryType.FINAL_ANSWER: [
r'Answer: .*',
r'Conclusion: .*',
r'Final: .*'
]
}
def detect_boundaries(self, content: str) -> List[AgentBoundary]:
"""Detect generic agent pattern boundaries."""
return self._detect_pattern_boundaries(content, self.generic_patterns, "Generic")
def _detect_pattern_boundaries(self, content: str, patterns: Dict, framework: str) -> List[AgentBoundary]:
"""Detect boundaries using generic patterns."""
boundaries = []
for boundary_type, pattern_list in patterns.items():
for pattern in pattern_list:
for match in re.finditer(pattern, content, re.MULTILINE):
start_pos = match.start()
context_size = 50
context_start = max(0, start_pos - context_size)
context_end = min(len(content), start_pos + len(match.group()) + context_size)
context_before = content[context_start:start_pos]
context_after = content[start_pos + len(match.group()):context_end]
confidence = 0.6 # Lower confidence for generic patterns
boundary = AgentBoundary(
position=start_pos,
boundary_type=boundary_type,
pattern_matched=pattern,
confidence_score=confidence,
context_before=context_before,
context_after=context_after,
metadata={
"framework": framework,
"matched_text": match.group(),
"pattern_type": "generic"
}
)
boundaries.append(boundary)
return boundaries
def get_priority(self) -> int:
"""Generic patterns have medium priority."""
return 2
class StructuralDetector(BaseBoundaryDetector):
"""Detector for structural boundaries (JSON, sections, etc.)."""
def detect_boundaries(self, content: str) -> List[AgentBoundary]:
"""Detect structural boundaries."""
boundaries = []
# Detect JSON object boundaries
boundaries.extend(self._detect_json_boundaries(content))
# Detect section headers
boundaries.extend(self._detect_section_boundaries(content))
return sorted(boundaries, key=lambda b: b.position)
def _detect_json_boundaries(self, content: str) -> List[AgentBoundary]:
"""Detect JSON object start/end boundaries."""
boundaries = []
brace_stack = []
bracket_stack = []
for i, char in enumerate(content):
if char == '{':
brace_stack.append(i)
if len(brace_stack) == 1: # Start of top-level object
boundary = AgentBoundary(
position=i,
boundary_type=BoundaryType.JSON_OBJECT_START,
pattern_matched="{",
confidence_score=0.8,
context_before=content[max(0, i-20):i],
context_after=content[i+1:min(len(content), i+21)],
metadata={"structure_type": "json_object"}
)
boundaries.append(boundary)
elif char == '}':
if brace_stack:
brace_stack.pop()
if len(brace_stack) == 0: # End of top-level object
boundary = AgentBoundary(
position=i,
boundary_type=BoundaryType.JSON_OBJECT_END,
pattern_matched="}",
confidence_score=0.8,
context_before=content[max(0, i-20):i],
context_after=content[i+1:min(len(content), i+21)],
metadata={"structure_type": "json_object"}
)
boundaries.append(boundary)
return boundaries
def _detect_section_boundaries(self, content: str) -> List[AgentBoundary]:
"""Detect section header boundaries."""
boundaries = []
# Markdown-style headers
header_pattern = r'^#+\s+.*$'
for match in re.finditer(header_pattern, content, re.MULTILINE):
boundary = AgentBoundary(
position=match.start(),
boundary_type=BoundaryType.SEMANTIC_BREAK,
pattern_matched=header_pattern,
confidence_score=0.7,
context_before=content[max(0, match.start()-30):match.start()],
context_after=content[match.end():min(len(content), match.end()+30)],
metadata={"structure_type": "section_header", "header_text": match.group()}
)
boundaries.append(boundary)
return boundaries
def get_priority(self) -> int:
"""Structural detection has lower priority."""
return 3
class BoundaryDetector:
"""Main boundary detection coordinator."""
def __init__(self, log_type_detector: Optional[LogTypeDetector] = None):
"""Initialize with optional log type detector."""
self.log_type_detector = log_type_detector or LogTypeDetector()
# Initialize detectors in priority order
self.detectors = [
FrameworkSpecificDetector(),
GenericAgentPatternDetector(),
StructuralDetector()
]
# Sort by priority
self.detectors.sort(key=lambda d: d.get_priority())
def detect_boundaries(self, content: str, log_type: Optional[LogType] = None) -> List[AgentBoundary]:
"""
Detect all boundaries in content using multi-layer approach.
Args:
content: The content to analyze
log_type: Optional pre-detected log type
Returns:
List of detected boundaries sorted by position
"""
if not log_type:
detection_result = self.log_type_detector.detect_log_type(content)
log_type = detection_result.log_type
all_boundaries = []
# Run all detectors
for detector in self.detectors:
try:
boundaries = detector.detect_boundaries(content)
all_boundaries.extend(boundaries)
except Exception as e:
# Log error but continue with other detectors
print(f"Warning: Detector {detector.__class__.__name__} failed: {e}")
# Remove duplicate boundaries (same position, similar type)
deduplicated = self._deduplicate_boundaries(all_boundaries)
# Sort by position
return sorted(deduplicated, key=lambda b: b.position)
def _deduplicate_boundaries(self, boundaries: List[AgentBoundary]) -> List[AgentBoundary]:
"""Remove duplicate boundaries that are too close to each other."""
if not boundaries:
return []
# Sort by position first
sorted_boundaries = sorted(boundaries, key=lambda b: b.position)
deduplicated = [sorted_boundaries[0]]
for boundary in sorted_boundaries[1:]:
# Check if this boundary is too close to the last one
last_boundary = deduplicated[-1]
position_diff = boundary.position - last_boundary.position
# If boundaries are very close (within 10 characters), keep the higher confidence one
if position_diff < 10:
if boundary.confidence_score > last_boundary.confidence_score:
deduplicated[-1] = boundary
else:
deduplicated.append(boundary)
return deduplicated
def calculate_boundary_confidence(self, boundary: AgentBoundary, content: str) -> BoundaryConfidence:
"""
Calculate comprehensive confidence score for a boundary.
Args:
boundary: The boundary to analyze
content: The full content for context analysis
Returns:
BoundaryConfidence with detailed scoring
"""
confidence = BoundaryConfidence()
# Pattern confidence (from initial detection)
confidence.pattern_confidence = boundary.confidence_score
# Context confidence (analyze surrounding content)
confidence.context_confidence = self._analyze_context_confidence(boundary, content)
# Semantic confidence (placeholder for embedding-based analysis)
confidence.semantic_confidence = 0.7 # Will be enhanced with semantic analyzer
# Combined score (weighted average)
weights = {"pattern": 0.4, "context": 0.3, "semantic": 0.3}
confidence.combined_score = (
confidence.pattern_confidence * weights["pattern"] +
confidence.context_confidence * weights["context"] +
confidence.semantic_confidence * weights["semantic"]
)
return confidence
def _analyze_context_confidence(self, boundary: AgentBoundary, content: str) -> float:
"""Analyze context around boundary to determine confidence."""
# Check for consistent formatting around boundary
context_window = boundary.context_before + boundary.context_after
confidence = 0.5 # Base confidence
# Check for consistent indentation/formatting
if re.search(r'\n\s*\n', context_window): # Blank lines suggest section breaks
confidence += 0.2
# Check for consistent agent markers in context
agent_markers = len(re.findall(r'(Agent:|Task:|Tool:|๐Ÿš€|๐Ÿค–|๐Ÿ“‹|๐Ÿ”ง)', context_window))
if agent_markers > 0:
confidence += 0.1
# Check for timestamp patterns
if re.search(r'\d{2}:\d{2}:\d{2}', context_window):
confidence += 0.1
return min(confidence, 1.0)