| | """ |
| | Text chunking module for Norwegian RAG chatbot. |
| | Splits documents into manageable chunks for embedding and retrieval. |
| | """ |
| |
|
| | import re |
| | from typing import List, Optional, Tuple |
| |
|
| | from ..api.config import CHUNK_SIZE, CHUNK_OVERLAP |
| |
|
| | class TextChunker: |
| | """ |
| | Splits documents into manageable chunks for embedding and retrieval. |
| | Supports different chunking strategies optimized for Norwegian text. |
| | """ |
| | |
| | @staticmethod |
| | def chunk_text( |
| | text: str, |
| | chunk_size: int = CHUNK_SIZE, |
| | chunk_overlap: int = CHUNK_OVERLAP, |
| | strategy: str = "paragraph" |
| | ) -> List[str]: |
| | """ |
| | Split text into chunks using the specified strategy. |
| | |
| | Args: |
| | text: Text to split into chunks |
| | chunk_size: Maximum size of each chunk |
| | chunk_overlap: Overlap between consecutive chunks |
| | strategy: Chunking strategy ('fixed', 'paragraph', or 'sentence') |
| | |
| | Returns: |
| | List of text chunks |
| | """ |
| | if not text: |
| | return [] |
| | |
| | if strategy == "fixed": |
| | return TextChunker.fixed_size_chunks(text, chunk_size, chunk_overlap) |
| | elif strategy == "paragraph": |
| | return TextChunker.paragraph_chunks(text, chunk_size, chunk_overlap) |
| | elif strategy == "sentence": |
| | return TextChunker.sentence_chunks(text, chunk_size, chunk_overlap) |
| | else: |
| | raise ValueError(f"Unknown chunking strategy: {strategy}") |
| | |
| | @staticmethod |
| | def fixed_size_chunks( |
| | text: str, |
| | chunk_size: int = CHUNK_SIZE, |
| | chunk_overlap: int = CHUNK_OVERLAP |
| | ) -> List[str]: |
| | """ |
| | Split text into fixed-size chunks with overlap. |
| | |
| | Args: |
| | text: Text to split into chunks |
| | chunk_size: Maximum size of each chunk |
| | chunk_overlap: Overlap between consecutive chunks |
| | |
| | Returns: |
| | List of text chunks |
| | """ |
| | if not text: |
| | return [] |
| | |
| | chunks = [] |
| | start = 0 |
| | text_length = len(text) |
| | |
| | while start < text_length: |
| | end = min(start + chunk_size, text_length) |
| | |
| | |
| | |
| | if start > 0 and end < text_length: |
| | |
| | last_whitespace = text.rfind(' ', start, end) |
| | if last_whitespace != -1: |
| | end = last_whitespace + 1 |
| | |
| | |
| | chunks.append(text[start:end].strip()) |
| | |
| | |
| | start = end - chunk_overlap if end < text_length else text_length |
| | |
| | return chunks |
| | |
| | @staticmethod |
| | def paragraph_chunks( |
| | text: str, |
| | max_chunk_size: int = CHUNK_SIZE, |
| | chunk_overlap: int = CHUNK_OVERLAP |
| | ) -> List[str]: |
| | """ |
| | Split text into chunks based on paragraphs. |
| | |
| | Args: |
| | text: Text to split into chunks |
| | max_chunk_size: Maximum size of each chunk |
| | chunk_overlap: Overlap between consecutive chunks |
| | |
| | Returns: |
| | List of text chunks |
| | """ |
| | if not text: |
| | return [] |
| | |
| | |
| | paragraphs = re.split(r'\n\s*\n', text) |
| | paragraphs = [p.strip() for p in paragraphs if p.strip()] |
| | |
| | chunks = [] |
| | current_chunk = [] |
| | current_size = 0 |
| | |
| | for paragraph in paragraphs: |
| | paragraph_size = len(paragraph) |
| | |
| | |
| | |
| | if current_size + paragraph_size > max_chunk_size and current_chunk: |
| | chunks.append('\n\n'.join(current_chunk)) |
| | |
| | |
| | overlap_size = 0 |
| | overlap_paragraphs = [] |
| | |
| | |
| | for p in reversed(current_chunk): |
| | if overlap_size + len(p) <= chunk_overlap: |
| | overlap_paragraphs.insert(0, p) |
| | overlap_size += len(p) |
| | else: |
| | break |
| | |
| | current_chunk = overlap_paragraphs |
| | current_size = overlap_size |
| | |
| | |
| | if paragraph_size > max_chunk_size: |
| | |
| | if current_chunk: |
| | chunks.append('\n\n'.join(current_chunk)) |
| | current_chunk = [] |
| | current_size = 0 |
| | |
| | |
| | paragraph_chunks = TextChunker.fixed_size_chunks(paragraph, max_chunk_size, chunk_overlap) |
| | chunks.extend(paragraph_chunks) |
| | else: |
| | |
| | current_chunk.append(paragraph) |
| | current_size += paragraph_size |
| | |
| | |
| | if current_chunk: |
| | chunks.append('\n\n'.join(current_chunk)) |
| | |
| | return chunks |
| | |
| | @staticmethod |
| | def sentence_chunks( |
| | text: str, |
| | max_chunk_size: int = CHUNK_SIZE, |
| | chunk_overlap: int = CHUNK_OVERLAP |
| | ) -> List[str]: |
| | """ |
| | Split text into chunks based on sentences. |
| | |
| | Args: |
| | text: Text to split into chunks |
| | max_chunk_size: Maximum size of each chunk |
| | chunk_overlap: Overlap between consecutive chunks |
| | |
| | Returns: |
| | List of text chunks |
| | """ |
| | if not text: |
| | return [] |
| | |
| | |
| | |
| | sentence_pattern = r'(?<=[.!?])\s+(?=[A-ZÆØÅ])' |
| | sentences = re.split(sentence_pattern, text) |
| | sentences = [s.strip() for s in sentences if s.strip()] |
| | |
| | chunks = [] |
| | current_chunk = [] |
| | current_size = 0 |
| | |
| | for sentence in sentences: |
| | sentence_size = len(sentence) |
| | |
| | |
| | |
| | if current_size + sentence_size > max_chunk_size and current_chunk: |
| | chunks.append(' '.join(current_chunk)) |
| | |
| | |
| | overlap_size = 0 |
| | overlap_sentences = [] |
| | |
| | |
| | for s in reversed(current_chunk): |
| | if overlap_size + len(s) <= chunk_overlap: |
| | overlap_sentences.insert(0, s) |
| | overlap_size += len(s) |
| | else: |
| | break |
| | |
| | current_chunk = overlap_sentences |
| | current_size = overlap_size |
| | |
| | |
| | if sentence_size > max_chunk_size: |
| | |
| | if current_chunk: |
| | chunks.append(' '.join(current_chunk)) |
| | current_chunk = [] |
| | current_size = 0 |
| | |
| | |
| | sentence_chunks = TextChunker.fixed_size_chunks(sentence, max_chunk_size, chunk_overlap) |
| | chunks.extend(sentence_chunks) |
| | else: |
| | |
| | current_chunk.append(sentence) |
| | current_size += sentence_size |
| | |
| | |
| | if current_chunk: |
| | chunks.append(' '.join(current_chunk)) |
| | |
| | return chunks |
| | |
| | @staticmethod |
| | def clean_chunk(chunk: str) -> str: |
| | """ |
| | Clean a text chunk by removing excessive whitespace and normalizing. |
| | |
| | Args: |
| | chunk: Text chunk to clean |
| | |
| | Returns: |
| | Cleaned text chunk |
| | """ |
| | if not chunk: |
| | return "" |
| | |
| | |
| | cleaned = re.sub(r'\s+', ' ', chunk) |
| | |
| | |
| | |
| | cleaned = cleaned.replace('æ', 'æ').replace('Æ', 'Æ') |
| | cleaned = cleaned.replace('ø', 'ø').replace('Ø', 'Ø') |
| | cleaned = cleaned.replace('å', 'å').replace('Å', 'Å') |
| | |
| | return cleaned.strip() |
| |
|