Spaces:
Running
Running
| """ | |
| Chunking Service for Agent Monitoring | |
| This service provides centralized chunking logic for trace content preprocessing. | |
| It extracts and unifies the chunking functionality previously scattered across | |
| pipeline.py, stage_processor.py, and knowledge_graph_processor.py. | |
| """ | |
| import logging | |
| from typing import List, Dict, Any, Optional, Tuple | |
| # Import the chunking components | |
| from .text_chunking_strategies import ( | |
| TextChunk, BaseSplitter, AgentAwareSemanticSplitter, | |
| JSONSplitter, PromptInteractionSplitter | |
| ) | |
| # Import trace analysis for parameter optimization | |
| # Note: Imported locally to avoid circular dependencies | |
| logger = logging.getLogger(__name__) | |
| class ChunkingService: | |
| """ | |
| Centralized service for chunking trace content using various splitting strategies. | |
| This service encapsulates all the chunking logic that was previously duplicated | |
| across multiple files, providing a single interface for creating chunks from | |
| trace content with optimal parameters. | |
| """ | |
| def __init__(self, default_batch_size: int = 3, default_model: str = "gpt-5-mini"): | |
| """ | |
| Initialize the chunking service with default parameters. | |
| Args: | |
| default_batch_size: Default batch size for processing | |
| default_model: Default model to use for LLM operations | |
| """ | |
| self.default_batch_size = default_batch_size | |
| self.default_model = default_model | |
| logger.info(f"ChunkingService initialized with batch_size={default_batch_size}, model={default_model}") | |
| def chunk_trace_content( | |
| self, | |
| content: str, | |
| splitter_type: str = "agent_semantic", | |
| window_size: Optional[int] = None, | |
| overlap_size: Optional[int] = None, | |
| min_chunk_size: Optional[int] = None, | |
| use_recommended_params: bool = True, | |
| trace_analysis: Optional[Dict] = None, | |
| apply_line_splitting: bool = True, | |
| max_line_length: int = 800 | |
| ) -> List[TextChunk]: | |
| """ | |
| Main interface for chunking trace content. | |
| Args: | |
| content: The trace content to chunk | |
| splitter_type: Type of splitter ("agent_semantic", "json", "prompt_interaction") | |
| window_size: Override window size (if None, will be calculated) | |
| overlap_size: Override overlap size (if None, will be calculated) | |
| use_recommended_params: Whether to use optimized parameters from trace analysis | |
| trace_analysis: Pre-computed trace analysis results (optional) | |
| apply_line_splitting: Whether to apply rule-based line splitting after chunking | |
| max_line_length: Maximum line length for rule-based splitting | |
| Returns: | |
| List of TextChunk objects ready for processing | |
| """ | |
| logger.info(f"Chunking trace content with {splitter_type} splitter") | |
| logger.info(f"Content length: {len(content)} characters") | |
| # Validate splitter type | |
| valid_splitters = ["agent_semantic", "json", "prompt_interaction"] | |
| if splitter_type not in valid_splitters: | |
| raise ValueError(f"Invalid splitter_type '{splitter_type}'. Must be one of: {', '.join(valid_splitters)}") | |
| # Determine parameters to use - unified approach with token-safe defaults | |
| final_window_size, final_overlap_size = self.optimize_parameters( | |
| content, | |
| window_size, | |
| overlap_size, | |
| use_recommended_params, | |
| trace_analysis | |
| ) | |
| # Create the appropriate splitter | |
| splitter = self.create_splitter( | |
| splitter_type, | |
| window_size=final_window_size, | |
| overlap_size=final_overlap_size, | |
| min_chunk_size=min_chunk_size | |
| ) | |
| # Split the content (without line numbers first) | |
| chunks = splitter.split(content) | |
| # Apply rule-based line splitting after chunking | |
| if apply_line_splitting: | |
| chunks = self._apply_rule_based_line_splitting(chunks, max_line_length) | |
| # Add global line numbers after chunking (if needed) | |
| if self._method_requires_line_numbers(): | |
| chunks = self._assign_global_line_numbers(content, chunks) | |
| logger.info(f"Split content into {len(chunks)} chunks using {splitter_type} splitter") | |
| logger.info(f"Parameters used: window_size={final_window_size}, overlap_size={final_overlap_size}") | |
| return chunks | |
| def _apply_rule_based_line_splitting(self, chunks: List[TextChunk], max_line_length: int) -> List[TextChunk]: | |
| """ | |
| Apply rule-based line splitting to each chunk's content. | |
| Args: | |
| chunks: List of TextChunk objects to process | |
| max_line_length: Maximum characters per line | |
| Returns: | |
| List of TextChunk objects with line-split content | |
| """ | |
| processed_chunks = [] | |
| for chunk in chunks: | |
| # Apply line splitting to chunk content | |
| split_content = self._split_lines_by_character_count(chunk.content, max_line_length) | |
| # Create new chunk with split content | |
| new_chunk = TextChunk( | |
| content=split_content, | |
| metadata=chunk.metadata.copy() | |
| ) | |
| # Update metadata to indicate line splitting was applied | |
| new_chunk.metadata["line_splitting"] = { | |
| "applied": True, | |
| "max_line_length": max_line_length, | |
| "original_lines": len(chunk.content.split('\n')), | |
| "processed_lines": len(split_content.split('\n')) | |
| } | |
| processed_chunks.append(new_chunk) | |
| logger.info(f"Applied rule-based line splitting to {len(chunks)} chunks (max_line_length={max_line_length})") | |
| return processed_chunks | |
| def _split_lines_by_character_count(self, content: str, max_length: int) -> str: | |
| """ | |
| Split lines that exceed max_length into multiple lines using simple character counting. | |
| Args: | |
| content: Content to process | |
| max_length: Maximum characters per line | |
| Returns: | |
| Content with long lines split | |
| """ | |
| lines = content.split('\n') | |
| processed_lines = [] | |
| for line in lines: | |
| if len(line) <= max_length: | |
| processed_lines.append(line) | |
| else: | |
| # Split long line into multiple lines | |
| while len(line) > max_length: | |
| processed_lines.append(line[:max_length]) | |
| line = line[max_length:] | |
| # Add remaining part if any | |
| if line: | |
| processed_lines.append(line) | |
| return '\n'.join(processed_lines) | |
| def create_splitter( | |
| self, | |
| splitter_type: str, | |
| window_size: int = 350000, | |
| overlap_size: int = 17500, | |
| min_chunk_size: Optional[int] = None, | |
| **kwargs | |
| ) -> BaseSplitter: | |
| """ | |
| Create and configure a splitter based on type and parameters. | |
| Args: | |
| splitter_type: Type of splitter to create | |
| window_size: Size of each window/chunk | |
| overlap_size: Overlap between consecutive chunks | |
| **kwargs: Additional parameters for specific splitters | |
| Returns: | |
| Configured splitter instance | |
| """ | |
| if splitter_type == "prompt_interaction": | |
| # For prompt interaction splitter, use interactions-based parameters | |
| interactions_per_chunk = kwargs.get("interactions_per_chunk", 2) | |
| overlap_interactions = kwargs.get("overlap_interactions", 1) | |
| splitter = PromptInteractionSplitter( | |
| interactions_per_chunk=interactions_per_chunk, | |
| overlap_interactions=overlap_interactions | |
| ) | |
| logger.info(f"Created PromptInteractionSplitter with {interactions_per_chunk} interactions per chunk, {overlap_interactions} overlap") | |
| elif splitter_type == "json": | |
| # For JSON splitter, use window_size as max_chunk_size | |
| splitter = JSONSplitter( | |
| max_chunk_size=window_size | |
| ) | |
| logger.info(f"Created JSONSplitter with max_chunk_size={window_size}") | |
| else: | |
| # Default to AgentAwareSemanticSplitter (agent_semantic) | |
| # Calculate overlap ratio from overlap_size and window_size | |
| overlap_ratio = overlap_size / window_size if window_size > 0 else 0.05 | |
| # Get additional parameters with defaults | |
| # Use provided min_chunk_size or default calculation | |
| default_min_chunk_size = max(50000, window_size // 8) | |
| final_min_chunk_size = min_chunk_size if min_chunk_size is not None else kwargs.get("min_chunk_size", default_min_chunk_size) | |
| confidence_threshold = kwargs.get("confidence_threshold", 0.7) | |
| preserve_agent_stages = kwargs.get("preserve_agent_stages", True) | |
| splitter = AgentAwareSemanticSplitter( | |
| min_chunk_size=final_min_chunk_size, | |
| max_chunk_size=window_size, | |
| overlap_ratio=overlap_ratio, | |
| confidence_threshold=confidence_threshold, | |
| preserve_agent_stages=preserve_agent_stages | |
| ) | |
| logger.info(f"Created AgentAwareSemanticSplitter with window_size={window_size}, overlap_ratio={overlap_ratio}") | |
| return splitter | |
| def optimize_parameters( | |
| self, | |
| content: str, | |
| window_size: Optional[int] = None, | |
| overlap_size: Optional[int] = None, | |
| use_recommended_params: bool = True, | |
| trace_analysis: Optional[Dict] = None | |
| ) -> Tuple[int, int]: | |
| """ | |
| Calculate optimal window and overlap sizes based on content analysis. | |
| This method implements the parameter optimization logic from pipeline.py. | |
| Args: | |
| content: The trace content to analyze | |
| window_size: Override window size | |
| overlap_size: Override overlap size | |
| use_recommended_params: Whether to use trace analysis for optimization | |
| trace_analysis: Pre-computed trace analysis results | |
| Returns: | |
| Tuple of (window_size, overlap_size) | |
| """ | |
| # Use provided parameters if given | |
| if window_size is not None and overlap_size is not None: | |
| logger.info(f"Using provided parameters: window_size={window_size}, overlap_size={overlap_size}") | |
| return window_size, overlap_size | |
| # Token-safe default parameters | |
| default_window_size = 300000 # Token-safe default (75K tokens) | |
| default_overlap_size = 6000 # 2% overlap for efficiency | |
| # If using recommended parameters, analyze the trace | |
| if use_recommended_params: | |
| # Use provided trace analysis or analyze the content | |
| if trace_analysis is None: | |
| logger.info("Analyzing trace to determine optimal parameters...") | |
| # Import locally to avoid circular dependencies | |
| from agentgraph.input.trace_management.trace_analysis import analyze_trace_characteristics | |
| trace_analysis = analyze_trace_characteristics(content) | |
| # Apply recommended parameters if analysis succeeded | |
| if trace_analysis is not None: | |
| recommended_window = trace_analysis.get("recommended_window_size", default_window_size) | |
| recommended_overlap = trace_analysis.get("recommended_overlap_size", default_overlap_size) | |
| final_window_size = window_size if window_size is not None else recommended_window | |
| final_overlap_size = overlap_size if overlap_size is not None else recommended_overlap | |
| logger.info(f"Using recommended parameters from trace analysis:") | |
| logger.info(f" - Window size: {final_window_size:,} characters") | |
| logger.info(f" - Overlap size: {final_overlap_size:,} characters") | |
| logger.info(f" - Estimated windows: {trace_analysis.get('estimated_windows', 'unknown')}") | |
| return final_window_size, final_overlap_size | |
| else: | |
| logger.warning("Could not get trace analysis results, using default parameters") | |
| # Fall back to defaults | |
| final_window_size = window_size if window_size is not None else default_window_size | |
| final_overlap_size = overlap_size if overlap_size is not None else default_overlap_size | |
| logger.info(f"Using default parameters: window_size={final_window_size}, overlap_size={final_overlap_size}") | |
| return final_window_size, final_overlap_size | |
| def _get_simple_default_params(self) -> Tuple[int, int]: | |
| """ | |
| Get default parameters that are token-safe for 128K context models. | |
| Returns: | |
| Tuple of (window_size, overlap_size) with token-safe defaults | |
| """ | |
| # Use token-safe defaults: 300K chars ≈ 75K tokens (safe for 128K context) | |
| window_size = 300000 # Token-safe max chunk size | |
| overlap_size = int(window_size * 0.02) # 2% overlap = 6K chars | |
| return window_size, overlap_size | |
| def get_stats(self) -> Dict[str, Any]: | |
| """ | |
| Get statistics about the chunking service. | |
| Returns: | |
| Dictionary with service statistics | |
| """ | |
| return { | |
| "service": "ChunkingService", | |
| "default_batch_size": self.default_batch_size, | |
| "default_model": self.default_model, | |
| "supported_splitters": ["agent_semantic", "json", "prompt_interaction"] | |
| } | |
| def fix_long_lines_in_content(self, content: str, max_line_length: int = 800) -> str: | |
| """ | |
| Apply rule-based line splitting to content independently of chunking. | |
| This method can be used to fix long lines in trace content without | |
| going through the full chunking process. | |
| Args: | |
| content: Content to process | |
| max_line_length: Maximum characters per line | |
| Returns: | |
| Content with long lines split | |
| """ | |
| logger.info(f"Fixing long lines in content (max_line_length={max_line_length})") | |
| original_lines = len(content.split('\n')) | |
| processed_content = self._split_lines_by_character_count(content, max_line_length) | |
| processed_lines = len(processed_content.split('\n')) | |
| logger.info(f"Line splitting: {original_lines} → {processed_lines} lines") | |
| return processed_content | |
| def _method_requires_line_numbers(self) -> bool: | |
| """Check if the extraction method requires line numbers""" | |
| try: | |
| from agentgraph.shared.extraction_factory import method_requires_line_numbers | |
| return method_requires_line_numbers(self.method_name if hasattr(self, 'method_name') else "production") | |
| except ImportError: | |
| # Fallback: assume production method requires line numbers | |
| return True | |
| def _has_line_numbers(self, content: str) -> bool: | |
| """ | |
| Check if content already has line numbers by looking for <L#> markers. | |
| Args: | |
| content: Content to check | |
| Returns: | |
| True if content already has line numbers, False otherwise | |
| """ | |
| lines = content.split('\n') | |
| # Check first few lines for <L#> pattern | |
| line_number_pattern = r'^<L\d+>\s' | |
| import re | |
| lines_to_check = min(5, len(lines)) # Check first 5 lines | |
| numbered_lines_found = 0 | |
| for i in range(lines_to_check): | |
| if lines[i] and re.match(line_number_pattern, lines[i]): | |
| numbered_lines_found += 1 | |
| # If most of the first few lines have line numbers, assume content is already numbered | |
| return numbered_lines_found >= (lines_to_check * 0.6) # 60% threshold | |
| def _assign_global_line_numbers(self, original_content: str, chunks: List[TextChunk]) -> List[TextChunk]: | |
| """Assign global line numbers to chunks based on original positions""" | |
| # Check if content already has line numbers | |
| if self._has_line_numbers(original_content): | |
| logger.info("Content already has line numbers, skipping line number assignment") | |
| return chunks | |
| logger.info(f"Assigning global line numbers to {len(chunks)} chunks") | |
| # Create original content line mapping | |
| original_lines = original_content.split('\n') | |
| char_to_line_map = {} | |
| char_pos = 0 | |
| for line_num, line in enumerate(original_lines, 1): | |
| # Map every character in this line to this line number | |
| for i in range(len(line) + 1): # +1 for newline | |
| if char_pos + i < len(original_content): | |
| char_to_line_map[char_pos + i] = line_num | |
| char_pos += len(line) + 1 # +1 for newline | |
| # Process each chunk | |
| processed_chunks = [] | |
| for i, chunk in enumerate(chunks): | |
| start_char = chunk.metadata["window_info"]["window_start_char"] | |
| end_char = chunk.metadata["window_info"]["window_end_char"] | |
| # Find the starting line number for this chunk | |
| global_start_line = char_to_line_map.get(start_char, 1) | |
| # Add line numbers to chunk content starting from global position | |
| numbered_content = self._add_line_numbers_to_chunk( | |
| chunk.content, | |
| global_start_line | |
| ) | |
| # Create new chunk with numbered content | |
| new_chunk = TextChunk( | |
| content=numbered_content, | |
| metadata=chunk.metadata.copy() | |
| ) | |
| # Store global line info in metadata | |
| new_chunk.metadata["window_info"]["global_line_start"] = global_start_line | |
| # Calculate ending line number | |
| lines_in_chunk = len(chunk.content.split('\n')) | |
| global_end_line = global_start_line + lines_in_chunk - 1 | |
| new_chunk.metadata["window_info"]["global_line_end"] = global_end_line | |
| processed_chunks.append(new_chunk) | |
| logger.debug(f"Chunk {i}: chars {start_char}-{end_char} → lines {global_start_line}-{global_end_line}") | |
| logger.info(f"Successfully assigned global line numbers to all chunks") | |
| return processed_chunks | |
| def _add_line_numbers_to_chunk(self, chunk_content: str, start_line: int) -> str: | |
| """Add line numbers to a single chunk starting from start_line""" | |
| from .trace_line_processor import TraceLineNumberProcessor | |
| processor = TraceLineNumberProcessor() | |
| numbered_content, _ = processor.add_line_numbers(chunk_content, start_line=start_line) | |
| return numbered_content | |