|
|
""" |
|
|
Document Indexer for RAG |
|
|
|
|
|
Handles indexing processed documents into the vector store. |
|
|
""" |
|
|
|
|
|
from typing import List, Optional, Dict, Any, Union |
|
|
from pathlib import Path |
|
|
from pydantic import BaseModel, Field |
|
|
from loguru import logger |
|
|
|
|
|
from .store import VectorStore, get_vector_store |
|
|
from .embeddings import EmbeddingAdapter, get_embedding_adapter |
|
|
|
|
|
try: |
|
|
from ..document.schemas.core import ProcessedDocument, DocumentChunk |
|
|
from ..document.pipeline import process_document, PipelineConfig |
|
|
DOCUMENT_MODULE_AVAILABLE = True |
|
|
except ImportError: |
|
|
DOCUMENT_MODULE_AVAILABLE = False |
|
|
logger.warning("Document module not available for indexing") |
|
|
|
|
|
|
|
|
class IndexerConfig(BaseModel): |
|
|
"""Configuration for document indexer.""" |
|
|
|
|
|
batch_size: int = Field(default=32, ge=1, description="Embedding batch size") |
|
|
|
|
|
|
|
|
include_bbox: bool = Field(default=True, description="Include bounding boxes") |
|
|
include_page: bool = Field(default=True, description="Include page numbers") |
|
|
include_chunk_type: bool = Field(default=True, description="Include chunk types") |
|
|
|
|
|
|
|
|
skip_empty_chunks: bool = Field(default=True, description="Skip empty text chunks") |
|
|
min_chunk_length: int = Field(default=10, ge=1, description="Minimum chunk text length") |
|
|
|
|
|
|
|
|
class IndexingResult(BaseModel): |
|
|
"""Result of indexing operation.""" |
|
|
document_id: str |
|
|
source_path: str |
|
|
num_chunks_indexed: int |
|
|
num_chunks_skipped: int |
|
|
success: bool |
|
|
error: Optional[str] = None |
|
|
|
|
|
|
|
|
class DocumentIndexer: |
|
|
""" |
|
|
Indexes documents into the vector store for RAG. |
|
|
|
|
|
Workflow: |
|
|
1. Process document (if not already processed) |
|
|
2. Extract chunks with metadata |
|
|
3. Generate embeddings |
|
|
4. Store in vector database |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
config: Optional[IndexerConfig] = None, |
|
|
vector_store: Optional[VectorStore] = None, |
|
|
embedding_adapter: Optional[EmbeddingAdapter] = None, |
|
|
): |
|
|
""" |
|
|
Initialize indexer. |
|
|
|
|
|
Args: |
|
|
config: Indexer configuration |
|
|
vector_store: Vector store instance |
|
|
embedding_adapter: Embedding adapter instance |
|
|
""" |
|
|
self.config = config or IndexerConfig() |
|
|
self._store = vector_store |
|
|
self._embedder = embedding_adapter |
|
|
|
|
|
@property |
|
|
def store(self) -> VectorStore: |
|
|
"""Get vector store (lazy initialization).""" |
|
|
if self._store is None: |
|
|
self._store = get_vector_store() |
|
|
return self._store |
|
|
|
|
|
@property |
|
|
def embedder(self) -> EmbeddingAdapter: |
|
|
"""Get embedding adapter (lazy initialization).""" |
|
|
if self._embedder is None: |
|
|
self._embedder = get_embedding_adapter() |
|
|
return self._embedder |
|
|
|
|
|
def index_document( |
|
|
self, |
|
|
source: Union[str, Path], |
|
|
document_id: Optional[str] = None, |
|
|
pipeline_config: Optional[Any] = None, |
|
|
) -> IndexingResult: |
|
|
""" |
|
|
Index a document from file. |
|
|
|
|
|
Args: |
|
|
source: Path to document |
|
|
document_id: Optional document ID |
|
|
pipeline_config: Optional pipeline configuration |
|
|
|
|
|
Returns: |
|
|
IndexingResult |
|
|
""" |
|
|
if not DOCUMENT_MODULE_AVAILABLE: |
|
|
return IndexingResult( |
|
|
document_id=document_id or str(source), |
|
|
source_path=str(source), |
|
|
num_chunks_indexed=0, |
|
|
num_chunks_skipped=0, |
|
|
success=False, |
|
|
error="Document processing module not available", |
|
|
) |
|
|
|
|
|
try: |
|
|
|
|
|
logger.info(f"Processing document: {source}") |
|
|
processed = process_document(source, document_id, pipeline_config) |
|
|
|
|
|
|
|
|
return self.index_processed_document(processed) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to index document: {e}") |
|
|
return IndexingResult( |
|
|
document_id=document_id or str(source), |
|
|
source_path=str(source), |
|
|
num_chunks_indexed=0, |
|
|
num_chunks_skipped=0, |
|
|
success=False, |
|
|
error=str(e), |
|
|
) |
|
|
|
|
|
def index_processed_document( |
|
|
self, |
|
|
document: "ProcessedDocument", |
|
|
) -> IndexingResult: |
|
|
""" |
|
|
Index an already-processed document. |
|
|
|
|
|
Args: |
|
|
document: ProcessedDocument instance |
|
|
|
|
|
Returns: |
|
|
IndexingResult |
|
|
""" |
|
|
document_id = document.metadata.document_id |
|
|
source_path = document.metadata.source_path |
|
|
|
|
|
try: |
|
|
|
|
|
chunks_to_index = [] |
|
|
skipped = 0 |
|
|
|
|
|
for chunk in document.chunks: |
|
|
|
|
|
if self.config.skip_empty_chunks: |
|
|
if not chunk.text or len(chunk.text.strip()) < self.config.min_chunk_length: |
|
|
skipped += 1 |
|
|
continue |
|
|
|
|
|
chunk_data = { |
|
|
"chunk_id": chunk.chunk_id, |
|
|
"document_id": document_id, |
|
|
"source_path": source_path, |
|
|
"text": chunk.text, |
|
|
"sequence_index": chunk.sequence_index, |
|
|
"confidence": chunk.confidence, |
|
|
} |
|
|
|
|
|
if self.config.include_page: |
|
|
chunk_data["page"] = chunk.page |
|
|
|
|
|
if self.config.include_chunk_type: |
|
|
chunk_data["chunk_type"] = chunk.chunk_type.value |
|
|
|
|
|
if self.config.include_bbox and chunk.bbox: |
|
|
chunk_data["bbox"] = { |
|
|
"x_min": chunk.bbox.x_min, |
|
|
"y_min": chunk.bbox.y_min, |
|
|
"x_max": chunk.bbox.x_max, |
|
|
"y_max": chunk.bbox.y_max, |
|
|
} |
|
|
|
|
|
chunks_to_index.append(chunk_data) |
|
|
|
|
|
if not chunks_to_index: |
|
|
return IndexingResult( |
|
|
document_id=document_id, |
|
|
source_path=source_path, |
|
|
num_chunks_indexed=0, |
|
|
num_chunks_skipped=skipped, |
|
|
success=True, |
|
|
) |
|
|
|
|
|
|
|
|
logger.info(f"Generating embeddings for {len(chunks_to_index)} chunks") |
|
|
texts = [c["text"] for c in chunks_to_index] |
|
|
embeddings = self.embedder.embed_batch(texts) |
|
|
|
|
|
|
|
|
logger.info(f"Storing {len(chunks_to_index)} chunks in vector store") |
|
|
self.store.add_chunks(chunks_to_index, embeddings) |
|
|
|
|
|
logger.info( |
|
|
f"Indexed document {document_id}: " |
|
|
f"{len(chunks_to_index)} chunks, {skipped} skipped" |
|
|
) |
|
|
|
|
|
return IndexingResult( |
|
|
document_id=document_id, |
|
|
source_path=source_path, |
|
|
num_chunks_indexed=len(chunks_to_index), |
|
|
num_chunks_skipped=skipped, |
|
|
success=True, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to index processed document: {e}") |
|
|
return IndexingResult( |
|
|
document_id=document_id, |
|
|
source_path=source_path, |
|
|
num_chunks_indexed=0, |
|
|
num_chunks_skipped=0, |
|
|
success=False, |
|
|
error=str(e), |
|
|
) |
|
|
|
|
|
def index_batch( |
|
|
self, |
|
|
sources: List[Union[str, Path]], |
|
|
pipeline_config: Optional[Any] = None, |
|
|
) -> List[IndexingResult]: |
|
|
""" |
|
|
Index multiple documents. |
|
|
|
|
|
Args: |
|
|
sources: List of document paths |
|
|
pipeline_config: Optional pipeline configuration |
|
|
|
|
|
Returns: |
|
|
List of IndexingResult |
|
|
""" |
|
|
results = [] |
|
|
|
|
|
for source in sources: |
|
|
result = self.index_document(source, pipeline_config=pipeline_config) |
|
|
results.append(result) |
|
|
|
|
|
|
|
|
successful = sum(1 for r in results if r.success) |
|
|
total_chunks = sum(r.num_chunks_indexed for r in results) |
|
|
|
|
|
logger.info( |
|
|
f"Batch indexing complete: " |
|
|
f"{successful}/{len(results)} documents, " |
|
|
f"{total_chunks} total chunks" |
|
|
) |
|
|
|
|
|
return results |
|
|
|
|
|
def delete_document(self, document_id: str) -> int: |
|
|
""" |
|
|
Remove a document from the index. |
|
|
|
|
|
Args: |
|
|
document_id: Document ID to remove |
|
|
|
|
|
Returns: |
|
|
Number of chunks deleted |
|
|
""" |
|
|
return self.store.delete_document(document_id) |
|
|
|
|
|
def get_index_stats(self) -> Dict[str, Any]: |
|
|
""" |
|
|
Get indexing statistics. |
|
|
|
|
|
Returns: |
|
|
Dictionary with index stats |
|
|
""" |
|
|
total_chunks = self.store.count() |
|
|
|
|
|
|
|
|
try: |
|
|
if hasattr(self.store, 'list_documents'): |
|
|
doc_ids = self.store.list_documents() |
|
|
num_documents = len(doc_ids) |
|
|
else: |
|
|
num_documents = None |
|
|
except: |
|
|
num_documents = None |
|
|
|
|
|
return { |
|
|
"total_chunks": total_chunks, |
|
|
"num_documents": num_documents, |
|
|
"embedding_model": self.embedder.model_name, |
|
|
"embedding_dimension": self.embedder.embedding_dimension, |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
_document_indexer: Optional[DocumentIndexer] = None |
|
|
|
|
|
|
|
|
def get_document_indexer( |
|
|
config: Optional[IndexerConfig] = None, |
|
|
vector_store: Optional[VectorStore] = None, |
|
|
embedding_adapter: Optional[EmbeddingAdapter] = None, |
|
|
) -> DocumentIndexer: |
|
|
""" |
|
|
Get or create singleton document indexer. |
|
|
|
|
|
Args: |
|
|
config: Indexer configuration |
|
|
vector_store: Optional vector store instance |
|
|
embedding_adapter: Optional embedding adapter |
|
|
|
|
|
Returns: |
|
|
DocumentIndexer instance |
|
|
""" |
|
|
global _document_indexer |
|
|
|
|
|
if _document_indexer is None: |
|
|
_document_indexer = DocumentIndexer( |
|
|
config=config, |
|
|
vector_store=vector_store, |
|
|
embedding_adapter=embedding_adapter, |
|
|
) |
|
|
|
|
|
return _document_indexer |
|
|
|
|
|
|
|
|
def reset_document_indexer(): |
|
|
"""Reset the global indexer instance.""" |
|
|
global _document_indexer |
|
|
_document_indexer = None |
|
|
|