""" Infrastructure - Qdrant Vector Store """ from typing import Dict, List, Optional from uuid import UUID from qdrant_client import AsyncQdrantClient from qdrant_client.models import Distance, PointStruct, VectorParams, Filter, FieldCondition, MatchValue from app.domain.interfaces import IRetriever, RetrievalResult class QdrantRetriever(IRetriever): """Qdrant vector store implementation""" def __init__( self, url: str, collection_name: str, vector_size: int = 384, api_key: Optional[str] = None, ): self.url = url self.collection_name = collection_name self.vector_size = vector_size # Handle in-memory mode if url.startswith("memory://"): self.client = AsyncQdrantClient(location=":memory:") else: self.client = AsyncQdrantClient(url=url, api_key=api_key) async def initialize_collection(self) -> None: """Initialize Qdrant collection""" collections = await self.client.get_collections() collection_names = [c.name for c in collections.collections] if self.collection_name not in collection_names: await self.client.create_collection( collection_name=self.collection_name, vectors_config=VectorParams(size=self.vector_size, distance=Distance.COSINE), ) async def search( self, query: str, top_k: int = 10, filters: Optional[dict] = None, min_score: float = 0.0, ) -> List[RetrievalResult]: """Search for relevant documents""" # Note: This requires embedding the query first # In practice, this would be called with query_vector raise NotImplementedError("Use search_by_vector instead") async def search_by_vector( self, query_vector: List[float], top_k: int = 10, filters: Optional[dict] = None, min_score: float = 0.0, ) -> List[RetrievalResult]: """Search by pre-computed vector""" # Build filter qdrant_filter = None if filters: conditions = [] for key, value in filters.items(): conditions.append( FieldCondition(key=key, match=MatchValue(value=value)) ) if conditions: qdrant_filter = Filter(must=conditions) # Search search_result = await self.client.search( collection_name=self.collection_name, query_vector=query_vector, limit=top_k, query_filter=qdrant_filter, score_threshold=min_score, ) # Convert to RetrievalResult results = [] for hit in search_result: payload = hit.payload or {} results.append( RetrievalResult( content=payload.get("content", ""), score=hit.score, document_id=payload.get("document_id", ""), chunk_index=payload.get("chunk_index", 0), metadata=payload.get("metadata", {}), ) ) return results async def hybrid_search( self, query: str, top_k: int = 10, alpha: float = 0.5, filters: Optional[dict] = None, ) -> List[RetrievalResult]: """Hybrid search (semantic + keyword)""" # Simplified - in production combine with keyword search raise NotImplementedError("Hybrid search requires integration with keyword search") async def upsert_points( self, points: List[Dict], ) -> None: """Upsert points to collection""" qdrant_points = [ PointStruct( id=point["id"], vector=point["vector"], payload=point["payload"], ) for point in points ] await self.client.upsert( collection_name=self.collection_name, points=qdrant_points, )