""" Text Splitter Module for Agent Monitoring This module provides different text splitting strategies for the sliding window monitor. Each splitter returns TextChunk objects with content and metadata. """ import json import logging from abc import ABC, abstractmethod from dataclasses import dataclass from datetime import datetime from typing import Dict, Any, List, Tuple, Optional from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_community.document_loaders import JSONLoader # Import agent-aware components from ..content_analysis import LogType, LogTypeDetector, DetectionResult from ..content_analysis import BoundaryDetector, AgentBoundary, BoundaryType, BoundaryConfidence, SemanticAnalyzer, SemanticBreakpoint logger = logging.getLogger(__name__) @dataclass class TextChunk: """ Container for a text chunk with its associated metadata. Attributes: content: The actual text content of the chunk metadata: Dictionary containing chunk metadata like position, size, etc. """ content: str metadata: Dict[str, Any] class BaseSplitter(ABC): """ Abstract base class for text splitters. All splitters must implement the split method that takes content and returns a list of TextChunk objects with appropriate metadata. """ @abstractmethod def split(self, content: str) -> List[TextChunk]: """ Split the input content into chunks. Args: content: The text content to split Returns: List of TextChunk objects with content and metadata """ pass class CharacterSplitter(BaseSplitter): """ Character-based text splitter using LangChain's RecursiveCharacterTextSplitter. This is the default splitter that matches the current sliding window behavior. Uses aggressive default parameters optimized for large traces. """ def __init__(self, chunk_size: int = 300000, overlap_size: int = 6000): """ Initialize the character splitter. Args: chunk_size: Size of each chunk in characters (default: 600K - optimized for 1M token context) overlap_size: Overlap between consecutive chunks (default: 30K - 5% overlap) """ self.chunk_size = chunk_size self.overlap_size = overlap_size # Validate parameters if overlap_size >= chunk_size: raise ValueError(f"Overlap size ({overlap_size}) must be less than chunk size ({chunk_size})") # Create the LangChain text splitter self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=self.chunk_size, chunk_overlap=self.overlap_size ) logger.info(f"CharacterSplitter initialized with chunk_size={chunk_size}, overlap_size={overlap_size}") def split(self, content: str) -> List[TextChunk]: """ Split content into overlapping character-based chunks. Args: content: The text content to split Returns: List of TextChunk objects with content and metadata """ logger.info(f"Splitting content into character-based chunks (chunk_size={self.chunk_size}, overlap={self.overlap_size})") # Use LangChain's text splitter text_chunks = self.text_splitter.split_text(content) # Convert to TextChunk objects with metadata chunks = [] current_search_pos = 0 for i, chunk_content in enumerate(text_chunks): # Find the actual position of this chunk in the original content # We search from current_search_pos to avoid finding the same content multiple times chunk_start = content.find(chunk_content, current_search_pos) if chunk_start == -1: # If we can't find the chunk content (which shouldn't happen with LangChain), # fall back to theoretical calculation logger.warning(f"Could not find chunk {i} content in original text, using theoretical position") if i == 0: chunk_start = 0 else: chunk_start = chunks[-1].metadata["window_info"]["window_end_char"] chunk_end = chunk_start + len(chunk_content) # Calculate overlap with previous chunk overlap_with_previous = 0 if i > 0: prev_end = chunks[-1].metadata["window_info"]["window_end_char"] if chunk_start < prev_end: overlap_with_previous = prev_end - chunk_start # Create metadata metadata = { "window_info": { "window_index": i, "window_total": len(text_chunks), "window_start_char": chunk_start, "window_end_char": chunk_end, "chunk_size": len(chunk_content), "window_size": self.chunk_size, "overlap_size": overlap_with_previous, "splitter_type": "character", "processed_at": datetime.now().isoformat(), "overlap_with_previous": overlap_with_previous > 0 and i > 0, "line_mapping_available": True # Indicate that line mapping can be created } } chunks.append(TextChunk(content=chunk_content, metadata=metadata)) # Update search position for next chunk (start from current chunk's beginning + some offset # to handle potential overlaps correctly) current_search_pos = max(chunk_start + 1, chunk_start + len(chunk_content) // 2) logger.info(f"Split content into {len(chunks)} character-based chunks") return chunks class JSONSplitter(BaseSplitter): """ JSON-based text splitter that treats JSON objects as logical chunks. This splitter attempts to preserve JSON structure while respecting maximum chunk size constraints. """ def __init__(self, schema: Dict[str, Any] = None, max_chunk_size: int = 300000): """ Initialize the JSON splitter. Args: schema: Optional JSON schema to guide splitting max_chunk_size: Maximum size for each chunk in characters """ self.schema = schema self.max_chunk_size = max_chunk_size logger.info(f"JSONSplitter initialized with max_chunk_size={max_chunk_size}") def split(self, content: str) -> List[TextChunk]: """ Split content into JSON-based chunks. Args: content: The JSON content to split Returns: List of TextChunk objects with content and metadata """ logger.info("Splitting content into JSON-based chunks") try: json_data = json.loads(content) except json.JSONDecodeError as e: logger.error(f"Invalid JSON content: {e}") # Create a single chunk with the raw content and mark it as invalid JSON chunk = self._create_chunk(content, 0, 1) chunk.metadata["window_info"]["splitter_type"] = "json_invalid" chunk.metadata["window_info"]["error"] = f"Invalid JSON: {str(e)}" chunk.metadata["window_info"]["handling"] = "raw_content_preserved" return [chunk] chunks = [] if isinstance(json_data, list): # Handle JSON arrays - each element becomes a chunk chunks = self._split_json_array(json_data) elif isinstance(json_data, dict): # Handle JSON objects - split based on size or schema chunks = self._split_json_object(json_data) else: # Handle primitive JSON values chunks = [self._create_chunk(json.dumps(json_data), 0, 1)] logger.info(f"Split content into {len(chunks)} JSON-based chunks") return chunks def _split_json_array(self, json_array: List[Any]) -> List[TextChunk]: """Split a JSON array into chunks.""" chunks = [] current_chunk = [] current_size = 2 # Start with array brackets [] for i, item in enumerate(json_array): item_json = json.dumps(item) item_size = len(item_json) # Check if adding this item would exceed the max chunk size if current_size + item_size + 1 > self.max_chunk_size and current_chunk: # +1 for comma # Create chunk from current items chunk_content = json.dumps(current_chunk) chunks.append(self._create_chunk(chunk_content, len(chunks), None)) # Start new chunk current_chunk = [item] current_size = 2 + item_size # [] + item else: current_chunk.append(item) current_size += item_size + (1 if current_chunk else 0) # +1 for comma if not first # Add remaining items as final chunk if current_chunk: chunk_content = json.dumps(current_chunk) chunks.append(self._create_chunk(chunk_content, len(chunks), None)) # Update total count in all chunks for chunk in chunks: chunk.metadata["window_info"]["window_total"] = len(chunks) return chunks def _split_json_object(self, json_object: Dict[str, Any]) -> List[TextChunk]: """Split a JSON object into chunks.""" # For now, treat the entire object as one chunk if it fits object_json = json.dumps(json_object) if len(object_json) <= self.max_chunk_size: return [self._create_chunk(object_json, 0, 1)] # If object is too large, split by top-level keys chunks = [] current_chunk = {} current_size = 2 # Start with object braces {} for key, value in json_object.items(): key_value_json = json.dumps({key: value}) key_value_size = len(key_value_json) - 2 # Subtract the {} from individual measurement # Check if adding this key-value pair would exceed max size if current_size + key_value_size + 1 > self.max_chunk_size and current_chunk: # +1 for comma # Create chunk from current key-value pairs chunk_content = json.dumps(current_chunk) chunks.append(self._create_chunk(chunk_content, len(chunks), None)) # Start new chunk current_chunk = {key: value} current_size = 2 + key_value_size # {} + key-value else: current_chunk[key] = value current_size += key_value_size + (1 if len(current_chunk) > 1 else 0) # +1 for comma if not first # Add remaining key-value pairs as final chunk if current_chunk: chunk_content = json.dumps(current_chunk) chunks.append(self._create_chunk(chunk_content, len(chunks), None)) # Update total count in all chunks for chunk in chunks: chunk.metadata["window_info"]["window_total"] = len(chunks) return chunks def _create_chunk(self, content: str, index: int, total: int) -> TextChunk: """Create a TextChunk with appropriate metadata.""" metadata = { "window_info": { "window_index": index, "window_total": total, "window_start_char": 0, # JSON chunks don't have meaningful character positions "window_end_char": len(content), "window_size": len(content), "overlap_size": 0, # JSON chunks typically don't overlap "splitter_type": "json", "processed_at": datetime.now().isoformat(), "schema_used": self.schema is not None } } if self.schema: metadata["window_info"]["schema"] = self.schema return TextChunk(content=content, metadata=metadata) class AgentAwareSemanticSplitter(BaseSplitter): """ Advanced splitter that uses agent-aware semantic analysis to create intelligent chunks that preserve agent interaction boundaries. This splitter combines: - Log type detection to identify the format - Boundary detection to find agent interaction points - Semantic analysis to identify topic shifts - Intelligent chunking that respects both size and semantic constraints """ def __init__(self, min_chunk_size: int = 100000, # 100K chars ≈ 25K tokens max_chunk_size: int = 300000, # 300K chars ≈ 75K tokens (token-safe) overlap_ratio: float = 0.02, # 2% overlap for efficiency confidence_threshold: float = 0.7, embedding_model: str = "text-embedding-3-small", preserve_agent_stages: bool = True, openai_api_key: Optional[str] = None): """ Initialize the agent-aware semantic splitter. Args: min_chunk_size: Minimum chunk size in characters (default: 200K ≈ 50K tokens) max_chunk_size: Maximum chunk size in characters (default: 800K ≈ 200K tokens) overlap_ratio: Ratio of overlap between chunks (default: 0.02 for cost efficiency) confidence_threshold: Minimum confidence for boundary detection embedding_model: Name of OpenAI embedding model for semantic analysis (default: text-embedding-3-small) preserve_agent_stages: Whether to preserve complete agent interaction stages openai_api_key: OpenAI API key (if not provided, will use OPENAI_API_KEY environment variable) Note: Optimized for 1M token context windows to minimize API costs """ self.min_chunk_size = min_chunk_size self.max_chunk_size = max_chunk_size self.overlap_ratio = overlap_ratio self.confidence_threshold = confidence_threshold self.preserve_agent_stages = preserve_agent_stages # Initialize component modules self.log_detector = LogTypeDetector() self.boundary_detector = BoundaryDetector(self.log_detector) try: self.semantic_analyzer = SemanticAnalyzer( model_name=embedding_model, similarity_threshold=0.5, api_key=openai_api_key ) except Exception as e: print(f"Warning: Failed to initialize semantic analyzer: {e}") self.semantic_analyzer = None # Statistics for monitoring self.stats = { "chunks_created": 0, "boundaries_detected": 0, "semantic_breaks_found": 0, "avg_chunk_size": 0, "stage_preservation_rate": 0.0 } def split(self, content: str) -> List[TextChunk]: """ Split content using agent-aware semantic analysis. Args: content: The content to split Returns: List of TextChunk objects with metadata """ if not content.strip(): return [] print(f"Starting agent-aware semantic splitting...") print(f"Content length: {len(content):,} characters") # Step 1: Detect log type detection_result = self.log_detector.detect_log_type(content) log_type = detection_result.log_type print(f"Detected log type: {log_type.value} (confidence: {detection_result.confidence:.2f})") # Step 2: Detect agent boundaries agent_boundaries = self.boundary_detector.detect_boundaries(content, log_type) print(f"Found {len(agent_boundaries)} agent boundaries") self.stats["boundaries_detected"] = len(agent_boundaries) # Step 3: Enhance boundaries with semantic analysis if self.semantic_analyzer: enhanced_boundaries = self._enhance_boundaries_with_semantics( agent_boundaries, content ) print(f"Enhanced to {len(enhanced_boundaries)} total boundaries") else: enhanced_boundaries = agent_boundaries print("Semantic analysis unavailable, using pattern-based boundaries only") # Step 4: Create chunks respecting boundaries and size constraints chunks = self._create_intelligent_chunks( content, enhanced_boundaries, log_type, detection_result.characteristics ) print(f"Created {len(chunks)} chunks") self.stats["chunks_created"] = len(chunks) self.stats["avg_chunk_size"] = sum(len(chunk.content) for chunk in chunks) / len(chunks) if chunks else 0 return chunks def _enhance_boundaries_with_semantics(self, agent_boundaries: List[AgentBoundary], content: str) -> List[AgentBoundary]: """ Enhance agent boundaries with semantic analysis. Args: agent_boundaries: List of detected agent boundaries content: Full content for analysis Returns: Enhanced list of boundaries including semantic breakpoints """ if not self.semantic_analyzer: return agent_boundaries # Analyze semantic structure semantic_analysis = self.semantic_analyzer.analyze_semantic_structure(content) semantic_breakpoints = semantic_analysis["breakpoints"] self.stats["semantic_breaks_found"] = len(semantic_breakpoints) # Convert semantic breakpoints to agent boundaries semantic_boundaries = [] for breakpoint in semantic_breakpoints: # Only add if not too close to existing agent boundaries is_near_agent_boundary = any( abs(breakpoint.position - ab.position) < 50 for ab in agent_boundaries ) if not is_near_agent_boundary and breakpoint.confidence > 0.6: semantic_boundary = AgentBoundary( position=breakpoint.position, boundary_type=BoundaryType.SEMANTIC_BREAK, pattern_matched="semantic_similarity_drop", confidence_score=breakpoint.confidence, context_before="", # Will be filled by boundary detector context_after="", # Will be filled by boundary detector metadata={ "type": "semantic", "similarity_drop": breakpoint.similarity_drop, "sentence_index": breakpoint.sentence_index } ) semantic_boundaries.append(semantic_boundary) # Combine and sort all boundaries all_boundaries = agent_boundaries + semantic_boundaries all_boundaries.sort(key=lambda b: b.position) # Remove boundaries that are too close to each other return self._deduplicate_boundaries(all_boundaries) def _deduplicate_boundaries(self, boundaries: List[AgentBoundary]) -> List[AgentBoundary]: """Remove boundaries that are too close to each other.""" if not boundaries: return [] deduplicated = [boundaries[0]] min_distance = 25 # Minimum distance between boundaries for boundary in boundaries[1:]: last_boundary = deduplicated[-1] if boundary.position - last_boundary.position >= min_distance: deduplicated.append(boundary) elif boundary.confidence_score > last_boundary.confidence_score: # Replace if new boundary has higher confidence deduplicated[-1] = boundary return deduplicated def _create_intelligent_chunks(self, content: str, boundaries: List[AgentBoundary], log_type: LogType, characteristics: Dict[str, any]) -> List[TextChunk]: """ Create chunks using intelligent boundary selection. Args: content: Content to chunk boundaries: List of detected boundaries log_type: Detected log type characteristics: Content characteristics Returns: List of TextChunk objects """ if not boundaries: # Fallback to simple size-based chunking return self._create_size_based_chunks(content, log_type, characteristics) chunks = [] current_position = 0 overlap_size = int(self.max_chunk_size * self.overlap_ratio) while current_position < len(content): # Find optimal chunk end position chunk_end, used_boundary = self._find_optimal_chunk_end( content, current_position, boundaries ) # Extract chunk content chunk_content = content[current_position:chunk_end] # Calculate next starting position with overlap if chunk_end < len(content): # Calculate overlap start position (going backward from chunk_end) desired_overlap_start = max(0, chunk_end - overlap_size) # Find a good sentence boundary for the overlap to avoid cutting mid-sentence next_start = self._find_overlap_boundary(content, desired_overlap_start, chunk_end) # FIX: Ensure next_start is not equal to chunk_end (which breaks the loop) # If _find_overlap_boundary returned chunk_end, use desired_overlap_start instead if next_start >= chunk_end: next_start = desired_overlap_start # Ensure we don't go backwards (next_start should be > current_position for meaningful progress) if next_start <= current_position: next_start = min(chunk_end - 1, current_position + max(1, len(chunk_content) // 2)) else: next_start = chunk_end # Create chunk metadata actual_overlap_size = chunk_end - next_start if chunk_end > next_start else 0 window_info = { "window_index": len(chunks), "window_start_char": current_position, "window_end_char": chunk_end, "chunk_size": len(chunk_content), "window_size": self.max_chunk_size, "overlap_size": actual_overlap_size, "splitter_type": "agent_semantic", "log_type": log_type.value, "boundary_used": used_boundary.boundary_type.value if used_boundary else "size_limit", "boundary_confidence": used_boundary.confidence_score if used_boundary else 0.0, "contains_agent_markers": characteristics.get("agent_markers", 0) > 0, "contains_tool_patterns": characteristics.get("tool_usage_patterns", 0) > 0, "overlap_with_previous": actual_overlap_size > 0 and len(chunks) > 0 } # Create TextChunk chunk = TextChunk( content=chunk_content, metadata={ "window_info": window_info, "creation_time": datetime.now().isoformat(), "quality_score": self._calculate_chunk_quality_score(chunk_content, used_boundary) } ) chunks.append(chunk) # Update position for next iteration current_position = next_start # Improved end-of-content handling to prevent tiny chunks # If we're very close to the end (less than 10% of min_chunk_size remaining), # or if we haven't made meaningful progress, break the loop remaining_content = len(content) - current_position min_meaningful_chunk = max(100, self.min_chunk_size // 10) # At least 100 chars or 10% of min_chunk_size if remaining_content <= min_meaningful_chunk or current_position >= chunk_end: # If there's still some content left but it's small, merge it with the last chunk if remaining_content > 0 and current_position < len(content): remaining_text = content[current_position:len(content)] if chunks: # Merge with the last chunk last_chunk = chunks[-1] last_chunk.content += remaining_text # Update the metadata last_chunk.metadata["window_info"]["window_end_char"] = len(content) last_chunk.metadata["window_info"]["chunk_size"] = len(last_chunk.content) last_chunk.metadata["window_info"]["merged_final_segment"] = True last_chunk.metadata["window_info"]["merged_segment_size"] = len(remaining_text) break return chunks def _find_optimal_chunk_end(self, content: str, start_pos: int, boundaries: List[AgentBoundary]) -> Tuple[int, Optional[AgentBoundary]]: """ Find the optimal end position for a chunk. Args: content: Full content start_pos: Starting position for chunk boundaries: Available boundaries Returns: Tuple of (end_position, boundary_used) """ max_end = min(len(content), start_pos + self.max_chunk_size) min_end = min(len(content), start_pos + self.min_chunk_size) # Find boundaries within the acceptable range candidate_boundaries = [ b for b in boundaries if min_end <= b.position <= max_end and b.position > start_pos ] if not candidate_boundaries: # No good boundaries, use max size return max_end, None # Choose best boundary based on confidence and position best_boundary = self._select_best_boundary( candidate_boundaries, start_pos, max_end ) return best_boundary.position, best_boundary def _select_best_boundary(self, boundaries: List[AgentBoundary], start_pos: int, max_end: int) -> AgentBoundary: """ Select the best boundary from candidates. Args: boundaries: Candidate boundaries start_pos: Chunk start position max_end: Maximum end position Returns: Best boundary to use """ def boundary_score(boundary: AgentBoundary) -> float: # Base score from confidence score = boundary.confidence_score # Prefer agent-specific boundaries over semantic ones if boundary.boundary_type in [ BoundaryType.TASK_END, BoundaryType.CREW_END, BoundaryType.FINAL_ANSWER, BoundaryType.TOOL_CYCLE_END ]: score += 0.3 elif boundary.boundary_type == BoundaryType.SEMANTIC_BREAK: score += 0.1 # Prefer boundaries closer to ideal size ideal_size = (self.min_chunk_size + self.max_chunk_size) // 2 chunk_size = boundary.position - start_pos size_score = 1.0 - abs(chunk_size - ideal_size) / ideal_size score += size_score * 0.2 return score return max(boundaries, key=boundary_score) def _find_overlap_boundary(self, content: str, start: int, end: int) -> int: """ Find a good boundary for overlap between chunks. Args: content: Full content start: Start position to search from end: End position to search to Returns: Position of the boundary """ # Look for sentence boundaries within the overlap region search_text = content[start:end] # Try to find sentence endings for delimiter in ['. ', '! ', '? ', '; ']: pos = search_text.rfind(delimiter) if pos != -1: return start + pos + len(delimiter) # Fall back to word boundary pos = search_text.rfind(' ') if pos != -1: return start + pos + 1 # If no good boundary found, use the end position return end def _create_size_based_chunks(self, content: str, log_type: LogType, characteristics: Dict[str, any]) -> List[TextChunk]: """Fallback to size-based chunking when no boundaries are found.""" chunks = [] current_pos = 0 overlap_size = int(self.max_chunk_size * self.overlap_ratio) while current_pos < len(content): end_pos = min(len(content), current_pos + self.max_chunk_size) chunk_content = content[current_pos:end_pos] # Calculate next starting position with overlap if end_pos < len(content): # Create overlap by going back from end_pos desired_overlap_start = max(0, end_pos - overlap_size) next_pos = self._find_overlap_boundary(content, desired_overlap_start, end_pos) # FIX: Ensure next_pos is not equal to end_pos (which breaks the loop) # If _find_overlap_boundary returned end_pos, use desired_overlap_start instead if next_pos >= end_pos: next_pos = desired_overlap_start # Ensure progress is made (avoid infinite loops) if next_pos <= current_pos: next_pos = min(end_pos - 1, current_pos + max(1, len(chunk_content) // 2)) else: next_pos = end_pos # Calculate actual overlap for metadata actual_overlap_size = end_pos - next_pos if end_pos > next_pos else 0 window_info = { "window_index": len(chunks), "window_start_char": current_pos, "window_end_char": end_pos, "chunk_size": len(chunk_content), "window_size": self.max_chunk_size, "overlap_size": actual_overlap_size, "splitter_type": "agent_semantic_fallback", "log_type": log_type.value, "boundary_used": "size_limit", "boundary_confidence": 0.0, "overlap_with_previous": actual_overlap_size > 0 and len(chunks) > 0 } chunk = TextChunk( content=chunk_content, metadata={ "window_info": window_info, "creation_time": datetime.now().isoformat(), "quality_score": 0.5 # Lower quality for size-based chunks } ) chunks.append(chunk) current_pos = next_pos # Improved end-of-content handling to prevent tiny chunks (same as intelligent chunks) remaining_content = len(content) - current_pos min_meaningful_chunk = max(100, self.min_chunk_size // 10) # At least 100 chars or 10% of min_chunk_size if remaining_content <= min_meaningful_chunk: # If there's still some content left but it's small, merge it with the last chunk if remaining_content > 0 and current_pos < len(content): remaining_text = content[current_pos:len(content)] if chunks: # Merge with the last chunk last_chunk = chunks[-1] last_chunk.content += remaining_text # Update the metadata last_chunk.metadata["window_info"]["window_end_char"] = len(content) last_chunk.metadata["window_info"]["chunk_size"] = len(last_chunk.content) last_chunk.metadata["window_info"]["merged_final_segment"] = True last_chunk.metadata["window_info"]["merged_segment_size"] = len(remaining_text) break return chunks def _calculate_chunk_quality_score(self, chunk_content: str, boundary_used: Optional[AgentBoundary]) -> float: """ Calculate a quality score for the chunk. Args: chunk_content: Content of the chunk boundary_used: Boundary that ended the chunk Returns: Quality score between 0 and 1 """ score = 0.5 # Base score # Bonus for using high-confidence boundaries if boundary_used and boundary_used.confidence_score > 0.8: score += 0.3 elif boundary_used and boundary_used.confidence_score > 0.6: score += 0.2 # Bonus for preserving agent stages if self._has_complete_agent_stages(chunk_content): score += 0.2 # Bonus for good size (not too small or too large) size_ratio = len(chunk_content) / self.max_chunk_size if 0.3 <= size_ratio <= 0.9: score += 0.1 return min(score, 1.0) def _has_complete_agent_stages(self, content: str) -> bool: """Check if chunk contains complete agent interaction stages.""" # Simple heuristic: look for start and end markers has_start = any(pattern in content for pattern in [ "Agent:", "Task:", "Crew:", "🚀", "📋", "🤖" ]) has_end = any(pattern in content for pattern in [ "Final Answer:", "Completed", "✅", "Final Result:" ]) return has_start and has_end def get_stats(self) -> Dict[str, any]: """Get splitting statistics.""" return self.stats.copy() class PromptInteractionSplitter(BaseSplitter): """ Splitter that treats every two prompt interactions as one chunk, with one prompt interaction overlap between consecutive chunks. This splitter is designed for log files or traces that contain multiple prompt-response interactions, where each interaction represents a complete conversation turn with an AI assistant. """ def __init__(self, interactions_per_chunk: int = 2, overlap_interactions: int = 1): """ Initialize the prompt interaction splitter. Args: interactions_per_chunk: Number of prompt interactions per chunk (default: 2) overlap_interactions: Number of interactions to overlap between chunks (default: 1) """ self.interactions_per_chunk = interactions_per_chunk self.overlap_interactions = overlap_interactions if self.overlap_interactions >= self.interactions_per_chunk: raise ValueError(f"Overlap interactions ({overlap_interactions}) must be less than interactions per chunk ({interactions_per_chunk})") logger.info(f"PromptInteractionSplitter initialized with {interactions_per_chunk} interactions per chunk, {overlap_interactions} overlap") def split(self, content: str) -> List[TextChunk]: """ Split content into chunks based on prompt interactions. Args: content: The content to split (can be a single JSON log, multiple JSON logs, or mixed content) Returns: List of TextChunk objects with content and metadata """ logger.info("Starting prompt interaction splitting") # Step 1: Identify individual prompt interactions interactions = self._identify_prompt_interactions(content) if len(interactions) == 0: logger.warning("No prompt interactions found in content") return [] logger.info(f"Found {len(interactions)} prompt interactions") # Step 2: Create chunks with specified grouping and overlap chunks = self._create_interaction_chunks(interactions) logger.info(f"Created {len(chunks)} chunks from {len(interactions)} interactions") return chunks def _identify_prompt_interactions(self, content: str) -> List[Dict[str, Any]]: """ Identify individual prompt interactions in the content. This method handles different content formats: 1. Single JSON object (one interaction) 2. Multiple JSON objects separated by newlines 3. Mixed content with JSON objects embedded Args: content: Raw content to analyze Returns: List of interaction dictionaries with metadata """ interactions = [] # Try to parse as single JSON first try: json_data = json.loads(content.strip()) if self._is_prompt_interaction(json_data): interaction = { "data": json_data, "start_pos": 0, "end_pos": len(content), "raw_content": content } interactions.append(interaction) return interactions except json.JSONDecodeError: pass # Try to find multiple JSON objects interactions.extend(self._find_json_interactions(content)) # If no JSON interactions found, try to identify text-based interactions if not interactions: interactions.extend(self._find_text_interactions(content)) return interactions def _is_prompt_interaction(self, data: Dict[str, Any]) -> bool: """ Check if a JSON object represents a prompt interaction. Args: data: JSON object to check Returns: True if it looks like a prompt interaction """ # Check for common prompt interaction fields prompt_indicators = [ "messages", "prompt", "user", "assistant", "content", "prompt_tokens", "completion_tokens", "model_name", "generated_query", "ai_message" ] return any(key in data for key in prompt_indicators) def _find_json_interactions(self, content: str) -> List[Dict[str, Any]]: """ Find JSON objects in content that represent prompt interactions. Args: content: Content to search Returns: List of interaction dictionaries """ interactions = [] lines = content.split('\n') current_json = "" start_line = 0 start_pos = 0 current_pos = 0 for line_idx, line in enumerate(lines): line_start_pos = current_pos current_pos += len(line) + 1 # +1 for newline if line.strip().startswith('{'): # Start of potential JSON object if current_json: # We were already building a JSON, this might be a new one # Try to parse the current one first try: json_data = json.loads(current_json) if self._is_prompt_interaction(json_data): interaction = { "data": json_data, "start_pos": start_pos, "end_pos": line_start_pos, "raw_content": current_json } interactions.append(interaction) except json.JSONDecodeError: pass # Start new JSON current_json = line start_line = line_idx start_pos = line_start_pos elif current_json: # Continue building current JSON current_json += '\n' + line # Try to parse to see if it's complete try: json_data = json.loads(current_json) if self._is_prompt_interaction(json_data): interaction = { "data": json_data, "start_pos": start_pos, "end_pos": current_pos, "raw_content": current_json } interactions.append(interaction) current_json = "" except json.JSONDecodeError: # Not complete yet, continue continue # Handle any remaining JSON if current_json: try: json_data = json.loads(current_json) if self._is_prompt_interaction(json_data): interaction = { "data": json_data, "start_pos": start_pos, "end_pos": len(content), "raw_content": current_json } interactions.append(interaction) except json.JSONDecodeError: pass return interactions def _find_text_interactions(self, content: str) -> List[Dict[str, Any]]: """ Find text-based prompt interactions using patterns. This is a fallback method for non-JSON content that might contain conversational patterns. Args: content: Content to search Returns: List of interaction dictionaries """ interactions = [] # Look for common conversation patterns patterns = [ r'(?:User|Human|Question):\s*(.+?)(?=(?:Assistant|AI|Answer|Response):|$)', r'(?:Assistant|AI|Answer|Response):\s*(.+?)(?=(?:User|Human|Question):|$)', r'>>(.+?)(?=>>|$)', # Pattern from the log file ] import re for pattern in patterns: matches = re.finditer(pattern, content, re.DOTALL | re.IGNORECASE) for match in matches: interaction = { "data": {"content": match.group(1).strip()}, "start_pos": match.start(), "end_pos": match.end(), "raw_content": match.group(0) } interactions.append(interaction) # Sort by position interactions.sort(key=lambda x: x["start_pos"]) return interactions def _create_interaction_chunks(self, interactions: List[Dict[str, Any]]) -> List[TextChunk]: """ Create chunks from interactions with specified grouping and overlap. Args: interactions: List of identified interactions Returns: List of TextChunk objects """ chunks = [] if len(interactions) == 0: return chunks # Calculate step size (how many interactions to advance for each chunk) step_size = self.interactions_per_chunk - self.overlap_interactions i = 0 while i < len(interactions): # Determine end index for this chunk end_idx = min(i + self.interactions_per_chunk, len(interactions)) # Get interactions for this chunk chunk_interactions = interactions[i:end_idx] # Combine the interactions into chunk content chunk_content = self._combine_interactions(chunk_interactions) # Calculate positions start_pos = chunk_interactions[0]["start_pos"] end_pos = chunk_interactions[-1]["end_pos"] # Calculate overlap information overlap_interactions_count = 0 if i > 0: # Check how many interactions overlap with previous chunk prev_chunk_start = max(0, i - step_size) overlap_interactions_count = max(0, min(self.overlap_interactions, i - prev_chunk_start)) # Create metadata metadata = { "window_info": { "window_index": len(chunks), "window_total": None, # Will be updated after all chunks are created "window_start_char": start_pos, "window_end_char": end_pos, "chunk_size": len(chunk_content), "interactions_count": len(chunk_interactions), "interactions_per_chunk": self.interactions_per_chunk, "overlap_interactions": overlap_interactions_count, "splitter_type": "prompt_interaction", "processed_at": datetime.now().isoformat(), "overlap_with_previous": overlap_interactions_count > 0, "interaction_indices": list(range(i, end_idx)) } } # Create TextChunk chunk = TextChunk( content=chunk_content, metadata=metadata ) chunks.append(chunk) # Move to next chunk position i += step_size # Break if we've processed all interactions if end_idx >= len(interactions): break # Update total count in all chunks for chunk in chunks: chunk.metadata["window_info"]["window_total"] = len(chunks) return chunks def _combine_interactions(self, interactions: List[Dict[str, Any]]) -> str: """ Combine multiple interactions into a single chunk content. Args: interactions: List of interactions to combine Returns: Combined content string """ combined_parts = [] for i, interaction in enumerate(interactions): # Add separator between interactions (except for first) if i > 0: combined_parts.append("\n" + "="*50 + f" INTERACTION {i+1} " + "="*50 + "\n") # Add the interaction content if "raw_content" in interaction: combined_parts.append(interaction["raw_content"]) else: # Fallback to JSON representation combined_parts.append(json.dumps(interaction["data"], indent=2)) return "\n".join(combined_parts) # Dictionary of available splitters for easy access AVAILABLE_SPLITTERS = { "agent_semantic": AgentAwareSemanticSplitter, # Default intelligent splitter "json": JSONSplitter, "prompt_interaction": PromptInteractionSplitter # New prompt interaction splitter }