""" Document Indexer for RAG Handles indexing processed documents into the vector store. """ from typing import List, Optional, Dict, Any, Union from pathlib import Path from pydantic import BaseModel, Field from loguru import logger from .store import VectorStore, get_vector_store from .embeddings import EmbeddingAdapter, get_embedding_adapter try: from ..document.schemas.core import ProcessedDocument, DocumentChunk from ..document.pipeline import process_document, PipelineConfig DOCUMENT_MODULE_AVAILABLE = True except ImportError: DOCUMENT_MODULE_AVAILABLE = False logger.warning("Document module not available for indexing") class IndexerConfig(BaseModel): """Configuration for document indexer.""" # Batch settings batch_size: int = Field(default=32, ge=1, description="Embedding batch size") # Metadata to index include_bbox: bool = Field(default=True, description="Include bounding boxes") include_page: bool = Field(default=True, description="Include page numbers") include_chunk_type: bool = Field(default=True, description="Include chunk types") # Processing options skip_empty_chunks: bool = Field(default=True, description="Skip empty text chunks") min_chunk_length: int = Field(default=10, ge=1, description="Minimum chunk text length") class IndexingResult(BaseModel): """Result of indexing operation.""" document_id: str source_path: str num_chunks_indexed: int num_chunks_skipped: int success: bool error: Optional[str] = None class DocumentIndexer: """ Indexes documents into the vector store for RAG. Workflow: 1. Process document (if not already processed) 2. Extract chunks with metadata 3. Generate embeddings 4. Store in vector database """ def __init__( self, config: Optional[IndexerConfig] = None, vector_store: Optional[VectorStore] = None, embedding_adapter: Optional[EmbeddingAdapter] = None, ): """ Initialize indexer. Args: config: Indexer configuration vector_store: Vector store instance embedding_adapter: Embedding adapter instance """ self.config = config or IndexerConfig() self._store = vector_store self._embedder = embedding_adapter @property def store(self) -> VectorStore: """Get vector store (lazy initialization).""" if self._store is None: self._store = get_vector_store() return self._store @property def embedder(self) -> EmbeddingAdapter: """Get embedding adapter (lazy initialization).""" if self._embedder is None: self._embedder = get_embedding_adapter() return self._embedder def index_document( self, source: Union[str, Path], document_id: Optional[str] = None, pipeline_config: Optional[Any] = None, ) -> IndexingResult: """ Index a document from file. Args: source: Path to document document_id: Optional document ID pipeline_config: Optional pipeline configuration Returns: IndexingResult """ if not DOCUMENT_MODULE_AVAILABLE: return IndexingResult( document_id=document_id or str(source), source_path=str(source), num_chunks_indexed=0, num_chunks_skipped=0, success=False, error="Document processing module not available", ) try: # Process document logger.info(f"Processing document: {source}") processed = process_document(source, document_id, pipeline_config) # Index the processed document return self.index_processed_document(processed) except Exception as e: logger.error(f"Failed to index document: {e}") return IndexingResult( document_id=document_id or str(source), source_path=str(source), num_chunks_indexed=0, num_chunks_skipped=0, success=False, error=str(e), ) def index_processed_document( self, document: "ProcessedDocument", ) -> IndexingResult: """ Index an already-processed document. Args: document: ProcessedDocument instance Returns: IndexingResult """ document_id = document.metadata.document_id source_path = document.metadata.source_path try: # Prepare chunks for indexing chunks_to_index = [] skipped = 0 for chunk in document.chunks: # Skip empty or short chunks if self.config.skip_empty_chunks: if not chunk.text or len(chunk.text.strip()) < self.config.min_chunk_length: skipped += 1 continue chunk_data = { "chunk_id": chunk.chunk_id, "document_id": document_id, "source_path": source_path, "text": chunk.text, "sequence_index": chunk.sequence_index, "confidence": chunk.confidence, } if self.config.include_page: chunk_data["page"] = chunk.page if self.config.include_chunk_type: chunk_data["chunk_type"] = chunk.chunk_type.value if self.config.include_bbox and chunk.bbox: chunk_data["bbox"] = { "x_min": chunk.bbox.x_min, "y_min": chunk.bbox.y_min, "x_max": chunk.bbox.x_max, "y_max": chunk.bbox.y_max, } chunks_to_index.append(chunk_data) if not chunks_to_index: return IndexingResult( document_id=document_id, source_path=source_path, num_chunks_indexed=0, num_chunks_skipped=skipped, success=True, ) # Generate embeddings in batches logger.info(f"Generating embeddings for {len(chunks_to_index)} chunks") texts = [c["text"] for c in chunks_to_index] embeddings = self.embedder.embed_batch(texts) # Store in vector database logger.info(f"Storing {len(chunks_to_index)} chunks in vector store") self.store.add_chunks(chunks_to_index, embeddings) logger.info( f"Indexed document {document_id}: " f"{len(chunks_to_index)} chunks, {skipped} skipped" ) return IndexingResult( document_id=document_id, source_path=source_path, num_chunks_indexed=len(chunks_to_index), num_chunks_skipped=skipped, success=True, ) except Exception as e: logger.error(f"Failed to index processed document: {e}") return IndexingResult( document_id=document_id, source_path=source_path, num_chunks_indexed=0, num_chunks_skipped=0, success=False, error=str(e), ) def index_batch( self, sources: List[Union[str, Path]], pipeline_config: Optional[Any] = None, ) -> List[IndexingResult]: """ Index multiple documents. Args: sources: List of document paths pipeline_config: Optional pipeline configuration Returns: List of IndexingResult """ results = [] for source in sources: result = self.index_document(source, pipeline_config=pipeline_config) results.append(result) # Summary successful = sum(1 for r in results if r.success) total_chunks = sum(r.num_chunks_indexed for r in results) logger.info( f"Batch indexing complete: " f"{successful}/{len(results)} documents, " f"{total_chunks} total chunks" ) return results def delete_document(self, document_id: str) -> int: """ Remove a document from the index. Args: document_id: Document ID to remove Returns: Number of chunks deleted """ return self.store.delete_document(document_id) def get_index_stats(self) -> Dict[str, Any]: """ Get indexing statistics. Returns: Dictionary with index stats """ total_chunks = self.store.count() # Try to get document count try: if hasattr(self.store, 'list_documents'): doc_ids = self.store.list_documents() num_documents = len(doc_ids) else: num_documents = None except: num_documents = None return { "total_chunks": total_chunks, "num_documents": num_documents, "embedding_model": self.embedder.model_name, "embedding_dimension": self.embedder.embedding_dimension, } # Global instance and factory _document_indexer: Optional[DocumentIndexer] = None def get_document_indexer( config: Optional[IndexerConfig] = None, vector_store: Optional[VectorStore] = None, embedding_adapter: Optional[EmbeddingAdapter] = None, ) -> DocumentIndexer: """ Get or create singleton document indexer. Args: config: Indexer configuration vector_store: Optional vector store instance embedding_adapter: Optional embedding adapter Returns: DocumentIndexer instance """ global _document_indexer if _document_indexer is None: _document_indexer = DocumentIndexer( config=config, vector_store=vector_store, embedding_adapter=embedding_adapter, ) return _document_indexer def reset_document_indexer(): """Reset the global indexer instance.""" global _document_indexer _document_indexer = None