Spaces:
Sleeping
Sleeping
File size: 4,293 Bytes
4570874 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
"""
Infrastructure - Qdrant Vector Store
"""
from typing import Dict, List, Optional
from uuid import UUID
from qdrant_client import AsyncQdrantClient
from qdrant_client.models import Distance, PointStruct, VectorParams, Filter, FieldCondition, MatchValue
from app.domain.interfaces import IRetriever, RetrievalResult
class QdrantRetriever(IRetriever):
"""Qdrant vector store implementation"""
def __init__(
self,
url: str,
collection_name: str,
vector_size: int = 384,
api_key: Optional[str] = None,
):
self.url = url
self.collection_name = collection_name
self.vector_size = vector_size
# Handle in-memory mode - accept both :memory: and memory:// formats
if url in (":memory:", "memory://localhost", "memory://"):
self.client = AsyncQdrantClient(location=":memory:")
else:
self.client = AsyncQdrantClient(url=url, api_key=api_key)
async def initialize_collection(self) -> None:
"""Initialize Qdrant collection"""
collections = await self.client.get_collections()
collection_names = [c.name for c in collections.collections]
if self.collection_name not in collection_names:
await self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(size=self.vector_size, distance=Distance.COSINE),
)
async def search(
self,
query: str,
top_k: int = 10,
filters: Optional[dict] = None,
min_score: float = 0.0,
) -> List[RetrievalResult]:
"""Search for relevant documents"""
# Note: This requires embedding the query first
# In practice, this would be called with query_vector
raise NotImplementedError("Use search_by_vector instead")
async def search_by_vector(
self,
query_vector: List[float],
top_k: int = 10,
filters: Optional[dict] = None,
min_score: float = 0.0,
) -> List[RetrievalResult]:
"""Search by pre-computed vector"""
# Build filter
qdrant_filter = None
if filters:
conditions = []
for key, value in filters.items():
conditions.append(
FieldCondition(key=key, match=MatchValue(value=value))
)
if conditions:
qdrant_filter = Filter(must=conditions)
# Search
search_result = await self.client.search(
collection_name=self.collection_name,
query_vector=query_vector,
limit=top_k,
query_filter=qdrant_filter,
score_threshold=min_score,
)
# Convert to RetrievalResult
results = []
for hit in search_result:
payload = hit.payload or {}
results.append(
RetrievalResult(
content=payload.get("content", ""),
score=hit.score,
document_id=payload.get("document_id", ""),
chunk_index=payload.get("chunk_index", 0),
metadata=payload.get("metadata", {}),
)
)
return results
async def hybrid_search(
self,
query: str,
top_k: int = 10,
alpha: float = 0.5,
filters: Optional[dict] = None,
) -> List[RetrievalResult]:
"""Hybrid search (semantic + keyword)"""
# Simplified - in production combine with keyword search
raise NotImplementedError("Hybrid search requires integration with keyword search")
async def upsert_points(
self,
points: List[Dict],
) -> None:
"""Upsert points to collection"""
qdrant_points = [
PointStruct(
id=point["id"],
vector=point["vector"],
payload=point["payload"],
)
for point in points
]
await self.client.upsert(
collection_name=self.collection_name,
points=qdrant_points,
)
|