File size: 8,270 Bytes
9a000fe 1d57863 9a000fe 1d57863 9a000fe 1d57863 9a000fe 1d57863 9a000fe 1d57863 9a000fe 1d57863 9a000fe 1d57863 9a000fe 1d57863 9a000fe 1d57863 9a000fe 1d57863 9a000fe 1d57863 9a000fe 1d57863 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
# core/chunking.py
import numpy as np
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import logging
import re
from typing import List, Optional
logger = logging.getLogger(__name__)
def _split_into_sentences(text: str) -> List[str]:
"""
Improved sentence splitting that handles common edge cases.
"""
# Handle common abbreviations that shouldn't cause splits
abbreviations = [
'Dr', 'Mr', 'Mrs', 'Ms', 'Prof', 'Sr', 'Jr', 'vs', 'etc', 'Inc', 'Ltd', 'Corp',
'U.S', 'U.K', 'U.N', 'E.U', 'NASA', 'FBI', 'CIA', 'GDP', 'CEO', 'CFO', 'CTO'
]
# Temporarily replace abbreviations to protect them from splitting
protected_text = text
replacements = {}
for i, abbr in enumerate(abbreviations):
placeholder = f"__ABBR_{i}__"
protected_text = re.sub(rf'\b{re.escape(abbr)}\.', placeholder, protected_text, flags=re.IGNORECASE)
replacements[placeholder] = f"{abbr}."
# Split on sentence-ending punctuation followed by whitespace or end of string
sentence_pattern = r'[.!?]+(?:\s+|$)'
sentences = re.split(sentence_pattern, protected_text)
# Restore abbreviations and clean up
cleaned_sentences = []
for sentence in sentences:
if sentence.strip():
# Restore abbreviations
for placeholder, original in replacements.items():
sentence = sentence.replace(placeholder, original)
cleaned_sentences.append(sentence.strip())
return cleaned_sentences
def _calculate_rolling_similarity(embeddings: np.ndarray, window_size: int = 3) -> List[float]:
"""
Calculate rolling average similarity to smooth out noise and capture broader semantic shifts.
"""
similarities = []
for i in range(1, len(embeddings)):
# Calculate similarity between current and previous sentence
current_sim = cosine_similarity(
embeddings[i].reshape(1, -1),
embeddings[i-1].reshape(1, -1)
)[0, 0]
similarities.append(current_sim)
# Apply rolling average to smooth similarities
if len(similarities) <= window_size:
return similarities
smoothed = []
for i in range(len(similarities)):
start_idx = max(0, i - window_size // 2)
end_idx = min(len(similarities), i + window_size // 2 + 1)
window_similarities = similarities[start_idx:end_idx]
smoothed.append(np.mean(window_similarities))
return smoothed
def _adaptive_threshold(similarities: List[float], base_threshold: float = 0.55) -> float:
"""
Dynamically adjust threshold based on the distribution of similarities in the text.
"""
if not similarities:
return base_threshold
mean_sim = np.mean(similarities)
std_sim = np.std(similarities)
# Adjust threshold based on text characteristics
# If similarities are generally high, use a higher threshold
# If similarities vary a lot, be more conservative
adjusted_threshold = max(
base_threshold,
mean_sim - (0.5 * std_sim)
)
return min(adjusted_threshold, 0.8) # Cap at 0.8 to avoid over-splitting
def semantic_chunker(
text: str,
model: SentenceTransformer,
similarity_threshold: float = 0.55,
min_chunk_size: int = 50,
max_chunk_size: int = 1000,
adaptive_threshold_enabled: bool = True
) -> List[str]:
"""
Enhanced semantic chunking with improved sentence splitting, adaptive thresholding,
and chunk size controls.
Args:
text: Input text to chunk
model: SentenceTransformer model for embeddings
similarity_threshold: Base threshold for semantic breaks
min_chunk_size: Minimum characters per chunk
max_chunk_size: Maximum characters per chunk
adaptive_threshold_enabled: Whether to use adaptive thresholding
Returns:
List of text chunks
"""
logger.info("Starting enhanced semantic chunking...")
if not text or not text.strip():
logger.warning("Empty or whitespace-only text provided")
return []
# Improved sentence splitting
sentences = _split_into_sentences(text)
if len(sentences) <= 1:
logger.info("Text contains only one sentence, returning as single chunk")
return [text.strip()]
logger.info(f"Split text into {len(sentences)} sentences")
try:
# Generate embeddings with error handling
embeddings = model.encode(sentences, convert_to_numpy=True, show_progress_bar=False)
logger.info("Generated sentence embeddings")
except Exception as e:
logger.error(f"Failed to generate embeddings: {e}")
# Fallback to simple splitting if embeddings fail
return [text]
# Calculate smoothed similarities
similarities = _calculate_rolling_similarity(embeddings)
if not similarities:
return [text.strip()]
# Adaptive threshold adjustment
if adaptive_threshold_enabled:
threshold = _adaptive_threshold(similarities, similarity_threshold)
logger.info(f"Adjusted threshold from {similarity_threshold:.3f} to {threshold:.3f}")
else:
threshold = similarity_threshold
# Enhanced chunking with size constraints
chunks = []
current_chunk_sentences = [sentences[0]]
current_chunk_length = len(sentences[0])
for i, similarity in enumerate(similarities):
sentence_idx = i + 1 # similarities[i] compares sentence[i+1] with sentence[i]
sentence = sentences[sentence_idx]
sentence_length = len(sentence)
# Check if we should create a new chunk
should_break = False
# Semantic break condition
if similarity < threshold:
should_break = True
# Maximum size constraint - force break if adding sentence exceeds max size
elif current_chunk_length + sentence_length > max_chunk_size:
should_break = True
# If we decide to break, finalize current chunk
if should_break and current_chunk_sentences:
chunk_text = " ".join(current_chunk_sentences)
# Only add chunk if it meets minimum size, otherwise merge with next
if len(chunk_text) >= min_chunk_size or not chunks:
chunks.append(chunk_text)
current_chunk_sentences = []
current_chunk_length = 0
# Add current sentence to chunk
current_chunk_sentences.append(sentence)
current_chunk_length += sentence_length + 1 # +1 for space
# Handle final chunk
if current_chunk_sentences:
final_chunk = " ".join(current_chunk_sentences)
# If final chunk is too small, merge with previous chunk
if len(final_chunk) < min_chunk_size and chunks:
chunks[-1] = chunks[-1] + " " + final_chunk
else:
chunks.append(final_chunk)
# Post-processing: ensure no chunks are too large
final_chunks = []
for chunk in chunks:
if len(chunk) <= max_chunk_size:
final_chunks.append(chunk)
else:
# Split oversized chunks at sentence boundaries
chunk_sentences = _split_into_sentences(chunk)
temp_chunk = ""
for sent in chunk_sentences:
if len(temp_chunk) + len(sent) <= max_chunk_size:
temp_chunk += (" " + sent) if temp_chunk else sent
else:
if temp_chunk:
final_chunks.append(temp_chunk)
temp_chunk = sent
if temp_chunk:
final_chunks.append(temp_chunk)
logger.info(f"Enhanced semantic chunking resulted in {len(final_chunks)} chunks")
# Log chunk statistics for debugging
if final_chunks:
chunk_lengths = [len(chunk) for chunk in final_chunks]
logger.debug(f"Chunk length stats - Min: {min(chunk_lengths)}, "
f"Max: {max(chunk_lengths)}, Mean: {np.mean(chunk_lengths):.1f}")
return final_chunks
|