Baktabek's picture
Update: Add support for :memory: Qdrant URL
4570874 verified
"""
Infrastructure - Qdrant Vector Store
"""
from typing import Dict, List, Optional
from uuid import UUID
from qdrant_client import AsyncQdrantClient
from qdrant_client.models import Distance, PointStruct, VectorParams, Filter, FieldCondition, MatchValue
from app.domain.interfaces import IRetriever, RetrievalResult
class QdrantRetriever(IRetriever):
"""Qdrant vector store implementation"""
def __init__(
self,
url: str,
collection_name: str,
vector_size: int = 384,
api_key: Optional[str] = None,
):
self.url = url
self.collection_name = collection_name
self.vector_size = vector_size
# Handle in-memory mode - accept both :memory: and memory:// formats
if url in (":memory:", "memory://localhost", "memory://"):
self.client = AsyncQdrantClient(location=":memory:")
else:
self.client = AsyncQdrantClient(url=url, api_key=api_key)
async def initialize_collection(self) -> None:
"""Initialize Qdrant collection"""
collections = await self.client.get_collections()
collection_names = [c.name for c in collections.collections]
if self.collection_name not in collection_names:
await self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(size=self.vector_size, distance=Distance.COSINE),
)
async def search(
self,
query: str,
top_k: int = 10,
filters: Optional[dict] = None,
min_score: float = 0.0,
) -> List[RetrievalResult]:
"""Search for relevant documents"""
# Note: This requires embedding the query first
# In practice, this would be called with query_vector
raise NotImplementedError("Use search_by_vector instead")
async def search_by_vector(
self,
query_vector: List[float],
top_k: int = 10,
filters: Optional[dict] = None,
min_score: float = 0.0,
) -> List[RetrievalResult]:
"""Search by pre-computed vector"""
# Build filter
qdrant_filter = None
if filters:
conditions = []
for key, value in filters.items():
conditions.append(
FieldCondition(key=key, match=MatchValue(value=value))
)
if conditions:
qdrant_filter = Filter(must=conditions)
# Search
search_result = await self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
limit=top_k,
query_filter=qdrant_filter,
score_threshold=min_score,
)
# Convert to RetrievalResult
results = []
for hit in search_result:
payload = hit.payload or {}
results.append(
RetrievalResult(
content=payload.get("content", ""),
score=hit.score,
document_id=payload.get("document_id", ""),
chunk_index=payload.get("chunk_index", 0),
metadata=payload.get("metadata", {}),
)
)
return results
async def hybrid_search(
self,
query: str,
top_k: int = 10,
alpha: float = 0.5,
filters: Optional[dict] = None,
) -> List[RetrievalResult]:
"""Hybrid search (semantic + keyword)"""
# Simplified - in production combine with keyword search
raise NotImplementedError("Hybrid search requires integration with keyword search")
async def upsert_points(
self,
points: List[Dict],
) -> None:
"""Upsert points to collection"""
qdrant_points = [
PointStruct(
id=point["id"],
vector=point["vector"],
payload=point["payload"],
)
for point in points
]
await self.client.upsert(
collection_name=self.collection_name,
points=qdrant_points,
)