Removed langchain and llama-cpp-python (not actively supported anymore) dependencies. Updated packages. Updated default dataset
5b2f824
| """ | |
| Custom text splitter to replace langchain RecursiveCharacterTextSplitter. | |
| """ | |
| from typing import List, Optional, Callable | |
| import re | |
| class RecursiveCharacterTextSplitter: | |
| """Splits text recursively by characters.""" | |
| def __init__( | |
| self, | |
| chunk_size: int = 1000, | |
| chunk_overlap: int = 200, | |
| separators: Optional[List[str]] = None, | |
| length_function: Optional[Callable[[str], int]] = None, | |
| add_start_index: bool = False | |
| ): | |
| self.chunk_size = chunk_size | |
| self.chunk_overlap = chunk_overlap | |
| self.separators = separators if separators else ["\n\n", "\n", ". ", "! ", "? ", " ", ""] | |
| self.length_function = length_function if length_function else len | |
| self.add_start_index = add_start_index | |
| def split_text(self, text: str) -> List[str]: | |
| """Split text into chunks.""" | |
| if not text: | |
| return [] | |
| # Start with the full text | |
| splits = [text] | |
| # Try each separator in order | |
| for separator in self.separators: | |
| if not separator: | |
| # Last separator - split by character | |
| new_splits = [] | |
| for split in splits: | |
| if self.length_function(split) <= self.chunk_size: | |
| new_splits.append(split) | |
| else: | |
| # Split by character | |
| for i in range(0, len(split), self.chunk_size - self.chunk_overlap): | |
| chunk = split[i:i + self.chunk_size] | |
| if chunk: | |
| new_splits.append(chunk) | |
| splits = new_splits | |
| break | |
| new_splits = [] | |
| for split in splits: | |
| if self.length_function(split) <= self.chunk_size: | |
| new_splits.append(split) | |
| else: | |
| # Split by separator | |
| parts = split.split(separator) | |
| current_chunk = "" | |
| for part in parts: | |
| part_with_sep = part if not current_chunk else separator + part | |
| if self.length_function(current_chunk + part_with_sep) <= self.chunk_size: | |
| current_chunk += part_with_sep | |
| else: | |
| if current_chunk: | |
| new_splits.append(current_chunk) | |
| current_chunk = part_with_sep | |
| if current_chunk: | |
| new_splits.append(current_chunk) | |
| splits = new_splits | |
| # If all splits are small enough, we're done | |
| if all(self.length_function(s) <= self.chunk_size for s in splits): | |
| break | |
| # Apply overlap | |
| if self.chunk_overlap > 0 and len(splits) > 1: | |
| overlapped_splits = [] | |
| for i, split in enumerate(splits): | |
| if i == 0: | |
| overlapped_splits.append(split) | |
| else: | |
| # Add overlap from previous chunk | |
| prev_chunk = splits[i - 1] | |
| overlap_text = prev_chunk[-self.chunk_overlap:] if len(prev_chunk) > self.chunk_overlap else prev_chunk | |
| overlapped_splits.append(overlap_text + split) | |
| splits = overlapped_splits | |
| return splits | |
| def create_documents( | |
| self, | |
| texts: List[str], | |
| metadatas: Optional[List[dict]] = None | |
| ) -> List: | |
| """Create Document objects from texts.""" | |
| from tools.document import Document | |
| all_docs = [] | |
| metadatas = metadatas if metadatas else [{}] * len(texts) | |
| for text, metadata in zip(texts, metadatas): | |
| splits = self.split_text(text) | |
| for i, split in enumerate(splits): | |
| doc_metadata = metadata.copy() | |
| if self.add_start_index: | |
| # Find start index in original text | |
| start_idx = text.find(split) | |
| if start_idx != -1: | |
| doc_metadata["start_index"] = start_idx | |
| all_docs.append(Document(page_content=split, metadata=doc_metadata)) | |
| return all_docs | |