| """ |
| Document Retriever with Grounding |
| |
| Provides: |
| - Semantic search over document chunks |
| - Metadata filtering (chunk_type, page range, etc.) |
| - Evidence grounding with bbox and page references |
| """ |
|
|
| from typing import List, Optional, Dict, Any, Tuple |
| from pydantic import BaseModel, Field |
| from loguru import logger |
|
|
| from .store import VectorStore, VectorSearchResult, get_vector_store, VectorStoreConfig |
| from .embeddings import EmbeddingAdapter, get_embedding_adapter, EmbeddingConfig |
|
|
| |
| import sys |
| if "src.document" in sys.modules or True: |
| try: |
| from ..document.schemas.core import EvidenceRef, BoundingBox, DocumentChunk |
| DOCUMENT_TYPES_AVAILABLE = True |
| except ImportError: |
| DOCUMENT_TYPES_AVAILABLE = False |
| else: |
| DOCUMENT_TYPES_AVAILABLE = False |
|
|
|
|
| class RetrieverConfig(BaseModel): |
| """Configuration for document retriever.""" |
| |
| default_top_k: int = Field(default=5, ge=1, description="Default number of results") |
| similarity_threshold: float = Field( |
| default=0.7, |
| ge=0.0, |
| le=1.0, |
| description="Minimum similarity score" |
| ) |
| max_results: int = Field(default=20, ge=1, description="Maximum results to return") |
|
|
| |
| enable_reranking: bool = Field(default=False, description="Enable result reranking") |
| rerank_top_k: int = Field(default=10, ge=1, description="Number to rerank") |
|
|
| |
| include_evidence: bool = Field(default=True, description="Include evidence references") |
| evidence_snippet_length: int = Field( |
| default=200, |
| ge=50, |
| description="Maximum snippet length in evidence" |
| ) |
|
|
|
|
| class RetrievedChunk(BaseModel): |
| """A retrieved chunk with evidence.""" |
| chunk_id: str |
| document_id: str |
| text: str |
| similarity: float |
|
|
| |
| page: Optional[int] = None |
| chunk_type: Optional[str] = None |
|
|
| |
| bbox_x_min: Optional[float] = None |
| bbox_y_min: Optional[float] = None |
| bbox_x_max: Optional[float] = None |
| bbox_y_max: Optional[float] = None |
|
|
| |
| source_path: Optional[str] = None |
| sequence_index: Optional[int] = None |
| confidence: Optional[float] = None |
|
|
| def to_evidence_ref(self) -> Optional[Any]: |
| """Convert to EvidenceRef if document types available.""" |
| if not DOCUMENT_TYPES_AVAILABLE: |
| return None |
|
|
| bbox = None |
| if all(v is not None for v in [self.bbox_x_min, self.bbox_y_min, |
| self.bbox_x_max, self.bbox_y_max]): |
| bbox = BoundingBox( |
| x_min=self.bbox_x_min, |
| y_min=self.bbox_y_min, |
| x_max=self.bbox_x_max, |
| y_max=self.bbox_y_max, |
| ) |
|
|
| return EvidenceRef( |
| chunk_id=self.chunk_id, |
| page=self.page or 0, |
| bbox=bbox or BoundingBox(x_min=0, y_min=0, x_max=0, y_max=0), |
| source_type=self.chunk_type or "text", |
| snippet=self.text[:200] + ("..." if len(self.text) > 200 else ""), |
| confidence=self.confidence or self.similarity, |
| ) |
|
|
|
|
| class DocumentRetriever: |
| """ |
| Document retriever with grounding support. |
| |
| Features: |
| - Semantic search over indexed chunks |
| - Metadata filtering |
| - Evidence grounding |
| - Optional reranking |
| """ |
|
|
| def __init__( |
| self, |
| config: Optional[RetrieverConfig] = None, |
| vector_store: Optional[VectorStore] = None, |
| embedding_adapter: Optional[EmbeddingAdapter] = None, |
| ): |
| """ |
| Initialize retriever. |
| |
| Args: |
| config: Retriever configuration |
| vector_store: Vector store instance (or uses global) |
| embedding_adapter: Embedding adapter (or uses global) |
| """ |
| self.config = config or RetrieverConfig() |
| self._store = vector_store |
| self._embedder = embedding_adapter |
|
|
| @property |
| def store(self) -> VectorStore: |
| """Get vector store (lazy initialization).""" |
| if self._store is None: |
| self._store = get_vector_store() |
| return self._store |
|
|
| @property |
| def embedder(self) -> EmbeddingAdapter: |
| """Get embedding adapter (lazy initialization).""" |
| if self._embedder is None: |
| self._embedder = get_embedding_adapter() |
| return self._embedder |
|
|
| def retrieve( |
| self, |
| query: str, |
| top_k: Optional[int] = None, |
| filters: Optional[Dict[str, Any]] = None, |
| ) -> List[RetrievedChunk]: |
| """ |
| Retrieve relevant chunks for a query. |
| |
| Args: |
| query: Search query |
| top_k: Number of results (default from config) |
| filters: Metadata filters (document_id, chunk_type, page, etc.) |
| |
| Returns: |
| List of retrieved chunks with evidence |
| """ |
| top_k = top_k or self.config.default_top_k |
|
|
| |
| query_embedding = self.embedder.embed_text(query) |
|
|
| |
| results = self.store.search( |
| query_embedding=query_embedding, |
| top_k=min(top_k, self.config.max_results), |
| filters=filters, |
| ) |
|
|
| |
| chunks = [] |
| for result in results: |
| |
| bbox = result.bbox or {} |
|
|
| chunk = RetrievedChunk( |
| chunk_id=result.chunk_id, |
| document_id=result.document_id, |
| text=result.text, |
| similarity=result.similarity, |
| page=result.page, |
| chunk_type=result.chunk_type, |
| bbox_x_min=bbox.get("x_min"), |
| bbox_y_min=bbox.get("y_min"), |
| bbox_x_max=bbox.get("x_max"), |
| bbox_y_max=bbox.get("y_max"), |
| source_path=result.metadata.get("source_path"), |
| sequence_index=result.metadata.get("sequence_index"), |
| confidence=result.metadata.get("confidence"), |
| ) |
| chunks.append(chunk) |
|
|
| logger.debug(f"Retrieved {len(chunks)} chunks for query: {query[:50]}...") |
| return chunks |
|
|
| def retrieve_with_evidence( |
| self, |
| query: str, |
| top_k: Optional[int] = None, |
| filters: Optional[Dict[str, Any]] = None, |
| ) -> Tuple[List[RetrievedChunk], List[Any]]: |
| """ |
| Retrieve chunks with evidence references. |
| |
| Args: |
| query: Search query |
| top_k: Number of results |
| filters: Metadata filters |
| |
| Returns: |
| Tuple of (chunks, evidence_refs) |
| """ |
| chunks = self.retrieve(query, top_k, filters) |
|
|
| evidence_refs = [] |
| if self.config.include_evidence and DOCUMENT_TYPES_AVAILABLE: |
| for chunk in chunks: |
| evidence = chunk.to_evidence_ref() |
| if evidence: |
| evidence_refs.append(evidence) |
|
|
| return chunks, evidence_refs |
|
|
| def retrieve_by_document( |
| self, |
| document_id: str, |
| query: Optional[str] = None, |
| top_k: Optional[int] = None, |
| ) -> List[RetrievedChunk]: |
| """ |
| Retrieve chunks from a specific document. |
| |
| Args: |
| document_id: Document to search in |
| query: Optional query (returns all if not provided) |
| top_k: Number of results |
| |
| Returns: |
| List of chunks from document |
| """ |
| filters = {"document_id": document_id} |
|
|
| if query: |
| return self.retrieve(query, top_k, filters) |
|
|
| |
| |
| return self.retrieve("document content", top_k or 100, filters) |
|
|
| def retrieve_by_page( |
| self, |
| query: str, |
| page_range: Tuple[int, int], |
| document_id: Optional[str] = None, |
| top_k: Optional[int] = None, |
| ) -> List[RetrievedChunk]: |
| """ |
| Retrieve chunks from specific page range. |
| |
| Args: |
| query: Search query |
| page_range: (start_page, end_page) tuple |
| document_id: Optional document filter |
| top_k: Number of results |
| |
| Returns: |
| List of chunks from page range |
| """ |
| filters = { |
| "page": {"min": page_range[0], "max": page_range[1]}, |
| } |
|
|
| if document_id: |
| filters["document_id"] = document_id |
|
|
| return self.retrieve(query, top_k, filters) |
|
|
| def retrieve_tables( |
| self, |
| query: str, |
| document_id: Optional[str] = None, |
| top_k: Optional[int] = None, |
| ) -> List[RetrievedChunk]: |
| """ |
| Retrieve table chunks. |
| |
| Args: |
| query: Search query |
| document_id: Optional document filter |
| top_k: Number of results |
| |
| Returns: |
| List of table chunks |
| """ |
| filters = {"chunk_type": "table"} |
|
|
| if document_id: |
| filters["document_id"] = document_id |
|
|
| return self.retrieve(query, top_k, filters) |
|
|
| def retrieve_figures( |
| self, |
| query: str, |
| document_id: Optional[str] = None, |
| top_k: Optional[int] = None, |
| ) -> List[RetrievedChunk]: |
| """ |
| Retrieve figure/chart chunks. |
| |
| Args: |
| query: Search query |
| document_id: Optional document filter |
| top_k: Number of results |
| |
| Returns: |
| List of figure chunks |
| """ |
| filters = {"chunk_type": ["figure", "chart"]} |
|
|
| if document_id: |
| filters["document_id"] = document_id |
|
|
| return self.retrieve(query, top_k, filters) |
|
|
| def build_context( |
| self, |
| chunks: List[RetrievedChunk], |
| max_length: Optional[int] = None, |
| include_metadata: bool = True, |
| ) -> str: |
| """ |
| Build context string from retrieved chunks. |
| |
| Args: |
| chunks: Retrieved chunks |
| max_length: Maximum context length |
| include_metadata: Include chunk metadata |
| |
| Returns: |
| Formatted context string |
| """ |
| if not chunks: |
| return "" |
|
|
| context_parts = [] |
|
|
| for i, chunk in enumerate(chunks, 1): |
| if include_metadata: |
| header = f"[{i}] " |
| if chunk.page is not None: |
| header += f"Page {chunk.page + 1}" |
| if chunk.chunk_type: |
| header += f" ({chunk.chunk_type})" |
| header += f" - Similarity: {chunk.similarity:.2f}" |
| context_parts.append(header) |
|
|
| context_parts.append(chunk.text) |
| context_parts.append("") |
|
|
| context = "\n".join(context_parts) |
|
|
| if max_length and len(context) > max_length: |
| context = context[:max_length] + "\n...[truncated]" |
|
|
| return context |
|
|
|
|
| |
| _document_retriever: Optional[DocumentRetriever] = None |
|
|
|
|
| def get_document_retriever( |
| config: Optional[RetrieverConfig] = None, |
| vector_store: Optional[VectorStore] = None, |
| embedding_adapter: Optional[EmbeddingAdapter] = None, |
| ) -> DocumentRetriever: |
| """ |
| Get or create singleton document retriever. |
| |
| Args: |
| config: Retriever configuration |
| vector_store: Optional vector store instance |
| embedding_adapter: Optional embedding adapter |
| |
| Returns: |
| DocumentRetriever instance |
| """ |
| global _document_retriever |
|
|
| if _document_retriever is None: |
| _document_retriever = DocumentRetriever( |
| config=config, |
| vector_store=vector_store, |
| embedding_adapter=embedding_adapter, |
| ) |
|
|
| return _document_retriever |
|
|
|
|
| def reset_document_retriever(): |
| """Reset the global retriever instance.""" |
| global _document_retriever |
| _document_retriever = None |
|
|