|
|
import asyncio |
|
|
from typing import List |
|
|
from concurrent.futures import ThreadPoolExecutor |
|
|
|
|
|
from evoagentx.core.logging import logger |
|
|
from llama_index.core.node_parser import SimpleNodeParser |
|
|
from .base import BaseChunker, ChunkingStrategy |
|
|
from evoagentx.rag.schema import Document, Corpus, Chunk |
|
|
|
|
|
class SimpleChunker(BaseChunker): |
|
|
"""Chunker that splits documents into fixed-size chunks using multi-threading and async parsing. |
|
|
|
|
|
Uses LlamaIndex's SimpleNodeParser with async support to create chunks with a specified size |
|
|
and overlap, suitable for general-purpose text splitting in RAG pipelines. |
|
|
|
|
|
Attributes: |
|
|
chunk_size (int): The target size of each chunk in characters. |
|
|
chunk_overlap (int): The number of overlapping characters between adjacent chunks. |
|
|
parser (SimpleNodeParser): The LlamaIndex parser for chunking. |
|
|
max_workers (int): Maximum number of threads for parallel processing. |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
chunk_size: int = 1024, |
|
|
chunk_overlap: int = 20, |
|
|
tokenizer=None, |
|
|
chunking_tokenizer_fn=None, |
|
|
include_metadata: bool = True, |
|
|
include_prev_next_rel: bool = True, |
|
|
max_workers: int = 4, |
|
|
): |
|
|
"""Initialize the SimpleChunker. |
|
|
|
|
|
Args: |
|
|
chunk_size (int, optional): Target size of each chunk in characters (default: 1024). |
|
|
chunk_overlap (int, optional): Overlap between adjacent chunks in characters (default: 20). |
|
|
tokenizer: Optional tokenizer for chunking. |
|
|
chunking_tokenizer_fn: Optional tokenizer function for chunking. |
|
|
include_metadata (bool): Whether to include metadata in nodes (default: True). |
|
|
include_prev_next_rel (bool): Whether to include previous/next relationships (default: True). |
|
|
max_workers (int): Maximum number of threads for parallel processing (default: 4). |
|
|
""" |
|
|
self.chunk_size = chunk_size |
|
|
self.chunk_overlap = chunk_overlap |
|
|
self.tokenizer = tokenizer |
|
|
self.chunking_tokenizer_fn = chunking_tokenizer_fn |
|
|
self.max_workers = max_workers |
|
|
self.parser = SimpleNodeParser( |
|
|
chunk_size=chunk_size, |
|
|
chunk_overlap=chunk_overlap, |
|
|
tokenizer=tokenizer, |
|
|
chunking_tokenizer_fn=chunking_tokenizer_fn, |
|
|
include_metadata=include_metadata, |
|
|
include_prev_next_rel=include_prev_next_rel, |
|
|
) |
|
|
|
|
|
def _process_document(self, doc: Document) -> List[Chunk]: |
|
|
"""Process a single document into chunks in a thread. |
|
|
|
|
|
Args: |
|
|
doc (Document): The document to chunk. |
|
|
|
|
|
Returns: |
|
|
List[Chunk]: List of Chunk objects with metadata. |
|
|
""" |
|
|
try: |
|
|
llama_doc = doc.to_llama_document() |
|
|
llama_doc.metadata["doc_id"] = doc.doc_id |
|
|
|
|
|
|
|
|
nodes = asyncio.run(self.parser.aget_nodes_from_documents([llama_doc])) |
|
|
|
|
|
|
|
|
chunks = [] |
|
|
for idx, node in enumerate(nodes): |
|
|
chunk = Chunk.from_llama_node(node) |
|
|
|
|
|
chunk.metadata.chunking_strategy = ChunkingStrategy.SIMPLE |
|
|
chunks.extend([chunk]) |
|
|
logger.debug(f"Processed document {doc.doc_id} into {len(chunks)} chunks") |
|
|
return chunks |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to process document {doc.doc_id}: {str(e)}") |
|
|
return [] |
|
|
|
|
|
def chunk(self, documents: List[Document], **kwargs) -> Corpus: |
|
|
"""Chunk documents into fixed-size chunks using multi-threading. |
|
|
|
|
|
Args: |
|
|
documents (List[Document]): List of Document objects to chunk. |
|
|
|
|
|
Returns: |
|
|
Corpus: A collection of Chunk objects with metadata. |
|
|
""" |
|
|
if not documents: |
|
|
logger.info("No documents provided, returning empty Corpus") |
|
|
return Corpus([]) |
|
|
|
|
|
chunks = [] |
|
|
with ThreadPoolExecutor(max_workers=self.max_workers) as executor: |
|
|
future_to_doc = {executor.submit(self._process_document, doc): doc for doc in documents} |
|
|
for future in future_to_doc: |
|
|
doc = future_to_doc[future] |
|
|
try: |
|
|
chunks.extend(future.result()) |
|
|
except Exception as e: |
|
|
logger.error(f"Error processing document {doc.doc_id}: {str(e)}") |
|
|
|
|
|
logger.info(f"Chunked {len(documents)} documents into {len(chunks)} chunks") |
|
|
return Corpus(chunks=chunks) |