""" ChunkingManager.py A manager class that orchestrates document chunking using different strategies. """ from typing import Dict, List, Optional, Union from pathlib import Path from sentence_transformers import SentenceTransformer from langchain_core.documents import Document # Import chunker strategies from core.BaseChunker import BaseChunker from core.PageChunker import PageChunker from core.ParagraphChunker import ParagraphChunker from core.SemanticChunker import SemanticChunker from core.HierarchicalChunker import HierarchicalChunker from core.TokenChunker import TokenChunker import logging # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class ChunkingStrategy: """Enumeration of available chunking strategies.""" PAGE = "page" PARAGRAPH = "paragraph" SEMANTIC = "semantic" HIERARCHICAL = "hierarchical" TOKEN = "token" class ChunkingManager: """Manager class for document chunking strategies.""" def __init__( self, embedding_model_name: str = "all-mpnet-base-v2", token_model_name: Optional[str] = None ): """ Initialize chunking manager. Args: embedding_model_name: Name of the sentence transformer model token_model_name: Name of the token counting model """ self.token_model_name = token_model_name self.embedding_model_name = embedding_model_name self._embedding_model = None self._chunkers = {} @property def embedding_model(self): """Lazy-load the embedding model.""" if self._embedding_model is None: try: # Only try to load as SentenceTransformer if it's a known SentenceTransformer model if self.embedding_model_name and not any(x in self.embedding_model_name.lower() for x in ["gpt", "text-embedding", "openai"]): logger.info(f"Loading embedding model: {self.embedding_model_name}") self._embedding_model = SentenceTransformer(self.embedding_model_name) else: # Return a dummy embedding model that returns None logger.info("Using dummy embedding model for tokenization only") class DummyEmbedder: def encode(self, text, **kwargs): return [0.0] * 384 # Return dummy vector self._embedding_model = DummyEmbedder() except Exception as e: logger.error(f"Error loading embedding model: {e}") # Return a dummy embedding model that returns None class DummyEmbedder: def encode(self, text, **kwargs): return [0.0] * 384 # Return dummy vector self._embedding_model = DummyEmbedder() return self._embedding_model def _get_chunker(self, strategy: str) -> BaseChunker: """Get or create chunker for the specified strategy.""" strategy = strategy.lower() if strategy not in self._chunkers: if strategy == ChunkingStrategy.PAGE: self._chunkers[strategy] = PageChunker( model_name=self.token_model_name, embedding_model=self.embedding_model ) elif strategy == ChunkingStrategy.PARAGRAPH: self._chunkers[strategy] = ParagraphChunker( model_name=self.token_model_name, embedding_model=self.embedding_model ) elif strategy == ChunkingStrategy.SEMANTIC: self._chunkers[strategy] = SemanticChunker( embedding_model=self.embedding_model, model_name=self.token_model_name ) elif strategy == ChunkingStrategy.HIERARCHICAL: self._chunkers[strategy] = HierarchicalChunker( model_name=self.token_model_name, embedding_model=self.embedding_model ) elif strategy == ChunkingStrategy.TOKEN: self._chunkers[strategy] = TokenChunker( model_name=self.token_model_name, embedding_model=self.embedding_model, chunk_size=256, # Default values, could be made configurable chunk_overlap=50 ) else: raise ValueError(f"Unknown chunking strategy: {strategy}") return self._chunkers[strategy] def process_document( self, file_path: str, strategy: str = ChunkingStrategy.PARAGRAPH, preprocess: bool = True ) -> Union[List[Document], Dict[str, List[Document]]]: """ Process document using specified chunking strategy. Args: file_path: Path to document file strategy: Chunking strategy to use preprocess: Whether to preprocess text Returns: Chunked document(s) according to strategy """ # Validate file exists path = Path(file_path) if not path.exists(): raise FileNotFoundError(f"File not found: {file_path}") # Determine file type file_extension = path.suffix.lower() # Process based on file type if file_extension == '.csv': return self._process_csv(file_path, strategy) elif file_extension == '.txt': return self._process_txt(file_path, strategy, preprocess) elif file_extension == '.pdf': # Get appropriate chunker and process document chunker = self._get_chunker(strategy) logger.info(f"Processing document using {strategy} chunking strategy") if strategy == ChunkingStrategy.PAGE: return chunker.page_process_document(file_path, preprocess) elif strategy == ChunkingStrategy.PARAGRAPH: return chunker.paragraph_process_document(file_path, preprocess) elif strategy == ChunkingStrategy.SEMANTIC: return chunker.semantic_process_document(file_path, preprocess) elif strategy == ChunkingStrategy.HIERARCHICAL: return chunker.hierarchical_process_document(file_path, preprocess) elif strategy == ChunkingStrategy.TOKEN: return chunker.token_process_document(file_path, preprocess) else: raise ValueError(f"Unknown chunking strategy: {strategy}") else: raise ValueError(f"Unsupported file type: {file_extension}. Supported types: .pdf, .csv, .txt") def process_directory( self, dir_path: str, strategy: str = ChunkingStrategy.PARAGRAPH, preprocess: bool = True ) -> Dict[str, Union[List[Document], Dict[str, List[Document]]]]: """ Process all supported documents in a directory. Args: dir_path: Directory containing files strategy: Chunking strategy to use preprocess: Whether to preprocess text Returns: Dictionary mapping filenames to their processed documents """ path = Path(dir_path) if not path.is_dir(): raise NotADirectoryError(f"Not a directory: {dir_path}") results = {} # Find supported files (PDFs, CSVs, and TXT files) pdf_files = list(path.glob("**/*.pdf")) csv_files = list(path.glob("**/*.csv")) txt_files = list(path.glob("**/*.txt")) all_files = pdf_files + csv_files + txt_files logger.info(f"Found {len(pdf_files)} PDF files, {len(csv_files)} CSV files, and {len(txt_files)} TXT files in {dir_path}") for file in all_files: try: logger.info(f"Processing {file.name}") result = self.process_document( str(file), strategy=strategy, preprocess=preprocess ) results[file.name] = result except Exception as e: logger.error(f"Error processing {file.name}: {e}") results[file.name] = {"error": str(e)} return results def _process_txt(self, file_path: str, strategy: str, preprocess: bool) -> List[Document]: """Process a TXT file into document chunks.""" logger.info(f"Processing TXT file: {file_path}") # Validate strategy for TXT files if strategy not in [ChunkingStrategy.PARAGRAPH, ChunkingStrategy.TOKEN]: raise ValueError(f"TXT files only support paragraph and token chunking strategies. Got: {strategy}") # Get appropriate chunker chunker = self._get_chunker(strategy) # Process based on strategy if strategy == ChunkingStrategy.PARAGRAPH: return chunker.process_text_file(file_path, preprocess) elif strategy == ChunkingStrategy.TOKEN: return chunker.process_text_file(file_path, preprocess) else: raise ValueError(f"Unsupported chunking strategy for TXT: {strategy}") def _process_txt(self, file_path: str, strategy: str, preprocess: bool) -> List[Document]: """Process a TXT file into document chunks.""" logger.info(f"Processing TXT file: {file_path}") # Validate strategy for TXT files if strategy not in [ChunkingStrategy.PARAGRAPH, ChunkingStrategy.TOKEN]: raise ValueError(f"TXT files only support paragraph and token chunking strategies. Got: {strategy}") # Get appropriate chunker chunker = self._get_chunker(strategy) # Process based on strategy if strategy == ChunkingStrategy.PARAGRAPH: return chunker.process_text_file(file_path, preprocess) elif strategy == ChunkingStrategy.TOKEN: return chunker.process_text_file(file_path, preprocess) else: raise ValueError(f"Unsupported chunking strategy for TXT: {strategy}") def _process_csv(self, file_path: str, strategy: str) -> List[Document]: """Process a CSV file into document chunks.""" import pandas as pd logger.info(f"Loading CSV file: {file_path}") # Read the CSV file df = pd.read_csv(file_path) # Determine the chunking approach based on strategy if strategy == ChunkingStrategy.PARAGRAPH: # For these strategies, we treat each row as a separate document # with columns combined into a structured text format return self._chunk_csv_by_row(df, file_path) elif strategy == ChunkingStrategy.PAGE: # For page strategy, we create larger chunks with multiple rows return self._chunk_csv_by_page(df, file_path) elif strategy == ChunkingStrategy.HIERARCHICAL: # For hierarchical, create documents with metadata structure return {"chunks": self._chunk_csv_by_row(df, file_path)} else: raise ValueError(f"Unsupported chunking strategy for CSV: {strategy}") def _chunk_csv_by_row(self, df, file_path: str) -> List[Document]: """Convert each CSV row to a document chunk.""" chunks = [] file_name = Path(file_path).name # Get column names columns = df.columns.tolist() # Process each row for i, row in df.iterrows(): # Convert row to formatted text content = "\n".join([f"{col}: {row[col]}" for col in columns]) # Create metadata metadata = { "source": file_path, "file_name": file_name, "file_type": "csv", "row_index": i, "chunk_type": "csv_row", } # Add columns as additional metadata for col in columns: # Convert to string to ensure compatibility metadata[f"csv_{col}"] = str(row[col]) # Create document doc = Document(page_content=content, metadata=metadata) chunks.append(doc) logger.info(f"Created {len(chunks)} chunks from CSV (row-based)") return chunks def _chunk_csv_by_page(self, df, file_path: str, rows_per_chunk: int = 20) -> List[Document]: """Convert CSV into larger chunks with multiple rows per chunk.""" chunks = [] file_name = Path(file_path).name columns = df.columns.tolist() # Calculate number of chunks total_rows = len(df) chunk_count = (total_rows + rows_per_chunk - 1) // rows_per_chunk # Ceiling division # Generate chunks for chunk_idx in range(chunk_count): start_row = chunk_idx * rows_per_chunk end_row = min(start_row + rows_per_chunk, total_rows) chunk_df = df.iloc[start_row:end_row] # Format the chunk content content = f"CSV Data (Rows {start_row+1}-{end_row}):\n\n" # Add header row content += " | ".join(columns) + "\n" content += "-" * (sum(len(col) for col in columns) + 3 * (len(columns) - 1)) + "\n" # Add data rows for _, row in chunk_df.iterrows(): content += " | ".join(str(row[col]) for col in columns) + "\n" # Create metadata metadata = { "source": file_path, "file_name": file_name, "file_type": "csv", "chunk_type": "csv_page", "start_row": start_row, "end_row": end_row - 1, "row_count": end_row - start_row, } # Create document doc = Document(page_content=content, metadata=metadata) chunks.append(doc) logger.info(f"Created {len(chunks)} chunks from CSV (page-based)") return chunks