Spaces:
Running
Running
| """ | |
| Infrastructure - Qdrant Vector Store | |
| """ | |
| from typing import Dict, List, Optional | |
| from uuid import UUID | |
| from qdrant_client import AsyncQdrantClient | |
| from qdrant_client.models import Distance, PointStruct, VectorParams, Filter, FieldCondition, MatchValue | |
| from app.domain.interfaces import IRetriever, RetrievalResult | |
| class QdrantRetriever(IRetriever): | |
| """Qdrant vector store implementation""" | |
| def __init__( | |
| self, | |
| url: str, | |
| collection_name: str, | |
| vector_size: int = 384, | |
| api_key: Optional[str] = None, | |
| ): | |
| self.url = url | |
| self.collection_name = collection_name | |
| self.vector_size = vector_size | |
| # Handle in-memory mode - accept both :memory: and memory:// formats | |
| if url in (":memory:", "memory://localhost", "memory://"): | |
| self.client = AsyncQdrantClient(location=":memory:") | |
| else: | |
| self.client = AsyncQdrantClient(url=url, api_key=api_key) | |
| async def initialize_collection(self) -> None: | |
| """Initialize Qdrant collection""" | |
| collections = await self.client.get_collections() | |
| collection_names = [c.name for c in collections.collections] | |
| if self.collection_name not in collection_names: | |
| await self.client.create_collection( | |
| collection_name=self.collection_name, | |
| vectors_config=VectorParams(size=self.vector_size, distance=Distance.COSINE), | |
| ) | |
| async def search( | |
| self, | |
| query: str, | |
| top_k: int = 10, | |
| filters: Optional[dict] = None, | |
| min_score: float = 0.0, | |
| ) -> List[RetrievalResult]: | |
| """Search for relevant documents""" | |
| # Note: This requires embedding the query first | |
| # In practice, this would be called with query_vector | |
| raise NotImplementedError("Use search_by_vector instead") | |
| async def search_by_vector( | |
| self, | |
| query_vector: List[float], | |
| top_k: int = 10, | |
| filters: Optional[dict] = None, | |
| min_score: float = 0.0, | |
| ) -> List[RetrievalResult]: | |
| """Search by pre-computed vector""" | |
| # Build filter | |
| qdrant_filter = None | |
| if filters: | |
| conditions = [] | |
| for key, value in filters.items(): | |
| conditions.append( | |
| FieldCondition(key=key, match=MatchValue(value=value)) | |
| ) | |
| if conditions: | |
| qdrant_filter = Filter(must=conditions) | |
| # Search | |
| search_result = await self.client.search( | |
| collection_name=self.collection_name, | |
| query_vector=query_vector, | |
| limit=top_k, | |
| query_filter=qdrant_filter, | |
| score_threshold=min_score, | |
| ) | |
| # Convert to RetrievalResult | |
| results = [] | |
| for hit in search_result: | |
| payload = hit.payload or {} | |
| results.append( | |
| RetrievalResult( | |
| content=payload.get("content", ""), | |
| score=hit.score, | |
| document_id=payload.get("document_id", ""), | |
| chunk_index=payload.get("chunk_index", 0), | |
| metadata=payload.get("metadata", {}), | |
| ) | |
| ) | |
| return results | |
| async def hybrid_search( | |
| self, | |
| query: str, | |
| top_k: int = 10, | |
| alpha: float = 0.5, | |
| filters: Optional[dict] = None, | |
| ) -> List[RetrievalResult]: | |
| """Hybrid search (semantic + keyword)""" | |
| # Simplified - in production combine with keyword search | |
| raise NotImplementedError("Hybrid search requires integration with keyword search") | |
| async def upsert_points( | |
| self, | |
| points: List[Dict], | |
| ) -> None: | |
| """Upsert points to collection""" | |
| qdrant_points = [ | |
| PointStruct( | |
| id=point["id"], | |
| vector=point["vector"], | |
| payload=point["payload"], | |
| ) | |
| for point in points | |
| ] | |
| await self.client.upsert( | |
| collection_name=self.collection_name, | |
| points=qdrant_points, | |
| ) | |