Spaces:
Sleeping
Sleeping
| # services/search_service.py | |
| import time | |
| from typing import Any, Dict, List | |
| from api.schemas.search import DocumentMetadata, SearchResultItem | |
| from core.exceptions import SearchExecutionError | |
| from core.logger import setup_logger | |
| from models.embedder import TextEmbedder | |
| from models.reranker import TextReranker | |
| from storage.qdrant_client import QdrantStorage | |
| from storage.sqlite_client import SQLiteStorage | |
| logger = setup_logger("search_service") | |
| class HybridSearchService: | |
| """ | |
| It is a business logic service that derives final search results by integrating | |
| Qdrant (Vector DB), SQLite (RDBMS), Embedder, and Reranker. | |
| """ | |
| def __init__(self, qdrant: QdrantStorage, sqlite: SQLiteStorage, embedder: TextEmbedder, reranker: TextReranker): | |
| self.qdrant = qdrant | |
| self.sqlite = sqlite | |
| self.embedder = embedder | |
| self.reranker = reranker | |
| def search(self, query: str, top_k: int = 5, limit: int = 50, use_reranking: bool = True, include_llm_context: bool = True) -> Dict[str, Any]: | |
| """ | |
| Receives user queries and performs hybrid search and reranking. | |
| :param query: User search query | |
| :param top_k: Number of documents to return (after reranking) | |
| :param limit: Number of candidate documents to fetch from Qdrant (after RRF fusion, before reranking) | |
| :param use_reranking: Whether to use reranking (if False, it will skip the reranking step and return Qdrant results directly, still mapped with SQLite data) | |
| :param include_llm_context: Whether to include LLM context in the response (formatted text for LLM consumption) | |
| :return: A dictionary containing the original query, a list of search results, and latency information. Each search result includes chunk_id, text, relevance score, and metadata. | |
| """ | |
| start_time = time.time() | |
| logger.info(f"๐ Starting search pipeline for query: '{query}'") | |
| try: | |
| # 1. Query Embedding (Dense, Sparse Extraction) | |
| encoded_query = self.embedder.encode_query(query) | |
| # 2. Qdrant Hybrid Search (Extract limit of candidates using RRF method) | |
| qdrant_results = self.qdrant.hybrid_search( | |
| dense_vector=encoded_query.dense_vector, | |
| sparse_indices=encoded_query.sparse_indices, | |
| sparse_values=encoded_query.sparse_values, | |
| limit=limit | |
| ) | |
| if not qdrant_results: | |
| logger.warning("No results found in Vector DB.") | |
| return self._build_empty_response(query, start_time) | |
| chunk_ids = [res.id for res in qdrant_results] | |
| # 3. Get Dict in SQLite for O(1) Mapping of Source Text and Metadata | |
| sqlite_data_map = self.sqlite.get_enriched_chunks_dict(chunk_ids) | |
| # 4. Data Preparation for Reranking (Merging Qdrant and SQLite Data) | |
| chunks_for_reranking = [] | |
| for rank, res in enumerate(qdrant_results, start=1): | |
| # Defense Logic: Skip data inconsistencies (Desync) in Vector DB but not in SQLite | |
| chunk_info = sqlite_data_map.get(res.id) | |
| if not chunk_info: | |
| logger.warning(f"Data Desync: chunk_id {res.id} found in Qdrant but missing in SQLite.") | |
| continue | |
| chunks_for_reranking.append({ | |
| "chunk_id": res.id, | |
| "text": chunk_info["text"], | |
| "metadata": chunk_info["metadata"], | |
| "rrf_score": res.score, | |
| "rrf_rank": rank | |
| }) | |
| if not chunks_for_reranking: | |
| return self._build_empty_response(query, start_time) | |
| # 5. Perform Cross-Encoder Reranking | |
| # Return a list sorted in descending order after recalculating context-based precise scores | |
| if use_reranking: | |
| reranked_docs = self.reranker.rerank( | |
| query=query, | |
| documents=chunks_for_reranking, | |
| text_key="text" | |
| ) | |
| else: | |
| # If reranking is disabled, use the Qdrant results directly | |
| reranked_docs = chunks_for_reranking | |
| # 6. Top-K Truncation and Mapping to Pydantic Schema (SearchResultItem) Specification | |
| final_results = [] | |
| for doc in reranked_docs[:top_k]: | |
| display_score = doc.get("rerank_score") if use_reranking else doc.get("rrf_score", 0.0) | |
| final_results.append(SearchResultItem( | |
| chunk_id=doc["chunk_id"], | |
| text=doc["text"], | |
| score=round(display_score, 4), # Neatly rounded to 4 decimal places | |
| metadata=DocumentMetadata(**doc["metadata"]) | |
| ).model_dump()) # Convert to dict for FastAPI compatibility | |
| latency_ms = int((time.time() - start_time) * 1000) | |
| logger.info(f"โ Search completed in {latency_ms}ms. Found {len(final_results)} final chunks.") | |
| response = { | |
| "query": query, | |
| "results": final_results, | |
| "latency_ms": latency_ms | |
| } | |
| if include_llm_context: | |
| # 7. Optional: Format results into LLM-friendly context (Markdown/XML mixed format) | |
| response["llm_context"] = self.format_for_llm(final_results) | |
| return response | |
| except Exception as e: | |
| # Wrap unexpected errors in custom errors and throw them to the router | |
| logger.error(f"โ Pipeline failed: {str(e)}", exc_info=True) | |
| raise SearchExecutionError(f"Search pipeline failed: {str(e)}") | |
| def _build_empty_response(self, query: str, start_time: float) -> Dict[str, Any]: | |
| """Build a standard response format when no search results are found""" | |
| return { | |
| "query": query, | |
| "results": [], | |
| "llm_context": "No relevant knowledge (documents) available.", | |
| "latency_ms": int((time.time() - start_time) * 1000) | |
| } | |
| # --------------------------------------------------------- | |
| # LLM-Friendly Prompt Formatter | |
| # (Utility used when injecting into Agents or VLMs) | |
| # --------------------------------------------------------- | |
| def format_for_llm(self, search_results: List[Dict[str, Any]]) -> str: | |
| """ | |
| Converts the retrieved JSON results into a Markdown/XML mixed format best understood by LLM. | |
| (This method can be optionally called by API routers or other Agent systems) | |
| """ | |
| if not search_results: | |
| return "No relevant knowledge (documents) available." | |
| context_blocks = [] | |
| for i, res in enumerate(search_results, start=1): | |
| meta = res["metadata"] | |
| source = meta.get("title", f"Document_{meta.get('doc_id')}") | |
| # LLM recognizes text enclosed in XML tags (<doc>) as the clearest 'referencing context'. | |
| block = ( | |
| f"<doc id=\"{i}\" source=\"{source}\" " | |
| f"url=\"{meta.get('url', 'N/A')}\" " | |
| f"relevance_score=\"{res['score']}\">\n" | |
| f"date_modified=\"{meta.get('date_modified', 'N/A')}\">\n" | |
| f"{res['text']}\n" | |
| f"</doc>" | |
| ) | |
| context_blocks.append(block) | |
| return "\n\n".join(context_blocks) |