Spaces:
Sleeping
Sleeping
File size: 7,519 Bytes
b62e029 9496080 b62e029 9496080 cda6eee b62e029 9496080 b62e029 ca57013 b62e029 ca57013 b62e029 cda6eee b62e029 cda6eee 9496080 cda6eee b62e029 cda6eee b62e029 cda6eee b62e029 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 | # services/search_service.py
import time
from typing import Any, Dict, List
from api.schemas.search import DocumentMetadata, SearchResultItem
from core.exceptions import SearchExecutionError
from core.logger import setup_logger
from models.embedder import TextEmbedder
from models.reranker import TextReranker
from storage.qdrant_client import QdrantStorage
from storage.sqlite_client import SQLiteStorage
logger = setup_logger("search_service")
class HybridSearchService:
"""
It is a business logic service that derives final search results by integrating
Qdrant (Vector DB), SQLite (RDBMS), Embedder, and Reranker.
"""
def __init__(self, qdrant: QdrantStorage, sqlite: SQLiteStorage, embedder: TextEmbedder, reranker: TextReranker):
self.qdrant = qdrant
self.sqlite = sqlite
self.embedder = embedder
self.reranker = reranker
def search(self, query: str, top_k: int = 5, limit: int = 50, use_reranking: bool = True, include_llm_context: bool = True) -> Dict[str, Any]:
"""
Receives user queries and performs hybrid search and reranking.
:param query: User search query
:param top_k: Number of documents to return (after reranking)
:param limit: Number of candidate documents to fetch from Qdrant (after RRF fusion, before reranking)
:param use_reranking: Whether to use reranking (if False, it will skip the reranking step and return Qdrant results directly, still mapped with SQLite data)
:param include_llm_context: Whether to include LLM context in the response (formatted text for LLM consumption)
:return: A dictionary containing the original query, a list of search results, and latency information. Each search result includes chunk_id, text, relevance score, and metadata.
"""
start_time = time.time()
logger.info(f"๐ Starting search pipeline for query: '{query}'")
try:
# 1. Query Embedding (Dense, Sparse Extraction)
encoded_query = self.embedder.encode_query(query)
# 2. Qdrant Hybrid Search (Extract limit of candidates using RRF method)
qdrant_results = self.qdrant.hybrid_search(
dense_vector=encoded_query.dense_vector,
sparse_indices=encoded_query.sparse_indices,
sparse_values=encoded_query.sparse_values,
limit=limit
)
if not qdrant_results:
logger.warning("No results found in Vector DB.")
return self._build_empty_response(query, start_time)
chunk_ids = [res.id for res in qdrant_results]
# 3. Get Dict in SQLite for O(1) Mapping of Source Text and Metadata
sqlite_data_map = self.sqlite.get_enriched_chunks_dict(chunk_ids)
# 4. Data Preparation for Reranking (Merging Qdrant and SQLite Data)
chunks_for_reranking = []
for rank, res in enumerate(qdrant_results, start=1):
# Defense Logic: Skip data inconsistencies (Desync) in Vector DB but not in SQLite
chunk_info = sqlite_data_map.get(res.id)
if not chunk_info:
logger.warning(f"Data Desync: chunk_id {res.id} found in Qdrant but missing in SQLite.")
continue
chunks_for_reranking.append({
"chunk_id": res.id,
"text": chunk_info["text"],
"metadata": chunk_info["metadata"],
"rrf_score": res.score,
"rrf_rank": rank
})
if not chunks_for_reranking:
return self._build_empty_response(query, start_time)
# 5. Perform Cross-Encoder Reranking
# Return a list sorted in descending order after recalculating context-based precise scores
if use_reranking:
reranked_docs = self.reranker.rerank(
query=query,
documents=chunks_for_reranking,
text_key="text"
)
else:
# If reranking is disabled, use the Qdrant results directly
reranked_docs = chunks_for_reranking
# 6. Top-K Truncation and Mapping to Pydantic Schema (SearchResultItem) Specification
final_results = []
for doc in reranked_docs[:top_k]:
display_score = doc.get("rerank_score") if use_reranking else doc.get("rrf_score", 0.0)
final_results.append(SearchResultItem(
chunk_id=doc["chunk_id"],
text=doc["text"],
score=round(display_score, 4), # Neatly rounded to 4 decimal places
metadata=DocumentMetadata(**doc["metadata"])
).model_dump()) # Convert to dict for FastAPI compatibility
latency_ms = int((time.time() - start_time) * 1000)
logger.info(f"โ
Search completed in {latency_ms}ms. Found {len(final_results)} final chunks.")
response = {
"query": query,
"results": final_results,
"latency_ms": latency_ms
}
if include_llm_context:
# 7. Optional: Format results into LLM-friendly context (Markdown/XML mixed format)
response["llm_context"] = self.format_for_llm(final_results)
return response
except Exception as e:
# Wrap unexpected errors in custom errors and throw them to the router
logger.error(f"โ Pipeline failed: {str(e)}", exc_info=True)
raise SearchExecutionError(f"Search pipeline failed: {str(e)}")
def _build_empty_response(self, query: str, start_time: float) -> Dict[str, Any]:
"""Build a standard response format when no search results are found"""
return {
"query": query,
"results": [],
"llm_context": "No relevant knowledge (documents) available.",
"latency_ms": int((time.time() - start_time) * 1000)
}
# ---------------------------------------------------------
# LLM-Friendly Prompt Formatter
# (Utility used when injecting into Agents or VLMs)
# ---------------------------------------------------------
def format_for_llm(self, search_results: List[Dict[str, Any]]) -> str:
"""
Converts the retrieved JSON results into a Markdown/XML mixed format best understood by LLM.
(This method can be optionally called by API routers or other Agent systems)
"""
if not search_results:
return "No relevant knowledge (documents) available."
context_blocks = []
for i, res in enumerate(search_results, start=1):
meta = res["metadata"]
source = meta.get("title", f"Document_{meta.get('doc_id')}")
# LLM recognizes text enclosed in XML tags (<doc>) as the clearest 'referencing context'.
block = (
f"<doc id=\"{i}\" source=\"{source}\" "
f"url=\"{meta.get('url', 'N/A')}\" "
f"relevance_score=\"{res['score']}\">\n"
f"date_modified=\"{meta.get('date_modified', 'N/A')}\">\n"
f"{res['text']}\n"
f"</doc>"
)
context_blocks.append(block)
return "\n\n".join(context_blocks) |