kiafa's picture
Premium UI/UX Overhaul & Optimization Update
b96f3a5 verified
"""
Smart Text Chunker - Splits text into optimal chunks for fine-tuning.
Respects sentence and paragraph boundaries.
"""
import re
import logging
from typing import List
logger = logging.getLogger("Chunker")
class TextChunker:
"""Splits text into chunks suitable for instruction dataset generation."""
def __init__(self, chunk_size: int = 1000, overlap: int = 100,
min_chunk_size: int = 200):
self.chunk_size = chunk_size # Target chars per chunk
self.overlap = overlap
self.min_chunk_size = min_chunk_size
def chunk_text(self, text: str, metadata: dict = None) -> List[dict]:
"""Split text into overlapping chunks respecting boundaries."""
if not text or len(text.strip()) < self.min_chunk_size:
return []
# First try paragraph-based splitting
paragraphs = self._split_paragraphs(text)
chunks = []
current_chunk = ""
chunk_index = 0
for para in paragraphs:
# If adding this paragraph exceeds chunk size
if len(current_chunk) + len(para) > self.chunk_size and current_chunk:
# Save current chunk
chunk_data = self._create_chunk(
current_chunk.strip(), chunk_index, metadata
)
if chunk_data:
chunks.append(chunk_data)
chunk_index += 1
# Start new chunk with overlap
overlap_text = self._get_overlap(current_chunk)
current_chunk = overlap_text + para
else:
current_chunk += "\n\n" + para if current_chunk else para
# Don't forget the last chunk
if current_chunk.strip():
chunk_data = self._create_chunk(
current_chunk.strip(), chunk_index, metadata
)
if chunk_data:
chunks.append(chunk_data)
logger.info(f" Split into {len(chunks)} chunks")
return chunks
def _split_paragraphs(self, text: str) -> List[str]:
"""Split text into paragraphs."""
# Split on double newlines
paragraphs = re.split(r'\n\s*\n', text)
# Filter empty paragraphs
return [p.strip() for p in paragraphs if p.strip()]
def _get_overlap(self, text: str) -> str:
"""Get the last N characters for overlap, respecting sentence boundary."""
if len(text) <= self.overlap:
return text
overlap_text = text[-self.overlap:]
# Try to start at a sentence boundary
sentence_start = re.search(r'[.!?]\s+', overlap_text)
if sentence_start:
overlap_text = overlap_text[sentence_start.end():]
return overlap_text + " "
def _create_chunk(self, text: str, index: int, metadata: dict = None) -> dict:
"""Create a chunk dictionary with metadata."""
if len(text) < self.min_chunk_size:
return None
chunk = {
"text": text,
"chunk_index": index,
"char_count": len(text),
"word_count": len(text.split()),
}
if metadata:
chunk.update({
"source": metadata.get("source", ""),
"url": metadata.get("url", ""),
"title": metadata.get("title", ""),
})
return chunk
def chunk_all_documents(self, documents: List[dict]) -> List[dict]:
"""Chunk all documents in a list."""
all_chunks = []
for doc in documents:
text = doc.get("text", "")
metadata = {
"source": doc.get("source", ""),
"url": doc.get("url", ""),
"title": doc.get("title", ""),
}
chunks = self.chunk_text(text, metadata)
all_chunks.extend(chunks)
logger.info(f"Total chunks from {len(documents)} documents: {len(all_chunks)}")
return all_chunks