| | """ |
| | Document Retriever with Grounding |
| | |
| | Provides: |
| | - Semantic search over document chunks |
| | - Metadata filtering (chunk_type, page range, etc.) |
| | - Evidence grounding with bbox and page references |
| | """ |
| |
|
| | from typing import List, Optional, Dict, Any, Tuple |
| | from pydantic import BaseModel, Field |
| | from loguru import logger |
| |
|
| | from .store import VectorStore, VectorSearchResult, get_vector_store, VectorStoreConfig |
| | from .embeddings import EmbeddingAdapter, get_embedding_adapter, EmbeddingConfig |
| |
|
| | |
| | import sys |
| | if "src.document" in sys.modules or True: |
| | try: |
| | from ..document.schemas.core import EvidenceRef, BoundingBox, DocumentChunk |
| | DOCUMENT_TYPES_AVAILABLE = True |
| | except ImportError: |
| | DOCUMENT_TYPES_AVAILABLE = False |
| | else: |
| | DOCUMENT_TYPES_AVAILABLE = False |
| |
|
| |
|
| | class RetrieverConfig(BaseModel): |
| | """Configuration for document retriever.""" |
| | |
| | default_top_k: int = Field(default=5, ge=1, description="Default number of results") |
| | similarity_threshold: float = Field( |
| | default=0.7, |
| | ge=0.0, |
| | le=1.0, |
| | description="Minimum similarity score" |
| | ) |
| | max_results: int = Field(default=20, ge=1, description="Maximum results to return") |
| |
|
| | |
| | enable_reranking: bool = Field(default=False, description="Enable result reranking") |
| | rerank_top_k: int = Field(default=10, ge=1, description="Number to rerank") |
| |
|
| | |
| | include_evidence: bool = Field(default=True, description="Include evidence references") |
| | evidence_snippet_length: int = Field( |
| | default=200, |
| | ge=50, |
| | description="Maximum snippet length in evidence" |
| | ) |
| |
|
| |
|
| | class RetrievedChunk(BaseModel): |
| | """A retrieved chunk with evidence.""" |
| | chunk_id: str |
| | document_id: str |
| | text: str |
| | similarity: float |
| |
|
| | |
| | page: Optional[int] = None |
| | chunk_type: Optional[str] = None |
| |
|
| | |
| | bbox_x_min: Optional[float] = None |
| | bbox_y_min: Optional[float] = None |
| | bbox_x_max: Optional[float] = None |
| | bbox_y_max: Optional[float] = None |
| |
|
| | |
| | source_path: Optional[str] = None |
| | sequence_index: Optional[int] = None |
| | confidence: Optional[float] = None |
| |
|
| | def to_evidence_ref(self) -> Optional[Any]: |
| | """Convert to EvidenceRef if document types available.""" |
| | if not DOCUMENT_TYPES_AVAILABLE: |
| | return None |
| |
|
| | bbox = None |
| | if all(v is not None for v in [self.bbox_x_min, self.bbox_y_min, |
| | self.bbox_x_max, self.bbox_y_max]): |
| | bbox = BoundingBox( |
| | x_min=self.bbox_x_min, |
| | y_min=self.bbox_y_min, |
| | x_max=self.bbox_x_max, |
| | y_max=self.bbox_y_max, |
| | ) |
| |
|
| | return EvidenceRef( |
| | chunk_id=self.chunk_id, |
| | page=self.page or 0, |
| | bbox=bbox or BoundingBox(x_min=0, y_min=0, x_max=0, y_max=0), |
| | source_type=self.chunk_type or "text", |
| | snippet=self.text[:200] + ("..." if len(self.text) > 200 else ""), |
| | confidence=self.confidence or self.similarity, |
| | ) |
| |
|
| |
|
| | class DocumentRetriever: |
| | """ |
| | Document retriever with grounding support. |
| | |
| | Features: |
| | - Semantic search over indexed chunks |
| | - Metadata filtering |
| | - Evidence grounding |
| | - Optional reranking |
| | """ |
| |
|
| | def __init__( |
| | self, |
| | config: Optional[RetrieverConfig] = None, |
| | vector_store: Optional[VectorStore] = None, |
| | embedding_adapter: Optional[EmbeddingAdapter] = None, |
| | ): |
| | """ |
| | Initialize retriever. |
| | |
| | Args: |
| | config: Retriever configuration |
| | vector_store: Vector store instance (or uses global) |
| | embedding_adapter: Embedding adapter (or uses global) |
| | """ |
| | self.config = config or RetrieverConfig() |
| | self._store = vector_store |
| | self._embedder = embedding_adapter |
| |
|
| | @property |
| | def store(self) -> VectorStore: |
| | """Get vector store (lazy initialization).""" |
| | if self._store is None: |
| | self._store = get_vector_store() |
| | return self._store |
| |
|
| | @property |
| | def embedder(self) -> EmbeddingAdapter: |
| | """Get embedding adapter (lazy initialization).""" |
| | if self._embedder is None: |
| | self._embedder = get_embedding_adapter() |
| | return self._embedder |
| |
|
| | def retrieve( |
| | self, |
| | query: str, |
| | top_k: Optional[int] = None, |
| | filters: Optional[Dict[str, Any]] = None, |
| | ) -> List[RetrievedChunk]: |
| | """ |
| | Retrieve relevant chunks for a query. |
| | |
| | Args: |
| | query: Search query |
| | top_k: Number of results (default from config) |
| | filters: Metadata filters (document_id, chunk_type, page, etc.) |
| | |
| | Returns: |
| | List of retrieved chunks with evidence |
| | """ |
| | top_k = top_k or self.config.default_top_k |
| |
|
| | |
| | query_embedding = self.embedder.embed_text(query) |
| |
|
| | |
| | results = self.store.search( |
| | query_embedding=query_embedding, |
| | top_k=min(top_k, self.config.max_results), |
| | filters=filters, |
| | ) |
| |
|
| | |
| | chunks = [] |
| | for result in results: |
| | |
| | bbox = result.bbox or {} |
| |
|
| | chunk = RetrievedChunk( |
| | chunk_id=result.chunk_id, |
| | document_id=result.document_id, |
| | text=result.text, |
| | similarity=result.similarity, |
| | page=result.page, |
| | chunk_type=result.chunk_type, |
| | bbox_x_min=bbox.get("x_min"), |
| | bbox_y_min=bbox.get("y_min"), |
| | bbox_x_max=bbox.get("x_max"), |
| | bbox_y_max=bbox.get("y_max"), |
| | source_path=result.metadata.get("source_path"), |
| | sequence_index=result.metadata.get("sequence_index"), |
| | confidence=result.metadata.get("confidence"), |
| | ) |
| | chunks.append(chunk) |
| |
|
| | logger.debug(f"Retrieved {len(chunks)} chunks for query: {query[:50]}...") |
| | return chunks |
| |
|
| | def retrieve_with_evidence( |
| | self, |
| | query: str, |
| | top_k: Optional[int] = None, |
| | filters: Optional[Dict[str, Any]] = None, |
| | ) -> Tuple[List[RetrievedChunk], List[Any]]: |
| | """ |
| | Retrieve chunks with evidence references. |
| | |
| | Args: |
| | query: Search query |
| | top_k: Number of results |
| | filters: Metadata filters |
| | |
| | Returns: |
| | Tuple of (chunks, evidence_refs) |
| | """ |
| | chunks = self.retrieve(query, top_k, filters) |
| |
|
| | evidence_refs = [] |
| | if self.config.include_evidence and DOCUMENT_TYPES_AVAILABLE: |
| | for chunk in chunks: |
| | evidence = chunk.to_evidence_ref() |
| | if evidence: |
| | evidence_refs.append(evidence) |
| |
|
| | return chunks, evidence_refs |
| |
|
| | def retrieve_by_document( |
| | self, |
| | document_id: str, |
| | query: Optional[str] = None, |
| | top_k: Optional[int] = None, |
| | ) -> List[RetrievedChunk]: |
| | """ |
| | Retrieve chunks from a specific document. |
| | |
| | Args: |
| | document_id: Document to search in |
| | query: Optional query (returns all if not provided) |
| | top_k: Number of results |
| | |
| | Returns: |
| | List of chunks from document |
| | """ |
| | filters = {"document_id": document_id} |
| |
|
| | if query: |
| | return self.retrieve(query, top_k, filters) |
| |
|
| | |
| | |
| | return self.retrieve("document content", top_k or 100, filters) |
| |
|
| | def retrieve_by_page( |
| | self, |
| | query: str, |
| | page_range: Tuple[int, int], |
| | document_id: Optional[str] = None, |
| | top_k: Optional[int] = None, |
| | ) -> List[RetrievedChunk]: |
| | """ |
| | Retrieve chunks from specific page range. |
| | |
| | Args: |
| | query: Search query |
| | page_range: (start_page, end_page) tuple |
| | document_id: Optional document filter |
| | top_k: Number of results |
| | |
| | Returns: |
| | List of chunks from page range |
| | """ |
| | filters = { |
| | "page": {"min": page_range[0], "max": page_range[1]}, |
| | } |
| |
|
| | if document_id: |
| | filters["document_id"] = document_id |
| |
|
| | return self.retrieve(query, top_k, filters) |
| |
|
| | def retrieve_tables( |
| | self, |
| | query: str, |
| | document_id: Optional[str] = None, |
| | top_k: Optional[int] = None, |
| | ) -> List[RetrievedChunk]: |
| | """ |
| | Retrieve table chunks. |
| | |
| | Args: |
| | query: Search query |
| | document_id: Optional document filter |
| | top_k: Number of results |
| | |
| | Returns: |
| | List of table chunks |
| | """ |
| | filters = {"chunk_type": "table"} |
| |
|
| | if document_id: |
| | filters["document_id"] = document_id |
| |
|
| | return self.retrieve(query, top_k, filters) |
| |
|
| | def retrieve_figures( |
| | self, |
| | query: str, |
| | document_id: Optional[str] = None, |
| | top_k: Optional[int] = None, |
| | ) -> List[RetrievedChunk]: |
| | """ |
| | Retrieve figure/chart chunks. |
| | |
| | Args: |
| | query: Search query |
| | document_id: Optional document filter |
| | top_k: Number of results |
| | |
| | Returns: |
| | List of figure chunks |
| | """ |
| | filters = {"chunk_type": ["figure", "chart"]} |
| |
|
| | if document_id: |
| | filters["document_id"] = document_id |
| |
|
| | return self.retrieve(query, top_k, filters) |
| |
|
| | def build_context( |
| | self, |
| | chunks: List[RetrievedChunk], |
| | max_length: Optional[int] = None, |
| | include_metadata: bool = True, |
| | ) -> str: |
| | """ |
| | Build context string from retrieved chunks. |
| | |
| | Args: |
| | chunks: Retrieved chunks |
| | max_length: Maximum context length |
| | include_metadata: Include chunk metadata |
| | |
| | Returns: |
| | Formatted context string |
| | """ |
| | if not chunks: |
| | return "" |
| |
|
| | context_parts = [] |
| |
|
| | for i, chunk in enumerate(chunks, 1): |
| | if include_metadata: |
| | header = f"[{i}] " |
| | if chunk.page is not None: |
| | header += f"Page {chunk.page + 1}" |
| | if chunk.chunk_type: |
| | header += f" ({chunk.chunk_type})" |
| | header += f" - Similarity: {chunk.similarity:.2f}" |
| | context_parts.append(header) |
| |
|
| | context_parts.append(chunk.text) |
| | context_parts.append("") |
| |
|
| | context = "\n".join(context_parts) |
| |
|
| | if max_length and len(context) > max_length: |
| | context = context[:max_length] + "\n...[truncated]" |
| |
|
| | return context |
| |
|
| |
|
| | |
| | _document_retriever: Optional[DocumentRetriever] = None |
| |
|
| |
|
| | def get_document_retriever( |
| | config: Optional[RetrieverConfig] = None, |
| | vector_store: Optional[VectorStore] = None, |
| | embedding_adapter: Optional[EmbeddingAdapter] = None, |
| | ) -> DocumentRetriever: |
| | """ |
| | Get or create singleton document retriever. |
| | |
| | Args: |
| | config: Retriever configuration |
| | vector_store: Optional vector store instance |
| | embedding_adapter: Optional embedding adapter |
| | |
| | Returns: |
| | DocumentRetriever instance |
| | """ |
| | global _document_retriever |
| |
|
| | if _document_retriever is None: |
| | _document_retriever = DocumentRetriever( |
| | config=config, |
| | vector_store=vector_store, |
| | embedding_adapter=embedding_adapter, |
| | ) |
| |
|
| | return _document_retriever |
| |
|
| |
|
| | def reset_document_retriever(): |
| | """Reset the global retriever instance.""" |
| | global _document_retriever |
| | _document_retriever = None |
| |
|