| | """ |
| | Document Intelligence Bridge for RAG |
| | |
| | Bridges the document_intelligence subsystem with the RAG indexer/retriever. |
| | Converts ParseResult to a format compatible with DocumentIndexer. |
| | """ |
| |
|
| | from typing import List, Optional, Dict, Any |
| | from pathlib import Path |
| | from pydantic import BaseModel |
| | from loguru import logger |
| |
|
| | from .store import VectorStore, get_vector_store |
| | from .embeddings import EmbeddingAdapter, get_embedding_adapter |
| | from .indexer import IndexingResult, IndexerConfig |
| |
|
| | |
| | try: |
| | from ..document_intelligence.chunks import ( |
| | ParseResult, |
| | DocumentChunk, |
| | BoundingBox, |
| | EvidenceRef, |
| | ChunkType, |
| | ) |
| | DOCINT_AVAILABLE = True |
| | except ImportError: |
| | DOCINT_AVAILABLE = False |
| | logger.warning("document_intelligence module not available") |
| |
|
| |
|
| | class DocIntIndexer: |
| | """ |
| | Indexes ParseResult from document_intelligence into the vector store. |
| | |
| | This bridges the new document_intelligence subsystem with the existing |
| | RAG infrastructure. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | config: Optional[IndexerConfig] = None, |
| | vector_store: Optional[VectorStore] = None, |
| | embedding_adapter: Optional[EmbeddingAdapter] = None, |
| | ): |
| | self.config = config or IndexerConfig() |
| | self._store = vector_store |
| | self._embedder = embedding_adapter |
| |
|
| | @property |
| | def store(self) -> VectorStore: |
| | if self._store is None: |
| | self._store = get_vector_store() |
| | return self._store |
| |
|
| | @property |
| | def embedder(self) -> EmbeddingAdapter: |
| | if self._embedder is None: |
| | self._embedder = get_embedding_adapter() |
| | return self._embedder |
| |
|
| | def index_parse_result( |
| | self, |
| | parse_result: "ParseResult", |
| | source_path: Optional[str] = None, |
| | ) -> IndexingResult: |
| | """ |
| | Index a ParseResult from document_intelligence. |
| | |
| | Args: |
| | parse_result: ParseResult from DocumentParser |
| | source_path: Optional override for source path |
| | |
| | Returns: |
| | IndexingResult with indexing stats |
| | """ |
| | if not DOCINT_AVAILABLE: |
| | return IndexingResult( |
| | document_id="unknown", |
| | source_path="unknown", |
| | num_chunks_indexed=0, |
| | num_chunks_skipped=0, |
| | success=False, |
| | error="document_intelligence module not available", |
| | ) |
| |
|
| | document_id = parse_result.doc_id |
| | source = source_path or parse_result.filename |
| |
|
| | try: |
| | chunks_to_index = [] |
| | skipped = 0 |
| |
|
| | for chunk in parse_result.chunks: |
| | |
| | if self.config.skip_empty_chunks: |
| | if not chunk.text or len(chunk.text.strip()) < self.config.min_chunk_length: |
| | skipped += 1 |
| | continue |
| |
|
| | chunk_data = { |
| | "chunk_id": chunk.chunk_id, |
| | "document_id": document_id, |
| | "source_path": source, |
| | "text": chunk.text, |
| | "sequence_index": chunk.sequence_index, |
| | "confidence": chunk.confidence, |
| | } |
| |
|
| | if self.config.include_page: |
| | chunk_data["page"] = chunk.page |
| |
|
| | if self.config.include_chunk_type: |
| | chunk_data["chunk_type"] = chunk.chunk_type.value |
| |
|
| | if self.config.include_bbox and chunk.bbox: |
| | chunk_data["bbox"] = { |
| | "x_min": chunk.bbox.x_min, |
| | "y_min": chunk.bbox.y_min, |
| | "x_max": chunk.bbox.x_max, |
| | "y_max": chunk.bbox.y_max, |
| | } |
| |
|
| | chunks_to_index.append(chunk_data) |
| |
|
| | if not chunks_to_index: |
| | return IndexingResult( |
| | document_id=document_id, |
| | source_path=source, |
| | num_chunks_indexed=0, |
| | num_chunks_skipped=skipped, |
| | success=True, |
| | ) |
| |
|
| | |
| | logger.info(f"Generating embeddings for {len(chunks_to_index)} chunks") |
| | texts = [c["text"] for c in chunks_to_index] |
| |
|
| | embeddings = [] |
| | batch_size = self.config.batch_size |
| | for i in range(0, len(texts), batch_size): |
| | batch = texts[i:i + batch_size] |
| | batch_embeddings = self.embedder.embed_batch(batch) |
| | embeddings.extend(batch_embeddings) |
| |
|
| | |
| | logger.info(f"Storing {len(chunks_to_index)} chunks in vector store") |
| | self.store.add_chunks(chunks_to_index, embeddings) |
| |
|
| | logger.info( |
| | f"Indexed document {document_id}: " |
| | f"{len(chunks_to_index)} chunks, {skipped} skipped" |
| | ) |
| |
|
| | return IndexingResult( |
| | document_id=document_id, |
| | source_path=source, |
| | num_chunks_indexed=len(chunks_to_index), |
| | num_chunks_skipped=skipped, |
| | success=True, |
| | ) |
| |
|
| | except Exception as e: |
| | logger.error(f"Failed to index parse result: {e}") |
| | return IndexingResult( |
| | document_id=document_id, |
| | source_path=source, |
| | num_chunks_indexed=0, |
| | num_chunks_skipped=0, |
| | success=False, |
| | error=str(e), |
| | ) |
| |
|
| | def index_document( |
| | self, |
| | path: str, |
| | max_pages: Optional[int] = None, |
| | ) -> IndexingResult: |
| | """ |
| | Parse and index a document in one step. |
| | |
| | Args: |
| | path: Path to document file |
| | max_pages: Optional limit on pages to process |
| | |
| | Returns: |
| | IndexingResult |
| | """ |
| | if not DOCINT_AVAILABLE: |
| | return IndexingResult( |
| | document_id=str(path), |
| | source_path=str(path), |
| | num_chunks_indexed=0, |
| | num_chunks_skipped=0, |
| | success=False, |
| | error="document_intelligence module not available", |
| | ) |
| |
|
| | try: |
| | from ..document_intelligence import DocumentParser, ParserConfig |
| |
|
| | config = ParserConfig(max_pages=max_pages) |
| | parser = DocumentParser(config=config) |
| |
|
| | logger.info(f"Parsing document: {path}") |
| | parse_result = parser.parse(path) |
| |
|
| | return self.index_parse_result(parse_result, source_path=str(path)) |
| |
|
| | except Exception as e: |
| | logger.error(f"Failed to parse and index document: {e}") |
| | return IndexingResult( |
| | document_id=str(path), |
| | source_path=str(path), |
| | num_chunks_indexed=0, |
| | num_chunks_skipped=0, |
| | success=False, |
| | error=str(e), |
| | ) |
| |
|
| | def delete_document(self, document_id: str) -> int: |
| | """Remove a document from the index.""" |
| | return self.store.delete_document(document_id) |
| |
|
| | def get_stats(self) -> Dict[str, Any]: |
| | """Get indexing statistics.""" |
| | total_chunks = self.store.count() |
| |
|
| | return { |
| | "total_chunks": total_chunks, |
| | "embedding_model": self.embedder.model_name, |
| | "embedding_dimension": self.embedder.embedding_dimension, |
| | } |
| |
|
| |
|
| | class DocIntRetriever: |
| | """ |
| | Retriever with document_intelligence EvidenceRef support. |
| | |
| | Wraps DocumentRetriever with conversions to document_intelligence types. |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | vector_store: Optional[VectorStore] = None, |
| | embedding_adapter: Optional[EmbeddingAdapter] = None, |
| | similarity_threshold: float = 0.5, |
| | ): |
| | self._store = vector_store |
| | self._embedder = embedding_adapter |
| | self.similarity_threshold = similarity_threshold |
| |
|
| | @property |
| | def store(self) -> VectorStore: |
| | if self._store is None: |
| | self._store = get_vector_store() |
| | return self._store |
| |
|
| | @property |
| | def embedder(self) -> EmbeddingAdapter: |
| | if self._embedder is None: |
| | self._embedder = get_embedding_adapter() |
| | return self._embedder |
| |
|
| | def retrieve( |
| | self, |
| | query: str, |
| | top_k: int = 5, |
| | document_id: Optional[str] = None, |
| | chunk_types: Optional[List[str]] = None, |
| | page_range: Optional[tuple] = None, |
| | ) -> List[Dict[str, Any]]: |
| | """ |
| | Retrieve relevant chunks. |
| | |
| | Args: |
| | query: Search query |
| | top_k: Number of results |
| | document_id: Filter by document |
| | chunk_types: Filter by chunk type(s) |
| | page_range: Filter by page range (start, end) |
| | |
| | Returns: |
| | List of chunk dicts with metadata |
| | """ |
| | |
| | filters = {} |
| |
|
| | if document_id: |
| | filters["document_id"] = document_id |
| |
|
| | if chunk_types: |
| | filters["chunk_type"] = chunk_types |
| |
|
| | if page_range: |
| | filters["page"] = {"min": page_range[0], "max": page_range[1]} |
| |
|
| | |
| | query_embedding = self.embedder.embed_text(query) |
| |
|
| | |
| | results = self.store.search( |
| | query_embedding=query_embedding, |
| | top_k=top_k, |
| | filters=filters if filters else None, |
| | ) |
| |
|
| | |
| | chunks = [] |
| | for result in results: |
| | if result.similarity < self.similarity_threshold: |
| | continue |
| |
|
| | chunk = { |
| | "chunk_id": result.chunk_id, |
| | "document_id": result.document_id, |
| | "text": result.text, |
| | "similarity": result.similarity, |
| | "page": result.page, |
| | "chunk_type": result.chunk_type, |
| | "bbox": result.bbox, |
| | "source_path": result.metadata.get("source_path"), |
| | "confidence": result.metadata.get("confidence"), |
| | } |
| | chunks.append(chunk) |
| |
|
| | return chunks |
| |
|
| | def retrieve_with_evidence( |
| | self, |
| | query: str, |
| | top_k: int = 5, |
| | document_id: Optional[str] = None, |
| | chunk_types: Optional[List[str]] = None, |
| | page_range: Optional[tuple] = None, |
| | ) -> tuple: |
| | """ |
| | Retrieve chunks with EvidenceRef objects. |
| | |
| | Returns: |
| | Tuple of (chunks, evidence_refs) |
| | """ |
| | chunks = self.retrieve( |
| | query, top_k, document_id, chunk_types, page_range |
| | ) |
| |
|
| | evidence_refs = [] |
| |
|
| | if DOCINT_AVAILABLE: |
| | for chunk in chunks: |
| | bbox = None |
| | if chunk.get("bbox"): |
| | bbox_data = chunk["bbox"] |
| | bbox = BoundingBox( |
| | x_min=bbox_data.get("x_min", 0), |
| | y_min=bbox_data.get("y_min", 0), |
| | x_max=bbox_data.get("x_max", 1), |
| | y_max=bbox_data.get("y_max", 1), |
| | normalized=True, |
| | ) |
| | else: |
| | bbox = BoundingBox(x_min=0, y_min=0, x_max=1, y_max=1) |
| |
|
| | evidence = EvidenceRef( |
| | chunk_id=chunk["chunk_id"], |
| | doc_id=chunk["document_id"], |
| | page=chunk.get("page", 1), |
| | bbox=bbox, |
| | source_type=chunk.get("chunk_type", "text"), |
| | snippet=chunk["text"][:200], |
| | confidence=chunk.get("confidence", chunk["similarity"]), |
| | ) |
| | evidence_refs.append(evidence) |
| |
|
| | return chunks, evidence_refs |
| |
|
| | def build_context( |
| | self, |
| | chunks: List[Dict[str, Any]], |
| | max_length: int = 8000, |
| | ) -> str: |
| | """Build context string from retrieved chunks.""" |
| | if not chunks: |
| | return "" |
| |
|
| | parts = [] |
| | for i, chunk in enumerate(chunks, 1): |
| | header = f"[{i}]" |
| | if chunk.get("page"): |
| | header += f" Page {chunk['page']}" |
| | if chunk.get("chunk_type"): |
| | header += f" ({chunk['chunk_type']})" |
| | header += f" [sim={chunk['similarity']:.2f}]" |
| |
|
| | parts.append(header) |
| | parts.append(chunk["text"]) |
| | parts.append("") |
| |
|
| | context = "\n".join(parts) |
| |
|
| | if len(context) > max_length: |
| | context = context[:max_length] + "\n...[truncated]" |
| |
|
| | return context |
| |
|
| |
|
| | |
| | _docint_indexer: Optional[DocIntIndexer] = None |
| | _docint_retriever: Optional[DocIntRetriever] = None |
| |
|
| |
|
| | def get_docint_indexer( |
| | config: Optional[IndexerConfig] = None, |
| | vector_store: Optional[VectorStore] = None, |
| | embedding_adapter: Optional[EmbeddingAdapter] = None, |
| | ) -> DocIntIndexer: |
| | """Get or create singleton DocIntIndexer.""" |
| | global _docint_indexer |
| |
|
| | if _docint_indexer is None: |
| | _docint_indexer = DocIntIndexer( |
| | config=config, |
| | vector_store=vector_store, |
| | embedding_adapter=embedding_adapter, |
| | ) |
| |
|
| | return _docint_indexer |
| |
|
| |
|
| | def get_docint_retriever( |
| | vector_store: Optional[VectorStore] = None, |
| | embedding_adapter: Optional[EmbeddingAdapter] = None, |
| | similarity_threshold: float = 0.5, |
| | ) -> DocIntRetriever: |
| | """Get or create singleton DocIntRetriever.""" |
| | global _docint_retriever |
| |
|
| | if _docint_retriever is None: |
| | _docint_retriever = DocIntRetriever( |
| | vector_store=vector_store, |
| | embedding_adapter=embedding_adapter, |
| | similarity_threshold=similarity_threshold, |
| | ) |
| |
|
| | return _docint_retriever |
| |
|
| |
|
| | def reset_docint_components(): |
| | """Reset singleton instances.""" |
| | global _docint_indexer, _docint_retriever |
| | _docint_indexer = None |
| | _docint_retriever = None |
| |
|