""" Chunking Service for Agent Monitoring This service provides centralized chunking logic for trace content preprocessing. It extracts and unifies the chunking functionality previously scattered across pipeline.py, stage_processor.py, and knowledge_graph_processor.py. """ import logging from typing import List, Dict, Any, Optional, Tuple # Import the chunking components from .text_chunking_strategies import ( TextChunk, BaseSplitter, AgentAwareSemanticSplitter, JSONSplitter, PromptInteractionSplitter ) # Import trace analysis for parameter optimization # Note: Imported locally to avoid circular dependencies logger = logging.getLogger(__name__) class ChunkingService: """ Centralized service for chunking trace content using various splitting strategies. This service encapsulates all the chunking logic that was previously duplicated across multiple files, providing a single interface for creating chunks from trace content with optimal parameters. """ def __init__(self, default_batch_size: int = 3, default_model: str = "gpt-5-mini"): """ Initialize the chunking service with default parameters. Args: default_batch_size: Default batch size for processing default_model: Default model to use for LLM operations """ self.default_batch_size = default_batch_size self.default_model = default_model logger.info(f"ChunkingService initialized with batch_size={default_batch_size}, model={default_model}") def chunk_trace_content( self, content: str, splitter_type: str = "agent_semantic", window_size: Optional[int] = None, overlap_size: Optional[int] = None, min_chunk_size: Optional[int] = None, use_recommended_params: bool = True, trace_analysis: Optional[Dict] = None, apply_line_splitting: bool = True, max_line_length: int = 800 ) -> List[TextChunk]: """ Main interface for chunking trace content. Args: content: The trace content to chunk splitter_type: Type of splitter ("agent_semantic", "json", "prompt_interaction") window_size: Override window size (if None, will be calculated) overlap_size: Override overlap size (if None, will be calculated) use_recommended_params: Whether to use optimized parameters from trace analysis trace_analysis: Pre-computed trace analysis results (optional) apply_line_splitting: Whether to apply rule-based line splitting after chunking max_line_length: Maximum line length for rule-based splitting Returns: List of TextChunk objects ready for processing """ logger.info(f"Chunking trace content with {splitter_type} splitter") logger.info(f"Content length: {len(content)} characters") # Validate splitter type valid_splitters = ["agent_semantic", "json", "prompt_interaction"] if splitter_type not in valid_splitters: raise ValueError(f"Invalid splitter_type '{splitter_type}'. Must be one of: {', '.join(valid_splitters)}") # Determine parameters to use - unified approach with token-safe defaults final_window_size, final_overlap_size = self.optimize_parameters( content, window_size, overlap_size, use_recommended_params, trace_analysis ) # Create the appropriate splitter splitter = self.create_splitter( splitter_type, window_size=final_window_size, overlap_size=final_overlap_size, min_chunk_size=min_chunk_size ) # Split the content (without line numbers first) chunks = splitter.split(content) # Apply rule-based line splitting after chunking if apply_line_splitting: chunks = self._apply_rule_based_line_splitting(chunks, max_line_length) # Add global line numbers after chunking (if needed) if self._method_requires_line_numbers(): chunks = self._assign_global_line_numbers(content, chunks) logger.info(f"Split content into {len(chunks)} chunks using {splitter_type} splitter") logger.info(f"Parameters used: window_size={final_window_size}, overlap_size={final_overlap_size}") return chunks def _apply_rule_based_line_splitting(self, chunks: List[TextChunk], max_line_length: int) -> List[TextChunk]: """ Apply rule-based line splitting to each chunk's content. Args: chunks: List of TextChunk objects to process max_line_length: Maximum characters per line Returns: List of TextChunk objects with line-split content """ processed_chunks = [] for chunk in chunks: # Apply line splitting to chunk content split_content = self._split_lines_by_character_count(chunk.content, max_line_length) # Create new chunk with split content new_chunk = TextChunk( content=split_content, metadata=chunk.metadata.copy() ) # Update metadata to indicate line splitting was applied new_chunk.metadata["line_splitting"] = { "applied": True, "max_line_length": max_line_length, "original_lines": len(chunk.content.split('\n')), "processed_lines": len(split_content.split('\n')) } processed_chunks.append(new_chunk) logger.info(f"Applied rule-based line splitting to {len(chunks)} chunks (max_line_length={max_line_length})") return processed_chunks def _split_lines_by_character_count(self, content: str, max_length: int) -> str: """ Split lines that exceed max_length into multiple lines using simple character counting. Args: content: Content to process max_length: Maximum characters per line Returns: Content with long lines split """ lines = content.split('\n') processed_lines = [] for line in lines: if len(line) <= max_length: processed_lines.append(line) else: # Split long line into multiple lines while len(line) > max_length: processed_lines.append(line[:max_length]) line = line[max_length:] # Add remaining part if any if line: processed_lines.append(line) return '\n'.join(processed_lines) def create_splitter( self, splitter_type: str, window_size: int = 350000, overlap_size: int = 17500, min_chunk_size: Optional[int] = None, **kwargs ) -> BaseSplitter: """ Create and configure a splitter based on type and parameters. Args: splitter_type: Type of splitter to create window_size: Size of each window/chunk overlap_size: Overlap between consecutive chunks **kwargs: Additional parameters for specific splitters Returns: Configured splitter instance """ if splitter_type == "prompt_interaction": # For prompt interaction splitter, use interactions-based parameters interactions_per_chunk = kwargs.get("interactions_per_chunk", 2) overlap_interactions = kwargs.get("overlap_interactions", 1) splitter = PromptInteractionSplitter( interactions_per_chunk=interactions_per_chunk, overlap_interactions=overlap_interactions ) logger.info(f"Created PromptInteractionSplitter with {interactions_per_chunk} interactions per chunk, {overlap_interactions} overlap") elif splitter_type == "json": # For JSON splitter, use window_size as max_chunk_size splitter = JSONSplitter( max_chunk_size=window_size ) logger.info(f"Created JSONSplitter with max_chunk_size={window_size}") else: # Default to AgentAwareSemanticSplitter (agent_semantic) # Calculate overlap ratio from overlap_size and window_size overlap_ratio = overlap_size / window_size if window_size > 0 else 0.05 # Get additional parameters with defaults # Use provided min_chunk_size or default calculation default_min_chunk_size = max(50000, window_size // 8) final_min_chunk_size = min_chunk_size if min_chunk_size is not None else kwargs.get("min_chunk_size", default_min_chunk_size) confidence_threshold = kwargs.get("confidence_threshold", 0.7) preserve_agent_stages = kwargs.get("preserve_agent_stages", True) splitter = AgentAwareSemanticSplitter( min_chunk_size=final_min_chunk_size, max_chunk_size=window_size, overlap_ratio=overlap_ratio, confidence_threshold=confidence_threshold, preserve_agent_stages=preserve_agent_stages ) logger.info(f"Created AgentAwareSemanticSplitter with window_size={window_size}, overlap_ratio={overlap_ratio}") return splitter def optimize_parameters( self, content: str, window_size: Optional[int] = None, overlap_size: Optional[int] = None, use_recommended_params: bool = True, trace_analysis: Optional[Dict] = None ) -> Tuple[int, int]: """ Calculate optimal window and overlap sizes based on content analysis. This method implements the parameter optimization logic from pipeline.py. Args: content: The trace content to analyze window_size: Override window size overlap_size: Override overlap size use_recommended_params: Whether to use trace analysis for optimization trace_analysis: Pre-computed trace analysis results Returns: Tuple of (window_size, overlap_size) """ # Use provided parameters if given if window_size is not None and overlap_size is not None: logger.info(f"Using provided parameters: window_size={window_size}, overlap_size={overlap_size}") return window_size, overlap_size # Token-safe default parameters default_window_size = 300000 # Token-safe default (75K tokens) default_overlap_size = 6000 # 2% overlap for efficiency # If using recommended parameters, analyze the trace if use_recommended_params: # Use provided trace analysis or analyze the content if trace_analysis is None: logger.info("Analyzing trace to determine optimal parameters...") # Import locally to avoid circular dependencies from agentgraph.input.trace_management.trace_analysis import analyze_trace_characteristics trace_analysis = analyze_trace_characteristics(content) # Apply recommended parameters if analysis succeeded if trace_analysis is not None: recommended_window = trace_analysis.get("recommended_window_size", default_window_size) recommended_overlap = trace_analysis.get("recommended_overlap_size", default_overlap_size) final_window_size = window_size if window_size is not None else recommended_window final_overlap_size = overlap_size if overlap_size is not None else recommended_overlap logger.info(f"Using recommended parameters from trace analysis:") logger.info(f" - Window size: {final_window_size:,} characters") logger.info(f" - Overlap size: {final_overlap_size:,} characters") logger.info(f" - Estimated windows: {trace_analysis.get('estimated_windows', 'unknown')}") return final_window_size, final_overlap_size else: logger.warning("Could not get trace analysis results, using default parameters") # Fall back to defaults final_window_size = window_size if window_size is not None else default_window_size final_overlap_size = overlap_size if overlap_size is not None else default_overlap_size logger.info(f"Using default parameters: window_size={final_window_size}, overlap_size={final_overlap_size}") return final_window_size, final_overlap_size def _get_simple_default_params(self) -> Tuple[int, int]: """ Get default parameters that are token-safe for 128K context models. Returns: Tuple of (window_size, overlap_size) with token-safe defaults """ # Use token-safe defaults: 300K chars ≈ 75K tokens (safe for 128K context) window_size = 300000 # Token-safe max chunk size overlap_size = int(window_size * 0.02) # 2% overlap = 6K chars return window_size, overlap_size def get_stats(self) -> Dict[str, Any]: """ Get statistics about the chunking service. Returns: Dictionary with service statistics """ return { "service": "ChunkingService", "default_batch_size": self.default_batch_size, "default_model": self.default_model, "supported_splitters": ["agent_semantic", "json", "prompt_interaction"] } def fix_long_lines_in_content(self, content: str, max_line_length: int = 800) -> str: """ Apply rule-based line splitting to content independently of chunking. This method can be used to fix long lines in trace content without going through the full chunking process. Args: content: Content to process max_line_length: Maximum characters per line Returns: Content with long lines split """ logger.info(f"Fixing long lines in content (max_line_length={max_line_length})") original_lines = len(content.split('\n')) processed_content = self._split_lines_by_character_count(content, max_line_length) processed_lines = len(processed_content.split('\n')) logger.info(f"Line splitting: {original_lines} → {processed_lines} lines") return processed_content def _method_requires_line_numbers(self) -> bool: """Check if the extraction method requires line numbers""" try: from agentgraph.shared.extraction_factory import method_requires_line_numbers return method_requires_line_numbers(self.method_name if hasattr(self, 'method_name') else "production") except ImportError: # Fallback: assume production method requires line numbers return True def _has_line_numbers(self, content: str) -> bool: """ Check if content already has line numbers by looking for markers. Args: content: Content to check Returns: True if content already has line numbers, False otherwise """ lines = content.split('\n') # Check first few lines for pattern line_number_pattern = r'^\s' import re lines_to_check = min(5, len(lines)) # Check first 5 lines numbered_lines_found = 0 for i in range(lines_to_check): if lines[i] and re.match(line_number_pattern, lines[i]): numbered_lines_found += 1 # If most of the first few lines have line numbers, assume content is already numbered return numbered_lines_found >= (lines_to_check * 0.6) # 60% threshold def _assign_global_line_numbers(self, original_content: str, chunks: List[TextChunk]) -> List[TextChunk]: """Assign global line numbers to chunks based on original positions""" # Check if content already has line numbers if self._has_line_numbers(original_content): logger.info("Content already has line numbers, skipping line number assignment") return chunks logger.info(f"Assigning global line numbers to {len(chunks)} chunks") # Create original content line mapping original_lines = original_content.split('\n') char_to_line_map = {} char_pos = 0 for line_num, line in enumerate(original_lines, 1): # Map every character in this line to this line number for i in range(len(line) + 1): # +1 for newline if char_pos + i < len(original_content): char_to_line_map[char_pos + i] = line_num char_pos += len(line) + 1 # +1 for newline # Process each chunk processed_chunks = [] for i, chunk in enumerate(chunks): start_char = chunk.metadata["window_info"]["window_start_char"] end_char = chunk.metadata["window_info"]["window_end_char"] # Find the starting line number for this chunk global_start_line = char_to_line_map.get(start_char, 1) # Add line numbers to chunk content starting from global position numbered_content = self._add_line_numbers_to_chunk( chunk.content, global_start_line ) # Create new chunk with numbered content new_chunk = TextChunk( content=numbered_content, metadata=chunk.metadata.copy() ) # Store global line info in metadata new_chunk.metadata["window_info"]["global_line_start"] = global_start_line # Calculate ending line number lines_in_chunk = len(chunk.content.split('\n')) global_end_line = global_start_line + lines_in_chunk - 1 new_chunk.metadata["window_info"]["global_line_end"] = global_end_line processed_chunks.append(new_chunk) logger.debug(f"Chunk {i}: chars {start_char}-{end_char} → lines {global_start_line}-{global_end_line}") logger.info(f"Successfully assigned global line numbers to all chunks") return processed_chunks def _add_line_numbers_to_chunk(self, chunk_content: str, start_line: int) -> str: """Add line numbers to a single chunk starting from start_line""" from .trace_line_processor import TraceLineNumberProcessor processor = TraceLineNumberProcessor() numbered_content, _ = processor.add_line_numbers(chunk_content, start_line=start_line) return numbered_content