Ragora-Server / app /services /vector_db_adapter.py
Peterase's picture
feat: add Neon and Qdrant Cloud support
405d962
"""Qdrant vector database adapter."""
from app.ports.vector_db import VectorDBPort, VectorChunk, VectorSearchResult
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct, Filter, FieldCondition, MatchValue
from typing import List
from app.config import get_settings
import logging
logger = logging.getLogger(__name__)
settings = get_settings()
class QdrantAdapter(VectorDBPort):
"""Qdrant implementation of VectorDBPort."""
def __init__(self):
try:
# Check if using Qdrant Cloud (with API key)
if settings.QDRANT_API_KEY:
self.client = QdrantClient(
url=f"https://{settings.QDRANT_HOST}:{settings.QDRANT_PORT}",
api_key=settings.QDRANT_API_KEY,
timeout=10.0
)
logger.info(f"Connected to Qdrant Cloud at {settings.QDRANT_HOST}")
else:
# Local Qdrant instance
self.client = QdrantClient(
host=settings.QDRANT_HOST,
port=settings.QDRANT_PORT,
timeout=5.0
)
logger.info(f"Connected to Qdrant at {settings.QDRANT_HOST}:{settings.QDRANT_PORT}")
except Exception as e:
logger.warning(f"Failed to connect to Qdrant: {e}")
self.client = None
async def initialize_collection(self, collection_name: str, dimension: int) -> None:
"""Initialize vector collection."""
if self.client is None:
logger.warning("Qdrant client not available - skipping collection initialization")
return
try:
collections = self.client.get_collections().collections
exists = any(c.name == collection_name for c in collections)
if not exists:
self.client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(
size=dimension,
distance=Distance.COSINE
)
)
logger.info(f"Created collection: {collection_name}")
else:
logger.info(f"Collection already exists: {collection_name}")
except Exception as e:
logger.error(f"Error initializing collection: {e}")
raise
async def store_chunks(self, chunks: List[VectorChunk], collection_name: str) -> None:
"""Store vector chunks."""
try:
points = [
PointStruct(
id=chunk.id,
vector=chunk.embedding,
payload={
"document_id": chunk.document_id,
"chunk_index": chunk.chunk_index,
"text": chunk.text,
**chunk.metadata
}
)
for chunk in chunks
]
self.client.upsert(
collection_name=collection_name,
points=points
)
logger.info(f"Stored {len(chunks)} chunks in {collection_name}")
except Exception as e:
logger.error(f"Error storing chunks: {e}")
raise
async def search(
self,
query_embedding: List[float],
org_id: str,
top_k: int,
collection_name: str
) -> List[VectorSearchResult]:
"""Search for similar vectors."""
try:
results = self.client.search(
collection_name=collection_name,
query_vector=query_embedding,
query_filter=Filter(
must=[
FieldCondition(
key="org_id",
match=MatchValue(value=org_id)
)
]
),
limit=top_k
)
search_results = [
VectorSearchResult(
document_id=hit.payload["document_id"],
chunk_index=hit.payload["chunk_index"],
text=hit.payload["text"],
score=hit.score,
metadata={k: v for k, v in hit.payload.items()
if k not in ["document_id", "chunk_index", "text"]}
)
for hit in results
]
logger.info(f"Found {len(search_results)} results for org {org_id}")
return search_results
except Exception as e:
logger.error(f"Error searching vectors: {e}")
raise
async def delete_document(self, document_id: str, collection_name: str) -> None:
"""Delete all chunks for a document."""
try:
self.client.delete(
collection_name=collection_name,
points_selector=Filter(
must=[
FieldCondition(
key="document_id",
match=MatchValue(value=document_id)
)
]
)
)
logger.info(f"Deleted chunks for document {document_id}")
except Exception as e:
logger.error(f"Error deleting document chunks: {e}")
raise