from qdrant_client import QdrantClient, models import os import uuid import logging logger = logging.getLogger(__name__) # --- Qdrant Client Initialization --- def get_qdrant_client(): """Initializes and returns the Qdrant client, prioritizing Cloud over local.""" qdrant_url = os.environ.get("QDRANT_URL") qdrant_api_key = os.environ.get("QDRANT_API_KEY") # Priority 1: Qdrant Cloud (production) if qdrant_url and qdrant_api_key: try: client = QdrantClient(url=qdrant_url, api_key=qdrant_api_key) logger.info(f"Connected to Qdrant Cloud at {qdrant_url}") return client except Exception as e: logger.error(f"Failed to connect to Qdrant Cloud with provided credentials: {e}") raise # If cloud credentials are provided, failure should be fatal. # Priority 2: Local Docker container qdrant_host = os.environ.get("QDRANT_HOST") if qdrant_host and qdrant_host != "localhost": try: client = QdrantClient(host=qdrant_host, port=6333) logger.info(f"Connected to local Qdrant server at {qdrant_host}") return client except Exception as e: logger.warning(f"Failed to connect to local Qdrant server at {qdrant_host}: {e}") # Priority 3: Local file-based storage (fallback for development) try: data_dir = "/app/data/qdrant" os.makedirs(data_dir, exist_ok=True) client = QdrantClient(path=data_dir) logger.info(f"Using file-based Qdrant client at {data_dir}") return client except Exception as e: logger.warning(f"Failed to create file-based Qdrant client: {e}") # Final fallback: in-memory client = QdrantClient(":memory:") logger.info("Using in-memory Qdrant client as final fallback") return client # --- Collection Management --- def create_collection_if_not_exists(client: QdrantClient, collection_name: str, vector_size: int): """Creates a Qdrant collection if it doesn't already exist.""" try: client.get_collection(collection_name=collection_name) logger.info(f"Collection '{collection_name}' already exists") except Exception as e: # If the collection does not exist, this will raise an exception logger.info(f"Collection '{collection_name}' does not exist, creating it...") try: client.create_collection( collection_name=collection_name, vectors_config=models.VectorParams(size=vector_size, distance=models.Distance.COSINE), ) logger.info(f"Created new collection '{collection_name}'") except Exception as create_error: logger.error(f"Failed to create collection '{collection_name}': {str(create_error)}") raise # --- User-Specific Collection Management --- def get_user_collection_name(user_id: uuid.UUID) -> str: """ Generate a user-specific collection name. Args: user_id: The user's UUID Returns: Collection name in format 'user_{user_id_without_hyphens}' """ # Convert UUID to string and remove hyphens for valid collection name user_id_str = str(user_id).replace('-', '_') return f"user_{user_id_str}" def ensure_user_collection_exists(client: QdrantClient, user_id: uuid.UUID, vector_size: int) -> str: """ Ensure that a user-specific collection exists in Qdrant. Args: client: Qdrant client instance user_id: The user's UUID vector_size: Size of the embedding vectors Returns: The collection name that was created or verified """ try: collection_name = get_user_collection_name(user_id) logger.info(f"Ensuring collection exists for user {user_id}: {collection_name}") try: # Check if collection exists client.get_collection(collection_name=collection_name) logger.info(f"User collection '{collection_name}' already exists for user {user_id}") except Exception as get_error: # Collection doesn't exist, create it logger.info(f"Collection '{collection_name}' does not exist, creating it for user {user_id}") try: client.create_collection( collection_name=collection_name, vectors_config=models.VectorParams(size=vector_size, distance=models.Distance.COSINE), ) logger.info(f"Created new user collection '{collection_name}' for user {user_id}") except Exception as create_error: logger.error(f"Failed to create collection '{collection_name}' for user {user_id}: {str(create_error)}") raise create_error return collection_name except Exception as e: logger.error(f"Error in ensure_user_collection_exists: {str(e)}") logger.error(f"Function called with client={type(client)}, user_id={user_id}, vector_size={vector_size}") raise def collection_exists(client: QdrantClient, collection_name: str) -> bool: """ Check if a collection exists in Qdrant. Args: client: Qdrant client instance collection_name: Name of the collection to check Returns: True if collection exists, False otherwise """ try: client.get_collection(collection_name=collection_name) return True except Exception: return False # --- Vector Operations --- def upsert_vectors(client: QdrantClient, collection_name: str, vectors, payloads): """Upserts vectors and their payloads into the specified collection.""" client.upsert( collection_name=collection_name, points=models.Batch( ids=list(range(len(vectors))), # Generate sequential integer IDs vectors=vectors, payloads=payloads ), wait=True ) def search_vectors(client: QdrantClient, collection_name: str, query_vector, limit: int = 5): """ Searches for similar vectors in the collection. Args: client: Qdrant client instance collection_name: Name of the collection to search query_vector: Query vector for similarity search limit: Maximum number of results to return Returns: Search results, or empty list if collection doesn't exist or is empty """ try: # Check if collection exists first if not collection_exists(client, collection_name): logger.warning(f"Collection '{collection_name}' does not exist") return [] # Check if collection has any points collection_info = client.get_collection(collection_name) if collection_info.points_count == 0: logger.info(f"Collection '{collection_name}' is empty") return [] # Convert numpy array to list if needed query_vector_list = query_vector.tolist() if hasattr(query_vector, 'tolist') else query_vector # Qdrant Cloud uses the newer API (v1.7+) # Use query_points which is the current method try: logger.debug(f"Attempting query_points on collection '{collection_name}'") result = client.query_points( collection_name=collection_name, query=query_vector_list, limit=limit, with_payload=True ) # Extract points from QueryResponse results = result.points if hasattr(result, 'points') else result logger.info(f"Found {len(results)} results using query_points in collection '{collection_name}'") return results except AttributeError as attr_err: # Fallback to older search method for backward compatibility logger.warning(f"query_points failed ({attr_err}), falling back to search method") try: results = client.search( collection_name=collection_name, query_vector=query_vector_list, limit=limit, with_payload=True ) logger.info(f"Found {len(results)} results using search in collection '{collection_name}'") return results except Exception as search_err: logger.error(f"Both query_points and search failed. search error: {search_err}") raise except Exception as e: logger.error(f"Error searching collection '{collection_name}': {str(e)}") logger.error(f"Error type: {type(e).__name__}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") return [] def get_collection_info(client: QdrantClient, collection_name: str) -> dict: """ Get information about a collection. Args: client: Qdrant client instance collection_name: Name of the collection Returns: Dictionary with collection information or None if collection doesn't exist """ try: collection_info = client.get_collection(collection_name) return { "name": collection_name, "points_count": collection_info.points_count, "status": collection_info.status, "vectors_count": collection_info.vectors_count if hasattr(collection_info, 'vectors_count') else None } except Exception as e: logger.error(f"Error getting collection info for '{collection_name}': {str(e)}") return None