""" Smart Text Chunker - Splits text into optimal chunks for fine-tuning. Respects sentence and paragraph boundaries. """ import re import logging from typing import List logger = logging.getLogger("Chunker") class TextChunker: """Splits text into chunks suitable for instruction dataset generation.""" def __init__(self, chunk_size: int = 1000, overlap: int = 100, min_chunk_size: int = 200): self.chunk_size = chunk_size # Target chars per chunk self.overlap = overlap self.min_chunk_size = min_chunk_size def chunk_text(self, text: str, metadata: dict = None) -> List[dict]: """Split text into overlapping chunks respecting boundaries.""" if not text or len(text.strip()) < self.min_chunk_size: return [] # First try paragraph-based splitting paragraphs = self._split_paragraphs(text) chunks = [] current_chunk = "" chunk_index = 0 for para in paragraphs: # If adding this paragraph exceeds chunk size if len(current_chunk) + len(para) > self.chunk_size and current_chunk: # Save current chunk chunk_data = self._create_chunk( current_chunk.strip(), chunk_index, metadata ) if chunk_data: chunks.append(chunk_data) chunk_index += 1 # Start new chunk with overlap overlap_text = self._get_overlap(current_chunk) current_chunk = overlap_text + para else: current_chunk += "\n\n" + para if current_chunk else para # Don't forget the last chunk if current_chunk.strip(): chunk_data = self._create_chunk( current_chunk.strip(), chunk_index, metadata ) if chunk_data: chunks.append(chunk_data) logger.info(f" Split into {len(chunks)} chunks") return chunks def _split_paragraphs(self, text: str) -> List[str]: """Split text into paragraphs.""" # Split on double newlines paragraphs = re.split(r'\n\s*\n', text) # Filter empty paragraphs return [p.strip() for p in paragraphs if p.strip()] def _get_overlap(self, text: str) -> str: """Get the last N characters for overlap, respecting sentence boundary.""" if len(text) <= self.overlap: return text overlap_text = text[-self.overlap:] # Try to start at a sentence boundary sentence_start = re.search(r'[.!?]\s+', overlap_text) if sentence_start: overlap_text = overlap_text[sentence_start.end():] return overlap_text + " " def _create_chunk(self, text: str, index: int, metadata: dict = None) -> dict: """Create a chunk dictionary with metadata.""" if len(text) < self.min_chunk_size: return None chunk = { "text": text, "chunk_index": index, "char_count": len(text), "word_count": len(text.split()), } if metadata: chunk.update({ "source": metadata.get("source", ""), "url": metadata.get("url", ""), "title": metadata.get("title", ""), }) return chunk def chunk_all_documents(self, documents: List[dict]) -> List[dict]: """Chunk all documents in a list.""" all_chunks = [] for doc in documents: text = doc.get("text", "") metadata = { "source": doc.get("source", ""), "url": doc.get("url", ""), "title": doc.get("title", ""), } chunks = self.chunk_text(text, metadata) all_chunks.extend(chunks) logger.info(f"Total chunks from {len(documents)} documents: {len(all_chunks)}") return all_chunks