|
|
""" |
|
|
Vector database operations for document storage and retrieval. |
|
|
""" |
|
|
from typing import List, Dict, Any, Optional |
|
|
|
|
|
from langchain_chroma import Chroma |
|
|
from langchain.text_splitter import RecursiveCharacterTextSplitter |
|
|
from langchain_cohere import CohereEmbeddings |
|
|
from langchain_community.vectorstores import FAISS |
|
|
from langchain_core.documents import Document |
|
|
|
|
|
from app.config.settings import ( |
|
|
CHUNK_SIZE, |
|
|
CHUNK_OVERLAP, |
|
|
EMBEDDING_MODEL, |
|
|
RERANKER_MODEL, |
|
|
COHERERANK_TOPN, |
|
|
VECTOSTORE_TOPK, |
|
|
) |
|
|
import cohere |
|
|
|
|
|
|
|
|
class Retriever: |
|
|
""" |
|
|
Wrapper for vector database operations including document storage, |
|
|
similarity search, and reranking of results. |
|
|
""" |
|
|
|
|
|
def __init__(self, model: str = EMBEDDING_MODEL): |
|
|
""" |
|
|
Initialize the retriever with embedding model and text splitter. |
|
|
|
|
|
Args: |
|
|
model: The embedding model name to use for vectorization |
|
|
""" |
|
|
self.cohere_client = cohere.Client() |
|
|
self.faiss = None |
|
|
self.embedding_model = CohereEmbeddings(model=model) |
|
|
self.text_splitter = RecursiveCharacterTextSplitter( |
|
|
chunk_size=CHUNK_SIZE, |
|
|
chunk_overlap=CHUNK_OVERLAP |
|
|
) |
|
|
|
|
|
def create_from_documents(self, result: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Create vector store from extracted document texts. |
|
|
|
|
|
Args: |
|
|
extraction_results: List of dictionaries containing filename and extracted text |
|
|
|
|
|
Returns: |
|
|
Updated extraction results with chunk size information |
|
|
""" |
|
|
chunks = [] |
|
|
filename = result['filename'] |
|
|
text = result['text'] |
|
|
if text: |
|
|
document = Document( |
|
|
page_content=text, |
|
|
metadata={"filename": filename} |
|
|
) |
|
|
doc_chunks = self.text_splitter.split_documents([document]) |
|
|
result['chunk_size'] = len(doc_chunks) |
|
|
chunks.extend(doc_chunks) |
|
|
|
|
|
self.faiss = FAISS.from_documents( |
|
|
chunks, |
|
|
embedding=self.embedding_model |
|
|
) |
|
|
return result |
|
|
|
|
|
def similarity_search(self, query: str, k: int = 5, filter: Optional[Dict[str, Any]] = None) -> List[Document]: |
|
|
""" |
|
|
Perform similarity search in the vector database. |
|
|
|
|
|
Args: |
|
|
query: The search query text |
|
|
k: Number of results to return |
|
|
filter: Optional metadata filter for the search |
|
|
|
|
|
Returns: |
|
|
List of document chunks most similar to the query |
|
|
|
|
|
Raises: |
|
|
ValueError: If vector store has not been initialized |
|
|
""" |
|
|
if not self.faiss: |
|
|
raise ValueError("Vector store has not been initialized with documents") |
|
|
|
|
|
return self.faiss.similarity_search(query=query, k=k, filter=filter) |
|
|
|
|
|
def reranking(self, query: str, docs: List[Document], top_n: int = 10) -> List[str]: |
|
|
""" |
|
|
Rerank documents using Cohere's reranking model. |
|
|
|
|
|
Args: |
|
|
query: The search query text |
|
|
docs: List of documents to rerank |
|
|
top_n: Number of top results to return |
|
|
|
|
|
Returns: |
|
|
List of reranked document contents |
|
|
""" |
|
|
doc_texts = [doc.page_content for doc in docs] |
|
|
rerank_response = self.cohere_client.rerank( |
|
|
model=RERANKER_MODEL, |
|
|
query=query, |
|
|
documents=doc_texts, |
|
|
top_n=top_n |
|
|
) |
|
|
return [docs[result.index].page_content for result in rerank_response.results] |
|
|
|
|
|
def get_relevant_docs(self, chromdb_query: str, rerank_query: str, |
|
|
filter: Optional[Dict[str, Any]] = None, |
|
|
chunk_size: int = VECTOSTORE_TOPK) -> List[str]: |
|
|
""" |
|
|
Perform a two-stage retrieval: vector search followed by reranking. |
|
|
|
|
|
Args: |
|
|
chromdb_query: Query for the initial vector search |
|
|
rerank_query: Query for the reranking step (can be different) |
|
|
filter: Optional metadata filter for the search |
|
|
chunk_size: Number of chunks in the document(s) |
|
|
|
|
|
Returns: |
|
|
List of the most relevant document contents |
|
|
""" |
|
|
|
|
|
dense_topk = min(chunk_size, VECTOSTORE_TOPK) |
|
|
reranking_topk = min(chunk_size, COHERERANK_TOPN) |
|
|
|
|
|
|
|
|
docs = self.similarity_search(chromdb_query, filter=filter, k=dense_topk) |
|
|
|
|
|
|
|
|
if docs: |
|
|
return self.reranking(rerank_query, docs, top_n=reranking_topk) |
|
|
return [] |