Spaces:
Sleeping
Sleeping
File size: 2,816 Bytes
b62e029 cda6eee b62e029 cda6eee b62e029 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 | # storage/qdrant_client.py
from typing import List
from qdrant_client import QdrantClient, models
from core.exceptions import DatabaseError
from core.logger import setup_logger
logger = setup_logger("qdrant_client")
class QdrantStorage:
"""
Qdrant client performing hybrid search based on dense and sparse vectors
"""
def __init__(self, url: str, collection_name: str = "knowledge_base"):
self.url = url
self.collection_name = collection_name
try:
# Local file system-based Qdrant connection (v1.10+)
self.client = QdrantClient(url=self.url, timeout=60.0)
logger.info(f"β
Connected to local Qdrant at {self.url} (Collection: {self.collection_name})")
except Exception as e:
logger.critical(f"β Qdrant connection failed: {e}")
raise e
def hybrid_search(
self,
dense_vector: List[float],
sparse_indices: List[int],
sparse_values: List[float],
limit: int = 100
) -> List[models.ScoredPoint]:
"""
Qdrant's Native Fusion API to perform hybrid search with dense and sparse vectors.
Calculates RRF (Reciprocal Rank Fusion) at the database level and returns the results.
"""
try:
# Qdrant v1.10+ Latest Syntax: Fusion processing after multiple searches using Prefetch
results = self.client.query_points(
collection_name=self.collection_name,
prefetch=[
# 1. Sparse search query
models.Prefetch(
query=models.SparseVector(
indices=sparse_indices,
values=sparse_values
),
using="sparse",
limit=limit,
),
# 2. Dense search query
models.Prefetch(
query=dense_vector,
using="dense",
limit=limit,
),
],
# 3. Score merging (Fusion) of the two results above using the RRF method
query=models.FusionQuery(fusion=models.Fusion.RRF),
limit=limit,
with_payload=True
)
return results.points
except Exception as e:
logger.error(f"β Hybrid search failed: {e}", exc_info=True)
raise DatabaseError(f"Qdrant Hybrid search execution failed: {e}")
def close(self):
"""Qdrant client connection cleanup (if applicable)"""
if hasattr(self, 'client') and self.client:
self.client.close()
logger.info("π Qdrant client connection closed.")
|