Spaces:
Running
Running
| """ | |
| Semantic Analysis Module for Agent-Aware Splitting | |
| This module provides embedding-based semantic analysis to identify | |
| natural breakpoints in agent execution logs based on content similarity. | |
| Uses OpenAI's text-embedding-3-small model for high-quality embeddings. | |
| """ | |
| import re | |
| import os | |
| from typing import List, Dict, Tuple, Optional | |
| from dataclasses import dataclass | |
| import logging | |
| import numpy as np | |
| # OpenAI client for embeddings | |
| try: | |
| from openai import OpenAI | |
| OPENAI_AVAILABLE = True | |
| except ImportError: | |
| OPENAI_AVAILABLE = False | |
| OpenAI = None | |
| # Optional import for sklearn | |
| try: | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| SKLEARN_AVAILABLE = True | |
| except ImportError: | |
| SKLEARN_AVAILABLE = False | |
| cosine_similarity = None | |
| # Optional import for tiktoken (accurate token counting) | |
| try: | |
| import tiktoken | |
| TIKTOKEN_AVAILABLE = True | |
| except ImportError: | |
| TIKTOKEN_AVAILABLE = False | |
| tiktoken = None | |
| logger = logging.getLogger(__name__) | |
| class SemanticBreakpoint: | |
| """Represents a semantic breakpoint in the content.""" | |
| position: int | |
| sentence_index: int | |
| similarity_drop: float | |
| confidence: float | |
| context_sentences: List[str] | |
| class SemanticSegment: | |
| """Represents a semantically coherent segment.""" | |
| start_position: int | |
| end_position: int | |
| sentences: List[str] | |
| coherence_score: float | |
| segment_type: str | |
| class SemanticAnalyzer: | |
| """ | |
| Analyzes text content using OpenAI embeddings to identify semantic boundaries. | |
| Uses sentence-level embeddings to detect where content topics shift, | |
| indicating natural breakpoints for intelligent chunking. | |
| """ | |
| def __init__(self, | |
| model_name: str = "text-embedding-3-small", | |
| similarity_threshold: float = 0.5, | |
| window_size: int = 3, | |
| api_key: Optional[str] = None): | |
| """ | |
| Initialize the SemanticAnalyzer with OpenAI embeddings and adaptive threshold detection. | |
| Args: | |
| model_name: Name of the OpenAI embedding model to use | |
| similarity_threshold: Base similarity threshold (used as fallback) | |
| window_size: Context window size for breakpoint analysis | |
| api_key: OpenAI API key (uses environment variable if not provided) | |
| """ | |
| self.model_name = model_name | |
| self.similarity_threshold = similarity_threshold | |
| self.window_size = window_size | |
| # Import log type detector for agent-aware analysis | |
| from .log_type_detector import LogTypeDetector | |
| self.log_type_detector = LogTypeDetector() | |
| # Set up tokenizer for accurate token counting if available | |
| self.tokenizer = None | |
| try: | |
| import tiktoken | |
| # For text-embedding-3-small, use cl100k_base encoding (same as GPT-4) | |
| self.tokenizer = tiktoken.get_encoding("cl100k_base") | |
| logger.debug("Initialized tiktoken for accurate token counting") | |
| except ImportError: | |
| logger.warning("tiktoken not available, using approximate token counting") | |
| # Initialize OpenAI client | |
| api_key = api_key or os.getenv('OPENAI_API_KEY') | |
| if not api_key: | |
| logger.warning("No OpenAI API key provided. Embeddings will not be available.") | |
| self.openai_client = None | |
| else: | |
| try: | |
| from openai import OpenAI | |
| self.openai_client = OpenAI(api_key=api_key) | |
| logger.debug(f"Initialized OpenAI client for {model_name}") | |
| except ImportError: | |
| logger.error("OpenAI package not installed. Run: pip install openai") | |
| self.openai_client = None | |
| except Exception as e: | |
| logger.error(f"Failed to initialize OpenAI client: {e}") | |
| self.openai_client = None | |
| def analyze_semantic_structure(self, content: str) -> Dict[str, any]: | |
| """ | |
| Analyze the semantic structure of content to identify natural breakpoints. | |
| Enhanced with agent trace type awareness and adaptive threshold selection. | |
| Args: | |
| content: The text content to analyze | |
| Returns: | |
| Dictionary containing analysis results with segments, breakpoints, and method used | |
| """ | |
| if not content or not content.strip(): | |
| return { | |
| "segments": [], | |
| "breakpoints": [], | |
| "coherence_score": 0.0, | |
| "method": "none_empty_content", | |
| "total_sentences": 0 | |
| } | |
| # Detect agent trace type for adaptive processing | |
| agent_trace_type = None | |
| if hasattr(self, 'log_type_detector'): | |
| try: | |
| detection_result = self.log_type_detector.detect_log_type(content) | |
| agent_trace_type = detection_result.log_type | |
| logger.info(f"Detected agent trace type: {agent_trace_type.value} (confidence: {detection_result.confidence:.2f})") | |
| except Exception as e: | |
| logger.warning(f"Failed to detect agent trace type: {e}") | |
| sentences = self._split_into_sentences(content) | |
| if len(sentences) < 2: | |
| return { | |
| "segments": [SemanticSegment( | |
| start_position=0, | |
| end_position=len(content), | |
| sentences=sentences, | |
| coherence_score=1.0, | |
| segment_type="single_sentence" | |
| )], | |
| "breakpoints": [], | |
| "coherence_score": 1.0, | |
| "method": "single_sentence", | |
| "total_sentences": len(sentences), | |
| "agent_trace_type": agent_trace_type.value if agent_trace_type else "unknown" | |
| } | |
| # Try OpenAI embeddings first | |
| if self.openai_client: | |
| try: | |
| embeddings = self._calculate_embeddings(sentences) | |
| if embeddings is not None: | |
| breakpoints = self._find_semantic_breakpoints(sentences, embeddings, content) | |
| segments = self._create_semantic_segments(sentences, breakpoints) | |
| coherence_score = self._calculate_overall_coherence(embeddings) | |
| return { | |
| "segments": segments, | |
| "breakpoints": breakpoints, | |
| "coherence_score": coherence_score, | |
| "method": "openai_embedding_based_adaptive", | |
| "total_sentences": len(sentences), | |
| "agent_trace_type": agent_trace_type.value if agent_trace_type else "unknown", | |
| "threshold_method": getattr(self, '_last_threshold_method', 'adaptive_statistical') | |
| } | |
| except Exception as e: | |
| logger.warning(f"OpenAI embedding analysis failed: {e}") | |
| logger.info("Falling back to text-based analysis") | |
| # Fallback to simple text-based analysis | |
| try: | |
| breakpoints = self._find_simple_breakpoints(sentences) | |
| segments = self._create_simple_segments(sentences, breakpoints) | |
| return { | |
| "segments": segments, | |
| "breakpoints": breakpoints, | |
| "coherence_score": 0.5, # Default for text-based | |
| "method": "text_based_fallback", | |
| "total_sentences": len(sentences), | |
| "agent_trace_type": agent_trace_type.value if agent_trace_type else "unknown" | |
| } | |
| except Exception as e: | |
| logger.error(f"All analysis methods failed: {e}") | |
| return None | |
| def _split_into_sentences(self, content: str) -> List[str]: | |
| """ | |
| Split content into sentences with enhanced handling for JSON schemas. | |
| Args: | |
| content: Text content to split | |
| Returns: | |
| List of sentences | |
| """ | |
| if not content.strip(): | |
| return [] | |
| # Detect content type for special handling | |
| content_type = None | |
| if hasattr(self, 'log_type_detector'): | |
| try: | |
| detection_result = self.log_type_detector.detect_log_type(content) | |
| content_type = detection_result.log_type | |
| except Exception: | |
| pass | |
| # Special preprocessing for complex JSON schemas | |
| if content_type and content_type.value == "complex_json_schema": | |
| content = self._preprocess_complex_json_schema(content) | |
| # Enhanced sentence splitting patterns | |
| # Add JSON object boundaries as sentence breaks for very long JSON | |
| sentence_patterns = [ | |
| r'(?<=[.!?])\s+(?=[A-Z])', # Standard sentence boundaries | |
| r'}\s*,\s*\{', # JSON object boundaries (simplified) | |
| r']\s*,\s*\[', # JSON array boundaries (simplified) | |
| r'}\s*\n\s*\{', # JSON blocks on new lines (simplified) | |
| r'}\s*\n\s*[A-Za-z]', # JSON to text transitions (simplified) | |
| r'[a-z]\s*\n\s*\{', # Text to JSON transitions (simplified) | |
| ] | |
| # Apply sentence splitting | |
| sentences = [content] # Start with full content | |
| for pattern in sentence_patterns: | |
| new_sentences = [] | |
| for sentence in sentences: | |
| # Split on pattern but keep delimiter context | |
| parts = re.split(f'({pattern})', sentence) | |
| current_sentence = "" | |
| for i, part in enumerate(parts): | |
| if re.match(pattern, part): | |
| # This is a delimiter - finish current sentence | |
| if current_sentence.strip(): | |
| new_sentences.append(current_sentence.strip()) | |
| current_sentence = "" | |
| else: | |
| current_sentence += part | |
| # Add final sentence if exists | |
| if current_sentence.strip(): | |
| new_sentences.append(current_sentence.strip()) | |
| sentences = new_sentences | |
| # Filter out empty sentences and very short ones | |
| sentences = [s for s in sentences if len(s.strip()) > 10] | |
| # Additional safety: split extremely long sentences (>5000 chars) | |
| final_sentences = [] | |
| for sentence in sentences: | |
| if len(sentence) > 5000: | |
| # Force split long content at logical boundaries | |
| chunks = self._split_long_content(sentence) | |
| final_sentences.extend(chunks) | |
| else: | |
| final_sentences.append(sentence) | |
| return final_sentences | |
| def _preprocess_complex_json_schema(self, content: str) -> str: | |
| """ | |
| Preprocess complex JSON schemas to add better sentence boundaries. | |
| Args: | |
| content: Content containing complex JSON schemas | |
| Returns: | |
| Preprocessed content with better boundaries | |
| """ | |
| # Add line breaks after major JSON schema sections | |
| schema_boundaries = [ | |
| (r'"business_rules":\s*\[', r'"business_rules": [\n'), | |
| (r'"technical_rules":\s*\[', r'"technical_rules": [\n'), | |
| (r'"list_of_tables":\s*\[', r'"list_of_tables": [\n'), | |
| (r'}\s*,\s*\{', r'},\n{'), # Separate table definitions | |
| (r']\s*}\s*,\s*\{', r']\n},\n{'), # End of table, start of next | |
| ] | |
| processed_content = content | |
| for pattern, replacement in schema_boundaries: | |
| processed_content = re.sub(pattern, replacement, processed_content) | |
| return processed_content | |
| def _split_long_content(self, content: str) -> List[str]: | |
| """ | |
| Force split extremely long content at logical boundaries. | |
| Args: | |
| content: Long content string | |
| Returns: | |
| List of smaller chunks | |
| """ | |
| if len(content) <= 5000: | |
| return [content] | |
| chunks = [] | |
| remaining = content | |
| while len(remaining) > 5000: | |
| # Find a good break point in the first 4000 characters | |
| break_point = 4000 | |
| # Look for JSON boundaries | |
| json_breaks = [ | |
| remaining.rfind('},', 0, break_point), | |
| remaining.rfind('],', 0, break_point), | |
| remaining.rfind('}\n', 0, break_point), | |
| remaining.rfind('.\n', 0, break_point), | |
| ] | |
| # Use the best available break point | |
| valid_breaks = [bp for bp in json_breaks if bp > 1000] # At least 1000 chars | |
| if valid_breaks: | |
| break_point = max(valid_breaks) + 2 # Include the delimiter | |
| chunk = remaining[:break_point].strip() | |
| if chunk: | |
| chunks.append(chunk) | |
| remaining = remaining[break_point:].strip() | |
| # Add final remaining content | |
| if remaining.strip(): | |
| chunks.append(remaining.strip()) | |
| return chunks | |
| def _calculate_embeddings(self, sentences: List[str]): | |
| """ | |
| Calculate embeddings for sentences using OpenAI API. | |
| Args: | |
| sentences: List of sentences | |
| Returns: | |
| Embeddings array or None if not available | |
| """ | |
| if not self.openai_client: | |
| return None | |
| try: | |
| # Process sentences in smart batches to respect token limits | |
| # text-embedding-3-small has a max context length of 8192 tokens | |
| max_tokens_per_batch = 7000 # Leave some margin for safety | |
| all_embeddings = [] | |
| current_batch = [] | |
| current_batch_tokens = 0 | |
| for sentence in sentences: | |
| # Accurate token counting using tiktoken if available | |
| if self.tokenizer: | |
| try: | |
| estimated_tokens = len(self.tokenizer.encode(sentence)) | |
| except Exception as e: | |
| logger.warning(f"Token counting failed, using character estimate: {e}") | |
| estimated_tokens = len(sentence) // 4 + 10 | |
| else: | |
| # Fallback: rough token estimation (~4 characters per token for English text) | |
| estimated_tokens = len(sentence) // 4 + 10 | |
| # If adding this sentence would exceed the token limit, process current batch | |
| if current_batch and (current_batch_tokens + estimated_tokens > max_tokens_per_batch): | |
| # Process current batch | |
| response = self.openai_client.embeddings.create( | |
| model=self.model_name, | |
| input=current_batch, | |
| encoding_format="float" | |
| ) | |
| # Extract embeddings from response | |
| batch_embeddings = [data.embedding for data in response.data] | |
| all_embeddings.extend(batch_embeddings) | |
| # Start new batch | |
| current_batch = [sentence] | |
| current_batch_tokens = estimated_tokens | |
| else: | |
| # Add sentence to current batch | |
| current_batch.append(sentence) | |
| current_batch_tokens += estimated_tokens | |
| # Safety check: if a single sentence is too long, truncate it | |
| if estimated_tokens > max_tokens_per_batch: | |
| logger.warning(f"Sentence too long ({estimated_tokens} tokens), truncating...") | |
| # Truncate the sentence to fit within token limit | |
| if self.tokenizer: | |
| # More accurate truncation using tokenizer | |
| tokens = self.tokenizer.encode(sentence) | |
| truncated_tokens = tokens[:max_tokens_per_batch - 50] # Leave margin | |
| truncated_sentence = self.tokenizer.decode(truncated_tokens) + "..." | |
| current_batch[-1] = truncated_sentence | |
| current_batch_tokens = len(truncated_tokens) + 10 # +10 for "..." and margin | |
| else: | |
| # Fallback character-based truncation | |
| max_chars = max_tokens_per_batch * 4 - 100 | |
| current_batch[-1] = sentence[:max_chars] + "..." | |
| current_batch_tokens = max_tokens_per_batch - 100 | |
| # Process final batch if it has content | |
| if current_batch: | |
| response = self.openai_client.embeddings.create( | |
| model=self.model_name, | |
| input=current_batch, | |
| encoding_format="float" | |
| ) | |
| # Extract embeddings from response | |
| batch_embeddings = [data.embedding for data in response.data] | |
| all_embeddings.extend(batch_embeddings) | |
| # Convert to numpy array | |
| embeddings = np.array(all_embeddings) | |
| logger.debug(f"Generated embeddings for {len(sentences)} sentences using {self.model_name} with smart batching") | |
| return embeddings | |
| except Exception as e: | |
| logger.error(f"Failed to calculate OpenAI embeddings: {e}") | |
| return None | |
| def _find_semantic_breakpoints(self, sentences: List[str], embeddings, content: str = None) -> List[SemanticBreakpoint]: | |
| """ | |
| Find semantic breakpoints using embedding similarity with adaptive statistical methods. | |
| Inspired by LangChain's SemanticChunker with multiple threshold detection methods. | |
| Args: | |
| sentences: List of sentences | |
| embeddings: Sentence embeddings (can be None) | |
| content: Original content for agent trace type detection | |
| Returns: | |
| List of semantic breakpoints | |
| """ | |
| if embeddings is None or not SKLEARN_AVAILABLE or len(embeddings) < 2: | |
| return [] | |
| breakpoints = [] | |
| # Calculate pairwise similarities between consecutive sentences | |
| similarities = [] | |
| for i in range(len(embeddings) - 1): | |
| similarity = cosine_similarity( | |
| embeddings[i:i+1], | |
| embeddings[i+1:i+2] | |
| )[0][0] | |
| similarities.append(similarity) | |
| if not similarities: | |
| return breakpoints | |
| # Use adaptive threshold based on statistical analysis (LangChain style) | |
| threshold = self._calculate_adaptive_threshold(similarities, content) | |
| logger.debug(f"Using adaptive threshold: {threshold:.3f} for {len(similarities)} similarities") | |
| # Find points where similarity drops significantly | |
| for i, similarity in enumerate(similarities): | |
| if similarity < threshold: | |
| # Calculate confidence based on how much similarity dropped | |
| if i > 0: | |
| prev_similarity = similarities[i-1] | |
| similarity_drop = prev_similarity - similarity | |
| else: | |
| similarity_drop = 1.0 - similarity | |
| # Enhanced confidence calculation using statistical context | |
| confidence = self._calculate_enhanced_confidence( | |
| similarity, similarities, i, threshold | |
| ) | |
| # Get context sentences | |
| context_start = max(0, i - self.window_size) | |
| context_end = min(len(sentences), i + self.window_size + 2) | |
| context_sentences = sentences[context_start:context_end] | |
| # Calculate position in original text | |
| position = self._calculate_sentence_position(sentences, i + 1) | |
| breakpoint = SemanticBreakpoint( | |
| position=position, | |
| sentence_index=i + 1, | |
| similarity_drop=similarity_drop, | |
| confidence=confidence, | |
| context_sentences=context_sentences | |
| ) | |
| breakpoints.append(breakpoint) | |
| return breakpoints | |
| def _calculate_adaptive_threshold(self, similarities: List[float], content: str = None) -> float: | |
| """ | |
| Calculate adaptive threshold using multiple statistical methods inspired by LangChain. | |
| Enhanced with agent trace type awareness. | |
| Args: | |
| similarities: List of similarity scores | |
| content: Original content for agent trace type detection | |
| Returns: | |
| Adaptive threshold value | |
| """ | |
| if not similarities: | |
| return self.similarity_threshold | |
| similarities_array = np.array(similarities) | |
| # Method 1: Percentile-based (LangChain style) | |
| # Use 5th percentile for breakpoints (bottom 5% of similarities) | |
| percentile_threshold = np.percentile(similarities_array, 5) | |
| # Method 2: Standard deviation | |
| mean_sim = np.mean(similarities_array) | |
| std_sim = np.std(similarities_array) | |
| std_threshold = mean_sim - (1.5 * std_sim) # 1.5 std devs below mean | |
| # Method 3: Interquartile range | |
| q1, q3 = np.percentile(similarities_array, [25, 75]) | |
| iqr = q3 - q1 | |
| iqr_threshold = q1 - (1.5 * iqr) # Similar to outlier detection | |
| # Method 4: Gradient-based (for highly correlated content) | |
| gradient_threshold = self._calculate_gradient_threshold(similarities_array) | |
| # Choose the most appropriate threshold based on data characteristics and agent trace type | |
| adaptive_threshold = self._select_best_threshold( | |
| similarities_array, | |
| percentile_threshold, | |
| std_threshold, | |
| iqr_threshold, | |
| gradient_threshold, | |
| content # Pass content for agent trace type detection | |
| ) | |
| # Ensure threshold is within reasonable bounds | |
| min_threshold = 0.1 # Don't split everything | |
| max_threshold = 0.9 # Don't split nothing | |
| return max(min_threshold, min(max_threshold, adaptive_threshold)) | |
| def _calculate_gradient_threshold(self, similarities) -> float: | |
| """ | |
| Calculate threshold using gradient analysis for highly semantic content. | |
| Similar to LangChain's gradient method. | |
| """ | |
| if len(similarities) < 3: | |
| return np.mean(similarities) - np.std(similarities) | |
| # Calculate gradient (rate of change) | |
| gradients = np.gradient(similarities) | |
| # Apply anomaly detection on gradients to widen distribution | |
| gradient_mean = np.mean(gradients) | |
| gradient_std = np.std(gradients) | |
| # Find significant negative gradients (sharp drops in similarity) | |
| significant_drops = gradients < (gradient_mean - 2 * gradient_std) | |
| if np.any(significant_drops): | |
| # Use the 10th percentile of similarities where we have significant drops | |
| drop_similarities = similarities[significant_drops] | |
| return np.percentile(drop_similarities, 10) | |
| else: | |
| # Fallback to standard method | |
| return np.mean(similarities) - np.std(similarities) | |
| def _select_best_threshold(self, | |
| similarities, | |
| percentile_threshold: float, | |
| std_threshold: float, | |
| iqr_threshold: float, | |
| gradient_threshold: float, | |
| content: str = None) -> float: | |
| """ | |
| Select the best threshold based on data characteristics and agent trace type. | |
| Enhanced to be agent-aware based on log type detection. | |
| """ | |
| # Analyze data characteristics | |
| variance = np.var(similarities) | |
| mean_similarity = np.mean(similarities) | |
| skewness = self._calculate_skewness(similarities) | |
| # Detect agent trace type if content is provided | |
| agent_trace_type = None | |
| if content and hasattr(self, 'log_type_detector'): | |
| try: | |
| detection_result = self.log_type_detector.detect_log_type(content) | |
| agent_trace_type = detection_result.log_type | |
| logger.debug(f"Detected agent trace type: {agent_trace_type.value} (confidence: {detection_result.confidence:.2f})") | |
| except Exception as e: | |
| logger.warning(f"Failed to detect agent trace type: {e}") | |
| # Agent trace type specific threshold selection | |
| if agent_trace_type: | |
| from .log_type_detector import LogType | |
| if agent_trace_type == LogType.CREWAI_EXECUTION: | |
| # CrewAI logs have clear hierarchical structure - use gradient method | |
| # to preserve task/agent boundaries | |
| logger.debug("CrewAI execution detected - using gradient threshold for task boundaries") | |
| self._last_threshold_method = "gradient_crewai_aware" | |
| return gradient_threshold | |
| elif agent_trace_type == LogType.LANGFUSE_TRACE: | |
| # Langfuse traces are highly structured - use percentile method | |
| # to identify significant breaks in observation chains | |
| logger.debug("Langfuse trace detected - using percentile threshold for observation breaks") | |
| self._last_threshold_method = "percentile_langfuse_aware" | |
| return percentile_threshold | |
| elif agent_trace_type == LogType.MIXED_JSON_NARRATIVE: | |
| # Mixed content needs robust method - use IQR to handle variety | |
| logger.debug("Mixed JSON/narrative detected - using IQR threshold for robust handling") | |
| self._last_threshold_method = "iqr_mixed_content_aware" | |
| return iqr_threshold | |
| elif agent_trace_type == LogType.COMPLEX_JSON_SCHEMA: | |
| # Complex schemas need special handling - use percentile for major breaks | |
| logger.debug("Complex JSON schema detected - using percentile threshold for schema boundaries") | |
| self._last_threshold_method = "percentile_complex_schema_aware" | |
| return percentile_threshold | |
| elif agent_trace_type == LogType.STRUCTURED_JSON: | |
| # JSON needs to preserve object boundaries - use std deviation | |
| logger.debug("Structured JSON detected - using standard deviation threshold") | |
| self._last_threshold_method = "std_json_aware" | |
| return std_threshold | |
| elif agent_trace_type == LogType.NATURAL_LANGUAGE: | |
| # Natural language benefits from semantic analysis - use percentile | |
| logger.debug("Natural language detected - using percentile threshold for semantic breaks") | |
| self._last_threshold_method = "percentile_natural_language_aware" | |
| return percentile_threshold | |
| elif agent_trace_type == LogType.UNKNOWN: | |
| # Unknown format - fall back to data-driven selection | |
| logger.debug("Unknown format detected - using data-driven threshold selection") | |
| # Data-driven threshold selection (original logic as fallback) | |
| # High variance suggests diverse content - use percentile method | |
| if variance > 0.05: | |
| logger.debug("High variance detected, using percentile threshold") | |
| self._last_threshold_method = "percentile_high_variance" | |
| return percentile_threshold | |
| # High mean similarity suggests coherent content - use gradient method | |
| elif mean_similarity > 0.8: | |
| logger.debug("Highly coherent content detected, using gradient threshold") | |
| self._last_threshold_method = "gradient_high_coherence" | |
| return gradient_threshold | |
| # Skewed distribution - use IQR method for robustness | |
| elif abs(skewness) > 1.0: | |
| logger.debug("Skewed distribution detected, using IQR threshold") | |
| self._last_threshold_method = "iqr_skewed_distribution" | |
| return iqr_threshold | |
| # Default to standard deviation method | |
| else: | |
| logger.debug("Using standard deviation threshold") | |
| self._last_threshold_method = "std_default" | |
| return std_threshold | |
| def _calculate_skewness(self, data) -> float: | |
| """Calculate skewness of the data distribution.""" | |
| if len(data) < 3: | |
| return 0.0 | |
| mean = np.mean(data) | |
| std = np.std(data) | |
| if std == 0: | |
| return 0.0 | |
| # Pearson's moment coefficient of skewness | |
| skewness = np.mean(((data - mean) / std) ** 3) | |
| return skewness | |
| def _calculate_enhanced_confidence(self, | |
| similarity: float, | |
| all_similarities: List[float], | |
| index: int, | |
| threshold: float) -> float: | |
| """ | |
| Calculate enhanced confidence score using statistical context. | |
| """ | |
| # Base confidence from how far below threshold | |
| base_confidence = max(0, (threshold - similarity) / threshold) | |
| # Bonus for consistency with nearby similarities | |
| window_start = max(0, index - 2) | |
| window_end = min(len(all_similarities), index + 3) | |
| local_similarities = all_similarities[window_start:window_end] | |
| # If local similarities are consistently low, increase confidence | |
| local_mean = np.mean(local_similarities) | |
| if local_mean < threshold: | |
| base_confidence += 0.2 | |
| # Bonus for being significantly different from neighbors | |
| if index > 0 and index < len(all_similarities) - 1: | |
| prev_sim = all_similarities[index - 1] | |
| next_sim = all_similarities[index + 1] | |
| # If this is a valley (lower than both neighbors), increase confidence | |
| if similarity < prev_sim and similarity < next_sim: | |
| base_confidence += 0.3 | |
| return min(1.0, base_confidence) | |
| def _find_simple_breakpoints(self, sentences: List[str]) -> List[SemanticBreakpoint]: | |
| """ | |
| Find breakpoints using simple text analysis (fallback method). | |
| Args: | |
| sentences: List of sentences | |
| Returns: | |
| List of semantic breakpoints | |
| """ | |
| breakpoints = [] | |
| # Look for topic shifts based on keyword changes | |
| for i in range(1, len(sentences)): | |
| current_keywords = self._extract_keywords(sentences[i]) | |
| prev_keywords = self._extract_keywords(sentences[i-1]) | |
| # Calculate keyword overlap | |
| if current_keywords and prev_keywords: | |
| overlap = len(current_keywords & prev_keywords) / len(current_keywords | prev_keywords) | |
| if overlap < 0.3: # Low overlap suggests topic shift | |
| position = self._calculate_sentence_position(sentences, i) | |
| breakpoint = SemanticBreakpoint( | |
| position=position, | |
| sentence_index=i, | |
| similarity_drop=1.0 - overlap, | |
| confidence=0.6, # Lower confidence for simple method | |
| context_sentences=sentences[max(0, i-2):min(len(sentences), i+3)] | |
| ) | |
| breakpoints.append(breakpoint) | |
| return breakpoints | |
| def _extract_keywords(self, sentence: str) -> set: | |
| """Extract keywords from a sentence.""" | |
| # Simple keyword extraction | |
| words = re.findall(r'\b[a-zA-Z]{3,}\b', sentence.lower()) | |
| # Filter out common stop words | |
| stop_words = {'the', 'and', 'are', 'for', 'with', 'this', 'that', 'from', 'they', 'been', 'have', 'will', 'would', 'could', 'should'} | |
| keywords = {word for word in words if word not in stop_words} | |
| return keywords | |
| def _calculate_sentence_position(self, sentences: List[str], sentence_index: int) -> int: | |
| """Calculate the character position where a sentence starts.""" | |
| position = 0 | |
| for i in range(sentence_index): | |
| position += len(sentences[i]) + 1 # +1 for space/newline | |
| return position | |
| def _create_semantic_segments(self, sentences: List[str], breakpoints: List[SemanticBreakpoint]) -> List[SemanticSegment]: | |
| """ | |
| Create semantic segments based on breakpoints. | |
| Args: | |
| sentences: List of sentences | |
| breakpoints: List of breakpoints | |
| Returns: | |
| List of semantic segments | |
| """ | |
| if not breakpoints: | |
| # Single segment containing all sentences | |
| return [SemanticSegment( | |
| start_position=0, | |
| end_position=sum(len(s) for s in sentences), | |
| sentences=sentences, | |
| coherence_score=0.8, | |
| segment_type="unified" | |
| )] | |
| segments = [] | |
| last_position = 0 | |
| last_sentence_index = 0 | |
| for breakpoint in breakpoints: | |
| # Create segment up to this breakpoint | |
| segment_sentences = sentences[last_sentence_index:breakpoint.sentence_index] | |
| if segment_sentences: | |
| segments.append(SemanticSegment( | |
| start_position=last_position, | |
| end_position=breakpoint.position, | |
| sentences=segment_sentences, | |
| coherence_score=1.0 - breakpoint.similarity_drop, | |
| segment_type="semantic_unit" | |
| )) | |
| last_position = breakpoint.position | |
| last_sentence_index = breakpoint.sentence_index | |
| # Add final segment | |
| if last_sentence_index < len(sentences): | |
| final_sentences = sentences[last_sentence_index:] | |
| segments.append(SemanticSegment( | |
| start_position=last_position, | |
| end_position=sum(len(s) for s in sentences), | |
| sentences=final_sentences, | |
| coherence_score=0.8, | |
| segment_type="final_segment" | |
| )) | |
| return segments | |
| def _create_simple_segments(self, sentences: List[str], breakpoints: List[SemanticBreakpoint]) -> List[SemanticSegment]: | |
| """Create segments using simple method (fallback).""" | |
| return self._create_semantic_segments(sentences, breakpoints) | |
| def _calculate_overall_coherence(self, embeddings) -> float: | |
| """ | |
| Calculate overall coherence score for the content. | |
| Args: | |
| embeddings: Sentence embeddings (can be None) | |
| Returns: | |
| Coherence score between 0 and 1 | |
| """ | |
| if embeddings is None or len(embeddings) < 2: | |
| return 0.7 # Default coherence score when embeddings unavailable | |
| # Calculate average pairwise similarity | |
| similarities = [] | |
| for i in range(len(embeddings) - 1): | |
| similarity = cosine_similarity( | |
| embeddings[i:i+1], | |
| embeddings[i+1:i+2] | |
| )[0][0] | |
| similarities.append(similarity) | |
| return float(np.mean(similarities)) | |
| def recommend_chunk_boundaries(self, content: str, target_chunk_size: int) -> List[int]: | |
| """ | |
| Recommend chunk boundaries based on semantic analysis. | |
| Args: | |
| content: The content to analyze | |
| target_chunk_size: Target size for chunks | |
| Returns: | |
| List of recommended boundary positions | |
| """ | |
| analysis = self.analyze_semantic_structure(content) | |
| breakpoints = analysis["breakpoints"] | |
| if not breakpoints: | |
| # No semantic breakpoints, fall back to size-based chunking | |
| boundaries = [] | |
| current_pos = 0 | |
| while current_pos + target_chunk_size < len(content): | |
| boundaries.append(current_pos + target_chunk_size) | |
| current_pos += target_chunk_size | |
| return boundaries | |
| # Combine semantic breakpoints with size constraints | |
| recommended_boundaries = [] | |
| last_boundary = 0 | |
| for breakpoint in breakpoints: | |
| chunk_size = breakpoint.position - last_boundary | |
| # If chunk would be too large, add intermediate boundary | |
| if chunk_size > target_chunk_size * 1.5: | |
| # Find a good intermediate point | |
| intermediate_pos = last_boundary + target_chunk_size | |
| recommended_boundaries.append(intermediate_pos) | |
| last_boundary = intermediate_pos | |
| # Add semantic boundary if it creates reasonable chunk size | |
| if breakpoint.position - last_boundary > target_chunk_size * 0.3: | |
| recommended_boundaries.append(breakpoint.position) | |
| last_boundary = breakpoint.position | |
| return recommended_boundaries | |
| def enhance_boundary_confidence(self, boundary_position: int, content: str) -> float: | |
| """ | |
| Enhance boundary confidence using semantic analysis. | |
| Args: | |
| boundary_position: Position of the boundary to analyze | |
| content: Full content for context | |
| Returns: | |
| Enhanced confidence score | |
| """ | |
| if not self.openai_client: | |
| return 0.5 # Default confidence without embeddings | |
| # Extract sentences around the boundary | |
| context_size = 200 | |
| context_start = max(0, boundary_position - context_size) | |
| context_end = min(len(content), boundary_position + context_size) | |
| context = content[context_start:context_end] | |
| sentences = self._split_into_sentences(context) | |
| if len(sentences) < 3: | |
| return 0.5 | |
| # Find which sentence the boundary falls in | |
| boundary_in_context = boundary_position - context_start | |
| target_sentence_idx = 0 | |
| current_pos = 0 | |
| for i, sentence in enumerate(sentences): | |
| if current_pos + len(sentence) >= boundary_in_context: | |
| target_sentence_idx = i | |
| break | |
| current_pos += len(sentence) + 1 | |
| # Analyze semantic similarity around this position | |
| if target_sentence_idx > 0 and target_sentence_idx < len(sentences) - 1: | |
| before_sentences = sentences[max(0, target_sentence_idx-1):target_sentence_idx+1] | |
| after_sentences = sentences[target_sentence_idx:target_sentence_idx+2] | |
| before_embeddings = self._calculate_embeddings(before_sentences) | |
| after_embeddings = self._calculate_embeddings(after_sentences) | |
| if before_embeddings is not None and after_embeddings is not None and len(before_embeddings) > 0 and len(after_embeddings) > 0: | |
| similarity = cosine_similarity( | |
| before_embeddings[-1:], | |
| after_embeddings[0:1] | |
| )[0][0] | |
| # Lower similarity means higher confidence for boundary | |
| confidence = 1.0 - similarity | |
| return max(0.1, min(1.0, confidence)) | |
| return 0.5 |