""" Document Retriever with Grounding Provides: - Semantic search over document chunks - Metadata filtering (chunk_type, page range, etc.) - Evidence grounding with bbox and page references """ from typing import List, Optional, Dict, Any, Tuple from pydantic import BaseModel, Field from loguru import logger from .store import VectorStore, VectorSearchResult, get_vector_store, VectorStoreConfig from .embeddings import EmbeddingAdapter, get_embedding_adapter, EmbeddingConfig # Import evidence types from document module import sys if "src.document" in sys.modules or True: try: from ..document.schemas.core import EvidenceRef, BoundingBox, DocumentChunk DOCUMENT_TYPES_AVAILABLE = True except ImportError: DOCUMENT_TYPES_AVAILABLE = False else: DOCUMENT_TYPES_AVAILABLE = False class RetrieverConfig(BaseModel): """Configuration for document retriever.""" # Search parameters default_top_k: int = Field(default=5, ge=1, description="Default number of results") similarity_threshold: float = Field( default=0.7, ge=0.0, le=1.0, description="Minimum similarity score" ) max_results: int = Field(default=20, ge=1, description="Maximum results to return") # Reranking enable_reranking: bool = Field(default=False, description="Enable result reranking") rerank_top_k: int = Field(default=10, ge=1, description="Number to rerank") # Evidence settings include_evidence: bool = Field(default=True, description="Include evidence references") evidence_snippet_length: int = Field( default=200, ge=50, description="Maximum snippet length in evidence" ) class RetrievedChunk(BaseModel): """A retrieved chunk with evidence.""" chunk_id: str document_id: str text: str similarity: float # Location page: Optional[int] = None chunk_type: Optional[str] = None # Bounding box bbox_x_min: Optional[float] = None bbox_y_min: Optional[float] = None bbox_x_max: Optional[float] = None bbox_y_max: Optional[float] = None # Source source_path: Optional[str] = None sequence_index: Optional[int] = None confidence: Optional[float] = None def to_evidence_ref(self) -> Optional[Any]: """Convert to EvidenceRef if document types available.""" if not DOCUMENT_TYPES_AVAILABLE: return None bbox = None if all(v is not None for v in [self.bbox_x_min, self.bbox_y_min, self.bbox_x_max, self.bbox_y_max]): bbox = BoundingBox( x_min=self.bbox_x_min, y_min=self.bbox_y_min, x_max=self.bbox_x_max, y_max=self.bbox_y_max, ) return EvidenceRef( chunk_id=self.chunk_id, page=self.page or 0, bbox=bbox or BoundingBox(x_min=0, y_min=0, x_max=0, y_max=0), source_type=self.chunk_type or "text", snippet=self.text[:200] + ("..." if len(self.text) > 200 else ""), confidence=self.confidence or self.similarity, ) class DocumentRetriever: """ Document retriever with grounding support. Features: - Semantic search over indexed chunks - Metadata filtering - Evidence grounding - Optional reranking """ def __init__( self, config: Optional[RetrieverConfig] = None, vector_store: Optional[VectorStore] = None, embedding_adapter: Optional[EmbeddingAdapter] = None, ): """ Initialize retriever. Args: config: Retriever configuration vector_store: Vector store instance (or uses global) embedding_adapter: Embedding adapter (or uses global) """ self.config = config or RetrieverConfig() self._store = vector_store self._embedder = embedding_adapter @property def store(self) -> VectorStore: """Get vector store (lazy initialization).""" if self._store is None: self._store = get_vector_store() return self._store @property def embedder(self) -> EmbeddingAdapter: """Get embedding adapter (lazy initialization).""" if self._embedder is None: self._embedder = get_embedding_adapter() return self._embedder def retrieve( self, query: str, top_k: Optional[int] = None, filters: Optional[Dict[str, Any]] = None, ) -> List[RetrievedChunk]: """ Retrieve relevant chunks for a query. Args: query: Search query top_k: Number of results (default from config) filters: Metadata filters (document_id, chunk_type, page, etc.) Returns: List of retrieved chunks with evidence """ top_k = top_k or self.config.default_top_k # Embed query query_embedding = self.embedder.embed_text(query) # Search results = self.store.search( query_embedding=query_embedding, top_k=min(top_k, self.config.max_results), filters=filters, ) # Convert to RetrievedChunk chunks = [] for result in results: # Extract bbox from metadata bbox = result.bbox or {} chunk = RetrievedChunk( chunk_id=result.chunk_id, document_id=result.document_id, text=result.text, similarity=result.similarity, page=result.page, chunk_type=result.chunk_type, bbox_x_min=bbox.get("x_min"), bbox_y_min=bbox.get("y_min"), bbox_x_max=bbox.get("x_max"), bbox_y_max=bbox.get("y_max"), source_path=result.metadata.get("source_path"), sequence_index=result.metadata.get("sequence_index"), confidence=result.metadata.get("confidence"), ) chunks.append(chunk) logger.debug(f"Retrieved {len(chunks)} chunks for query: {query[:50]}...") return chunks def retrieve_with_evidence( self, query: str, top_k: Optional[int] = None, filters: Optional[Dict[str, Any]] = None, ) -> Tuple[List[RetrievedChunk], List[Any]]: """ Retrieve chunks with evidence references. Args: query: Search query top_k: Number of results filters: Metadata filters Returns: Tuple of (chunks, evidence_refs) """ chunks = self.retrieve(query, top_k, filters) evidence_refs = [] if self.config.include_evidence and DOCUMENT_TYPES_AVAILABLE: for chunk in chunks: evidence = chunk.to_evidence_ref() if evidence: evidence_refs.append(evidence) return chunks, evidence_refs def retrieve_by_document( self, document_id: str, query: Optional[str] = None, top_k: Optional[int] = None, ) -> List[RetrievedChunk]: """ Retrieve chunks from a specific document. Args: document_id: Document to search in query: Optional query (returns all if not provided) top_k: Number of results Returns: List of chunks from document """ filters = {"document_id": document_id} if query: return self.retrieve(query, top_k, filters) # Without query, return all chunks for document # Use a generic query to trigger search return self.retrieve("document content", top_k or 100, filters) def retrieve_by_page( self, query: str, page_range: Tuple[int, int], document_id: Optional[str] = None, top_k: Optional[int] = None, ) -> List[RetrievedChunk]: """ Retrieve chunks from specific page range. Args: query: Search query page_range: (start_page, end_page) tuple document_id: Optional document filter top_k: Number of results Returns: List of chunks from page range """ filters = { "page": {"min": page_range[0], "max": page_range[1]}, } if document_id: filters["document_id"] = document_id return self.retrieve(query, top_k, filters) def retrieve_tables( self, query: str, document_id: Optional[str] = None, top_k: Optional[int] = None, ) -> List[RetrievedChunk]: """ Retrieve table chunks. Args: query: Search query document_id: Optional document filter top_k: Number of results Returns: List of table chunks """ filters = {"chunk_type": "table"} if document_id: filters["document_id"] = document_id return self.retrieve(query, top_k, filters) def retrieve_figures( self, query: str, document_id: Optional[str] = None, top_k: Optional[int] = None, ) -> List[RetrievedChunk]: """ Retrieve figure/chart chunks. Args: query: Search query document_id: Optional document filter top_k: Number of results Returns: List of figure chunks """ filters = {"chunk_type": ["figure", "chart"]} if document_id: filters["document_id"] = document_id return self.retrieve(query, top_k, filters) def build_context( self, chunks: List[RetrievedChunk], max_length: Optional[int] = None, include_metadata: bool = True, ) -> str: """ Build context string from retrieved chunks. Args: chunks: Retrieved chunks max_length: Maximum context length include_metadata: Include chunk metadata Returns: Formatted context string """ if not chunks: return "" context_parts = [] for i, chunk in enumerate(chunks, 1): if include_metadata: header = f"[{i}] " if chunk.page is not None: header += f"Page {chunk.page + 1}" if chunk.chunk_type: header += f" ({chunk.chunk_type})" header += f" - Similarity: {chunk.similarity:.2f}" context_parts.append(header) context_parts.append(chunk.text) context_parts.append("") # Empty line separator context = "\n".join(context_parts) if max_length and len(context) > max_length: context = context[:max_length] + "\n...[truncated]" return context # Global instance and factory _document_retriever: Optional[DocumentRetriever] = None def get_document_retriever( config: Optional[RetrieverConfig] = None, vector_store: Optional[VectorStore] = None, embedding_adapter: Optional[EmbeddingAdapter] = None, ) -> DocumentRetriever: """ Get or create singleton document retriever. Args: config: Retriever configuration vector_store: Optional vector store instance embedding_adapter: Optional embedding adapter Returns: DocumentRetriever instance """ global _document_retriever if _document_retriever is None: _document_retriever = DocumentRetriever( config=config, vector_store=vector_store, embedding_adapter=embedding_adapter, ) return _document_retriever def reset_document_retriever(): """Reset the global retriever instance.""" global _document_retriever _document_retriever = None