File size: 19,529 Bytes
c2ea5ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7bc750c
c2ea5ed
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
7bc750c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
"""
Chunking Service for Agent Monitoring

This service provides centralized chunking logic for trace content preprocessing.
It extracts and unifies the chunking functionality previously scattered across
pipeline.py, stage_processor.py, and knowledge_graph_processor.py.
"""

import logging
from typing import List, Dict, Any, Optional, Tuple

# Import the chunking components
from .text_chunking_strategies import (
    TextChunk, BaseSplitter, AgentAwareSemanticSplitter, 
    JSONSplitter, PromptInteractionSplitter
)

# Import trace analysis for parameter optimization
# Note: Imported locally to avoid circular dependencies

logger = logging.getLogger(__name__)


class ChunkingService:
    """
    Centralized service for chunking trace content using various splitting strategies.
    
    This service encapsulates all the chunking logic that was previously duplicated
    across multiple files, providing a single interface for creating chunks from
    trace content with optimal parameters.
    """
    
    def __init__(self, default_batch_size: int = 3, default_model: str = "gpt-5-mini"):
        """
        Initialize the chunking service with default parameters.
        
        Args:
            default_batch_size: Default batch size for processing
            default_model: Default model to use for LLM operations
        """
        self.default_batch_size = default_batch_size
        self.default_model = default_model
        
        logger.info(f"ChunkingService initialized with batch_size={default_batch_size}, model={default_model}")
    
    def chunk_trace_content(
        self,
        content: str,
        splitter_type: str = "agent_semantic",
        window_size: Optional[int] = None,
        overlap_size: Optional[int] = None,
        min_chunk_size: Optional[int] = None,
        use_recommended_params: bool = True,
        trace_analysis: Optional[Dict] = None,
        apply_line_splitting: bool = True,
        max_line_length: int = 800
    ) -> List[TextChunk]:
        """
        Main interface for chunking trace content.
        
        Args:
            content: The trace content to chunk
            splitter_type: Type of splitter ("agent_semantic", "json", "prompt_interaction")
            window_size: Override window size (if None, will be calculated)
            overlap_size: Override overlap size (if None, will be calculated)
            use_recommended_params: Whether to use optimized parameters from trace analysis
            trace_analysis: Pre-computed trace analysis results (optional)
            apply_line_splitting: Whether to apply rule-based line splitting after chunking
            max_line_length: Maximum line length for rule-based splitting
            
        Returns:
            List of TextChunk objects ready for processing
        """
        logger.info(f"Chunking trace content with {splitter_type} splitter")
        logger.info(f"Content length: {len(content)} characters")
        
        # Validate splitter type
        valid_splitters = ["agent_semantic", "json", "prompt_interaction"]
        if splitter_type not in valid_splitters:
            raise ValueError(f"Invalid splitter_type '{splitter_type}'. Must be one of: {', '.join(valid_splitters)}")
        
        # Determine parameters to use - unified approach with token-safe defaults
        final_window_size, final_overlap_size = self.optimize_parameters(
            content, 
            window_size, 
            overlap_size, 
            use_recommended_params, 
            trace_analysis
        )
        
        # Create the appropriate splitter
        splitter = self.create_splitter(
            splitter_type, 
            window_size=final_window_size,
            overlap_size=final_overlap_size,
            min_chunk_size=min_chunk_size
        )
        
        # Split the content (without line numbers first)
        chunks = splitter.split(content)
        
        # Apply rule-based line splitting after chunking
        if apply_line_splitting:
            chunks = self._apply_rule_based_line_splitting(chunks, max_line_length)
        
        # Add global line numbers after chunking (if needed)
        if self._method_requires_line_numbers():
            chunks = self._assign_global_line_numbers(content, chunks)
        
        logger.info(f"Split content into {len(chunks)} chunks using {splitter_type} splitter")
        logger.info(f"Parameters used: window_size={final_window_size}, overlap_size={final_overlap_size}")
        
        return chunks
    
    def _apply_rule_based_line_splitting(self, chunks: List[TextChunk], max_line_length: int) -> List[TextChunk]:
        """
        Apply rule-based line splitting to each chunk's content.
        
        Args:
            chunks: List of TextChunk objects to process
            max_line_length: Maximum characters per line
            
        Returns:
            List of TextChunk objects with line-split content
        """
        processed_chunks = []
        
        for chunk in chunks:
            # Apply line splitting to chunk content
            split_content = self._split_lines_by_character_count(chunk.content, max_line_length)
            
            # Create new chunk with split content
            new_chunk = TextChunk(
                content=split_content,
                metadata=chunk.metadata.copy()
            )
            
            # Update metadata to indicate line splitting was applied
            new_chunk.metadata["line_splitting"] = {
                "applied": True,
                "max_line_length": max_line_length,
                "original_lines": len(chunk.content.split('\n')),
                "processed_lines": len(split_content.split('\n'))
            }
            
            processed_chunks.append(new_chunk)
        
        logger.info(f"Applied rule-based line splitting to {len(chunks)} chunks (max_line_length={max_line_length})")
        return processed_chunks
    
    def _split_lines_by_character_count(self, content: str, max_length: int) -> str:
        """
        Split lines that exceed max_length into multiple lines using simple character counting.
        
        Args:
            content: Content to process
            max_length: Maximum characters per line
            
        Returns:
            Content with long lines split
        """
        lines = content.split('\n')
        processed_lines = []
        
        for line in lines:
            if len(line) <= max_length:
                processed_lines.append(line)
            else:
                # Split long line into multiple lines
                while len(line) > max_length:
                    processed_lines.append(line[:max_length])
                    line = line[max_length:]
                
                # Add remaining part if any
                if line:
                    processed_lines.append(line)
        
        return '\n'.join(processed_lines)
    
    def create_splitter(
        self, 
        splitter_type: str, 
        window_size: int = 350000,
        overlap_size: int = 17500,
        min_chunk_size: Optional[int] = None,
        **kwargs
    ) -> BaseSplitter:
        """
        Create and configure a splitter based on type and parameters.
        
        Args:
            splitter_type: Type of splitter to create
            window_size: Size of each window/chunk
            overlap_size: Overlap between consecutive chunks
            **kwargs: Additional parameters for specific splitters
            
        Returns:
            Configured splitter instance
        """
        if splitter_type == "prompt_interaction":
            # For prompt interaction splitter, use interactions-based parameters
            interactions_per_chunk = kwargs.get("interactions_per_chunk", 2)
            overlap_interactions = kwargs.get("overlap_interactions", 1)
            
            splitter = PromptInteractionSplitter(
                interactions_per_chunk=interactions_per_chunk,
                overlap_interactions=overlap_interactions
            )
            logger.info(f"Created PromptInteractionSplitter with {interactions_per_chunk} interactions per chunk, {overlap_interactions} overlap")
            
        elif splitter_type == "json":
            # For JSON splitter, use window_size as max_chunk_size
            splitter = JSONSplitter(
                max_chunk_size=window_size
            )
            logger.info(f"Created JSONSplitter with max_chunk_size={window_size}")
            
        else:
            # Default to AgentAwareSemanticSplitter (agent_semantic)
            # Calculate overlap ratio from overlap_size and window_size
            overlap_ratio = overlap_size / window_size if window_size > 0 else 0.05
            
            # Get additional parameters with defaults
            # Use provided min_chunk_size or default calculation
            default_min_chunk_size = max(50000, window_size // 8)
            final_min_chunk_size = min_chunk_size if min_chunk_size is not None else kwargs.get("min_chunk_size", default_min_chunk_size)
            confidence_threshold = kwargs.get("confidence_threshold", 0.7)
            preserve_agent_stages = kwargs.get("preserve_agent_stages", True)
            
            splitter = AgentAwareSemanticSplitter(
                min_chunk_size=final_min_chunk_size,
                max_chunk_size=window_size,
                overlap_ratio=overlap_ratio,
                confidence_threshold=confidence_threshold,
                preserve_agent_stages=preserve_agent_stages
            )
            logger.info(f"Created AgentAwareSemanticSplitter with window_size={window_size}, overlap_ratio={overlap_ratio}")
        
        return splitter
    
    def optimize_parameters(
        self, 
        content: str,
        window_size: Optional[int] = None,
        overlap_size: Optional[int] = None,
        use_recommended_params: bool = True,
        trace_analysis: Optional[Dict] = None
    ) -> Tuple[int, int]:
        """
        Calculate optimal window and overlap sizes based on content analysis.
        
        This method implements the parameter optimization logic from pipeline.py.
        
        Args:
            content: The trace content to analyze
            window_size: Override window size
            overlap_size: Override overlap size  
            use_recommended_params: Whether to use trace analysis for optimization
            trace_analysis: Pre-computed trace analysis results
            
        Returns:
            Tuple of (window_size, overlap_size)
        """
        # Use provided parameters if given
        if window_size is not None and overlap_size is not None:
            logger.info(f"Using provided parameters: window_size={window_size}, overlap_size={overlap_size}")
            return window_size, overlap_size
        
        # Token-safe default parameters
        default_window_size = 300000  # Token-safe default (75K tokens)
        default_overlap_size = 6000    # 2% overlap for efficiency
        
        # If using recommended parameters, analyze the trace
        if use_recommended_params:
            # Use provided trace analysis or analyze the content
            if trace_analysis is None:
                logger.info("Analyzing trace to determine optimal parameters...")
                # Import locally to avoid circular dependencies
                from agentgraph.input.trace_management.trace_analysis import analyze_trace_characteristics
                trace_analysis = analyze_trace_characteristics(content)
            
            # Apply recommended parameters if analysis succeeded
            if trace_analysis is not None:
                recommended_window = trace_analysis.get("recommended_window_size", default_window_size)
                recommended_overlap = trace_analysis.get("recommended_overlap_size", default_overlap_size)
                
                final_window_size = window_size if window_size is not None else recommended_window
                final_overlap_size = overlap_size if overlap_size is not None else recommended_overlap
                
                logger.info(f"Using recommended parameters from trace analysis:")
                logger.info(f"  - Window size: {final_window_size:,} characters")
                logger.info(f"  - Overlap size: {final_overlap_size:,} characters")
                logger.info(f"  - Estimated windows: {trace_analysis.get('estimated_windows', 'unknown')}")
                
                return final_window_size, final_overlap_size
            else:
                logger.warning("Could not get trace analysis results, using default parameters")
        
        # Fall back to defaults
        final_window_size = window_size if window_size is not None else default_window_size
        final_overlap_size = overlap_size if overlap_size is not None else default_overlap_size
        
        logger.info(f"Using default parameters: window_size={final_window_size}, overlap_size={final_overlap_size}")
        return final_window_size, final_overlap_size
    
    def _get_simple_default_params(self) -> Tuple[int, int]:
        """
        Get default parameters that are token-safe for 128K context models.
        
        Returns:
            Tuple of (window_size, overlap_size) with token-safe defaults
        """
        # Use token-safe defaults: 300K chars ≈ 75K tokens (safe for 128K context)
        window_size = 300000  # Token-safe max chunk size
        overlap_size = int(window_size * 0.02)  # 2% overlap = 6K chars
        
        return window_size, overlap_size
    
    def get_stats(self) -> Dict[str, Any]:
        """
        Get statistics about the chunking service.
        
        Returns:
            Dictionary with service statistics
        """
        return {
            "service": "ChunkingService",
            "default_batch_size": self.default_batch_size,
            "default_model": self.default_model,
            "supported_splitters": ["agent_semantic", "json", "prompt_interaction"]
        }
    
    def fix_long_lines_in_content(self, content: str, max_line_length: int = 800) -> str:
        """
        Apply rule-based line splitting to content independently of chunking.
        
        This method can be used to fix long lines in trace content without 
        going through the full chunking process.
        
        Args:
            content: Content to process
            max_line_length: Maximum characters per line
            
        Returns:
            Content with long lines split
        """
        logger.info(f"Fixing long lines in content (max_line_length={max_line_length})")
        
        original_lines = len(content.split('\n'))
        processed_content = self._split_lines_by_character_count(content, max_line_length)
        processed_lines = len(processed_content.split('\n'))
        
        logger.info(f"Line splitting: {original_lines}{processed_lines} lines")
        
        return processed_content
    
    def _method_requires_line_numbers(self) -> bool:
        """Check if the extraction method requires line numbers"""
        try:
            from agentgraph.shared.extraction_factory import method_requires_line_numbers
            return method_requires_line_numbers(self.method_name if hasattr(self, 'method_name') else "production")
        except ImportError:
            # Fallback: assume production method requires line numbers
            return True
    
    def _has_line_numbers(self, content: str) -> bool:
        """
        Check if content already has line numbers by looking for <L#> markers.
        
        Args:
            content: Content to check
            
        Returns:
            True if content already has line numbers, False otherwise
        """
        lines = content.split('\n')
        
        # Check first few lines for <L#> pattern
        line_number_pattern = r'^<L\d+>\s'
        import re
        
        lines_to_check = min(5, len(lines))  # Check first 5 lines
        numbered_lines_found = 0
        
        for i in range(lines_to_check):
            if lines[i] and re.match(line_number_pattern, lines[i]):
                numbered_lines_found += 1
        
        # If most of the first few lines have line numbers, assume content is already numbered
        return numbered_lines_found >= (lines_to_check * 0.6)  # 60% threshold
    
    def _assign_global_line_numbers(self, original_content: str, chunks: List[TextChunk]) -> List[TextChunk]:
        """Assign global line numbers to chunks based on original positions"""
        
        # Check if content already has line numbers
        if self._has_line_numbers(original_content):
            logger.info("Content already has line numbers, skipping line number assignment")
            return chunks
        
        logger.info(f"Assigning global line numbers to {len(chunks)} chunks")
        
        # Create original content line mapping
        original_lines = original_content.split('\n')
        char_to_line_map = {}
        char_pos = 0
        
        for line_num, line in enumerate(original_lines, 1):
            # Map every character in this line to this line number
            for i in range(len(line) + 1):  # +1 for newline
                if char_pos + i < len(original_content):
                    char_to_line_map[char_pos + i] = line_num
            char_pos += len(line) + 1  # +1 for newline
        
        # Process each chunk
        processed_chunks = []
        for i, chunk in enumerate(chunks):
            start_char = chunk.metadata["window_info"]["window_start_char"]
            end_char = chunk.metadata["window_info"]["window_end_char"]
            
            # Find the starting line number for this chunk
            global_start_line = char_to_line_map.get(start_char, 1)
            
            # Add line numbers to chunk content starting from global position
            numbered_content = self._add_line_numbers_to_chunk(
                chunk.content, 
                global_start_line
            )
            
            # Create new chunk with numbered content
            new_chunk = TextChunk(
                content=numbered_content,
                metadata=chunk.metadata.copy()
            )
            
            # Store global line info in metadata
            new_chunk.metadata["window_info"]["global_line_start"] = global_start_line
            
            # Calculate ending line number
            lines_in_chunk = len(chunk.content.split('\n'))
            global_end_line = global_start_line + lines_in_chunk - 1
            new_chunk.metadata["window_info"]["global_line_end"] = global_end_line
            
            processed_chunks.append(new_chunk)
            
            logger.debug(f"Chunk {i}: chars {start_char}-{end_char} → lines {global_start_line}-{global_end_line}")
        
        logger.info(f"Successfully assigned global line numbers to all chunks")
        return processed_chunks
    
    def _add_line_numbers_to_chunk(self, chunk_content: str, start_line: int) -> str:
        """Add line numbers to a single chunk starting from start_line"""
        from .trace_line_processor import TraceLineNumberProcessor
        processor = TraceLineNumberProcessor()
        numbered_content, _ = processor.add_line_numbers(chunk_content, start_line=start_line)
        return numbered_content