Spaces:
Sleeping
Sleeping
File size: 5,505 Bytes
f02c5b9 3b1f683 405d962 3b1f683 f02c5b9 3b1f683 f02c5b9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 | """Qdrant vector database adapter."""
from app.ports.vector_db import VectorDBPort, VectorChunk, VectorSearchResult
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue
from typing import List
from app.config import get_settings
import logging
logger = logging.getLogger(__name__)
settings = get_settings()
class QdrantAdapter(VectorDBPort):
"""Qdrant implementation of VectorDBPort."""
def __init__(self):
try:
# Check if using Qdrant Cloud (with API key)
if settings.QDRANT_API_KEY:
self.client = QdrantClient(
url=f"https://{settings.QDRANT_HOST}:{settings.QDRANT_PORT}",
api_key=settings.QDRANT_API_KEY,
timeout=10.0
)
logger.info(f"Connected to Qdrant Cloud at {settings.QDRANT_HOST}")
else:
# Local Qdrant instance
self.client = QdrantClient(
host=settings.QDRANT_HOST,
port=settings.QDRANT_PORT,
timeout=5.0
)
logger.info(f"Connected to Qdrant at {settings.QDRANT_HOST}:{settings.QDRANT_PORT}")
except Exception as e:
logger.warning(f"Failed to connect to Qdrant: {e}")
self.client = None
async def initialize_collection(self, collection_name: str, dimension: int) -> None:
"""Initialize vector collection."""
if self.client is None:
logger.warning("Qdrant client not available - skipping collection initialization")
return
try:
collections = self.client.get_collections().collections
exists = any(c.name == collection_name for c in collections)
if not exists:
self.client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(
size=dimension,
distance=Distance.COSINE
)
)
logger.info(f"Created collection: {collection_name}")
else:
logger.info(f"Collection already exists: {collection_name}")
except Exception as e:
logger.error(f"Error initializing collection: {e}")
raise
async def store_chunks(self, chunks: List[VectorChunk], collection_name: str) -> None:
"""Store vector chunks."""
try:
points = [
PointStruct(
id=chunk.id,
vector=chunk.embedding,
payload={
"document_id": chunk.document_id,
"chunk_index": chunk.chunk_index,
"text": chunk.text,
**chunk.metadata
}
)
for chunk in chunks
]
self.client.upsert(
collection_name=collection_name,
points=points
)
logger.info(f"Stored {len(chunks)} chunks in {collection_name}")
except Exception as e:
logger.error(f"Error storing chunks: {e}")
raise
async def search(
self,
query_embedding: List[float],
org_id: str,
top_k: int,
collection_name: str
) -> List[VectorSearchResult]:
"""Search for similar vectors."""
try:
results = self.client.search(
collection_name=collection_name,
query_vector=query_embedding,
query_filter=Filter(
must=[
FieldCondition(
key="org_id",
match=MatchValue(value=org_id)
)
]
),
limit=top_k
)
search_results = [
VectorSearchResult(
document_id=hit.payload["document_id"],
chunk_index=hit.payload["chunk_index"],
text=hit.payload["text"],
score=hit.score,
metadata={k: v for k, v in hit.payload.items()
if k not in ["document_id", "chunk_index", "text"]}
)
for hit in results
]
logger.info(f"Found {len(search_results)} results for org {org_id}")
return search_results
except Exception as e:
logger.error(f"Error searching vectors: {e}")
raise
async def delete_document(self, document_id: str, collection_name: str) -> None:
"""Delete all chunks for a document."""
try:
self.client.delete(
collection_name=collection_name,
points_selector=Filter(
must=[
FieldCondition(
key="document_id",
match=MatchValue(value=document_id)
)
]
)
)
logger.info(f"Deleted chunks for document {document_id}")
except Exception as e:
logger.error(f"Error deleting document chunks: {e}")
raise
|