File size: 10,445 Bytes
d520909 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 |
"""
Document Indexer for RAG
Handles indexing processed documents into the vector store.
"""
from typing import List, Optional, Dict, Any, Union
from pathlib import Path
from pydantic import BaseModel, Field
from loguru import logger
from .store import VectorStore, get_vector_store
from .embeddings import EmbeddingAdapter, get_embedding_adapter
try:
from ..document.schemas.core import ProcessedDocument, DocumentChunk
from ..document.pipeline import process_document, PipelineConfig
DOCUMENT_MODULE_AVAILABLE = True
except ImportError:
DOCUMENT_MODULE_AVAILABLE = False
logger.warning("Document module not available for indexing")
class IndexerConfig(BaseModel):
"""Configuration for document indexer."""
# Batch settings
batch_size: int = Field(default=32, ge=1, description="Embedding batch size")
# Metadata to index
include_bbox: bool = Field(default=True, description="Include bounding boxes")
include_page: bool = Field(default=True, description="Include page numbers")
include_chunk_type: bool = Field(default=True, description="Include chunk types")
# Processing options
skip_empty_chunks: bool = Field(default=True, description="Skip empty text chunks")
min_chunk_length: int = Field(default=10, ge=1, description="Minimum chunk text length")
class IndexingResult(BaseModel):
"""Result of indexing operation."""
document_id: str
source_path: str
num_chunks_indexed: int
num_chunks_skipped: int
success: bool
error: Optional[str] = None
class DocumentIndexer:
"""
Indexes documents into the vector store for RAG.
Workflow:
1. Process document (if not already processed)
2. Extract chunks with metadata
3. Generate embeddings
4. Store in vector database
"""
def __init__(
self,
config: Optional[IndexerConfig] = None,
vector_store: Optional[VectorStore] = None,
embedding_adapter: Optional[EmbeddingAdapter] = None,
):
"""
Initialize indexer.
Args:
config: Indexer configuration
vector_store: Vector store instance
embedding_adapter: Embedding adapter instance
"""
self.config = config or IndexerConfig()
self._store = vector_store
self._embedder = embedding_adapter
@property
def store(self) -> VectorStore:
"""Get vector store (lazy initialization)."""
if self._store is None:
self._store = get_vector_store()
return self._store
@property
def embedder(self) -> EmbeddingAdapter:
"""Get embedding adapter (lazy initialization)."""
if self._embedder is None:
self._embedder = get_embedding_adapter()
return self._embedder
def index_document(
self,
source: Union[str, Path],
document_id: Optional[str] = None,
pipeline_config: Optional[Any] = None,
) -> IndexingResult:
"""
Index a document from file.
Args:
source: Path to document
document_id: Optional document ID
pipeline_config: Optional pipeline configuration
Returns:
IndexingResult
"""
if not DOCUMENT_MODULE_AVAILABLE:
return IndexingResult(
document_id=document_id or str(source),
source_path=str(source),
num_chunks_indexed=0,
num_chunks_skipped=0,
success=False,
error="Document processing module not available",
)
try:
# Process document
logger.info(f"Processing document: {source}")
processed = process_document(source, document_id, pipeline_config)
# Index the processed document
return self.index_processed_document(processed)
except Exception as e:
logger.error(f"Failed to index document: {e}")
return IndexingResult(
document_id=document_id or str(source),
source_path=str(source),
num_chunks_indexed=0,
num_chunks_skipped=0,
success=False,
error=str(e),
)
def index_processed_document(
self,
document: "ProcessedDocument",
) -> IndexingResult:
"""
Index an already-processed document.
Args:
document: ProcessedDocument instance
Returns:
IndexingResult
"""
document_id = document.metadata.document_id
source_path = document.metadata.source_path
try:
# Prepare chunks for indexing
chunks_to_index = []
skipped = 0
for chunk in document.chunks:
# Skip empty or short chunks
if self.config.skip_empty_chunks:
if not chunk.text or len(chunk.text.strip()) < self.config.min_chunk_length:
skipped += 1
continue
chunk_data = {
"chunk_id": chunk.chunk_id,
"document_id": document_id,
"source_path": source_path,
"text": chunk.text,
"sequence_index": chunk.sequence_index,
"confidence": chunk.confidence,
}
if self.config.include_page:
chunk_data["page"] = chunk.page
if self.config.include_chunk_type:
chunk_data["chunk_type"] = chunk.chunk_type.value
if self.config.include_bbox and chunk.bbox:
chunk_data["bbox"] = {
"x_min": chunk.bbox.x_min,
"y_min": chunk.bbox.y_min,
"x_max": chunk.bbox.x_max,
"y_max": chunk.bbox.y_max,
}
chunks_to_index.append(chunk_data)
if not chunks_to_index:
return IndexingResult(
document_id=document_id,
source_path=source_path,
num_chunks_indexed=0,
num_chunks_skipped=skipped,
success=True,
)
# Generate embeddings in batches
logger.info(f"Generating embeddings for {len(chunks_to_index)} chunks")
texts = [c["text"] for c in chunks_to_index]
embeddings = self.embedder.embed_batch(texts)
# Store in vector database
logger.info(f"Storing {len(chunks_to_index)} chunks in vector store")
self.store.add_chunks(chunks_to_index, embeddings)
logger.info(
f"Indexed document {document_id}: "
f"{len(chunks_to_index)} chunks, {skipped} skipped"
)
return IndexingResult(
document_id=document_id,
source_path=source_path,
num_chunks_indexed=len(chunks_to_index),
num_chunks_skipped=skipped,
success=True,
)
except Exception as e:
logger.error(f"Failed to index processed document: {e}")
return IndexingResult(
document_id=document_id,
source_path=source_path,
num_chunks_indexed=0,
num_chunks_skipped=0,
success=False,
error=str(e),
)
def index_batch(
self,
sources: List[Union[str, Path]],
pipeline_config: Optional[Any] = None,
) -> List[IndexingResult]:
"""
Index multiple documents.
Args:
sources: List of document paths
pipeline_config: Optional pipeline configuration
Returns:
List of IndexingResult
"""
results = []
for source in sources:
result = self.index_document(source, pipeline_config=pipeline_config)
results.append(result)
# Summary
successful = sum(1 for r in results if r.success)
total_chunks = sum(r.num_chunks_indexed for r in results)
logger.info(
f"Batch indexing complete: "
f"{successful}/{len(results)} documents, "
f"{total_chunks} total chunks"
)
return results
def delete_document(self, document_id: str) -> int:
"""
Remove a document from the index.
Args:
document_id: Document ID to remove
Returns:
Number of chunks deleted
"""
return self.store.delete_document(document_id)
def get_index_stats(self) -> Dict[str, Any]:
"""
Get indexing statistics.
Returns:
Dictionary with index stats
"""
total_chunks = self.store.count()
# Try to get document count
try:
if hasattr(self.store, 'list_documents'):
doc_ids = self.store.list_documents()
num_documents = len(doc_ids)
else:
num_documents = None
except:
num_documents = None
return {
"total_chunks": total_chunks,
"num_documents": num_documents,
"embedding_model": self.embedder.model_name,
"embedding_dimension": self.embedder.embedding_dimension,
}
# Global instance and factory
_document_indexer: Optional[DocumentIndexer] = None
def get_document_indexer(
config: Optional[IndexerConfig] = None,
vector_store: Optional[VectorStore] = None,
embedding_adapter: Optional[EmbeddingAdapter] = None,
) -> DocumentIndexer:
"""
Get or create singleton document indexer.
Args:
config: Indexer configuration
vector_store: Optional vector store instance
embedding_adapter: Optional embedding adapter
Returns:
DocumentIndexer instance
"""
global _document_indexer
if _document_indexer is None:
_document_indexer = DocumentIndexer(
config=config,
vector_store=vector_store,
embedding_adapter=embedding_adapter,
)
return _document_indexer
def reset_document_indexer():
"""Reset the global indexer instance."""
global _document_indexer
_document_indexer = None
|