AI_Toolkit / src /core /ChunkingManager.py
NavyDevilDoc's picture
Upload 10 files
c0f31c1 verified
"""
ChunkingManager.py
A manager class that orchestrates document chunking using different strategies.
"""
from typing import Dict, List, Optional, Union
from pathlib import Path
from sentence_transformers import SentenceTransformer
from langchain_core.documents import Document
# Import chunker strategies
from core.BaseChunker import BaseChunker
from core.PageChunker import PageChunker
from core.ParagraphChunker import ParagraphChunker
from core.SemanticChunker import SemanticChunker
from core.HierarchicalChunker import HierarchicalChunker
from core.TokenChunker import TokenChunker
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ChunkingStrategy:
"""Enumeration of available chunking strategies."""
PAGE = "page"
PARAGRAPH = "paragraph"
SEMANTIC = "semantic"
HIERARCHICAL = "hierarchical"
TOKEN = "token"
class ChunkingManager:
"""Manager class for document chunking strategies."""
def __init__(
self,
embedding_model_name: str = "all-mpnet-base-v2",
token_model_name: Optional[str] = None
):
"""
Initialize chunking manager.
Args:
embedding_model_name: Name of the sentence transformer model
token_model_name: Name of the token counting model
"""
self.token_model_name = token_model_name
self.embedding_model_name = embedding_model_name
self._embedding_model = None
self._chunkers = {}
@property
def embedding_model(self):
"""Lazy-load the embedding model."""
if self._embedding_model is None:
try:
# Only try to load as SentenceTransformer if it's a known SentenceTransformer model
if self.embedding_model_name and not any(x in self.embedding_model_name.lower() for x in ["gpt", "text-embedding", "openai"]):
logger.info(f"Loading embedding model: {self.embedding_model_name}")
self._embedding_model = SentenceTransformer(self.embedding_model_name)
else:
# Return a dummy embedding model that returns None
logger.info("Using dummy embedding model for tokenization only")
class DummyEmbedder:
def encode(self, text, **kwargs):
return [0.0] * 384 # Return dummy vector
self._embedding_model = DummyEmbedder()
except Exception as e:
logger.error(f"Error loading embedding model: {e}")
# Return a dummy embedding model that returns None
class DummyEmbedder:
def encode(self, text, **kwargs):
return [0.0] * 384 # Return dummy vector
self._embedding_model = DummyEmbedder()
return self._embedding_model
def _get_chunker(self, strategy: str) -> BaseChunker:
"""Get or create chunker for the specified strategy."""
strategy = strategy.lower()
if strategy not in self._chunkers:
if strategy == ChunkingStrategy.PAGE:
self._chunkers[strategy] = PageChunker(
model_name=self.token_model_name,
embedding_model=self.embedding_model
)
elif strategy == ChunkingStrategy.PARAGRAPH:
self._chunkers[strategy] = ParagraphChunker(
model_name=self.token_model_name,
embedding_model=self.embedding_model
)
elif strategy == ChunkingStrategy.SEMANTIC:
self._chunkers[strategy] = SemanticChunker(
embedding_model=self.embedding_model,
model_name=self.token_model_name
)
elif strategy == ChunkingStrategy.HIERARCHICAL:
self._chunkers[strategy] = HierarchicalChunker(
model_name=self.token_model_name,
embedding_model=self.embedding_model
)
elif strategy == ChunkingStrategy.TOKEN:
self._chunkers[strategy] = TokenChunker(
model_name=self.token_model_name,
embedding_model=self.embedding_model,
chunk_size=256, # Default values, could be made configurable
chunk_overlap=50
)
else:
raise ValueError(f"Unknown chunking strategy: {strategy}")
return self._chunkers[strategy]
def process_document(
self,
file_path: str,
strategy: str = ChunkingStrategy.PARAGRAPH,
preprocess: bool = True
) -> Union[List[Document], Dict[str, List[Document]]]:
"""
Process document using specified chunking strategy.
Args:
file_path: Path to document file
strategy: Chunking strategy to use
preprocess: Whether to preprocess text
Returns:
Chunked document(s) according to strategy
"""
# Validate file exists
path = Path(file_path)
if not path.exists():
raise FileNotFoundError(f"File not found: {file_path}")
# Determine file type
file_extension = path.suffix.lower()
# Process based on file type
if file_extension == '.csv':
return self._process_csv(file_path, strategy)
elif file_extension == '.txt':
return self._process_txt(file_path, strategy, preprocess)
elif file_extension == '.pdf':
# Get appropriate chunker and process document
chunker = self._get_chunker(strategy)
logger.info(f"Processing document using {strategy} chunking strategy")
if strategy == ChunkingStrategy.PAGE:
return chunker.page_process_document(file_path, preprocess)
elif strategy == ChunkingStrategy.PARAGRAPH:
return chunker.paragraph_process_document(file_path, preprocess)
elif strategy == ChunkingStrategy.SEMANTIC:
return chunker.semantic_process_document(file_path, preprocess)
elif strategy == ChunkingStrategy.HIERARCHICAL:
return chunker.hierarchical_process_document(file_path, preprocess)
elif strategy == ChunkingStrategy.TOKEN:
return chunker.token_process_document(file_path, preprocess)
else:
raise ValueError(f"Unknown chunking strategy: {strategy}")
else:
raise ValueError(f"Unsupported file type: {file_extension}. Supported types: .pdf, .csv, .txt")
def process_directory(
self,
dir_path: str,
strategy: str = ChunkingStrategy.PARAGRAPH,
preprocess: bool = True
) -> Dict[str, Union[List[Document], Dict[str, List[Document]]]]:
"""
Process all supported documents in a directory.
Args:
dir_path: Directory containing files
strategy: Chunking strategy to use
preprocess: Whether to preprocess text
Returns:
Dictionary mapping filenames to their processed documents
"""
path = Path(dir_path)
if not path.is_dir():
raise NotADirectoryError(f"Not a directory: {dir_path}")
results = {}
# Find supported files (PDFs, CSVs, and TXT files)
pdf_files = list(path.glob("**/*.pdf"))
csv_files = list(path.glob("**/*.csv"))
txt_files = list(path.glob("**/*.txt"))
all_files = pdf_files + csv_files + txt_files
logger.info(f"Found {len(pdf_files)} PDF files, {len(csv_files)} CSV files, and {len(txt_files)} TXT files in {dir_path}")
for file in all_files:
try:
logger.info(f"Processing {file.name}")
result = self.process_document(
str(file),
strategy=strategy,
preprocess=preprocess
)
results[file.name] = result
except Exception as e:
logger.error(f"Error processing {file.name}: {e}")
results[file.name] = {"error": str(e)}
return results
def _process_txt(self, file_path: str, strategy: str, preprocess: bool) -> List[Document]:
"""Process a TXT file into document chunks."""
logger.info(f"Processing TXT file: {file_path}")
# Validate strategy for TXT files
if strategy not in [ChunkingStrategy.PARAGRAPH, ChunkingStrategy.TOKEN]:
raise ValueError(f"TXT files only support paragraph and token chunking strategies. Got: {strategy}")
# Get appropriate chunker
chunker = self._get_chunker(strategy)
# Process based on strategy
if strategy == ChunkingStrategy.PARAGRAPH:
return chunker.process_text_file(file_path, preprocess)
elif strategy == ChunkingStrategy.TOKEN:
return chunker.process_text_file(file_path, preprocess)
else:
raise ValueError(f"Unsupported chunking strategy for TXT: {strategy}")
def _process_txt(self, file_path: str, strategy: str, preprocess: bool) -> List[Document]:
"""Process a TXT file into document chunks."""
logger.info(f"Processing TXT file: {file_path}")
# Validate strategy for TXT files
if strategy not in [ChunkingStrategy.PARAGRAPH, ChunkingStrategy.TOKEN]:
raise ValueError(f"TXT files only support paragraph and token chunking strategies. Got: {strategy}")
# Get appropriate chunker
chunker = self._get_chunker(strategy)
# Process based on strategy
if strategy == ChunkingStrategy.PARAGRAPH:
return chunker.process_text_file(file_path, preprocess)
elif strategy == ChunkingStrategy.TOKEN:
return chunker.process_text_file(file_path, preprocess)
else:
raise ValueError(f"Unsupported chunking strategy for TXT: {strategy}")
def _process_csv(self, file_path: str, strategy: str) -> List[Document]:
"""Process a CSV file into document chunks."""
import pandas as pd
logger.info(f"Loading CSV file: {file_path}")
# Read the CSV file
df = pd.read_csv(file_path)
# Determine the chunking approach based on strategy
if strategy == ChunkingStrategy.PARAGRAPH:
# For these strategies, we treat each row as a separate document
# with columns combined into a structured text format
return self._chunk_csv_by_row(df, file_path)
elif strategy == ChunkingStrategy.PAGE:
# For page strategy, we create larger chunks with multiple rows
return self._chunk_csv_by_page(df, file_path)
elif strategy == ChunkingStrategy.HIERARCHICAL:
# For hierarchical, create documents with metadata structure
return {"chunks": self._chunk_csv_by_row(df, file_path)}
else:
raise ValueError(f"Unsupported chunking strategy for CSV: {strategy}")
def _chunk_csv_by_row(self, df, file_path: str) -> List[Document]:
"""Convert each CSV row to a document chunk."""
chunks = []
file_name = Path(file_path).name
# Get column names
columns = df.columns.tolist()
# Process each row
for i, row in df.iterrows():
# Convert row to formatted text
content = "\n".join([f"{col}: {row[col]}" for col in columns])
# Create metadata
metadata = {
"source": file_path,
"file_name": file_name,
"file_type": "csv",
"row_index": i,
"chunk_type": "csv_row",
}
# Add columns as additional metadata
for col in columns:
# Convert to string to ensure compatibility
metadata[f"csv_{col}"] = str(row[col])
# Create document
doc = Document(page_content=content, metadata=metadata)
chunks.append(doc)
logger.info(f"Created {len(chunks)} chunks from CSV (row-based)")
return chunks
def _chunk_csv_by_page(self, df, file_path: str, rows_per_chunk: int = 20) -> List[Document]:
"""Convert CSV into larger chunks with multiple rows per chunk."""
chunks = []
file_name = Path(file_path).name
columns = df.columns.tolist()
# Calculate number of chunks
total_rows = len(df)
chunk_count = (total_rows + rows_per_chunk - 1) // rows_per_chunk # Ceiling division
# Generate chunks
for chunk_idx in range(chunk_count):
start_row = chunk_idx * rows_per_chunk
end_row = min(start_row + rows_per_chunk, total_rows)
chunk_df = df.iloc[start_row:end_row]
# Format the chunk content
content = f"CSV Data (Rows {start_row+1}-{end_row}):\n\n"
# Add header row
content += " | ".join(columns) + "\n"
content += "-" * (sum(len(col) for col in columns) + 3 * (len(columns) - 1)) + "\n"
# Add data rows
for _, row in chunk_df.iterrows():
content += " | ".join(str(row[col]) for col in columns) + "\n"
# Create metadata
metadata = {
"source": file_path,
"file_name": file_name,
"file_type": "csv",
"chunk_type": "csv_page",
"start_row": start_row,
"end_row": end_row - 1,
"row_count": end_row - start_row,
}
# Create document
doc = Document(page_content=content, metadata=metadata)
chunks.append(doc)
logger.info(f"Created {len(chunks)} chunks from CSV (page-based)")
return chunks