Spaces:
Running
Running
| # DEPENDENCIES | |
| from typing import List | |
| from typing import Tuple | |
| from typing import Optional | |
| from config.models import DocumentChunk | |
| from config.logging_config import get_logger | |
| from chunking.token_counter import TokenCounter | |
| # Setup Logging | |
| logger = get_logger(__name__) | |
| class OverlapManager: | |
| """ | |
| Manages overlapping regions between chunks : ensures smooth context transitions and optimal retrieval | |
| """ | |
| def __init__(self, overlap_tokens: int = 50): | |
| """ | |
| Initialize overlap manager | |
| Arguments: | |
| ---------- | |
| overlap_tokens { int } : Target overlap in tokens | |
| """ | |
| self.overlap_tokens = overlap_tokens | |
| self.token_counter = TokenCounter() | |
| self.logger = logger | |
| def add_overlap(self, chunks: List[DocumentChunk], overlap_tokens: Optional[int] = None) -> List[DocumentChunk]: | |
| """ | |
| Add overlap to existing chunks | |
| Arguments: | |
| ---------- | |
| chunks { list } : List of chunks without overlap | |
| overlap_tokens { int } : Override default overlap | |
| Returns: | |
| -------- | |
| { list } : List of chunks with overlap | |
| """ | |
| if (not chunks or (len(chunks) < 2)): | |
| return chunks | |
| overlap = overlap_tokens or self.overlap_tokens | |
| overlapped_chunks = list() | |
| for i, chunk in enumerate(chunks): | |
| if (i == 0): | |
| # First chunk: no prefix, add suffix from next | |
| new_text = chunk.text | |
| if (i + 1 < len(chunks)): | |
| suffix = self._get_overlap_text(text = chunks[i + 1].text, | |
| overlap_tokens = overlap, | |
| from_start = True, | |
| ) | |
| new_text = new_text + " " + suffix | |
| elif (i == len(chunks) - 1): | |
| # Last chunk: add prefix from previous, no suffix | |
| prefix = self._get_overlap_text(text = chunks[i - 1].text, | |
| overlap_tokens = overlap, | |
| from_start = False, | |
| ) | |
| new_text = prefix + " " + chunk.text | |
| else: | |
| # Middle chunk: add both prefix and suffix | |
| prefix = self._get_overlap_text(text = chunks[i - 1].text, | |
| overlap_tokens = overlap, | |
| from_start = False, | |
| ) | |
| suffix = self._get_overlap_text(text = chunks[i + 1].text, | |
| overlap_tokens = overlap, | |
| from_start = True, | |
| ) | |
| new_text = prefix + " " + chunk.text + " " + suffix | |
| # Create new chunk with overlapped text | |
| overlapped_chunk = DocumentChunk(chunk_id = chunk.chunk_id, | |
| document_id = chunk.document_id, | |
| text = new_text, | |
| chunk_index = chunk.chunk_index, | |
| start_char = chunk.start_char, | |
| end_char = chunk.end_char, | |
| page_number = chunk.page_number, | |
| section_title = chunk.section_title, | |
| token_count = self.token_counter.count_tokens(new_text), | |
| metadata = chunk.metadata, | |
| ) | |
| overlapped_chunks.append(overlapped_chunk) | |
| self.logger.debug(f"Added overlap to {len(chunks)} chunks") | |
| return overlapped_chunks | |
| def _get_overlap_text(self, text: str, overlap_tokens: int, from_start: bool) -> str: | |
| """ | |
| Extract overlap text from beginning or end | |
| Arguments: | |
| ---------- | |
| text { str } : Source text | |
| overlap_tokens { int } : Number of tokens to extract | |
| from_start { bool } : True for start, False for end | |
| Returns: | |
| -------- | |
| { str } : Overlap text | |
| """ | |
| total_tokens = self.token_counter.count_tokens(text) | |
| if (total_tokens <= overlap_tokens): | |
| return text | |
| if from_start: | |
| # Get first N tokens | |
| return self.token_counter.truncate_to_tokens(text = text, | |
| max_tokens = overlap_tokens, | |
| suffix = "", | |
| ) | |
| else: | |
| # Get last N tokens using token counter's boundary finding | |
| char_pos, overlap_text = self.token_counter.find_token_boundaries(text = text, | |
| target_tokens = overlap_tokens, | |
| ) | |
| # Take from the end instead of beginning | |
| if (char_pos < len(text)): | |
| return text[-char_pos:] if (char_pos > 0) else text | |
| return overlap_text | |
| def remove_overlap(self, chunks: List[DocumentChunk]) -> List[DocumentChunk]: | |
| """ | |
| Remove overlap from chunks (get core content only) | |
| Arguments: | |
| ---------- | |
| chunks { list } : List of chunks with overlap | |
| Returns: | |
| -------- | |
| { list } : List of chunks without overlap | |
| """ | |
| if (not chunks or (len(chunks) < 2)): | |
| return chunks | |
| core_chunks = list() | |
| for i, chunk in enumerate(chunks): | |
| if (i == 0): | |
| # First chunk: remove suffix | |
| core_text = self._remove_suffix_overlap(text = chunk.text, | |
| next_text = chunks[i + 1].text if i + 1 < len(chunks) else "", | |
| ) | |
| elif (i == len(chunks) - 1): | |
| # Last chunk: remove prefix | |
| core_text = self._remove_prefix_overlap(text = chunk.text, | |
| previous_text = chunks[i - 1].text, | |
| ) | |
| else: | |
| # Middle chunk: remove both | |
| temp_text = self._remove_prefix_overlap(text = chunk.text, | |
| previous_text = chunks[i - 1].text, | |
| ) | |
| core_text = self._remove_suffix_overlap(text = temp_text, | |
| next_text = chunks[i + 1].text, | |
| ) | |
| core_chunk = DocumentChunk(chunk_id = chunk.chunk_id, | |
| document_id = chunk.document_id, | |
| text = core_text, | |
| chunk_index = chunk.chunk_index, | |
| start_char = chunk.start_char, | |
| end_char = chunk.end_char, | |
| page_number = chunk.page_number, | |
| section_title = chunk.section_title, | |
| token_count = self.token_counter.count_tokens(core_text), | |
| metadata = chunk.metadata, | |
| ) | |
| core_chunks.append(core_chunk) | |
| return core_chunks | |
| def _remove_prefix_overlap(self, text: str, previous_text: str) -> str: | |
| """ | |
| Remove overlap with previous chunk | |
| """ | |
| if not text or not previous_text: | |
| return text | |
| words = text.split() | |
| prev_words = previous_text.split() | |
| # Find longest common suffix-prefix match | |
| max_overlap = 0 | |
| for overlap_size in range(1, min(len(words), len(prev_words)) + 1): | |
| if (words[:overlap_size] == prev_words[-overlap_size:]): | |
| max_overlap = overlap_size | |
| if (max_overlap > 0): | |
| return " ".join(words[max_overlap:]) | |
| return text | |
| def _remove_suffix_overlap(self, text: str, next_text: str) -> str: | |
| """ | |
| Remove overlap with next chunk | |
| """ | |
| # Find common suffix | |
| words = text.split() | |
| next_words = next_text.split() | |
| common_length = 0 | |
| for i in range(1, min(len(words), len(next_words)) + 1): | |
| if (words[-i] == next_words[i - 1]): | |
| common_length += 1 | |
| else: | |
| break | |
| if (common_length > 0): | |
| return " ".join(words[:-common_length]) | |
| return text | |
| def calculate_overlap_percentage(self, chunks: List[DocumentChunk]) -> float: | |
| """ | |
| Calculate average overlap percentage | |
| Arguments: | |
| ---------- | |
| chunks { list } : List of chunks | |
| Returns: | |
| -------- | |
| { float } : Average overlap percentage | |
| """ | |
| if (len(chunks) < 2): | |
| return 0.0 | |
| overlaps = list() | |
| for i in range(len(chunks) - 1): | |
| overlap = self._measure_overlap(chunks[i].text, chunks[i + 1].text) | |
| overlaps.append(overlap) | |
| return sum(overlaps) / len(overlaps) if overlaps else 0.0 | |
| def _measure_overlap(self, text1: str, text2: str) -> float: | |
| """ | |
| Measure overlap between two texts | |
| Arguments: | |
| ---------- | |
| text1 { str } : First text | |
| text2 { str } : Second text | |
| Returns: | |
| -------- | |
| { float } : Overlap percentage (0-100) | |
| """ | |
| words1 = set(text1.lower().split()) | |
| words2 = set(text2.lower().split()) | |
| if (not words1 or not words2): | |
| return 0.0 | |
| common = words1 & words2 | |
| overlap_pct = (len(common) / min(len(words1), len(words2))) * 100 | |
| return overlap_pct | |
| def optimize_overlaps(self, chunks: List[DocumentChunk], target_overlap: int, tolerance: int = 10) -> List[DocumentChunk]: | |
| """ | |
| Optimize overlap sizes to target | |
| Arguments: | |
| ---------- | |
| chunks { list } : List of chunks | |
| target_overlap { int } : Target overlap in tokens | |
| tolerance { int } : Acceptable deviation in tokens | |
| Returns: | |
| -------- | |
| { list } : Optimized chunks | |
| """ | |
| if (len(chunks) < 2): | |
| return chunks | |
| # Validate target_overlap is reasonable | |
| if (target_overlap <= 0): | |
| self.logger.warning("Target overlap must be positive, using default") | |
| target_overlap = self.overlap_tokens | |
| optimized = list() | |
| for i in range(len(chunks)): | |
| chunk = chunks[i] | |
| # Check current overlap with next chunk | |
| if (i < len(chunks) - 1): | |
| current_overlap = self._count_overlap_tokens(text1 = chunk.text, | |
| text2 = chunks[i + 1].text, | |
| ) | |
| # Adjust if outside tolerance | |
| if (abs(current_overlap - target_overlap) > tolerance): | |
| # Add or remove text to reach target | |
| if (current_overlap < target_overlap): | |
| # Need more overlap | |
| additional = self._get_overlap_text(text = chunks[i + 1].text, | |
| overlap_tokens = target_overlap - current_overlap, | |
| from_start = True, | |
| ) | |
| new_text = chunk.text + " " + additional | |
| else: | |
| # Need less overlap | |
| new_text = self.token_counter.truncate_to_tokens(text = chunk.text, | |
| max_tokens = self.token_counter.count_tokens(chunk.text) - (current_overlap - target_overlap), | |
| ) | |
| chunk = DocumentChunk(chunk_id = chunk.chunk_id, | |
| document_id = chunk.document_id, | |
| text = new_text, | |
| chunk_index = chunk.chunk_index, | |
| start_char = chunk.start_char, | |
| end_char = chunk.end_char, | |
| page_number = chunk.page_number, | |
| section_title = chunk.section_title, | |
| token_count = self.token_counter.count_tokens(new_text), | |
| metadata = chunk.metadata, | |
| ) | |
| optimized.append(chunk) | |
| return optimized | |
| def _count_overlap_tokens(self, text1: str, text2: str) -> int: | |
| """ | |
| Count overlapping tokens between two texts | |
| """ | |
| # Find longest common substring at the boundary | |
| words1 = text1.split() | |
| words2 = text2.split() | |
| max_overlap = 0 | |
| for i in range(1, min(len(words1), len(words2)) + 1): | |
| if (words1[-i:] == words2[:i]): | |
| overlap_text = " ".join(words1[-i:]) | |
| max_overlap = self.token_counter.count_tokens(overlap_text) | |
| return max_overlap | |
| def get_overlap_statistics(self, chunks: List[DocumentChunk]) -> dict: | |
| """ | |
| Get statistics about overlaps | |
| Arguments: | |
| ---------- | |
| chunks { list } : List of chunks | |
| Returns: | |
| -------- | |
| { dict } : Statistics dictionary | |
| """ | |
| if (len(chunks) < 2): | |
| return {"num_chunks" : len(chunks), | |
| "num_overlaps" : 0, | |
| "avg_overlap_tokens" : 0, | |
| "avg_overlap_percentage" : 0, | |
| } | |
| overlap_tokens = list() | |
| overlap_percentages = list() | |
| for i in range(len(chunks) - 1): | |
| tokens = self._count_overlap_tokens(chunks[i].text, chunks[i + 1].text) | |
| pct = self._measure_overlap(chunks[i].text, chunks[i + 1].text) | |
| overlap_tokens.append(tokens) | |
| overlap_percentages.append(pct) | |
| return {"num_chunks" : len(chunks), | |
| "num_overlaps" : len(overlap_tokens), | |
| "avg_overlap_tokens" : sum(overlap_tokens) / len(overlap_tokens) if overlap_tokens else 0, | |
| "min_overlap_tokens" : min(overlap_tokens) if overlap_tokens else 0, | |
| "max_overlap_tokens" : max(overlap_tokens) if overlap_tokens else 0, | |
| "avg_overlap_percentage" : sum(overlap_percentages) / len(overlap_percentages) if overlap_percentages else 0, | |
| } |