wu981526092's picture
🚀 Deploy AgentGraph: Complete agent monitoring and knowledge graph system
c2ea5ed
"""
Semantic Analysis Module for Agent-Aware Splitting
This module provides embedding-based semantic analysis to identify
natural breakpoints in agent execution logs based on content similarity.
Uses OpenAI's text-embedding-3-small model for high-quality embeddings.
"""
import re
import os
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
import logging
import numpy as np
# OpenAI client for embeddings
try:
from openai import OpenAI
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
OpenAI = None
# Optional import for sklearn
try:
from sklearn.metrics.pairwise import cosine_similarity
SKLEARN_AVAILABLE = True
except ImportError:
SKLEARN_AVAILABLE = False
cosine_similarity = None
# Optional import for tiktoken (accurate token counting)
try:
import tiktoken
TIKTOKEN_AVAILABLE = True
except ImportError:
TIKTOKEN_AVAILABLE = False
tiktoken = None
logger = logging.getLogger(__name__)
@dataclass
class SemanticBreakpoint:
"""Represents a semantic breakpoint in the content."""
position: int
sentence_index: int
similarity_drop: float
confidence: float
context_sentences: List[str]
@dataclass
class SemanticSegment:
"""Represents a semantically coherent segment."""
start_position: int
end_position: int
sentences: List[str]
coherence_score: float
segment_type: str
class SemanticAnalyzer:
"""
Analyzes text content using OpenAI embeddings to identify semantic boundaries.
Uses sentence-level embeddings to detect where content topics shift,
indicating natural breakpoints for intelligent chunking.
"""
def __init__(self,
model_name: str = "text-embedding-3-small",
similarity_threshold: float = 0.5,
window_size: int = 3,
api_key: Optional[str] = None):
"""
Initialize the SemanticAnalyzer with OpenAI embeddings and adaptive threshold detection.
Args:
model_name: Name of the OpenAI embedding model to use
similarity_threshold: Base similarity threshold (used as fallback)
window_size: Context window size for breakpoint analysis
api_key: OpenAI API key (uses environment variable if not provided)
"""
self.model_name = model_name
self.similarity_threshold = similarity_threshold
self.window_size = window_size
# Import log type detector for agent-aware analysis
from .log_type_detector import LogTypeDetector
self.log_type_detector = LogTypeDetector()
# Set up tokenizer for accurate token counting if available
self.tokenizer = None
try:
import tiktoken
# For text-embedding-3-small, use cl100k_base encoding (same as GPT-4)
self.tokenizer = tiktoken.get_encoding("cl100k_base")
logger.debug("Initialized tiktoken for accurate token counting")
except ImportError:
logger.warning("tiktoken not available, using approximate token counting")
# Initialize OpenAI client
api_key = api_key or os.getenv('OPENAI_API_KEY')
if not api_key:
logger.warning("No OpenAI API key provided. Embeddings will not be available.")
self.openai_client = None
else:
try:
from openai import OpenAI
self.openai_client = OpenAI(api_key=api_key)
logger.debug(f"Initialized OpenAI client for {model_name}")
except ImportError:
logger.error("OpenAI package not installed. Run: pip install openai")
self.openai_client = None
except Exception as e:
logger.error(f"Failed to initialize OpenAI client: {e}")
self.openai_client = None
def analyze_semantic_structure(self, content: str) -> Dict[str, any]:
"""
Analyze the semantic structure of content to identify natural breakpoints.
Enhanced with agent trace type awareness and adaptive threshold selection.
Args:
content: The text content to analyze
Returns:
Dictionary containing analysis results with segments, breakpoints, and method used
"""
if not content or not content.strip():
return {
"segments": [],
"breakpoints": [],
"coherence_score": 0.0,
"method": "none_empty_content",
"total_sentences": 0
}
# Detect agent trace type for adaptive processing
agent_trace_type = None
if hasattr(self, 'log_type_detector'):
try:
detection_result = self.log_type_detector.detect_log_type(content)
agent_trace_type = detection_result.log_type
logger.info(f"Detected agent trace type: {agent_trace_type.value} (confidence: {detection_result.confidence:.2f})")
except Exception as e:
logger.warning(f"Failed to detect agent trace type: {e}")
sentences = self._split_into_sentences(content)
if len(sentences) < 2:
return {
"segments": [SemanticSegment(
start_position=0,
end_position=len(content),
sentences=sentences,
coherence_score=1.0,
segment_type="single_sentence"
)],
"breakpoints": [],
"coherence_score": 1.0,
"method": "single_sentence",
"total_sentences": len(sentences),
"agent_trace_type": agent_trace_type.value if agent_trace_type else "unknown"
}
# Try OpenAI embeddings first
if self.openai_client:
try:
embeddings = self._calculate_embeddings(sentences)
if embeddings is not None:
breakpoints = self._find_semantic_breakpoints(sentences, embeddings, content)
segments = self._create_semantic_segments(sentences, breakpoints)
coherence_score = self._calculate_overall_coherence(embeddings)
return {
"segments": segments,
"breakpoints": breakpoints,
"coherence_score": coherence_score,
"method": "openai_embedding_based_adaptive",
"total_sentences": len(sentences),
"agent_trace_type": agent_trace_type.value if agent_trace_type else "unknown",
"threshold_method": getattr(self, '_last_threshold_method', 'adaptive_statistical')
}
except Exception as e:
logger.warning(f"OpenAI embedding analysis failed: {e}")
logger.info("Falling back to text-based analysis")
# Fallback to simple text-based analysis
try:
breakpoints = self._find_simple_breakpoints(sentences)
segments = self._create_simple_segments(sentences, breakpoints)
return {
"segments": segments,
"breakpoints": breakpoints,
"coherence_score": 0.5, # Default for text-based
"method": "text_based_fallback",
"total_sentences": len(sentences),
"agent_trace_type": agent_trace_type.value if agent_trace_type else "unknown"
}
except Exception as e:
logger.error(f"All analysis methods failed: {e}")
return None
def _split_into_sentences(self, content: str) -> List[str]:
"""
Split content into sentences with enhanced handling for JSON schemas.
Args:
content: Text content to split
Returns:
List of sentences
"""
if not content.strip():
return []
# Detect content type for special handling
content_type = None
if hasattr(self, 'log_type_detector'):
try:
detection_result = self.log_type_detector.detect_log_type(content)
content_type = detection_result.log_type
except Exception:
pass
# Special preprocessing for complex JSON schemas
if content_type and content_type.value == "complex_json_schema":
content = self._preprocess_complex_json_schema(content)
# Enhanced sentence splitting patterns
# Add JSON object boundaries as sentence breaks for very long JSON
sentence_patterns = [
r'(?<=[.!?])\s+(?=[A-Z])', # Standard sentence boundaries
r'}\s*,\s*\{', # JSON object boundaries (simplified)
r']\s*,\s*\[', # JSON array boundaries (simplified)
r'}\s*\n\s*\{', # JSON blocks on new lines (simplified)
r'}\s*\n\s*[A-Za-z]', # JSON to text transitions (simplified)
r'[a-z]\s*\n\s*\{', # Text to JSON transitions (simplified)
]
# Apply sentence splitting
sentences = [content] # Start with full content
for pattern in sentence_patterns:
new_sentences = []
for sentence in sentences:
# Split on pattern but keep delimiter context
parts = re.split(f'({pattern})', sentence)
current_sentence = ""
for i, part in enumerate(parts):
if re.match(pattern, part):
# This is a delimiter - finish current sentence
if current_sentence.strip():
new_sentences.append(current_sentence.strip())
current_sentence = ""
else:
current_sentence += part
# Add final sentence if exists
if current_sentence.strip():
new_sentences.append(current_sentence.strip())
sentences = new_sentences
# Filter out empty sentences and very short ones
sentences = [s for s in sentences if len(s.strip()) > 10]
# Additional safety: split extremely long sentences (>5000 chars)
final_sentences = []
for sentence in sentences:
if len(sentence) > 5000:
# Force split long content at logical boundaries
chunks = self._split_long_content(sentence)
final_sentences.extend(chunks)
else:
final_sentences.append(sentence)
return final_sentences
def _preprocess_complex_json_schema(self, content: str) -> str:
"""
Preprocess complex JSON schemas to add better sentence boundaries.
Args:
content: Content containing complex JSON schemas
Returns:
Preprocessed content with better boundaries
"""
# Add line breaks after major JSON schema sections
schema_boundaries = [
(r'"business_rules":\s*\[', r'"business_rules": [\n'),
(r'"technical_rules":\s*\[', r'"technical_rules": [\n'),
(r'"list_of_tables":\s*\[', r'"list_of_tables": [\n'),
(r'}\s*,\s*\{', r'},\n{'), # Separate table definitions
(r']\s*}\s*,\s*\{', r']\n},\n{'), # End of table, start of next
]
processed_content = content
for pattern, replacement in schema_boundaries:
processed_content = re.sub(pattern, replacement, processed_content)
return processed_content
def _split_long_content(self, content: str) -> List[str]:
"""
Force split extremely long content at logical boundaries.
Args:
content: Long content string
Returns:
List of smaller chunks
"""
if len(content) <= 5000:
return [content]
chunks = []
remaining = content
while len(remaining) > 5000:
# Find a good break point in the first 4000 characters
break_point = 4000
# Look for JSON boundaries
json_breaks = [
remaining.rfind('},', 0, break_point),
remaining.rfind('],', 0, break_point),
remaining.rfind('}\n', 0, break_point),
remaining.rfind('.\n', 0, break_point),
]
# Use the best available break point
valid_breaks = [bp for bp in json_breaks if bp > 1000] # At least 1000 chars
if valid_breaks:
break_point = max(valid_breaks) + 2 # Include the delimiter
chunk = remaining[:break_point].strip()
if chunk:
chunks.append(chunk)
remaining = remaining[break_point:].strip()
# Add final remaining content
if remaining.strip():
chunks.append(remaining.strip())
return chunks
def _calculate_embeddings(self, sentences: List[str]):
"""
Calculate embeddings for sentences using OpenAI API.
Args:
sentences: List of sentences
Returns:
Embeddings array or None if not available
"""
if not self.openai_client:
return None
try:
# Process sentences in smart batches to respect token limits
# text-embedding-3-small has a max context length of 8192 tokens
max_tokens_per_batch = 7000 # Leave some margin for safety
all_embeddings = []
current_batch = []
current_batch_tokens = 0
for sentence in sentences:
# Accurate token counting using tiktoken if available
if self.tokenizer:
try:
estimated_tokens = len(self.tokenizer.encode(sentence))
except Exception as e:
logger.warning(f"Token counting failed, using character estimate: {e}")
estimated_tokens = len(sentence) // 4 + 10
else:
# Fallback: rough token estimation (~4 characters per token for English text)
estimated_tokens = len(sentence) // 4 + 10
# If adding this sentence would exceed the token limit, process current batch
if current_batch and (current_batch_tokens + estimated_tokens > max_tokens_per_batch):
# Process current batch
response = self.openai_client.embeddings.create(
model=self.model_name,
input=current_batch,
encoding_format="float"
)
# Extract embeddings from response
batch_embeddings = [data.embedding for data in response.data]
all_embeddings.extend(batch_embeddings)
# Start new batch
current_batch = [sentence]
current_batch_tokens = estimated_tokens
else:
# Add sentence to current batch
current_batch.append(sentence)
current_batch_tokens += estimated_tokens
# Safety check: if a single sentence is too long, truncate it
if estimated_tokens > max_tokens_per_batch:
logger.warning(f"Sentence too long ({estimated_tokens} tokens), truncating...")
# Truncate the sentence to fit within token limit
if self.tokenizer:
# More accurate truncation using tokenizer
tokens = self.tokenizer.encode(sentence)
truncated_tokens = tokens[:max_tokens_per_batch - 50] # Leave margin
truncated_sentence = self.tokenizer.decode(truncated_tokens) + "..."
current_batch[-1] = truncated_sentence
current_batch_tokens = len(truncated_tokens) + 10 # +10 for "..." and margin
else:
# Fallback character-based truncation
max_chars = max_tokens_per_batch * 4 - 100
current_batch[-1] = sentence[:max_chars] + "..."
current_batch_tokens = max_tokens_per_batch - 100
# Process final batch if it has content
if current_batch:
response = self.openai_client.embeddings.create(
model=self.model_name,
input=current_batch,
encoding_format="float"
)
# Extract embeddings from response
batch_embeddings = [data.embedding for data in response.data]
all_embeddings.extend(batch_embeddings)
# Convert to numpy array
embeddings = np.array(all_embeddings)
logger.debug(f"Generated embeddings for {len(sentences)} sentences using {self.model_name} with smart batching")
return embeddings
except Exception as e:
logger.error(f"Failed to calculate OpenAI embeddings: {e}")
return None
def _find_semantic_breakpoints(self, sentences: List[str], embeddings, content: str = None) -> List[SemanticBreakpoint]:
"""
Find semantic breakpoints using embedding similarity with adaptive statistical methods.
Inspired by LangChain's SemanticChunker with multiple threshold detection methods.
Args:
sentences: List of sentences
embeddings: Sentence embeddings (can be None)
content: Original content for agent trace type detection
Returns:
List of semantic breakpoints
"""
if embeddings is None or not SKLEARN_AVAILABLE or len(embeddings) < 2:
return []
breakpoints = []
# Calculate pairwise similarities between consecutive sentences
similarities = []
for i in range(len(embeddings) - 1):
similarity = cosine_similarity(
embeddings[i:i+1],
embeddings[i+1:i+2]
)[0][0]
similarities.append(similarity)
if not similarities:
return breakpoints
# Use adaptive threshold based on statistical analysis (LangChain style)
threshold = self._calculate_adaptive_threshold(similarities, content)
logger.debug(f"Using adaptive threshold: {threshold:.3f} for {len(similarities)} similarities")
# Find points where similarity drops significantly
for i, similarity in enumerate(similarities):
if similarity < threshold:
# Calculate confidence based on how much similarity dropped
if i > 0:
prev_similarity = similarities[i-1]
similarity_drop = prev_similarity - similarity
else:
similarity_drop = 1.0 - similarity
# Enhanced confidence calculation using statistical context
confidence = self._calculate_enhanced_confidence(
similarity, similarities, i, threshold
)
# Get context sentences
context_start = max(0, i - self.window_size)
context_end = min(len(sentences), i + self.window_size + 2)
context_sentences = sentences[context_start:context_end]
# Calculate position in original text
position = self._calculate_sentence_position(sentences, i + 1)
breakpoint = SemanticBreakpoint(
position=position,
sentence_index=i + 1,
similarity_drop=similarity_drop,
confidence=confidence,
context_sentences=context_sentences
)
breakpoints.append(breakpoint)
return breakpoints
def _calculate_adaptive_threshold(self, similarities: List[float], content: str = None) -> float:
"""
Calculate adaptive threshold using multiple statistical methods inspired by LangChain.
Enhanced with agent trace type awareness.
Args:
similarities: List of similarity scores
content: Original content for agent trace type detection
Returns:
Adaptive threshold value
"""
if not similarities:
return self.similarity_threshold
similarities_array = np.array(similarities)
# Method 1: Percentile-based (LangChain style)
# Use 5th percentile for breakpoints (bottom 5% of similarities)
percentile_threshold = np.percentile(similarities_array, 5)
# Method 2: Standard deviation
mean_sim = np.mean(similarities_array)
std_sim = np.std(similarities_array)
std_threshold = mean_sim - (1.5 * std_sim) # 1.5 std devs below mean
# Method 3: Interquartile range
q1, q3 = np.percentile(similarities_array, [25, 75])
iqr = q3 - q1
iqr_threshold = q1 - (1.5 * iqr) # Similar to outlier detection
# Method 4: Gradient-based (for highly correlated content)
gradient_threshold = self._calculate_gradient_threshold(similarities_array)
# Choose the most appropriate threshold based on data characteristics and agent trace type
adaptive_threshold = self._select_best_threshold(
similarities_array,
percentile_threshold,
std_threshold,
iqr_threshold,
gradient_threshold,
content # Pass content for agent trace type detection
)
# Ensure threshold is within reasonable bounds
min_threshold = 0.1 # Don't split everything
max_threshold = 0.9 # Don't split nothing
return max(min_threshold, min(max_threshold, adaptive_threshold))
def _calculate_gradient_threshold(self, similarities) -> float:
"""
Calculate threshold using gradient analysis for highly semantic content.
Similar to LangChain's gradient method.
"""
if len(similarities) < 3:
return np.mean(similarities) - np.std(similarities)
# Calculate gradient (rate of change)
gradients = np.gradient(similarities)
# Apply anomaly detection on gradients to widen distribution
gradient_mean = np.mean(gradients)
gradient_std = np.std(gradients)
# Find significant negative gradients (sharp drops in similarity)
significant_drops = gradients < (gradient_mean - 2 * gradient_std)
if np.any(significant_drops):
# Use the 10th percentile of similarities where we have significant drops
drop_similarities = similarities[significant_drops]
return np.percentile(drop_similarities, 10)
else:
# Fallback to standard method
return np.mean(similarities) - np.std(similarities)
def _select_best_threshold(self,
similarities,
percentile_threshold: float,
std_threshold: float,
iqr_threshold: float,
gradient_threshold: float,
content: str = None) -> float:
"""
Select the best threshold based on data characteristics and agent trace type.
Enhanced to be agent-aware based on log type detection.
"""
# Analyze data characteristics
variance = np.var(similarities)
mean_similarity = np.mean(similarities)
skewness = self._calculate_skewness(similarities)
# Detect agent trace type if content is provided
agent_trace_type = None
if content and hasattr(self, 'log_type_detector'):
try:
detection_result = self.log_type_detector.detect_log_type(content)
agent_trace_type = detection_result.log_type
logger.debug(f"Detected agent trace type: {agent_trace_type.value} (confidence: {detection_result.confidence:.2f})")
except Exception as e:
logger.warning(f"Failed to detect agent trace type: {e}")
# Agent trace type specific threshold selection
if agent_trace_type:
from .log_type_detector import LogType
if agent_trace_type == LogType.CREWAI_EXECUTION:
# CrewAI logs have clear hierarchical structure - use gradient method
# to preserve task/agent boundaries
logger.debug("CrewAI execution detected - using gradient threshold for task boundaries")
self._last_threshold_method = "gradient_crewai_aware"
return gradient_threshold
elif agent_trace_type == LogType.LANGFUSE_TRACE:
# Langfuse traces are highly structured - use percentile method
# to identify significant breaks in observation chains
logger.debug("Langfuse trace detected - using percentile threshold for observation breaks")
self._last_threshold_method = "percentile_langfuse_aware"
return percentile_threshold
elif agent_trace_type == LogType.MIXED_JSON_NARRATIVE:
# Mixed content needs robust method - use IQR to handle variety
logger.debug("Mixed JSON/narrative detected - using IQR threshold for robust handling")
self._last_threshold_method = "iqr_mixed_content_aware"
return iqr_threshold
elif agent_trace_type == LogType.COMPLEX_JSON_SCHEMA:
# Complex schemas need special handling - use percentile for major breaks
logger.debug("Complex JSON schema detected - using percentile threshold for schema boundaries")
self._last_threshold_method = "percentile_complex_schema_aware"
return percentile_threshold
elif agent_trace_type == LogType.STRUCTURED_JSON:
# JSON needs to preserve object boundaries - use std deviation
logger.debug("Structured JSON detected - using standard deviation threshold")
self._last_threshold_method = "std_json_aware"
return std_threshold
elif agent_trace_type == LogType.NATURAL_LANGUAGE:
# Natural language benefits from semantic analysis - use percentile
logger.debug("Natural language detected - using percentile threshold for semantic breaks")
self._last_threshold_method = "percentile_natural_language_aware"
return percentile_threshold
elif agent_trace_type == LogType.UNKNOWN:
# Unknown format - fall back to data-driven selection
logger.debug("Unknown format detected - using data-driven threshold selection")
# Data-driven threshold selection (original logic as fallback)
# High variance suggests diverse content - use percentile method
if variance > 0.05:
logger.debug("High variance detected, using percentile threshold")
self._last_threshold_method = "percentile_high_variance"
return percentile_threshold
# High mean similarity suggests coherent content - use gradient method
elif mean_similarity > 0.8:
logger.debug("Highly coherent content detected, using gradient threshold")
self._last_threshold_method = "gradient_high_coherence"
return gradient_threshold
# Skewed distribution - use IQR method for robustness
elif abs(skewness) > 1.0:
logger.debug("Skewed distribution detected, using IQR threshold")
self._last_threshold_method = "iqr_skewed_distribution"
return iqr_threshold
# Default to standard deviation method
else:
logger.debug("Using standard deviation threshold")
self._last_threshold_method = "std_default"
return std_threshold
def _calculate_skewness(self, data) -> float:
"""Calculate skewness of the data distribution."""
if len(data) < 3:
return 0.0
mean = np.mean(data)
std = np.std(data)
if std == 0:
return 0.0
# Pearson's moment coefficient of skewness
skewness = np.mean(((data - mean) / std) ** 3)
return skewness
def _calculate_enhanced_confidence(self,
similarity: float,
all_similarities: List[float],
index: int,
threshold: float) -> float:
"""
Calculate enhanced confidence score using statistical context.
"""
# Base confidence from how far below threshold
base_confidence = max(0, (threshold - similarity) / threshold)
# Bonus for consistency with nearby similarities
window_start = max(0, index - 2)
window_end = min(len(all_similarities), index + 3)
local_similarities = all_similarities[window_start:window_end]
# If local similarities are consistently low, increase confidence
local_mean = np.mean(local_similarities)
if local_mean < threshold:
base_confidence += 0.2
# Bonus for being significantly different from neighbors
if index > 0 and index < len(all_similarities) - 1:
prev_sim = all_similarities[index - 1]
next_sim = all_similarities[index + 1]
# If this is a valley (lower than both neighbors), increase confidence
if similarity < prev_sim and similarity < next_sim:
base_confidence += 0.3
return min(1.0, base_confidence)
def _find_simple_breakpoints(self, sentences: List[str]) -> List[SemanticBreakpoint]:
"""
Find breakpoints using simple text analysis (fallback method).
Args:
sentences: List of sentences
Returns:
List of semantic breakpoints
"""
breakpoints = []
# Look for topic shifts based on keyword changes
for i in range(1, len(sentences)):
current_keywords = self._extract_keywords(sentences[i])
prev_keywords = self._extract_keywords(sentences[i-1])
# Calculate keyword overlap
if current_keywords and prev_keywords:
overlap = len(current_keywords & prev_keywords) / len(current_keywords | prev_keywords)
if overlap < 0.3: # Low overlap suggests topic shift
position = self._calculate_sentence_position(sentences, i)
breakpoint = SemanticBreakpoint(
position=position,
sentence_index=i,
similarity_drop=1.0 - overlap,
confidence=0.6, # Lower confidence for simple method
context_sentences=sentences[max(0, i-2):min(len(sentences), i+3)]
)
breakpoints.append(breakpoint)
return breakpoints
def _extract_keywords(self, sentence: str) -> set:
"""Extract keywords from a sentence."""
# Simple keyword extraction
words = re.findall(r'\b[a-zA-Z]{3,}\b', sentence.lower())
# Filter out common stop words
stop_words = {'the', 'and', 'are', 'for', 'with', 'this', 'that', 'from', 'they', 'been', 'have', 'will', 'would', 'could', 'should'}
keywords = {word for word in words if word not in stop_words}
return keywords
def _calculate_sentence_position(self, sentences: List[str], sentence_index: int) -> int:
"""Calculate the character position where a sentence starts."""
position = 0
for i in range(sentence_index):
position += len(sentences[i]) + 1 # +1 for space/newline
return position
def _create_semantic_segments(self, sentences: List[str], breakpoints: List[SemanticBreakpoint]) -> List[SemanticSegment]:
"""
Create semantic segments based on breakpoints.
Args:
sentences: List of sentences
breakpoints: List of breakpoints
Returns:
List of semantic segments
"""
if not breakpoints:
# Single segment containing all sentences
return [SemanticSegment(
start_position=0,
end_position=sum(len(s) for s in sentences),
sentences=sentences,
coherence_score=0.8,
segment_type="unified"
)]
segments = []
last_position = 0
last_sentence_index = 0
for breakpoint in breakpoints:
# Create segment up to this breakpoint
segment_sentences = sentences[last_sentence_index:breakpoint.sentence_index]
if segment_sentences:
segments.append(SemanticSegment(
start_position=last_position,
end_position=breakpoint.position,
sentences=segment_sentences,
coherence_score=1.0 - breakpoint.similarity_drop,
segment_type="semantic_unit"
))
last_position = breakpoint.position
last_sentence_index = breakpoint.sentence_index
# Add final segment
if last_sentence_index < len(sentences):
final_sentences = sentences[last_sentence_index:]
segments.append(SemanticSegment(
start_position=last_position,
end_position=sum(len(s) for s in sentences),
sentences=final_sentences,
coherence_score=0.8,
segment_type="final_segment"
))
return segments
def _create_simple_segments(self, sentences: List[str], breakpoints: List[SemanticBreakpoint]) -> List[SemanticSegment]:
"""Create segments using simple method (fallback)."""
return self._create_semantic_segments(sentences, breakpoints)
def _calculate_overall_coherence(self, embeddings) -> float:
"""
Calculate overall coherence score for the content.
Args:
embeddings: Sentence embeddings (can be None)
Returns:
Coherence score between 0 and 1
"""
if embeddings is None or len(embeddings) < 2:
return 0.7 # Default coherence score when embeddings unavailable
# Calculate average pairwise similarity
similarities = []
for i in range(len(embeddings) - 1):
similarity = cosine_similarity(
embeddings[i:i+1],
embeddings[i+1:i+2]
)[0][0]
similarities.append(similarity)
return float(np.mean(similarities))
def recommend_chunk_boundaries(self, content: str, target_chunk_size: int) -> List[int]:
"""
Recommend chunk boundaries based on semantic analysis.
Args:
content: The content to analyze
target_chunk_size: Target size for chunks
Returns:
List of recommended boundary positions
"""
analysis = self.analyze_semantic_structure(content)
breakpoints = analysis["breakpoints"]
if not breakpoints:
# No semantic breakpoints, fall back to size-based chunking
boundaries = []
current_pos = 0
while current_pos + target_chunk_size < len(content):
boundaries.append(current_pos + target_chunk_size)
current_pos += target_chunk_size
return boundaries
# Combine semantic breakpoints with size constraints
recommended_boundaries = []
last_boundary = 0
for breakpoint in breakpoints:
chunk_size = breakpoint.position - last_boundary
# If chunk would be too large, add intermediate boundary
if chunk_size > target_chunk_size * 1.5:
# Find a good intermediate point
intermediate_pos = last_boundary + target_chunk_size
recommended_boundaries.append(intermediate_pos)
last_boundary = intermediate_pos
# Add semantic boundary if it creates reasonable chunk size
if breakpoint.position - last_boundary > target_chunk_size * 0.3:
recommended_boundaries.append(breakpoint.position)
last_boundary = breakpoint.position
return recommended_boundaries
def enhance_boundary_confidence(self, boundary_position: int, content: str) -> float:
"""
Enhance boundary confidence using semantic analysis.
Args:
boundary_position: Position of the boundary to analyze
content: Full content for context
Returns:
Enhanced confidence score
"""
if not self.openai_client:
return 0.5 # Default confidence without embeddings
# Extract sentences around the boundary
context_size = 200
context_start = max(0, boundary_position - context_size)
context_end = min(len(content), boundary_position + context_size)
context = content[context_start:context_end]
sentences = self._split_into_sentences(context)
if len(sentences) < 3:
return 0.5
# Find which sentence the boundary falls in
boundary_in_context = boundary_position - context_start
target_sentence_idx = 0
current_pos = 0
for i, sentence in enumerate(sentences):
if current_pos + len(sentence) >= boundary_in_context:
target_sentence_idx = i
break
current_pos += len(sentence) + 1
# Analyze semantic similarity around this position
if target_sentence_idx > 0 and target_sentence_idx < len(sentences) - 1:
before_sentences = sentences[max(0, target_sentence_idx-1):target_sentence_idx+1]
after_sentences = sentences[target_sentence_idx:target_sentence_idx+2]
before_embeddings = self._calculate_embeddings(before_sentences)
after_embeddings = self._calculate_embeddings(after_sentences)
if before_embeddings is not None and after_embeddings is not None and len(before_embeddings) > 0 and len(after_embeddings) > 0:
similarity = cosine_similarity(
before_embeddings[-1:],
after_embeddings[0:1]
)[0][0]
# Lower similarity means higher confidence for boundary
confidence = 1.0 - similarity
return max(0.1, min(1.0, confidence))
return 0.5