Naveedtechlab's picture
Add full AI Native Textbook project source code
db7c1e8
"""
Qdrant client setup for the AI Backend with RAG + Authentication
Implements Qdrant client initialization and connection management
"""
from qdrant_client import QdrantClient
from qdrant_client.http import models
from qdrant_client.http.exceptions import UnexpectedResponse
from typing import Optional
import logging
from ..config.settings import settings
logger = logging.getLogger(__name__)
# Vector dimensions for Gemini text-embedding-004 model
VECTOR_DIMENSIONS = 1536 # Standard for text-embedding-004
class QdrantService:
"""
Service class to manage Qdrant client and operations
"""
def __init__(self):
self.client: Optional[QdrantClient] = None
self._initialize_client()
def _initialize_client(self):
"""Initialize the Qdrant client with settings from configuration"""
try:
if settings.qdrant_api_key:
if settings.qdrant_url.startswith('https://'):
# For cloud instances, use the URL directly with API key
self.client = QdrantClient(
url=settings.qdrant_url,
api_key=settings.qdrant_api_key,
timeout=10.0 # 10 second timeout
)
else:
# For local instances
self.client = QdrantClient(
url=settings.qdrant_url,
api_key=settings.qdrant_api_key,
timeout=10.0 # 10 second timeout
)
else:
# If no API key is provided, connect without authentication
self.client = QdrantClient(
url=settings.qdrant_url,
timeout=10.0 # 10 second timeout
)
logger.info("Qdrant client initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize Qdrant client: {e}")
raise
async def health_check(self) -> bool:
"""Check if Qdrant server is accessible"""
try:
# Try to get cluster info as a health check
if self.client:
cluster_info = self.client.get_cluster_info()
logger.info(f"Qdrant health check passed. Cluster: {cluster_info}")
return True
return False
except Exception as e:
logger.error(f"Qdrant health check failed: {e}")
return False
def get_client(self) -> QdrantClient:
"""Get the initialized Qdrant client"""
if self.client is None:
raise RuntimeError("Qdrant client not initialized")
return self.client
# Global instance of QdrantService
qdrant_service = QdrantService()
def get_qdrant_client() -> QdrantClient:
"""Get the Qdrant client instance"""
return qdrant_service.get_client()
async def initialize_qdrant_collections():
"""Initialize required collections in Qdrant"""
try:
client = get_qdrant_client()
# Check if the documents collection already exists
collections = client.get_collections()
collection_names = [collection.name for collection in collections.collections]
if "documents" not in collection_names:
# Create the documents collection with proper vector configuration
client.create_collection(
collection_name="documents",
vectors_config=models.VectorParams(
size=VECTOR_DIMENSIONS,
distance=models.Distance.COSINE # Cosine distance is good for embeddings
)
)
logger.info("Created 'documents' collection in Qdrant")
else:
logger.info("Collection 'documents' already exists in Qdrant")
# Verify the collection has the correct configuration
collection_info = client.get_collection(collection_name="documents")
vector_config = collection_info.config.params.vectors
if hasattr(vector_config, 'size'):
actual_size = vector_config.size
else:
# Handle the case where vector_config is a dictionary
actual_size = vector_config['size'] if isinstance(vector_config, dict) else vector_config
if actual_size != VECTOR_DIMENSIONS:
logger.warning(f"Collection vector size is {actual_size}, expected {VECTOR_DIMENSIONS}")
else:
logger.info(f"Collection 'documents' has correct vector size: {VECTOR_DIMENSIONS}")
return True
except UnexpectedResponse as e:
logger.error(f"Qdrant API error during collection initialization: {e}")
return False
except Exception as e:
logger.error(f"Unexpected error during collection initialization: {e}")
return False
# Initialize collections on module import
async def setup_qdrant():
"""Setup function to initialize Qdrant collections"""
success = await initialize_qdrant_collections()
if success:
logger.info("Qdrant setup completed successfully")
else:
logger.error("Qdrant setup failed")
return success