knowledge-engine / services /search_service.py
m97j's picture
fix(search): resolve KeyError by adding fallback score logic
ca57013
# services/search_service.py
import time
from typing import Any, Dict, List
from api.schemas.search import DocumentMetadata, SearchResultItem
from core.exceptions import SearchExecutionError
from core.logger import setup_logger
from models.embedder import TextEmbedder
from models.reranker import TextReranker
from storage.qdrant_client import QdrantStorage
from storage.sqlite_client import SQLiteStorage
logger = setup_logger("search_service")
class HybridSearchService:
"""
It is a business logic service that derives final search results by integrating
Qdrant (Vector DB), SQLite (RDBMS), Embedder, and Reranker.
"""
def __init__(self, qdrant: QdrantStorage, sqlite: SQLiteStorage, embedder: TextEmbedder, reranker: TextReranker):
self.qdrant = qdrant
self.sqlite = sqlite
self.embedder = embedder
self.reranker = reranker
def search(self, query: str, top_k: int = 5, limit: int = 50, use_reranking: bool = True, include_llm_context: bool = True) -> Dict[str, Any]:
"""
Receives user queries and performs hybrid search and reranking.
:param query: User search query
:param top_k: Number of documents to return (after reranking)
:param limit: Number of candidate documents to fetch from Qdrant (after RRF fusion, before reranking)
:param use_reranking: Whether to use reranking (if False, it will skip the reranking step and return Qdrant results directly, still mapped with SQLite data)
:param include_llm_context: Whether to include LLM context in the response (formatted text for LLM consumption)
:return: A dictionary containing the original query, a list of search results, and latency information. Each search result includes chunk_id, text, relevance score, and metadata.
"""
start_time = time.time()
logger.info(f"๐Ÿ” Starting search pipeline for query: '{query}'")
try:
# 1. Query Embedding (Dense, Sparse Extraction)
encoded_query = self.embedder.encode_query(query)
# 2. Qdrant Hybrid Search (Extract limit of candidates using RRF method)
qdrant_results = self.qdrant.hybrid_search(
dense_vector=encoded_query.dense_vector,
sparse_indices=encoded_query.sparse_indices,
sparse_values=encoded_query.sparse_values,
limit=limit
)
if not qdrant_results:
logger.warning("No results found in Vector DB.")
return self._build_empty_response(query, start_time)
chunk_ids = [res.id for res in qdrant_results]
# 3. Get Dict in SQLite for O(1) Mapping of Source Text and Metadata
sqlite_data_map = self.sqlite.get_enriched_chunks_dict(chunk_ids)
# 4. Data Preparation for Reranking (Merging Qdrant and SQLite Data)
chunks_for_reranking = []
for rank, res in enumerate(qdrant_results, start=1):
# Defense Logic: Skip data inconsistencies (Desync) in Vector DB but not in SQLite
chunk_info = sqlite_data_map.get(res.id)
if not chunk_info:
logger.warning(f"Data Desync: chunk_id {res.id} found in Qdrant but missing in SQLite.")
continue
chunks_for_reranking.append({
"chunk_id": res.id,
"text": chunk_info["text"],
"metadata": chunk_info["metadata"],
"rrf_score": res.score,
"rrf_rank": rank
})
if not chunks_for_reranking:
return self._build_empty_response(query, start_time)
# 5. Perform Cross-Encoder Reranking
# Return a list sorted in descending order after recalculating context-based precise scores
if use_reranking:
reranked_docs = self.reranker.rerank(
query=query,
documents=chunks_for_reranking,
text_key="text"
)
else:
# If reranking is disabled, use the Qdrant results directly
reranked_docs = chunks_for_reranking
# 6. Top-K Truncation and Mapping to Pydantic Schema (SearchResultItem) Specification
final_results = []
for doc in reranked_docs[:top_k]:
display_score = doc.get("rerank_score") if use_reranking else doc.get("rrf_score", 0.0)
final_results.append(SearchResultItem(
chunk_id=doc["chunk_id"],
text=doc["text"],
score=round(display_score, 4), # Neatly rounded to 4 decimal places
metadata=DocumentMetadata(**doc["metadata"])
).model_dump()) # Convert to dict for FastAPI compatibility
latency_ms = int((time.time() - start_time) * 1000)
logger.info(f"โœ… Search completed in {latency_ms}ms. Found {len(final_results)} final chunks.")
response = {
"query": query,
"results": final_results,
"latency_ms": latency_ms
}
if include_llm_context:
# 7. Optional: Format results into LLM-friendly context (Markdown/XML mixed format)
response["llm_context"] = self.format_for_llm(final_results)
return response
except Exception as e:
# Wrap unexpected errors in custom errors and throw them to the router
logger.error(f"โŒ Pipeline failed: {str(e)}", exc_info=True)
raise SearchExecutionError(f"Search pipeline failed: {str(e)}")
def _build_empty_response(self, query: str, start_time: float) -> Dict[str, Any]:
"""Build a standard response format when no search results are found"""
return {
"query": query,
"results": [],
"llm_context": "No relevant knowledge (documents) available.",
"latency_ms": int((time.time() - start_time) * 1000)
}
# ---------------------------------------------------------
# LLM-Friendly Prompt Formatter
# (Utility used when injecting into Agents or VLMs)
# ---------------------------------------------------------
def format_for_llm(self, search_results: List[Dict[str, Any]]) -> str:
"""
Converts the retrieved JSON results into a Markdown/XML mixed format best understood by LLM.
(This method can be optionally called by API routers or other Agent systems)
"""
if not search_results:
return "No relevant knowledge (documents) available."
context_blocks = []
for i, res in enumerate(search_results, start=1):
meta = res["metadata"]
source = meta.get("title", f"Document_{meta.get('doc_id')}")
# LLM recognizes text enclosed in XML tags (<doc>) as the clearest 'referencing context'.
block = (
f"<doc id=\"{i}\" source=\"{source}\" "
f"url=\"{meta.get('url', 'N/A')}\" "
f"relevance_score=\"{res['score']}\">\n"
f"date_modified=\"{meta.get('date_modified', 'N/A')}\">\n"
f"{res['text']}\n"
f"</doc>"
)
context_blocks.append(block)
return "\n\n".join(context_blocks)