""" Semantic Analysis Module for Agent-Aware Splitting This module provides embedding-based semantic analysis to identify natural breakpoints in agent execution logs based on content similarity. Uses OpenAI's text-embedding-3-small model for high-quality embeddings. """ import re import os from typing import List, Dict, Tuple, Optional from dataclasses import dataclass import logging import numpy as np # OpenAI client for embeddings try: from openai import OpenAI OPENAI_AVAILABLE = True except ImportError: OPENAI_AVAILABLE = False OpenAI = None # Optional import for sklearn try: from sklearn.metrics.pairwise import cosine_similarity SKLEARN_AVAILABLE = True except ImportError: SKLEARN_AVAILABLE = False cosine_similarity = None # Optional import for tiktoken (accurate token counting) try: import tiktoken TIKTOKEN_AVAILABLE = True except ImportError: TIKTOKEN_AVAILABLE = False tiktoken = None logger = logging.getLogger(__name__) @dataclass class SemanticBreakpoint: """Represents a semantic breakpoint in the content.""" position: int sentence_index: int similarity_drop: float confidence: float context_sentences: List[str] @dataclass class SemanticSegment: """Represents a semantically coherent segment.""" start_position: int end_position: int sentences: List[str] coherence_score: float segment_type: str class SemanticAnalyzer: """ Analyzes text content using OpenAI embeddings to identify semantic boundaries. Uses sentence-level embeddings to detect where content topics shift, indicating natural breakpoints for intelligent chunking. """ def __init__(self, model_name: str = "text-embedding-3-small", similarity_threshold: float = 0.5, window_size: int = 3, api_key: Optional[str] = None): """ Initialize the SemanticAnalyzer with OpenAI embeddings and adaptive threshold detection. Args: model_name: Name of the OpenAI embedding model to use similarity_threshold: Base similarity threshold (used as fallback) window_size: Context window size for breakpoint analysis api_key: OpenAI API key (uses environment variable if not provided) """ self.model_name = model_name self.similarity_threshold = similarity_threshold self.window_size = window_size # Import log type detector for agent-aware analysis from .log_type_detector import LogTypeDetector self.log_type_detector = LogTypeDetector() # Set up tokenizer for accurate token counting if available self.tokenizer = None try: import tiktoken # For text-embedding-3-small, use cl100k_base encoding (same as GPT-4) self.tokenizer = tiktoken.get_encoding("cl100k_base") logger.debug("Initialized tiktoken for accurate token counting") except ImportError: logger.warning("tiktoken not available, using approximate token counting") # Initialize OpenAI client api_key = api_key or os.getenv('OPENAI_API_KEY') if not api_key: logger.warning("No OpenAI API key provided. Embeddings will not be available.") self.openai_client = None else: try: from openai import OpenAI self.openai_client = OpenAI(api_key=api_key) logger.debug(f"Initialized OpenAI client for {model_name}") except ImportError: logger.error("OpenAI package not installed. Run: pip install openai") self.openai_client = None except Exception as e: logger.error(f"Failed to initialize OpenAI client: {e}") self.openai_client = None def analyze_semantic_structure(self, content: str) -> Dict[str, any]: """ Analyze the semantic structure of content to identify natural breakpoints. Enhanced with agent trace type awareness and adaptive threshold selection. Args: content: The text content to analyze Returns: Dictionary containing analysis results with segments, breakpoints, and method used """ if not content or not content.strip(): return { "segments": [], "breakpoints": [], "coherence_score": 0.0, "method": "none_empty_content", "total_sentences": 0 } # Detect agent trace type for adaptive processing agent_trace_type = None if hasattr(self, 'log_type_detector'): try: detection_result = self.log_type_detector.detect_log_type(content) agent_trace_type = detection_result.log_type logger.info(f"Detected agent trace type: {agent_trace_type.value} (confidence: {detection_result.confidence:.2f})") except Exception as e: logger.warning(f"Failed to detect agent trace type: {e}") sentences = self._split_into_sentences(content) if len(sentences) < 2: return { "segments": [SemanticSegment( start_position=0, end_position=len(content), sentences=sentences, coherence_score=1.0, segment_type="single_sentence" )], "breakpoints": [], "coherence_score": 1.0, "method": "single_sentence", "total_sentences": len(sentences), "agent_trace_type": agent_trace_type.value if agent_trace_type else "unknown" } # Try OpenAI embeddings first if self.openai_client: try: embeddings = self._calculate_embeddings(sentences) if embeddings is not None: breakpoints = self._find_semantic_breakpoints(sentences, embeddings, content) segments = self._create_semantic_segments(sentences, breakpoints) coherence_score = self._calculate_overall_coherence(embeddings) return { "segments": segments, "breakpoints": breakpoints, "coherence_score": coherence_score, "method": "openai_embedding_based_adaptive", "total_sentences": len(sentences), "agent_trace_type": agent_trace_type.value if agent_trace_type else "unknown", "threshold_method": getattr(self, '_last_threshold_method', 'adaptive_statistical') } except Exception as e: logger.warning(f"OpenAI embedding analysis failed: {e}") logger.info("Falling back to text-based analysis") # Fallback to simple text-based analysis try: breakpoints = self._find_simple_breakpoints(sentences) segments = self._create_simple_segments(sentences, breakpoints) return { "segments": segments, "breakpoints": breakpoints, "coherence_score": 0.5, # Default for text-based "method": "text_based_fallback", "total_sentences": len(sentences), "agent_trace_type": agent_trace_type.value if agent_trace_type else "unknown" } except Exception as e: logger.error(f"All analysis methods failed: {e}") return None def _split_into_sentences(self, content: str) -> List[str]: """ Split content into sentences with enhanced handling for JSON schemas. Args: content: Text content to split Returns: List of sentences """ if not content.strip(): return [] # Detect content type for special handling content_type = None if hasattr(self, 'log_type_detector'): try: detection_result = self.log_type_detector.detect_log_type(content) content_type = detection_result.log_type except Exception: pass # Special preprocessing for complex JSON schemas if content_type and content_type.value == "complex_json_schema": content = self._preprocess_complex_json_schema(content) # Enhanced sentence splitting patterns # Add JSON object boundaries as sentence breaks for very long JSON sentence_patterns = [ r'(?<=[.!?])\s+(?=[A-Z])', # Standard sentence boundaries r'}\s*,\s*\{', # JSON object boundaries (simplified) r']\s*,\s*\[', # JSON array boundaries (simplified) r'}\s*\n\s*\{', # JSON blocks on new lines (simplified) r'}\s*\n\s*[A-Za-z]', # JSON to text transitions (simplified) r'[a-z]\s*\n\s*\{', # Text to JSON transitions (simplified) ] # Apply sentence splitting sentences = [content] # Start with full content for pattern in sentence_patterns: new_sentences = [] for sentence in sentences: # Split on pattern but keep delimiter context parts = re.split(f'({pattern})', sentence) current_sentence = "" for i, part in enumerate(parts): if re.match(pattern, part): # This is a delimiter - finish current sentence if current_sentence.strip(): new_sentences.append(current_sentence.strip()) current_sentence = "" else: current_sentence += part # Add final sentence if exists if current_sentence.strip(): new_sentences.append(current_sentence.strip()) sentences = new_sentences # Filter out empty sentences and very short ones sentences = [s for s in sentences if len(s.strip()) > 10] # Additional safety: split extremely long sentences (>5000 chars) final_sentences = [] for sentence in sentences: if len(sentence) > 5000: # Force split long content at logical boundaries chunks = self._split_long_content(sentence) final_sentences.extend(chunks) else: final_sentences.append(sentence) return final_sentences def _preprocess_complex_json_schema(self, content: str) -> str: """ Preprocess complex JSON schemas to add better sentence boundaries. Args: content: Content containing complex JSON schemas Returns: Preprocessed content with better boundaries """ # Add line breaks after major JSON schema sections schema_boundaries = [ (r'"business_rules":\s*\[', r'"business_rules": [\n'), (r'"technical_rules":\s*\[', r'"technical_rules": [\n'), (r'"list_of_tables":\s*\[', r'"list_of_tables": [\n'), (r'}\s*,\s*\{', r'},\n{'), # Separate table definitions (r']\s*}\s*,\s*\{', r']\n},\n{'), # End of table, start of next ] processed_content = content for pattern, replacement in schema_boundaries: processed_content = re.sub(pattern, replacement, processed_content) return processed_content def _split_long_content(self, content: str) -> List[str]: """ Force split extremely long content at logical boundaries. Args: content: Long content string Returns: List of smaller chunks """ if len(content) <= 5000: return [content] chunks = [] remaining = content while len(remaining) > 5000: # Find a good break point in the first 4000 characters break_point = 4000 # Look for JSON boundaries json_breaks = [ remaining.rfind('},', 0, break_point), remaining.rfind('],', 0, break_point), remaining.rfind('}\n', 0, break_point), remaining.rfind('.\n', 0, break_point), ] # Use the best available break point valid_breaks = [bp for bp in json_breaks if bp > 1000] # At least 1000 chars if valid_breaks: break_point = max(valid_breaks) + 2 # Include the delimiter chunk = remaining[:break_point].strip() if chunk: chunks.append(chunk) remaining = remaining[break_point:].strip() # Add final remaining content if remaining.strip(): chunks.append(remaining.strip()) return chunks def _calculate_embeddings(self, sentences: List[str]): """ Calculate embeddings for sentences using OpenAI API. Args: sentences: List of sentences Returns: Embeddings array or None if not available """ if not self.openai_client: return None try: # Process sentences in smart batches to respect token limits # text-embedding-3-small has a max context length of 8192 tokens max_tokens_per_batch = 7000 # Leave some margin for safety all_embeddings = [] current_batch = [] current_batch_tokens = 0 for sentence in sentences: # Accurate token counting using tiktoken if available if self.tokenizer: try: estimated_tokens = len(self.tokenizer.encode(sentence)) except Exception as e: logger.warning(f"Token counting failed, using character estimate: {e}") estimated_tokens = len(sentence) // 4 + 10 else: # Fallback: rough token estimation (~4 characters per token for English text) estimated_tokens = len(sentence) // 4 + 10 # If adding this sentence would exceed the token limit, process current batch if current_batch and (current_batch_tokens + estimated_tokens > max_tokens_per_batch): # Process current batch response = self.openai_client.embeddings.create( model=self.model_name, input=current_batch, encoding_format="float" ) # Extract embeddings from response batch_embeddings = [data.embedding for data in response.data] all_embeddings.extend(batch_embeddings) # Start new batch current_batch = [sentence] current_batch_tokens = estimated_tokens else: # Add sentence to current batch current_batch.append(sentence) current_batch_tokens += estimated_tokens # Safety check: if a single sentence is too long, truncate it if estimated_tokens > max_tokens_per_batch: logger.warning(f"Sentence too long ({estimated_tokens} tokens), truncating...") # Truncate the sentence to fit within token limit if self.tokenizer: # More accurate truncation using tokenizer tokens = self.tokenizer.encode(sentence) truncated_tokens = tokens[:max_tokens_per_batch - 50] # Leave margin truncated_sentence = self.tokenizer.decode(truncated_tokens) + "..." current_batch[-1] = truncated_sentence current_batch_tokens = len(truncated_tokens) + 10 # +10 for "..." and margin else: # Fallback character-based truncation max_chars = max_tokens_per_batch * 4 - 100 current_batch[-1] = sentence[:max_chars] + "..." current_batch_tokens = max_tokens_per_batch - 100 # Process final batch if it has content if current_batch: response = self.openai_client.embeddings.create( model=self.model_name, input=current_batch, encoding_format="float" ) # Extract embeddings from response batch_embeddings = [data.embedding for data in response.data] all_embeddings.extend(batch_embeddings) # Convert to numpy array embeddings = np.array(all_embeddings) logger.debug(f"Generated embeddings for {len(sentences)} sentences using {self.model_name} with smart batching") return embeddings except Exception as e: logger.error(f"Failed to calculate OpenAI embeddings: {e}") return None def _find_semantic_breakpoints(self, sentences: List[str], embeddings, content: str = None) -> List[SemanticBreakpoint]: """ Find semantic breakpoints using embedding similarity with adaptive statistical methods. Inspired by LangChain's SemanticChunker with multiple threshold detection methods. Args: sentences: List of sentences embeddings: Sentence embeddings (can be None) content: Original content for agent trace type detection Returns: List of semantic breakpoints """ if embeddings is None or not SKLEARN_AVAILABLE or len(embeddings) < 2: return [] breakpoints = [] # Calculate pairwise similarities between consecutive sentences similarities = [] for i in range(len(embeddings) - 1): similarity = cosine_similarity( embeddings[i:i+1], embeddings[i+1:i+2] )[0][0] similarities.append(similarity) if not similarities: return breakpoints # Use adaptive threshold based on statistical analysis (LangChain style) threshold = self._calculate_adaptive_threshold(similarities, content) logger.debug(f"Using adaptive threshold: {threshold:.3f} for {len(similarities)} similarities") # Find points where similarity drops significantly for i, similarity in enumerate(similarities): if similarity < threshold: # Calculate confidence based on how much similarity dropped if i > 0: prev_similarity = similarities[i-1] similarity_drop = prev_similarity - similarity else: similarity_drop = 1.0 - similarity # Enhanced confidence calculation using statistical context confidence = self._calculate_enhanced_confidence( similarity, similarities, i, threshold ) # Get context sentences context_start = max(0, i - self.window_size) context_end = min(len(sentences), i + self.window_size + 2) context_sentences = sentences[context_start:context_end] # Calculate position in original text position = self._calculate_sentence_position(sentences, i + 1) breakpoint = SemanticBreakpoint( position=position, sentence_index=i + 1, similarity_drop=similarity_drop, confidence=confidence, context_sentences=context_sentences ) breakpoints.append(breakpoint) return breakpoints def _calculate_adaptive_threshold(self, similarities: List[float], content: str = None) -> float: """ Calculate adaptive threshold using multiple statistical methods inspired by LangChain. Enhanced with agent trace type awareness. Args: similarities: List of similarity scores content: Original content for agent trace type detection Returns: Adaptive threshold value """ if not similarities: return self.similarity_threshold similarities_array = np.array(similarities) # Method 1: Percentile-based (LangChain style) # Use 5th percentile for breakpoints (bottom 5% of similarities) percentile_threshold = np.percentile(similarities_array, 5) # Method 2: Standard deviation mean_sim = np.mean(similarities_array) std_sim = np.std(similarities_array) std_threshold = mean_sim - (1.5 * std_sim) # 1.5 std devs below mean # Method 3: Interquartile range q1, q3 = np.percentile(similarities_array, [25, 75]) iqr = q3 - q1 iqr_threshold = q1 - (1.5 * iqr) # Similar to outlier detection # Method 4: Gradient-based (for highly correlated content) gradient_threshold = self._calculate_gradient_threshold(similarities_array) # Choose the most appropriate threshold based on data characteristics and agent trace type adaptive_threshold = self._select_best_threshold( similarities_array, percentile_threshold, std_threshold, iqr_threshold, gradient_threshold, content # Pass content for agent trace type detection ) # Ensure threshold is within reasonable bounds min_threshold = 0.1 # Don't split everything max_threshold = 0.9 # Don't split nothing return max(min_threshold, min(max_threshold, adaptive_threshold)) def _calculate_gradient_threshold(self, similarities) -> float: """ Calculate threshold using gradient analysis for highly semantic content. Similar to LangChain's gradient method. """ if len(similarities) < 3: return np.mean(similarities) - np.std(similarities) # Calculate gradient (rate of change) gradients = np.gradient(similarities) # Apply anomaly detection on gradients to widen distribution gradient_mean = np.mean(gradients) gradient_std = np.std(gradients) # Find significant negative gradients (sharp drops in similarity) significant_drops = gradients < (gradient_mean - 2 * gradient_std) if np.any(significant_drops): # Use the 10th percentile of similarities where we have significant drops drop_similarities = similarities[significant_drops] return np.percentile(drop_similarities, 10) else: # Fallback to standard method return np.mean(similarities) - np.std(similarities) def _select_best_threshold(self, similarities, percentile_threshold: float, std_threshold: float, iqr_threshold: float, gradient_threshold: float, content: str = None) -> float: """ Select the best threshold based on data characteristics and agent trace type. Enhanced to be agent-aware based on log type detection. """ # Analyze data characteristics variance = np.var(similarities) mean_similarity = np.mean(similarities) skewness = self._calculate_skewness(similarities) # Detect agent trace type if content is provided agent_trace_type = None if content and hasattr(self, 'log_type_detector'): try: detection_result = self.log_type_detector.detect_log_type(content) agent_trace_type = detection_result.log_type logger.debug(f"Detected agent trace type: {agent_trace_type.value} (confidence: {detection_result.confidence:.2f})") except Exception as e: logger.warning(f"Failed to detect agent trace type: {e}") # Agent trace type specific threshold selection if agent_trace_type: from .log_type_detector import LogType if agent_trace_type == LogType.CREWAI_EXECUTION: # CrewAI logs have clear hierarchical structure - use gradient method # to preserve task/agent boundaries logger.debug("CrewAI execution detected - using gradient threshold for task boundaries") self._last_threshold_method = "gradient_crewai_aware" return gradient_threshold elif agent_trace_type == LogType.LANGFUSE_TRACE: # Langfuse traces are highly structured - use percentile method # to identify significant breaks in observation chains logger.debug("Langfuse trace detected - using percentile threshold for observation breaks") self._last_threshold_method = "percentile_langfuse_aware" return percentile_threshold elif agent_trace_type == LogType.MIXED_JSON_NARRATIVE: # Mixed content needs robust method - use IQR to handle variety logger.debug("Mixed JSON/narrative detected - using IQR threshold for robust handling") self._last_threshold_method = "iqr_mixed_content_aware" return iqr_threshold elif agent_trace_type == LogType.COMPLEX_JSON_SCHEMA: # Complex schemas need special handling - use percentile for major breaks logger.debug("Complex JSON schema detected - using percentile threshold for schema boundaries") self._last_threshold_method = "percentile_complex_schema_aware" return percentile_threshold elif agent_trace_type == LogType.STRUCTURED_JSON: # JSON needs to preserve object boundaries - use std deviation logger.debug("Structured JSON detected - using standard deviation threshold") self._last_threshold_method = "std_json_aware" return std_threshold elif agent_trace_type == LogType.NATURAL_LANGUAGE: # Natural language benefits from semantic analysis - use percentile logger.debug("Natural language detected - using percentile threshold for semantic breaks") self._last_threshold_method = "percentile_natural_language_aware" return percentile_threshold elif agent_trace_type == LogType.UNKNOWN: # Unknown format - fall back to data-driven selection logger.debug("Unknown format detected - using data-driven threshold selection") # Data-driven threshold selection (original logic as fallback) # High variance suggests diverse content - use percentile method if variance > 0.05: logger.debug("High variance detected, using percentile threshold") self._last_threshold_method = "percentile_high_variance" return percentile_threshold # High mean similarity suggests coherent content - use gradient method elif mean_similarity > 0.8: logger.debug("Highly coherent content detected, using gradient threshold") self._last_threshold_method = "gradient_high_coherence" return gradient_threshold # Skewed distribution - use IQR method for robustness elif abs(skewness) > 1.0: logger.debug("Skewed distribution detected, using IQR threshold") self._last_threshold_method = "iqr_skewed_distribution" return iqr_threshold # Default to standard deviation method else: logger.debug("Using standard deviation threshold") self._last_threshold_method = "std_default" return std_threshold def _calculate_skewness(self, data) -> float: """Calculate skewness of the data distribution.""" if len(data) < 3: return 0.0 mean = np.mean(data) std = np.std(data) if std == 0: return 0.0 # Pearson's moment coefficient of skewness skewness = np.mean(((data - mean) / std) ** 3) return skewness def _calculate_enhanced_confidence(self, similarity: float, all_similarities: List[float], index: int, threshold: float) -> float: """ Calculate enhanced confidence score using statistical context. """ # Base confidence from how far below threshold base_confidence = max(0, (threshold - similarity) / threshold) # Bonus for consistency with nearby similarities window_start = max(0, index - 2) window_end = min(len(all_similarities), index + 3) local_similarities = all_similarities[window_start:window_end] # If local similarities are consistently low, increase confidence local_mean = np.mean(local_similarities) if local_mean < threshold: base_confidence += 0.2 # Bonus for being significantly different from neighbors if index > 0 and index < len(all_similarities) - 1: prev_sim = all_similarities[index - 1] next_sim = all_similarities[index + 1] # If this is a valley (lower than both neighbors), increase confidence if similarity < prev_sim and similarity < next_sim: base_confidence += 0.3 return min(1.0, base_confidence) def _find_simple_breakpoints(self, sentences: List[str]) -> List[SemanticBreakpoint]: """ Find breakpoints using simple text analysis (fallback method). Args: sentences: List of sentences Returns: List of semantic breakpoints """ breakpoints = [] # Look for topic shifts based on keyword changes for i in range(1, len(sentences)): current_keywords = self._extract_keywords(sentences[i]) prev_keywords = self._extract_keywords(sentences[i-1]) # Calculate keyword overlap if current_keywords and prev_keywords: overlap = len(current_keywords & prev_keywords) / len(current_keywords | prev_keywords) if overlap < 0.3: # Low overlap suggests topic shift position = self._calculate_sentence_position(sentences, i) breakpoint = SemanticBreakpoint( position=position, sentence_index=i, similarity_drop=1.0 - overlap, confidence=0.6, # Lower confidence for simple method context_sentences=sentences[max(0, i-2):min(len(sentences), i+3)] ) breakpoints.append(breakpoint) return breakpoints def _extract_keywords(self, sentence: str) -> set: """Extract keywords from a sentence.""" # Simple keyword extraction words = re.findall(r'\b[a-zA-Z]{3,}\b', sentence.lower()) # Filter out common stop words stop_words = {'the', 'and', 'are', 'for', 'with', 'this', 'that', 'from', 'they', 'been', 'have', 'will', 'would', 'could', 'should'} keywords = {word for word in words if word not in stop_words} return keywords def _calculate_sentence_position(self, sentences: List[str], sentence_index: int) -> int: """Calculate the character position where a sentence starts.""" position = 0 for i in range(sentence_index): position += len(sentences[i]) + 1 # +1 for space/newline return position def _create_semantic_segments(self, sentences: List[str], breakpoints: List[SemanticBreakpoint]) -> List[SemanticSegment]: """ Create semantic segments based on breakpoints. Args: sentences: List of sentences breakpoints: List of breakpoints Returns: List of semantic segments """ if not breakpoints: # Single segment containing all sentences return [SemanticSegment( start_position=0, end_position=sum(len(s) for s in sentences), sentences=sentences, coherence_score=0.8, segment_type="unified" )] segments = [] last_position = 0 last_sentence_index = 0 for breakpoint in breakpoints: # Create segment up to this breakpoint segment_sentences = sentences[last_sentence_index:breakpoint.sentence_index] if segment_sentences: segments.append(SemanticSegment( start_position=last_position, end_position=breakpoint.position, sentences=segment_sentences, coherence_score=1.0 - breakpoint.similarity_drop, segment_type="semantic_unit" )) last_position = breakpoint.position last_sentence_index = breakpoint.sentence_index # Add final segment if last_sentence_index < len(sentences): final_sentences = sentences[last_sentence_index:] segments.append(SemanticSegment( start_position=last_position, end_position=sum(len(s) for s in sentences), sentences=final_sentences, coherence_score=0.8, segment_type="final_segment" )) return segments def _create_simple_segments(self, sentences: List[str], breakpoints: List[SemanticBreakpoint]) -> List[SemanticSegment]: """Create segments using simple method (fallback).""" return self._create_semantic_segments(sentences, breakpoints) def _calculate_overall_coherence(self, embeddings) -> float: """ Calculate overall coherence score for the content. Args: embeddings: Sentence embeddings (can be None) Returns: Coherence score between 0 and 1 """ if embeddings is None or len(embeddings) < 2: return 0.7 # Default coherence score when embeddings unavailable # Calculate average pairwise similarity similarities = [] for i in range(len(embeddings) - 1): similarity = cosine_similarity( embeddings[i:i+1], embeddings[i+1:i+2] )[0][0] similarities.append(similarity) return float(np.mean(similarities)) def recommend_chunk_boundaries(self, content: str, target_chunk_size: int) -> List[int]: """ Recommend chunk boundaries based on semantic analysis. Args: content: The content to analyze target_chunk_size: Target size for chunks Returns: List of recommended boundary positions """ analysis = self.analyze_semantic_structure(content) breakpoints = analysis["breakpoints"] if not breakpoints: # No semantic breakpoints, fall back to size-based chunking boundaries = [] current_pos = 0 while current_pos + target_chunk_size < len(content): boundaries.append(current_pos + target_chunk_size) current_pos += target_chunk_size return boundaries # Combine semantic breakpoints with size constraints recommended_boundaries = [] last_boundary = 0 for breakpoint in breakpoints: chunk_size = breakpoint.position - last_boundary # If chunk would be too large, add intermediate boundary if chunk_size > target_chunk_size * 1.5: # Find a good intermediate point intermediate_pos = last_boundary + target_chunk_size recommended_boundaries.append(intermediate_pos) last_boundary = intermediate_pos # Add semantic boundary if it creates reasonable chunk size if breakpoint.position - last_boundary > target_chunk_size * 0.3: recommended_boundaries.append(breakpoint.position) last_boundary = breakpoint.position return recommended_boundaries def enhance_boundary_confidence(self, boundary_position: int, content: str) -> float: """ Enhance boundary confidence using semantic analysis. Args: boundary_position: Position of the boundary to analyze content: Full content for context Returns: Enhanced confidence score """ if not self.openai_client: return 0.5 # Default confidence without embeddings # Extract sentences around the boundary context_size = 200 context_start = max(0, boundary_position - context_size) context_end = min(len(content), boundary_position + context_size) context = content[context_start:context_end] sentences = self._split_into_sentences(context) if len(sentences) < 3: return 0.5 # Find which sentence the boundary falls in boundary_in_context = boundary_position - context_start target_sentence_idx = 0 current_pos = 0 for i, sentence in enumerate(sentences): if current_pos + len(sentence) >= boundary_in_context: target_sentence_idx = i break current_pos += len(sentence) + 1 # Analyze semantic similarity around this position if target_sentence_idx > 0 and target_sentence_idx < len(sentences) - 1: before_sentences = sentences[max(0, target_sentence_idx-1):target_sentence_idx+1] after_sentences = sentences[target_sentence_idx:target_sentence_idx+2] before_embeddings = self._calculate_embeddings(before_sentences) after_embeddings = self._calculate_embeddings(after_sentences) if before_embeddings is not None and after_embeddings is not None and len(before_embeddings) > 0 and len(after_embeddings) > 0: similarity = cosine_similarity( before_embeddings[-1:], after_embeddings[0:1] )[0][0] # Lower similarity means higher confidence for boundary confidence = 1.0 - similarity return max(0.1, min(1.0, confidence)) return 0.5