File size: 3,150 Bytes
64d7fdf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
from langchain_qdrant import QdrantVectorStore
from langchain_community.retrievers import BM25Retriever
from langchain_core.retrievers import BaseRetriever
from langchain_core.documents import Document
from qdrant_client import QdrantClient
from ingestion.embedder import embedder
from app.config import config, settings
from app.utils.logger import logger
from typing import List


class HybridRetriever(BaseRetriever):
    vector_store: QdrantVectorStore = None
    bm25_retriever: BM25Retriever = None
    documents: List[Document] = []
    k: int = 10
    _initialized: bool = False
    
    def __init__(self):
        super().__init__()
        self.k = config["rag"]["retrieval"]["top_k"]
    
    def _initialize_vector_store(self):
        if not self._initialized:
            qdrant_config = config["database"]["qdrant"]
            
            client = QdrantClient(
                url=qdrant_config["url"],
                api_key=settings.qdrant_api_key or None,
                timeout=60
            )
            
            try:
                self.vector_store = QdrantVectorStore(
                    client=client,
                    collection_name=qdrant_config["collection_name"],
                    embedding=embedder.get_embeddings()
                )
                self._initialized = True
                logger.info(f"Vector store initialized: {qdrant_config['collection_name']}")
            except Exception as e:
                logger.warning(f"Vector store init skipped: {str(e)}")
    
    def add_documents(self, documents: List[Document]):
        self._initialize_vector_store()
        
        ids = self.vector_store.add_documents(documents)
        self.documents.extend(documents)
        
        self.bm25_retriever = BM25Retriever.from_documents(
            self.documents,
            k=self.k
        )
        
        logger.info(f"Added {len(documents)} documents (total: {len(self.documents)})")
        return ids
    
    def _get_relevant_documents(self, query: str) -> List[Document]:
        self._initialize_vector_store()
        
        if not self._initialized:
            logger.warning("Vector store not available")
            return []
        
        vector_docs = self.vector_store.similarity_search(query, k=self.k)
        
        if self.bm25_retriever is None:
            logger.warning("BM25 not initialized, using vector-only retrieval")
            return vector_docs
        
        bm25_docs = self.bm25_retriever.invoke(query)
        
        combined = {}
        for doc in vector_docs:
            doc_id = doc.page_content[:100]
            combined[doc_id] = doc
        
        for doc in bm25_docs:
            doc_id = doc.page_content[:100]
            if doc_id not in combined:
                combined[doc_id] = doc
        
        results = list(combined.values())[:self.k]
        logger.info(f"Hybrid search returned {len(results)} documents")
        return results
    
    async def _aget_relevant_documents(self, query: str) -> List[Document]:
        return self._get_relevant_documents(query)


hybrid_retriever = HybridRetriever()