File size: 7,519 Bytes
b62e029
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9496080
b62e029
 
 
 
 
 
9496080
cda6eee
 
b62e029
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9496080
 
 
 
 
 
 
 
 
b62e029
 
 
 
ca57013
b62e029
 
 
ca57013
b62e029
 
 
 
 
 
cda6eee
b62e029
 
 
 
 
cda6eee
 
 
9496080
 
cda6eee
b62e029
 
 
 
 
 
 
 
 
 
cda6eee
b62e029
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cda6eee
b62e029
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# services/search_service.py

import time
from typing import Any, Dict, List

from api.schemas.search import DocumentMetadata, SearchResultItem
from core.exceptions import SearchExecutionError
from core.logger import setup_logger
from models.embedder import TextEmbedder
from models.reranker import TextReranker
from storage.qdrant_client import QdrantStorage
from storage.sqlite_client import SQLiteStorage

logger = setup_logger("search_service")

class HybridSearchService:
    """
    It is a business logic service that derives final search results by integrating  
    Qdrant (Vector DB), SQLite (RDBMS), Embedder, and Reranker.
    """
    def __init__(self, qdrant: QdrantStorage, sqlite: SQLiteStorage, embedder: TextEmbedder, reranker: TextReranker):
        self.qdrant = qdrant
        self.sqlite = sqlite
        self.embedder = embedder
        self.reranker = reranker

    def search(self, query: str, top_k: int = 5, limit: int = 50, use_reranking: bool = True, include_llm_context: bool = True) -> Dict[str, Any]:
        """
        Receives user queries and performs hybrid search and reranking.
        
        :param query: User search query
        :param top_k: Number of documents to return (after reranking)
        :param limit: Number of candidate documents to fetch from Qdrant (after RRF fusion, before reranking)
        :param use_reranking: Whether to use reranking (if False, it will skip the reranking step and return Qdrant results directly, still mapped with SQLite data)
        :param include_llm_context: Whether to include LLM context in the response (formatted text for LLM consumption)
        :return: A dictionary containing the original query, a list of search results, and latency information. Each search result includes chunk_id, text, relevance score, and metadata.
        """
        start_time = time.time()
        logger.info(f"๐Ÿ” Starting search pipeline for query: '{query}'")

        try:
            # 1. Query Embedding (Dense, Sparse Extraction)
            encoded_query = self.embedder.encode_query(query)

            # 2. Qdrant Hybrid Search (Extract limit of candidates using RRF method)
            qdrant_results = self.qdrant.hybrid_search(
                dense_vector=encoded_query.dense_vector,
                sparse_indices=encoded_query.sparse_indices,
                sparse_values=encoded_query.sparse_values,
                limit=limit
            )
            
            if not qdrant_results:
                logger.warning("No results found in Vector DB.")
                return self._build_empty_response(query, start_time)

            chunk_ids = [res.id for res in qdrant_results]

            # 3. Get Dict in SQLite for O(1) Mapping of Source Text and Metadata
            sqlite_data_map = self.sqlite.get_enriched_chunks_dict(chunk_ids)

            # 4. Data Preparation for Reranking (Merging Qdrant and SQLite Data)
            chunks_for_reranking = []
            for rank, res in enumerate(qdrant_results, start=1):
                # Defense Logic: Skip data inconsistencies (Desync) in Vector DB but not in SQLite
                chunk_info = sqlite_data_map.get(res.id)
                if not chunk_info:
                    logger.warning(f"Data Desync: chunk_id {res.id} found in Qdrant but missing in SQLite.")
                    continue
                
                chunks_for_reranking.append({
                    "chunk_id": res.id,
                    "text": chunk_info["text"],
                    "metadata": chunk_info["metadata"],
                    "rrf_score": res.score,
                    "rrf_rank": rank
                })

            if not chunks_for_reranking:
                return self._build_empty_response(query, start_time)

            # 5. Perform Cross-Encoder Reranking
            # Return a list sorted in descending order after recalculating context-based precise scores
            if use_reranking:
                reranked_docs = self.reranker.rerank(
                    query=query, 
                    documents=chunks_for_reranking, 
                    text_key="text"
                )
            else:
                # If reranking is disabled, use the Qdrant results directly
                reranked_docs = chunks_for_reranking

            # 6. Top-K Truncation and Mapping to Pydantic Schema (SearchResultItem) Specification
            final_results = []
            for doc in reranked_docs[:top_k]:
                display_score = doc.get("rerank_score") if use_reranking else doc.get("rrf_score", 0.0)
                final_results.append(SearchResultItem(
                    chunk_id=doc["chunk_id"],
                    text=doc["text"],
                    score=round(display_score, 4), # Neatly rounded to 4 decimal places
                    metadata=DocumentMetadata(**doc["metadata"])
                ).model_dump()) # Convert to dict for FastAPI compatibility

            latency_ms = int((time.time() - start_time) * 1000)
            logger.info(f"โœ… Search completed in {latency_ms}ms. Found {len(final_results)} final chunks.")

            response = {
                "query": query,
                "results": final_results,
                "latency_ms": latency_ms
            }

            if include_llm_context:
                # 7. Optional: Format results into LLM-friendly context (Markdown/XML mixed format)
                response["llm_context"] = self.format_for_llm(final_results)
            
            return response

        except Exception as e:
            # Wrap unexpected errors in custom errors and throw them to the router
            logger.error(f"โŒ Pipeline failed: {str(e)}", exc_info=True)
            raise SearchExecutionError(f"Search pipeline failed: {str(e)}")

    def _build_empty_response(self, query: str, start_time: float) -> Dict[str, Any]:
        """Build a standard response format when no search results are found"""
        return {
            "query": query,
            "results": [],
            "llm_context": "No relevant knowledge (documents) available.",
            "latency_ms": int((time.time() - start_time) * 1000)
        }

    # ---------------------------------------------------------
    # LLM-Friendly Prompt Formatter 
    # (Utility used when injecting into Agents or VLMs)
    # ---------------------------------------------------------
    def format_for_llm(self, search_results: List[Dict[str, Any]]) -> str:
        """
        Converts the retrieved JSON results into a Markdown/XML mixed format best understood by LLM.  
        (This method can be optionally called by API routers or other Agent systems)
        """
        if not search_results:
            return "No relevant knowledge (documents) available."

        context_blocks = []
        for i, res in enumerate(search_results, start=1):
            meta = res["metadata"]
            source = meta.get("title", f"Document_{meta.get('doc_id')}")
            
            # LLM recognizes text enclosed in XML tags (<doc>) as the clearest 'referencing context'.
            block = (
                f"<doc id=\"{i}\" source=\"{source}\" "
                f"url=\"{meta.get('url', 'N/A')}\" "
                f"relevance_score=\"{res['score']}\">\n"
                f"date_modified=\"{meta.get('date_modified', 'N/A')}\">\n"
                f"{res['text']}\n"
                f"</doc>"
            )
            context_blocks.append(block)

        return "\n\n".join(context_blocks)