wu981526092's picture
add
7bc750c
"""
Chunking Service for Agent Monitoring
This service provides centralized chunking logic for trace content preprocessing.
It extracts and unifies the chunking functionality previously scattered across
pipeline.py, stage_processor.py, and knowledge_graph_processor.py.
"""
import logging
from typing import List, Dict, Any, Optional, Tuple
# Import the chunking components
from .text_chunking_strategies import (
TextChunk, BaseSplitter, AgentAwareSemanticSplitter,
JSONSplitter, PromptInteractionSplitter
)
# Import trace analysis for parameter optimization
# Note: Imported locally to avoid circular dependencies
logger = logging.getLogger(__name__)
class ChunkingService:
"""
Centralized service for chunking trace content using various splitting strategies.
This service encapsulates all the chunking logic that was previously duplicated
across multiple files, providing a single interface for creating chunks from
trace content with optimal parameters.
"""
def __init__(self, default_batch_size: int = 3, default_model: str = "gpt-5-mini"):
"""
Initialize the chunking service with default parameters.
Args:
default_batch_size: Default batch size for processing
default_model: Default model to use for LLM operations
"""
self.default_batch_size = default_batch_size
self.default_model = default_model
logger.info(f"ChunkingService initialized with batch_size={default_batch_size}, model={default_model}")
def chunk_trace_content(
self,
content: str,
splitter_type: str = "agent_semantic",
window_size: Optional[int] = None,
overlap_size: Optional[int] = None,
min_chunk_size: Optional[int] = None,
use_recommended_params: bool = True,
trace_analysis: Optional[Dict] = None,
apply_line_splitting: bool = True,
max_line_length: int = 800
) -> List[TextChunk]:
"""
Main interface for chunking trace content.
Args:
content: The trace content to chunk
splitter_type: Type of splitter ("agent_semantic", "json", "prompt_interaction")
window_size: Override window size (if None, will be calculated)
overlap_size: Override overlap size (if None, will be calculated)
use_recommended_params: Whether to use optimized parameters from trace analysis
trace_analysis: Pre-computed trace analysis results (optional)
apply_line_splitting: Whether to apply rule-based line splitting after chunking
max_line_length: Maximum line length for rule-based splitting
Returns:
List of TextChunk objects ready for processing
"""
logger.info(f"Chunking trace content with {splitter_type} splitter")
logger.info(f"Content length: {len(content)} characters")
# Validate splitter type
valid_splitters = ["agent_semantic", "json", "prompt_interaction"]
if splitter_type not in valid_splitters:
raise ValueError(f"Invalid splitter_type '{splitter_type}'. Must be one of: {', '.join(valid_splitters)}")
# Determine parameters to use - unified approach with token-safe defaults
final_window_size, final_overlap_size = self.optimize_parameters(
content,
window_size,
overlap_size,
use_recommended_params,
trace_analysis
)
# Create the appropriate splitter
splitter = self.create_splitter(
splitter_type,
window_size=final_window_size,
overlap_size=final_overlap_size,
min_chunk_size=min_chunk_size
)
# Split the content (without line numbers first)
chunks = splitter.split(content)
# Apply rule-based line splitting after chunking
if apply_line_splitting:
chunks = self._apply_rule_based_line_splitting(chunks, max_line_length)
# Add global line numbers after chunking (if needed)
if self._method_requires_line_numbers():
chunks = self._assign_global_line_numbers(content, chunks)
logger.info(f"Split content into {len(chunks)} chunks using {splitter_type} splitter")
logger.info(f"Parameters used: window_size={final_window_size}, overlap_size={final_overlap_size}")
return chunks
def _apply_rule_based_line_splitting(self, chunks: List[TextChunk], max_line_length: int) -> List[TextChunk]:
"""
Apply rule-based line splitting to each chunk's content.
Args:
chunks: List of TextChunk objects to process
max_line_length: Maximum characters per line
Returns:
List of TextChunk objects with line-split content
"""
processed_chunks = []
for chunk in chunks:
# Apply line splitting to chunk content
split_content = self._split_lines_by_character_count(chunk.content, max_line_length)
# Create new chunk with split content
new_chunk = TextChunk(
content=split_content,
metadata=chunk.metadata.copy()
)
# Update metadata to indicate line splitting was applied
new_chunk.metadata["line_splitting"] = {
"applied": True,
"max_line_length": max_line_length,
"original_lines": len(chunk.content.split('\n')),
"processed_lines": len(split_content.split('\n'))
}
processed_chunks.append(new_chunk)
logger.info(f"Applied rule-based line splitting to {len(chunks)} chunks (max_line_length={max_line_length})")
return processed_chunks
def _split_lines_by_character_count(self, content: str, max_length: int) -> str:
"""
Split lines that exceed max_length into multiple lines using simple character counting.
Args:
content: Content to process
max_length: Maximum characters per line
Returns:
Content with long lines split
"""
lines = content.split('\n')
processed_lines = []
for line in lines:
if len(line) <= max_length:
processed_lines.append(line)
else:
# Split long line into multiple lines
while len(line) > max_length:
processed_lines.append(line[:max_length])
line = line[max_length:]
# Add remaining part if any
if line:
processed_lines.append(line)
return '\n'.join(processed_lines)
def create_splitter(
self,
splitter_type: str,
window_size: int = 350000,
overlap_size: int = 17500,
min_chunk_size: Optional[int] = None,
**kwargs
) -> BaseSplitter:
"""
Create and configure a splitter based on type and parameters.
Args:
splitter_type: Type of splitter to create
window_size: Size of each window/chunk
overlap_size: Overlap between consecutive chunks
**kwargs: Additional parameters for specific splitters
Returns:
Configured splitter instance
"""
if splitter_type == "prompt_interaction":
# For prompt interaction splitter, use interactions-based parameters
interactions_per_chunk = kwargs.get("interactions_per_chunk", 2)
overlap_interactions = kwargs.get("overlap_interactions", 1)
splitter = PromptInteractionSplitter(
interactions_per_chunk=interactions_per_chunk,
overlap_interactions=overlap_interactions
)
logger.info(f"Created PromptInteractionSplitter with {interactions_per_chunk} interactions per chunk, {overlap_interactions} overlap")
elif splitter_type == "json":
# For JSON splitter, use window_size as max_chunk_size
splitter = JSONSplitter(
max_chunk_size=window_size
)
logger.info(f"Created JSONSplitter with max_chunk_size={window_size}")
else:
# Default to AgentAwareSemanticSplitter (agent_semantic)
# Calculate overlap ratio from overlap_size and window_size
overlap_ratio = overlap_size / window_size if window_size > 0 else 0.05
# Get additional parameters with defaults
# Use provided min_chunk_size or default calculation
default_min_chunk_size = max(50000, window_size // 8)
final_min_chunk_size = min_chunk_size if min_chunk_size is not None else kwargs.get("min_chunk_size", default_min_chunk_size)
confidence_threshold = kwargs.get("confidence_threshold", 0.7)
preserve_agent_stages = kwargs.get("preserve_agent_stages", True)
splitter = AgentAwareSemanticSplitter(
min_chunk_size=final_min_chunk_size,
max_chunk_size=window_size,
overlap_ratio=overlap_ratio,
confidence_threshold=confidence_threshold,
preserve_agent_stages=preserve_agent_stages
)
logger.info(f"Created AgentAwareSemanticSplitter with window_size={window_size}, overlap_ratio={overlap_ratio}")
return splitter
def optimize_parameters(
self,
content: str,
window_size: Optional[int] = None,
overlap_size: Optional[int] = None,
use_recommended_params: bool = True,
trace_analysis: Optional[Dict] = None
) -> Tuple[int, int]:
"""
Calculate optimal window and overlap sizes based on content analysis.
This method implements the parameter optimization logic from pipeline.py.
Args:
content: The trace content to analyze
window_size: Override window size
overlap_size: Override overlap size
use_recommended_params: Whether to use trace analysis for optimization
trace_analysis: Pre-computed trace analysis results
Returns:
Tuple of (window_size, overlap_size)
"""
# Use provided parameters if given
if window_size is not None and overlap_size is not None:
logger.info(f"Using provided parameters: window_size={window_size}, overlap_size={overlap_size}")
return window_size, overlap_size
# Token-safe default parameters
default_window_size = 300000 # Token-safe default (75K tokens)
default_overlap_size = 6000 # 2% overlap for efficiency
# If using recommended parameters, analyze the trace
if use_recommended_params:
# Use provided trace analysis or analyze the content
if trace_analysis is None:
logger.info("Analyzing trace to determine optimal parameters...")
# Import locally to avoid circular dependencies
from agentgraph.input.trace_management.trace_analysis import analyze_trace_characteristics
trace_analysis = analyze_trace_characteristics(content)
# Apply recommended parameters if analysis succeeded
if trace_analysis is not None:
recommended_window = trace_analysis.get("recommended_window_size", default_window_size)
recommended_overlap = trace_analysis.get("recommended_overlap_size", default_overlap_size)
final_window_size = window_size if window_size is not None else recommended_window
final_overlap_size = overlap_size if overlap_size is not None else recommended_overlap
logger.info(f"Using recommended parameters from trace analysis:")
logger.info(f" - Window size: {final_window_size:,} characters")
logger.info(f" - Overlap size: {final_overlap_size:,} characters")
logger.info(f" - Estimated windows: {trace_analysis.get('estimated_windows', 'unknown')}")
return final_window_size, final_overlap_size
else:
logger.warning("Could not get trace analysis results, using default parameters")
# Fall back to defaults
final_window_size = window_size if window_size is not None else default_window_size
final_overlap_size = overlap_size if overlap_size is not None else default_overlap_size
logger.info(f"Using default parameters: window_size={final_window_size}, overlap_size={final_overlap_size}")
return final_window_size, final_overlap_size
def _get_simple_default_params(self) -> Tuple[int, int]:
"""
Get default parameters that are token-safe for 128K context models.
Returns:
Tuple of (window_size, overlap_size) with token-safe defaults
"""
# Use token-safe defaults: 300K chars ≈ 75K tokens (safe for 128K context)
window_size = 300000 # Token-safe max chunk size
overlap_size = int(window_size * 0.02) # 2% overlap = 6K chars
return window_size, overlap_size
def get_stats(self) -> Dict[str, Any]:
"""
Get statistics about the chunking service.
Returns:
Dictionary with service statistics
"""
return {
"service": "ChunkingService",
"default_batch_size": self.default_batch_size,
"default_model": self.default_model,
"supported_splitters": ["agent_semantic", "json", "prompt_interaction"]
}
def fix_long_lines_in_content(self, content: str, max_line_length: int = 800) -> str:
"""
Apply rule-based line splitting to content independently of chunking.
This method can be used to fix long lines in trace content without
going through the full chunking process.
Args:
content: Content to process
max_line_length: Maximum characters per line
Returns:
Content with long lines split
"""
logger.info(f"Fixing long lines in content (max_line_length={max_line_length})")
original_lines = len(content.split('\n'))
processed_content = self._split_lines_by_character_count(content, max_line_length)
processed_lines = len(processed_content.split('\n'))
logger.info(f"Line splitting: {original_lines}{processed_lines} lines")
return processed_content
def _method_requires_line_numbers(self) -> bool:
"""Check if the extraction method requires line numbers"""
try:
from agentgraph.shared.extraction_factory import method_requires_line_numbers
return method_requires_line_numbers(self.method_name if hasattr(self, 'method_name') else "production")
except ImportError:
# Fallback: assume production method requires line numbers
return True
def _has_line_numbers(self, content: str) -> bool:
"""
Check if content already has line numbers by looking for <L#> markers.
Args:
content: Content to check
Returns:
True if content already has line numbers, False otherwise
"""
lines = content.split('\n')
# Check first few lines for <L#> pattern
line_number_pattern = r'^<L\d+>\s'
import re
lines_to_check = min(5, len(lines)) # Check first 5 lines
numbered_lines_found = 0
for i in range(lines_to_check):
if lines[i] and re.match(line_number_pattern, lines[i]):
numbered_lines_found += 1
# If most of the first few lines have line numbers, assume content is already numbered
return numbered_lines_found >= (lines_to_check * 0.6) # 60% threshold
def _assign_global_line_numbers(self, original_content: str, chunks: List[TextChunk]) -> List[TextChunk]:
"""Assign global line numbers to chunks based on original positions"""
# Check if content already has line numbers
if self._has_line_numbers(original_content):
logger.info("Content already has line numbers, skipping line number assignment")
return chunks
logger.info(f"Assigning global line numbers to {len(chunks)} chunks")
# Create original content line mapping
original_lines = original_content.split('\n')
char_to_line_map = {}
char_pos = 0
for line_num, line in enumerate(original_lines, 1):
# Map every character in this line to this line number
for i in range(len(line) + 1): # +1 for newline
if char_pos + i < len(original_content):
char_to_line_map[char_pos + i] = line_num
char_pos += len(line) + 1 # +1 for newline
# Process each chunk
processed_chunks = []
for i, chunk in enumerate(chunks):
start_char = chunk.metadata["window_info"]["window_start_char"]
end_char = chunk.metadata["window_info"]["window_end_char"]
# Find the starting line number for this chunk
global_start_line = char_to_line_map.get(start_char, 1)
# Add line numbers to chunk content starting from global position
numbered_content = self._add_line_numbers_to_chunk(
chunk.content,
global_start_line
)
# Create new chunk with numbered content
new_chunk = TextChunk(
content=numbered_content,
metadata=chunk.metadata.copy()
)
# Store global line info in metadata
new_chunk.metadata["window_info"]["global_line_start"] = global_start_line
# Calculate ending line number
lines_in_chunk = len(chunk.content.split('\n'))
global_end_line = global_start_line + lines_in_chunk - 1
new_chunk.metadata["window_info"]["global_line_end"] = global_end_line
processed_chunks.append(new_chunk)
logger.debug(f"Chunk {i}: chars {start_char}-{end_char} → lines {global_start_line}-{global_end_line}")
logger.info(f"Successfully assigned global line numbers to all chunks")
return processed_chunks
def _add_line_numbers_to_chunk(self, chunk_content: str, start_line: int) -> str:
"""Add line numbers to a single chunk starting from start_line"""
from .trace_line_processor import TraceLineNumberProcessor
processor = TraceLineNumberProcessor()
numbered_content, _ = processor.add_line_numbers(chunk_content, start_line=start_line)
return numbered_content