Spaces:
Running
Running
| """ | |
| Text Splitter Module for Agent Monitoring | |
| This module provides different text splitting strategies for the sliding window monitor. | |
| Each splitter returns TextChunk objects with content and metadata. | |
| """ | |
| import json | |
| import logging | |
| from abc import ABC, abstractmethod | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from typing import Dict, Any, List, Tuple, Optional | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from langchain_community.document_loaders import JSONLoader | |
| # Import agent-aware components | |
| from ..content_analysis import LogType, LogTypeDetector, DetectionResult | |
| from ..content_analysis import BoundaryDetector, AgentBoundary, BoundaryType, BoundaryConfidence, SemanticAnalyzer, SemanticBreakpoint | |
| logger = logging.getLogger(__name__) | |
| class TextChunk: | |
| """ | |
| Container for a text chunk with its associated metadata. | |
| Attributes: | |
| content: The actual text content of the chunk | |
| metadata: Dictionary containing chunk metadata like position, size, etc. | |
| """ | |
| content: str | |
| metadata: Dict[str, Any] | |
| class BaseSplitter(ABC): | |
| """ | |
| Abstract base class for text splitters. | |
| All splitters must implement the split method that takes content | |
| and returns a list of TextChunk objects with appropriate metadata. | |
| """ | |
| def split(self, content: str) -> List[TextChunk]: | |
| """ | |
| Split the input content into chunks. | |
| Args: | |
| content: The text content to split | |
| Returns: | |
| List of TextChunk objects with content and metadata | |
| """ | |
| pass | |
| class CharacterSplitter(BaseSplitter): | |
| """ | |
| Character-based text splitter using LangChain's RecursiveCharacterTextSplitter. | |
| This is the default splitter that matches the current sliding window behavior. | |
| Uses aggressive default parameters optimized for large traces. | |
| """ | |
| def __init__(self, chunk_size: int = 300000, overlap_size: int = 6000): | |
| """ | |
| Initialize the character splitter. | |
| Args: | |
| chunk_size: Size of each chunk in characters (default: 600K - optimized for 1M token context) | |
| overlap_size: Overlap between consecutive chunks (default: 30K - 5% overlap) | |
| """ | |
| self.chunk_size = chunk_size | |
| self.overlap_size = overlap_size | |
| # Validate parameters | |
| if overlap_size >= chunk_size: | |
| raise ValueError(f"Overlap size ({overlap_size}) must be less than chunk size ({chunk_size})") | |
| # Create the LangChain text splitter | |
| self.text_splitter = RecursiveCharacterTextSplitter( | |
| chunk_size=self.chunk_size, | |
| chunk_overlap=self.overlap_size | |
| ) | |
| logger.info(f"CharacterSplitter initialized with chunk_size={chunk_size}, overlap_size={overlap_size}") | |
| def split(self, content: str) -> List[TextChunk]: | |
| """ | |
| Split content into overlapping character-based chunks. | |
| Args: | |
| content: The text content to split | |
| Returns: | |
| List of TextChunk objects with content and metadata | |
| """ | |
| logger.info(f"Splitting content into character-based chunks (chunk_size={self.chunk_size}, overlap={self.overlap_size})") | |
| # Use LangChain's text splitter | |
| text_chunks = self.text_splitter.split_text(content) | |
| # Convert to TextChunk objects with metadata | |
| chunks = [] | |
| current_search_pos = 0 | |
| for i, chunk_content in enumerate(text_chunks): | |
| # Find the actual position of this chunk in the original content | |
| # We search from current_search_pos to avoid finding the same content multiple times | |
| chunk_start = content.find(chunk_content, current_search_pos) | |
| if chunk_start == -1: | |
| # If we can't find the chunk content (which shouldn't happen with LangChain), | |
| # fall back to theoretical calculation | |
| logger.warning(f"Could not find chunk {i} content in original text, using theoretical position") | |
| if i == 0: | |
| chunk_start = 0 | |
| else: | |
| chunk_start = chunks[-1].metadata["window_info"]["window_end_char"] | |
| chunk_end = chunk_start + len(chunk_content) | |
| # Calculate overlap with previous chunk | |
| overlap_with_previous = 0 | |
| if i > 0: | |
| prev_end = chunks[-1].metadata["window_info"]["window_end_char"] | |
| if chunk_start < prev_end: | |
| overlap_with_previous = prev_end - chunk_start | |
| # Create metadata | |
| metadata = { | |
| "window_info": { | |
| "window_index": i, | |
| "window_total": len(text_chunks), | |
| "window_start_char": chunk_start, | |
| "window_end_char": chunk_end, | |
| "chunk_size": len(chunk_content), | |
| "window_size": self.chunk_size, | |
| "overlap_size": overlap_with_previous, | |
| "splitter_type": "character", | |
| "processed_at": datetime.now().isoformat(), | |
| "overlap_with_previous": overlap_with_previous > 0 and i > 0, | |
| "line_mapping_available": True # Indicate that line mapping can be created | |
| } | |
| } | |
| chunks.append(TextChunk(content=chunk_content, metadata=metadata)) | |
| # Update search position for next chunk (start from current chunk's beginning + some offset | |
| # to handle potential overlaps correctly) | |
| current_search_pos = max(chunk_start + 1, chunk_start + len(chunk_content) // 2) | |
| logger.info(f"Split content into {len(chunks)} character-based chunks") | |
| return chunks | |
| class JSONSplitter(BaseSplitter): | |
| """ | |
| JSON-based text splitter that treats JSON objects as logical chunks. | |
| This splitter attempts to preserve JSON structure while respecting | |
| maximum chunk size constraints. | |
| """ | |
| def __init__(self, schema: Dict[str, Any] = None, max_chunk_size: int = 300000): | |
| """ | |
| Initialize the JSON splitter. | |
| Args: | |
| schema: Optional JSON schema to guide splitting | |
| max_chunk_size: Maximum size for each chunk in characters | |
| """ | |
| self.schema = schema | |
| self.max_chunk_size = max_chunk_size | |
| logger.info(f"JSONSplitter initialized with max_chunk_size={max_chunk_size}") | |
| def split(self, content: str) -> List[TextChunk]: | |
| """ | |
| Split content into JSON-based chunks. | |
| Args: | |
| content: The JSON content to split | |
| Returns: | |
| List of TextChunk objects with content and metadata | |
| """ | |
| logger.info("Splitting content into JSON-based chunks") | |
| try: | |
| json_data = json.loads(content) | |
| except json.JSONDecodeError as e: | |
| logger.error(f"Invalid JSON content: {e}") | |
| # Create a single chunk with the raw content and mark it as invalid JSON | |
| chunk = self._create_chunk(content, 0, 1) | |
| chunk.metadata["window_info"]["splitter_type"] = "json_invalid" | |
| chunk.metadata["window_info"]["error"] = f"Invalid JSON: {str(e)}" | |
| chunk.metadata["window_info"]["handling"] = "raw_content_preserved" | |
| return [chunk] | |
| chunks = [] | |
| if isinstance(json_data, list): | |
| # Handle JSON arrays - each element becomes a chunk | |
| chunks = self._split_json_array(json_data) | |
| elif isinstance(json_data, dict): | |
| # Handle JSON objects - split based on size or schema | |
| chunks = self._split_json_object(json_data) | |
| else: | |
| # Handle primitive JSON values | |
| chunks = [self._create_chunk(json.dumps(json_data), 0, 1)] | |
| logger.info(f"Split content into {len(chunks)} JSON-based chunks") | |
| return chunks | |
| def _split_json_array(self, json_array: List[Any]) -> List[TextChunk]: | |
| """Split a JSON array into chunks.""" | |
| chunks = [] | |
| current_chunk = [] | |
| current_size = 2 # Start with array brackets [] | |
| for i, item in enumerate(json_array): | |
| item_json = json.dumps(item) | |
| item_size = len(item_json) | |
| # Check if adding this item would exceed the max chunk size | |
| if current_size + item_size + 1 > self.max_chunk_size and current_chunk: # +1 for comma | |
| # Create chunk from current items | |
| chunk_content = json.dumps(current_chunk) | |
| chunks.append(self._create_chunk(chunk_content, len(chunks), None)) | |
| # Start new chunk | |
| current_chunk = [item] | |
| current_size = 2 + item_size # [] + item | |
| else: | |
| current_chunk.append(item) | |
| current_size += item_size + (1 if current_chunk else 0) # +1 for comma if not first | |
| # Add remaining items as final chunk | |
| if current_chunk: | |
| chunk_content = json.dumps(current_chunk) | |
| chunks.append(self._create_chunk(chunk_content, len(chunks), None)) | |
| # Update total count in all chunks | |
| for chunk in chunks: | |
| chunk.metadata["window_info"]["window_total"] = len(chunks) | |
| return chunks | |
| def _split_json_object(self, json_object: Dict[str, Any]) -> List[TextChunk]: | |
| """Split a JSON object into chunks.""" | |
| # For now, treat the entire object as one chunk if it fits | |
| object_json = json.dumps(json_object) | |
| if len(object_json) <= self.max_chunk_size: | |
| return [self._create_chunk(object_json, 0, 1)] | |
| # If object is too large, split by top-level keys | |
| chunks = [] | |
| current_chunk = {} | |
| current_size = 2 # Start with object braces {} | |
| for key, value in json_object.items(): | |
| key_value_json = json.dumps({key: value}) | |
| key_value_size = len(key_value_json) - 2 # Subtract the {} from individual measurement | |
| # Check if adding this key-value pair would exceed max size | |
| if current_size + key_value_size + 1 > self.max_chunk_size and current_chunk: # +1 for comma | |
| # Create chunk from current key-value pairs | |
| chunk_content = json.dumps(current_chunk) | |
| chunks.append(self._create_chunk(chunk_content, len(chunks), None)) | |
| # Start new chunk | |
| current_chunk = {key: value} | |
| current_size = 2 + key_value_size # {} + key-value | |
| else: | |
| current_chunk[key] = value | |
| current_size += key_value_size + (1 if len(current_chunk) > 1 else 0) # +1 for comma if not first | |
| # Add remaining key-value pairs as final chunk | |
| if current_chunk: | |
| chunk_content = json.dumps(current_chunk) | |
| chunks.append(self._create_chunk(chunk_content, len(chunks), None)) | |
| # Update total count in all chunks | |
| for chunk in chunks: | |
| chunk.metadata["window_info"]["window_total"] = len(chunks) | |
| return chunks | |
| def _create_chunk(self, content: str, index: int, total: int) -> TextChunk: | |
| """Create a TextChunk with appropriate metadata.""" | |
| metadata = { | |
| "window_info": { | |
| "window_index": index, | |
| "window_total": total, | |
| "window_start_char": 0, # JSON chunks don't have meaningful character positions | |
| "window_end_char": len(content), | |
| "window_size": len(content), | |
| "overlap_size": 0, # JSON chunks typically don't overlap | |
| "splitter_type": "json", | |
| "processed_at": datetime.now().isoformat(), | |
| "schema_used": self.schema is not None | |
| } | |
| } | |
| if self.schema: | |
| metadata["window_info"]["schema"] = self.schema | |
| return TextChunk(content=content, metadata=metadata) | |
| class AgentAwareSemanticSplitter(BaseSplitter): | |
| """ | |
| Advanced splitter that uses agent-aware semantic analysis to create | |
| intelligent chunks that preserve agent interaction boundaries. | |
| This splitter combines: | |
| - Log type detection to identify the format | |
| - Boundary detection to find agent interaction points | |
| - Semantic analysis to identify topic shifts | |
| - Intelligent chunking that respects both size and semantic constraints | |
| """ | |
| def __init__(self, | |
| min_chunk_size: int = 100000, # 100K chars ≈ 25K tokens | |
| max_chunk_size: int = 300000, # 300K chars ≈ 75K tokens (token-safe) | |
| overlap_ratio: float = 0.02, # 2% overlap for efficiency | |
| confidence_threshold: float = 0.7, | |
| embedding_model: str = "text-embedding-3-small", | |
| preserve_agent_stages: bool = True, | |
| openai_api_key: Optional[str] = None): | |
| """ | |
| Initialize the agent-aware semantic splitter. | |
| Args: | |
| min_chunk_size: Minimum chunk size in characters (default: 200K ≈ 50K tokens) | |
| max_chunk_size: Maximum chunk size in characters (default: 800K ≈ 200K tokens) | |
| overlap_ratio: Ratio of overlap between chunks (default: 0.02 for cost efficiency) | |
| confidence_threshold: Minimum confidence for boundary detection | |
| embedding_model: Name of OpenAI embedding model for semantic analysis (default: text-embedding-3-small) | |
| preserve_agent_stages: Whether to preserve complete agent interaction stages | |
| openai_api_key: OpenAI API key (if not provided, will use OPENAI_API_KEY environment variable) | |
| Note: Optimized for 1M token context windows to minimize API costs | |
| """ | |
| self.min_chunk_size = min_chunk_size | |
| self.max_chunk_size = max_chunk_size | |
| self.overlap_ratio = overlap_ratio | |
| self.confidence_threshold = confidence_threshold | |
| self.preserve_agent_stages = preserve_agent_stages | |
| # Initialize component modules | |
| self.log_detector = LogTypeDetector() | |
| self.boundary_detector = BoundaryDetector(self.log_detector) | |
| try: | |
| self.semantic_analyzer = SemanticAnalyzer( | |
| model_name=embedding_model, | |
| similarity_threshold=0.5, | |
| api_key=openai_api_key | |
| ) | |
| except Exception as e: | |
| print(f"Warning: Failed to initialize semantic analyzer: {e}") | |
| self.semantic_analyzer = None | |
| # Statistics for monitoring | |
| self.stats = { | |
| "chunks_created": 0, | |
| "boundaries_detected": 0, | |
| "semantic_breaks_found": 0, | |
| "avg_chunk_size": 0, | |
| "stage_preservation_rate": 0.0 | |
| } | |
| def split(self, content: str) -> List[TextChunk]: | |
| """ | |
| Split content using agent-aware semantic analysis. | |
| Args: | |
| content: The content to split | |
| Returns: | |
| List of TextChunk objects with metadata | |
| """ | |
| if not content.strip(): | |
| return [] | |
| print(f"Starting agent-aware semantic splitting...") | |
| print(f"Content length: {len(content):,} characters") | |
| # Step 1: Detect log type | |
| detection_result = self.log_detector.detect_log_type(content) | |
| log_type = detection_result.log_type | |
| print(f"Detected log type: {log_type.value} (confidence: {detection_result.confidence:.2f})") | |
| # Step 2: Detect agent boundaries | |
| agent_boundaries = self.boundary_detector.detect_boundaries(content, log_type) | |
| print(f"Found {len(agent_boundaries)} agent boundaries") | |
| self.stats["boundaries_detected"] = len(agent_boundaries) | |
| # Step 3: Enhance boundaries with semantic analysis | |
| if self.semantic_analyzer: | |
| enhanced_boundaries = self._enhance_boundaries_with_semantics( | |
| agent_boundaries, content | |
| ) | |
| print(f"Enhanced to {len(enhanced_boundaries)} total boundaries") | |
| else: | |
| enhanced_boundaries = agent_boundaries | |
| print("Semantic analysis unavailable, using pattern-based boundaries only") | |
| # Step 4: Create chunks respecting boundaries and size constraints | |
| chunks = self._create_intelligent_chunks( | |
| content, enhanced_boundaries, log_type, detection_result.characteristics | |
| ) | |
| print(f"Created {len(chunks)} chunks") | |
| self.stats["chunks_created"] = len(chunks) | |
| self.stats["avg_chunk_size"] = sum(len(chunk.content) for chunk in chunks) / len(chunks) if chunks else 0 | |
| return chunks | |
| def _enhance_boundaries_with_semantics(self, | |
| agent_boundaries: List[AgentBoundary], | |
| content: str) -> List[AgentBoundary]: | |
| """ | |
| Enhance agent boundaries with semantic analysis. | |
| Args: | |
| agent_boundaries: List of detected agent boundaries | |
| content: Full content for analysis | |
| Returns: | |
| Enhanced list of boundaries including semantic breakpoints | |
| """ | |
| if not self.semantic_analyzer: | |
| return agent_boundaries | |
| # Analyze semantic structure | |
| semantic_analysis = self.semantic_analyzer.analyze_semantic_structure(content) | |
| semantic_breakpoints = semantic_analysis["breakpoints"] | |
| self.stats["semantic_breaks_found"] = len(semantic_breakpoints) | |
| # Convert semantic breakpoints to agent boundaries | |
| semantic_boundaries = [] | |
| for breakpoint in semantic_breakpoints: | |
| # Only add if not too close to existing agent boundaries | |
| is_near_agent_boundary = any( | |
| abs(breakpoint.position - ab.position) < 50 | |
| for ab in agent_boundaries | |
| ) | |
| if not is_near_agent_boundary and breakpoint.confidence > 0.6: | |
| semantic_boundary = AgentBoundary( | |
| position=breakpoint.position, | |
| boundary_type=BoundaryType.SEMANTIC_BREAK, | |
| pattern_matched="semantic_similarity_drop", | |
| confidence_score=breakpoint.confidence, | |
| context_before="", # Will be filled by boundary detector | |
| context_after="", # Will be filled by boundary detector | |
| metadata={ | |
| "type": "semantic", | |
| "similarity_drop": breakpoint.similarity_drop, | |
| "sentence_index": breakpoint.sentence_index | |
| } | |
| ) | |
| semantic_boundaries.append(semantic_boundary) | |
| # Combine and sort all boundaries | |
| all_boundaries = agent_boundaries + semantic_boundaries | |
| all_boundaries.sort(key=lambda b: b.position) | |
| # Remove boundaries that are too close to each other | |
| return self._deduplicate_boundaries(all_boundaries) | |
| def _deduplicate_boundaries(self, boundaries: List[AgentBoundary]) -> List[AgentBoundary]: | |
| """Remove boundaries that are too close to each other.""" | |
| if not boundaries: | |
| return [] | |
| deduplicated = [boundaries[0]] | |
| min_distance = 25 # Minimum distance between boundaries | |
| for boundary in boundaries[1:]: | |
| last_boundary = deduplicated[-1] | |
| if boundary.position - last_boundary.position >= min_distance: | |
| deduplicated.append(boundary) | |
| elif boundary.confidence_score > last_boundary.confidence_score: | |
| # Replace if new boundary has higher confidence | |
| deduplicated[-1] = boundary | |
| return deduplicated | |
| def _create_intelligent_chunks(self, | |
| content: str, | |
| boundaries: List[AgentBoundary], | |
| log_type: LogType, | |
| characteristics: Dict[str, any]) -> List[TextChunk]: | |
| """ | |
| Create chunks using intelligent boundary selection. | |
| Args: | |
| content: Content to chunk | |
| boundaries: List of detected boundaries | |
| log_type: Detected log type | |
| characteristics: Content characteristics | |
| Returns: | |
| List of TextChunk objects | |
| """ | |
| if not boundaries: | |
| # Fallback to simple size-based chunking | |
| return self._create_size_based_chunks(content, log_type, characteristics) | |
| chunks = [] | |
| current_position = 0 | |
| overlap_size = int(self.max_chunk_size * self.overlap_ratio) | |
| while current_position < len(content): | |
| # Find optimal chunk end position | |
| chunk_end, used_boundary = self._find_optimal_chunk_end( | |
| content, current_position, boundaries | |
| ) | |
| # Extract chunk content | |
| chunk_content = content[current_position:chunk_end] | |
| # Calculate next starting position with overlap | |
| if chunk_end < len(content): | |
| # Calculate overlap start position (going backward from chunk_end) | |
| desired_overlap_start = max(0, chunk_end - overlap_size) | |
| # Find a good sentence boundary for the overlap to avoid cutting mid-sentence | |
| next_start = self._find_overlap_boundary(content, desired_overlap_start, chunk_end) | |
| # FIX: Ensure next_start is not equal to chunk_end (which breaks the loop) | |
| # If _find_overlap_boundary returned chunk_end, use desired_overlap_start instead | |
| if next_start >= chunk_end: | |
| next_start = desired_overlap_start | |
| # Ensure we don't go backwards (next_start should be > current_position for meaningful progress) | |
| if next_start <= current_position: | |
| next_start = min(chunk_end - 1, current_position + max(1, len(chunk_content) // 2)) | |
| else: | |
| next_start = chunk_end | |
| # Create chunk metadata | |
| actual_overlap_size = chunk_end - next_start if chunk_end > next_start else 0 | |
| window_info = { | |
| "window_index": len(chunks), | |
| "window_start_char": current_position, | |
| "window_end_char": chunk_end, | |
| "chunk_size": len(chunk_content), | |
| "window_size": self.max_chunk_size, | |
| "overlap_size": actual_overlap_size, | |
| "splitter_type": "agent_semantic", | |
| "log_type": log_type.value, | |
| "boundary_used": used_boundary.boundary_type.value if used_boundary else "size_limit", | |
| "boundary_confidence": used_boundary.confidence_score if used_boundary else 0.0, | |
| "contains_agent_markers": characteristics.get("agent_markers", 0) > 0, | |
| "contains_tool_patterns": characteristics.get("tool_usage_patterns", 0) > 0, | |
| "overlap_with_previous": actual_overlap_size > 0 and len(chunks) > 0 | |
| } | |
| # Create TextChunk | |
| chunk = TextChunk( | |
| content=chunk_content, | |
| metadata={ | |
| "window_info": window_info, | |
| "creation_time": datetime.now().isoformat(), | |
| "quality_score": self._calculate_chunk_quality_score(chunk_content, used_boundary) | |
| } | |
| ) | |
| chunks.append(chunk) | |
| # Update position for next iteration | |
| current_position = next_start | |
| # Improved end-of-content handling to prevent tiny chunks | |
| # If we're very close to the end (less than 10% of min_chunk_size remaining), | |
| # or if we haven't made meaningful progress, break the loop | |
| remaining_content = len(content) - current_position | |
| min_meaningful_chunk = max(100, self.min_chunk_size // 10) # At least 100 chars or 10% of min_chunk_size | |
| if remaining_content <= min_meaningful_chunk or current_position >= chunk_end: | |
| # If there's still some content left but it's small, merge it with the last chunk | |
| if remaining_content > 0 and current_position < len(content): | |
| remaining_text = content[current_position:len(content)] | |
| if chunks: | |
| # Merge with the last chunk | |
| last_chunk = chunks[-1] | |
| last_chunk.content += remaining_text | |
| # Update the metadata | |
| last_chunk.metadata["window_info"]["window_end_char"] = len(content) | |
| last_chunk.metadata["window_info"]["chunk_size"] = len(last_chunk.content) | |
| last_chunk.metadata["window_info"]["merged_final_segment"] = True | |
| last_chunk.metadata["window_info"]["merged_segment_size"] = len(remaining_text) | |
| break | |
| return chunks | |
| def _find_optimal_chunk_end(self, | |
| content: str, | |
| start_pos: int, | |
| boundaries: List[AgentBoundary]) -> Tuple[int, Optional[AgentBoundary]]: | |
| """ | |
| Find the optimal end position for a chunk. | |
| Args: | |
| content: Full content | |
| start_pos: Starting position for chunk | |
| boundaries: Available boundaries | |
| Returns: | |
| Tuple of (end_position, boundary_used) | |
| """ | |
| max_end = min(len(content), start_pos + self.max_chunk_size) | |
| min_end = min(len(content), start_pos + self.min_chunk_size) | |
| # Find boundaries within the acceptable range | |
| candidate_boundaries = [ | |
| b for b in boundaries | |
| if min_end <= b.position <= max_end and b.position > start_pos | |
| ] | |
| if not candidate_boundaries: | |
| # No good boundaries, use max size | |
| return max_end, None | |
| # Choose best boundary based on confidence and position | |
| best_boundary = self._select_best_boundary( | |
| candidate_boundaries, start_pos, max_end | |
| ) | |
| return best_boundary.position, best_boundary | |
| def _select_best_boundary(self, | |
| boundaries: List[AgentBoundary], | |
| start_pos: int, | |
| max_end: int) -> AgentBoundary: | |
| """ | |
| Select the best boundary from candidates. | |
| Args: | |
| boundaries: Candidate boundaries | |
| start_pos: Chunk start position | |
| max_end: Maximum end position | |
| Returns: | |
| Best boundary to use | |
| """ | |
| def boundary_score(boundary: AgentBoundary) -> float: | |
| # Base score from confidence | |
| score = boundary.confidence_score | |
| # Prefer agent-specific boundaries over semantic ones | |
| if boundary.boundary_type in [ | |
| BoundaryType.TASK_END, BoundaryType.CREW_END, | |
| BoundaryType.FINAL_ANSWER, BoundaryType.TOOL_CYCLE_END | |
| ]: | |
| score += 0.3 | |
| elif boundary.boundary_type == BoundaryType.SEMANTIC_BREAK: | |
| score += 0.1 | |
| # Prefer boundaries closer to ideal size | |
| ideal_size = (self.min_chunk_size + self.max_chunk_size) // 2 | |
| chunk_size = boundary.position - start_pos | |
| size_score = 1.0 - abs(chunk_size - ideal_size) / ideal_size | |
| score += size_score * 0.2 | |
| return score | |
| return max(boundaries, key=boundary_score) | |
| def _find_overlap_boundary(self, content: str, start: int, end: int) -> int: | |
| """ | |
| Find a good boundary for overlap between chunks. | |
| Args: | |
| content: Full content | |
| start: Start position to search from | |
| end: End position to search to | |
| Returns: | |
| Position of the boundary | |
| """ | |
| # Look for sentence boundaries within the overlap region | |
| search_text = content[start:end] | |
| # Try to find sentence endings | |
| for delimiter in ['. ', '! ', '? ', '; ']: | |
| pos = search_text.rfind(delimiter) | |
| if pos != -1: | |
| return start + pos + len(delimiter) | |
| # Fall back to word boundary | |
| pos = search_text.rfind(' ') | |
| if pos != -1: | |
| return start + pos + 1 | |
| # If no good boundary found, use the end position | |
| return end | |
| def _create_size_based_chunks(self, | |
| content: str, | |
| log_type: LogType, | |
| characteristics: Dict[str, any]) -> List[TextChunk]: | |
| """Fallback to size-based chunking when no boundaries are found.""" | |
| chunks = [] | |
| current_pos = 0 | |
| overlap_size = int(self.max_chunk_size * self.overlap_ratio) | |
| while current_pos < len(content): | |
| end_pos = min(len(content), current_pos + self.max_chunk_size) | |
| chunk_content = content[current_pos:end_pos] | |
| # Calculate next starting position with overlap | |
| if end_pos < len(content): | |
| # Create overlap by going back from end_pos | |
| desired_overlap_start = max(0, end_pos - overlap_size) | |
| next_pos = self._find_overlap_boundary(content, desired_overlap_start, end_pos) | |
| # FIX: Ensure next_pos is not equal to end_pos (which breaks the loop) | |
| # If _find_overlap_boundary returned end_pos, use desired_overlap_start instead | |
| if next_pos >= end_pos: | |
| next_pos = desired_overlap_start | |
| # Ensure progress is made (avoid infinite loops) | |
| if next_pos <= current_pos: | |
| next_pos = min(end_pos - 1, current_pos + max(1, len(chunk_content) // 2)) | |
| else: | |
| next_pos = end_pos | |
| # Calculate actual overlap for metadata | |
| actual_overlap_size = end_pos - next_pos if end_pos > next_pos else 0 | |
| window_info = { | |
| "window_index": len(chunks), | |
| "window_start_char": current_pos, | |
| "window_end_char": end_pos, | |
| "chunk_size": len(chunk_content), | |
| "window_size": self.max_chunk_size, | |
| "overlap_size": actual_overlap_size, | |
| "splitter_type": "agent_semantic_fallback", | |
| "log_type": log_type.value, | |
| "boundary_used": "size_limit", | |
| "boundary_confidence": 0.0, | |
| "overlap_with_previous": actual_overlap_size > 0 and len(chunks) > 0 | |
| } | |
| chunk = TextChunk( | |
| content=chunk_content, | |
| metadata={ | |
| "window_info": window_info, | |
| "creation_time": datetime.now().isoformat(), | |
| "quality_score": 0.5 # Lower quality for size-based chunks | |
| } | |
| ) | |
| chunks.append(chunk) | |
| current_pos = next_pos | |
| # Improved end-of-content handling to prevent tiny chunks (same as intelligent chunks) | |
| remaining_content = len(content) - current_pos | |
| min_meaningful_chunk = max(100, self.min_chunk_size // 10) # At least 100 chars or 10% of min_chunk_size | |
| if remaining_content <= min_meaningful_chunk: | |
| # If there's still some content left but it's small, merge it with the last chunk | |
| if remaining_content > 0 and current_pos < len(content): | |
| remaining_text = content[current_pos:len(content)] | |
| if chunks: | |
| # Merge with the last chunk | |
| last_chunk = chunks[-1] | |
| last_chunk.content += remaining_text | |
| # Update the metadata | |
| last_chunk.metadata["window_info"]["window_end_char"] = len(content) | |
| last_chunk.metadata["window_info"]["chunk_size"] = len(last_chunk.content) | |
| last_chunk.metadata["window_info"]["merged_final_segment"] = True | |
| last_chunk.metadata["window_info"]["merged_segment_size"] = len(remaining_text) | |
| break | |
| return chunks | |
| def _calculate_chunk_quality_score(self, | |
| chunk_content: str, | |
| boundary_used: Optional[AgentBoundary]) -> float: | |
| """ | |
| Calculate a quality score for the chunk. | |
| Args: | |
| chunk_content: Content of the chunk | |
| boundary_used: Boundary that ended the chunk | |
| Returns: | |
| Quality score between 0 and 1 | |
| """ | |
| score = 0.5 # Base score | |
| # Bonus for using high-confidence boundaries | |
| if boundary_used and boundary_used.confidence_score > 0.8: | |
| score += 0.3 | |
| elif boundary_used and boundary_used.confidence_score > 0.6: | |
| score += 0.2 | |
| # Bonus for preserving agent stages | |
| if self._has_complete_agent_stages(chunk_content): | |
| score += 0.2 | |
| # Bonus for good size (not too small or too large) | |
| size_ratio = len(chunk_content) / self.max_chunk_size | |
| if 0.3 <= size_ratio <= 0.9: | |
| score += 0.1 | |
| return min(score, 1.0) | |
| def _has_complete_agent_stages(self, content: str) -> bool: | |
| """Check if chunk contains complete agent interaction stages.""" | |
| # Simple heuristic: look for start and end markers | |
| has_start = any(pattern in content for pattern in [ | |
| "Agent:", "Task:", "Crew:", "🚀", "📋", "🤖" | |
| ]) | |
| has_end = any(pattern in content for pattern in [ | |
| "Final Answer:", "Completed", "✅", "Final Result:" | |
| ]) | |
| return has_start and has_end | |
| def get_stats(self) -> Dict[str, any]: | |
| """Get splitting statistics.""" | |
| return self.stats.copy() | |
| class PromptInteractionSplitter(BaseSplitter): | |
| """ | |
| Splitter that treats every two prompt interactions as one chunk, | |
| with one prompt interaction overlap between consecutive chunks. | |
| This splitter is designed for log files or traces that contain multiple | |
| prompt-response interactions, where each interaction represents a complete | |
| conversation turn with an AI assistant. | |
| """ | |
| def __init__(self, | |
| interactions_per_chunk: int = 2, | |
| overlap_interactions: int = 1): | |
| """ | |
| Initialize the prompt interaction splitter. | |
| Args: | |
| interactions_per_chunk: Number of prompt interactions per chunk (default: 2) | |
| overlap_interactions: Number of interactions to overlap between chunks (default: 1) | |
| """ | |
| self.interactions_per_chunk = interactions_per_chunk | |
| self.overlap_interactions = overlap_interactions | |
| if self.overlap_interactions >= self.interactions_per_chunk: | |
| raise ValueError(f"Overlap interactions ({overlap_interactions}) must be less than interactions per chunk ({interactions_per_chunk})") | |
| logger.info(f"PromptInteractionSplitter initialized with {interactions_per_chunk} interactions per chunk, {overlap_interactions} overlap") | |
| def split(self, content: str) -> List[TextChunk]: | |
| """ | |
| Split content into chunks based on prompt interactions. | |
| Args: | |
| content: The content to split (can be a single JSON log, multiple JSON logs, or mixed content) | |
| Returns: | |
| List of TextChunk objects with content and metadata | |
| """ | |
| logger.info("Starting prompt interaction splitting") | |
| # Step 1: Identify individual prompt interactions | |
| interactions = self._identify_prompt_interactions(content) | |
| if len(interactions) == 0: | |
| logger.warning("No prompt interactions found in content") | |
| return [] | |
| logger.info(f"Found {len(interactions)} prompt interactions") | |
| # Step 2: Create chunks with specified grouping and overlap | |
| chunks = self._create_interaction_chunks(interactions) | |
| logger.info(f"Created {len(chunks)} chunks from {len(interactions)} interactions") | |
| return chunks | |
| def _identify_prompt_interactions(self, content: str) -> List[Dict[str, Any]]: | |
| """ | |
| Identify individual prompt interactions in the content. | |
| This method handles different content formats: | |
| 1. Single JSON object (one interaction) | |
| 2. Multiple JSON objects separated by newlines | |
| 3. Mixed content with JSON objects embedded | |
| Args: | |
| content: Raw content to analyze | |
| Returns: | |
| List of interaction dictionaries with metadata | |
| """ | |
| interactions = [] | |
| # Try to parse as single JSON first | |
| try: | |
| json_data = json.loads(content.strip()) | |
| if self._is_prompt_interaction(json_data): | |
| interaction = { | |
| "data": json_data, | |
| "start_pos": 0, | |
| "end_pos": len(content), | |
| "raw_content": content | |
| } | |
| interactions.append(interaction) | |
| return interactions | |
| except json.JSONDecodeError: | |
| pass | |
| # Try to find multiple JSON objects | |
| interactions.extend(self._find_json_interactions(content)) | |
| # If no JSON interactions found, try to identify text-based interactions | |
| if not interactions: | |
| interactions.extend(self._find_text_interactions(content)) | |
| return interactions | |
| def _is_prompt_interaction(self, data: Dict[str, Any]) -> bool: | |
| """ | |
| Check if a JSON object represents a prompt interaction. | |
| Args: | |
| data: JSON object to check | |
| Returns: | |
| True if it looks like a prompt interaction | |
| """ | |
| # Check for common prompt interaction fields | |
| prompt_indicators = [ | |
| "messages", "prompt", "user", "assistant", "content", | |
| "prompt_tokens", "completion_tokens", "model_name", | |
| "generated_query", "ai_message" | |
| ] | |
| return any(key in data for key in prompt_indicators) | |
| def _find_json_interactions(self, content: str) -> List[Dict[str, Any]]: | |
| """ | |
| Find JSON objects in content that represent prompt interactions. | |
| Args: | |
| content: Content to search | |
| Returns: | |
| List of interaction dictionaries | |
| """ | |
| interactions = [] | |
| lines = content.split('\n') | |
| current_json = "" | |
| start_line = 0 | |
| start_pos = 0 | |
| current_pos = 0 | |
| for line_idx, line in enumerate(lines): | |
| line_start_pos = current_pos | |
| current_pos += len(line) + 1 # +1 for newline | |
| if line.strip().startswith('{'): | |
| # Start of potential JSON object | |
| if current_json: | |
| # We were already building a JSON, this might be a new one | |
| # Try to parse the current one first | |
| try: | |
| json_data = json.loads(current_json) | |
| if self._is_prompt_interaction(json_data): | |
| interaction = { | |
| "data": json_data, | |
| "start_pos": start_pos, | |
| "end_pos": line_start_pos, | |
| "raw_content": current_json | |
| } | |
| interactions.append(interaction) | |
| except json.JSONDecodeError: | |
| pass | |
| # Start new JSON | |
| current_json = line | |
| start_line = line_idx | |
| start_pos = line_start_pos | |
| elif current_json: | |
| # Continue building current JSON | |
| current_json += '\n' + line | |
| # Try to parse to see if it's complete | |
| try: | |
| json_data = json.loads(current_json) | |
| if self._is_prompt_interaction(json_data): | |
| interaction = { | |
| "data": json_data, | |
| "start_pos": start_pos, | |
| "end_pos": current_pos, | |
| "raw_content": current_json | |
| } | |
| interactions.append(interaction) | |
| current_json = "" | |
| except json.JSONDecodeError: | |
| # Not complete yet, continue | |
| continue | |
| # Handle any remaining JSON | |
| if current_json: | |
| try: | |
| json_data = json.loads(current_json) | |
| if self._is_prompt_interaction(json_data): | |
| interaction = { | |
| "data": json_data, | |
| "start_pos": start_pos, | |
| "end_pos": len(content), | |
| "raw_content": current_json | |
| } | |
| interactions.append(interaction) | |
| except json.JSONDecodeError: | |
| pass | |
| return interactions | |
| def _find_text_interactions(self, content: str) -> List[Dict[str, Any]]: | |
| """ | |
| Find text-based prompt interactions using patterns. | |
| This is a fallback method for non-JSON content that might contain | |
| conversational patterns. | |
| Args: | |
| content: Content to search | |
| Returns: | |
| List of interaction dictionaries | |
| """ | |
| interactions = [] | |
| # Look for common conversation patterns | |
| patterns = [ | |
| r'(?:User|Human|Question):\s*(.+?)(?=(?:Assistant|AI|Answer|Response):|$)', | |
| r'(?:Assistant|AI|Answer|Response):\s*(.+?)(?=(?:User|Human|Question):|$)', | |
| r'>>(.+?)(?=>>|$)', # Pattern from the log file | |
| ] | |
| import re | |
| for pattern in patterns: | |
| matches = re.finditer(pattern, content, re.DOTALL | re.IGNORECASE) | |
| for match in matches: | |
| interaction = { | |
| "data": {"content": match.group(1).strip()}, | |
| "start_pos": match.start(), | |
| "end_pos": match.end(), | |
| "raw_content": match.group(0) | |
| } | |
| interactions.append(interaction) | |
| # Sort by position | |
| interactions.sort(key=lambda x: x["start_pos"]) | |
| return interactions | |
| def _create_interaction_chunks(self, interactions: List[Dict[str, Any]]) -> List[TextChunk]: | |
| """ | |
| Create chunks from interactions with specified grouping and overlap. | |
| Args: | |
| interactions: List of identified interactions | |
| Returns: | |
| List of TextChunk objects | |
| """ | |
| chunks = [] | |
| if len(interactions) == 0: | |
| return chunks | |
| # Calculate step size (how many interactions to advance for each chunk) | |
| step_size = self.interactions_per_chunk - self.overlap_interactions | |
| i = 0 | |
| while i < len(interactions): | |
| # Determine end index for this chunk | |
| end_idx = min(i + self.interactions_per_chunk, len(interactions)) | |
| # Get interactions for this chunk | |
| chunk_interactions = interactions[i:end_idx] | |
| # Combine the interactions into chunk content | |
| chunk_content = self._combine_interactions(chunk_interactions) | |
| # Calculate positions | |
| start_pos = chunk_interactions[0]["start_pos"] | |
| end_pos = chunk_interactions[-1]["end_pos"] | |
| # Calculate overlap information | |
| overlap_interactions_count = 0 | |
| if i > 0: | |
| # Check how many interactions overlap with previous chunk | |
| prev_chunk_start = max(0, i - step_size) | |
| overlap_interactions_count = max(0, min(self.overlap_interactions, i - prev_chunk_start)) | |
| # Create metadata | |
| metadata = { | |
| "window_info": { | |
| "window_index": len(chunks), | |
| "window_total": None, # Will be updated after all chunks are created | |
| "window_start_char": start_pos, | |
| "window_end_char": end_pos, | |
| "chunk_size": len(chunk_content), | |
| "interactions_count": len(chunk_interactions), | |
| "interactions_per_chunk": self.interactions_per_chunk, | |
| "overlap_interactions": overlap_interactions_count, | |
| "splitter_type": "prompt_interaction", | |
| "processed_at": datetime.now().isoformat(), | |
| "overlap_with_previous": overlap_interactions_count > 0, | |
| "interaction_indices": list(range(i, end_idx)) | |
| } | |
| } | |
| # Create TextChunk | |
| chunk = TextChunk( | |
| content=chunk_content, | |
| metadata=metadata | |
| ) | |
| chunks.append(chunk) | |
| # Move to next chunk position | |
| i += step_size | |
| # Break if we've processed all interactions | |
| if end_idx >= len(interactions): | |
| break | |
| # Update total count in all chunks | |
| for chunk in chunks: | |
| chunk.metadata["window_info"]["window_total"] = len(chunks) | |
| return chunks | |
| def _combine_interactions(self, interactions: List[Dict[str, Any]]) -> str: | |
| """ | |
| Combine multiple interactions into a single chunk content. | |
| Args: | |
| interactions: List of interactions to combine | |
| Returns: | |
| Combined content string | |
| """ | |
| combined_parts = [] | |
| for i, interaction in enumerate(interactions): | |
| # Add separator between interactions (except for first) | |
| if i > 0: | |
| combined_parts.append("\n" + "="*50 + f" INTERACTION {i+1} " + "="*50 + "\n") | |
| # Add the interaction content | |
| if "raw_content" in interaction: | |
| combined_parts.append(interaction["raw_content"]) | |
| else: | |
| # Fallback to JSON representation | |
| combined_parts.append(json.dumps(interaction["data"], indent=2)) | |
| return "\n".join(combined_parts) | |
| # Dictionary of available splitters for easy access | |
| AVAILABLE_SPLITTERS = { | |
| "agent_semantic": AgentAwareSemanticSplitter, # Default intelligent splitter | |
| "json": JSONSplitter, | |
| "prompt_interaction": PromptInteractionSplitter # New prompt interaction splitter | |
| } |