|
|
from qdrant_client import QdrantClient, models |
|
|
import os |
|
|
import uuid |
|
|
import logging |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
|
|
|
def get_qdrant_client(): |
|
|
"""Initializes and returns the Qdrant client, prioritizing Cloud over local.""" |
|
|
qdrant_url = os.environ.get("QDRANT_URL") |
|
|
qdrant_api_key = os.environ.get("QDRANT_API_KEY") |
|
|
|
|
|
|
|
|
if qdrant_url and qdrant_api_key: |
|
|
try: |
|
|
client = QdrantClient(url=qdrant_url, api_key=qdrant_api_key) |
|
|
logger.info(f"Connected to Qdrant Cloud at {qdrant_url}") |
|
|
return client |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to connect to Qdrant Cloud with provided credentials: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
qdrant_host = os.environ.get("QDRANT_HOST") |
|
|
if qdrant_host and qdrant_host != "localhost": |
|
|
try: |
|
|
client = QdrantClient(host=qdrant_host, port=6333) |
|
|
logger.info(f"Connected to local Qdrant server at {qdrant_host}") |
|
|
return client |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to connect to local Qdrant server at {qdrant_host}: {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
data_dir = "/app/data/qdrant" |
|
|
os.makedirs(data_dir, exist_ok=True) |
|
|
client = QdrantClient(path=data_dir) |
|
|
logger.info(f"Using file-based Qdrant client at {data_dir}") |
|
|
return client |
|
|
except Exception as e: |
|
|
logger.warning(f"Failed to create file-based Qdrant client: {e}") |
|
|
|
|
|
client = QdrantClient(":memory:") |
|
|
logger.info("Using in-memory Qdrant client as final fallback") |
|
|
return client |
|
|
|
|
|
|
|
|
|
|
|
def create_collection_if_not_exists(client: QdrantClient, collection_name: str, vector_size: int): |
|
|
"""Creates a Qdrant collection if it doesn't already exist.""" |
|
|
try: |
|
|
client.get_collection(collection_name=collection_name) |
|
|
logger.info(f"Collection '{collection_name}' already exists") |
|
|
except Exception as e: |
|
|
|
|
|
logger.info(f"Collection '{collection_name}' does not exist, creating it...") |
|
|
try: |
|
|
client.create_collection( |
|
|
collection_name=collection_name, |
|
|
vectors_config=models.VectorParams(size=vector_size, distance=models.Distance.COSINE), |
|
|
) |
|
|
logger.info(f"Created new collection '{collection_name}'") |
|
|
except Exception as create_error: |
|
|
logger.error(f"Failed to create collection '{collection_name}': {str(create_error)}") |
|
|
raise |
|
|
|
|
|
|
|
|
|
|
|
def get_user_collection_name(user_id: uuid.UUID) -> str: |
|
|
""" |
|
|
Generate a user-specific collection name. |
|
|
|
|
|
Args: |
|
|
user_id: The user's UUID |
|
|
|
|
|
Returns: |
|
|
Collection name in format 'user_{user_id_without_hyphens}' |
|
|
""" |
|
|
|
|
|
user_id_str = str(user_id).replace('-', '_') |
|
|
return f"user_{user_id_str}" |
|
|
|
|
|
def ensure_user_collection_exists(client: QdrantClient, user_id: uuid.UUID, vector_size: int) -> str: |
|
|
""" |
|
|
Ensure that a user-specific collection exists in Qdrant. |
|
|
|
|
|
Args: |
|
|
client: Qdrant client instance |
|
|
user_id: The user's UUID |
|
|
vector_size: Size of the embedding vectors |
|
|
|
|
|
Returns: |
|
|
The collection name that was created or verified |
|
|
""" |
|
|
try: |
|
|
collection_name = get_user_collection_name(user_id) |
|
|
logger.info(f"Ensuring collection exists for user {user_id}: {collection_name}") |
|
|
|
|
|
try: |
|
|
|
|
|
client.get_collection(collection_name=collection_name) |
|
|
logger.info(f"User collection '{collection_name}' already exists for user {user_id}") |
|
|
except Exception as get_error: |
|
|
|
|
|
logger.info(f"Collection '{collection_name}' does not exist, creating it for user {user_id}") |
|
|
try: |
|
|
client.create_collection( |
|
|
collection_name=collection_name, |
|
|
vectors_config=models.VectorParams(size=vector_size, distance=models.Distance.COSINE), |
|
|
) |
|
|
logger.info(f"Created new user collection '{collection_name}' for user {user_id}") |
|
|
except Exception as create_error: |
|
|
logger.error(f"Failed to create collection '{collection_name}' for user {user_id}: {str(create_error)}") |
|
|
raise create_error |
|
|
|
|
|
return collection_name |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error in ensure_user_collection_exists: {str(e)}") |
|
|
logger.error(f"Function called with client={type(client)}, user_id={user_id}, vector_size={vector_size}") |
|
|
raise |
|
|
|
|
|
def collection_exists(client: QdrantClient, collection_name: str) -> bool: |
|
|
""" |
|
|
Check if a collection exists in Qdrant. |
|
|
|
|
|
Args: |
|
|
client: Qdrant client instance |
|
|
collection_name: Name of the collection to check |
|
|
|
|
|
Returns: |
|
|
True if collection exists, False otherwise |
|
|
""" |
|
|
try: |
|
|
client.get_collection(collection_name=collection_name) |
|
|
return True |
|
|
except Exception: |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
def upsert_vectors(client: QdrantClient, collection_name: str, vectors, payloads): |
|
|
"""Upserts vectors and their payloads into the specified collection.""" |
|
|
client.upsert( |
|
|
collection_name=collection_name, |
|
|
points=models.Batch( |
|
|
ids=list(range(len(vectors))), |
|
|
vectors=vectors, |
|
|
payloads=payloads |
|
|
), |
|
|
wait=True |
|
|
) |
|
|
|
|
|
def search_vectors(client: QdrantClient, collection_name: str, query_vector, limit: int = 5): |
|
|
""" |
|
|
Searches for similar vectors in the collection. |
|
|
|
|
|
Args: |
|
|
client: Qdrant client instance |
|
|
collection_name: Name of the collection to search |
|
|
query_vector: Query vector for similarity search |
|
|
limit: Maximum number of results to return |
|
|
|
|
|
Returns: |
|
|
Search results, or empty list if collection doesn't exist or is empty |
|
|
""" |
|
|
try: |
|
|
|
|
|
if not collection_exists(client, collection_name): |
|
|
logger.warning(f"Collection '{collection_name}' does not exist") |
|
|
return [] |
|
|
|
|
|
|
|
|
collection_info = client.get_collection(collection_name) |
|
|
if collection_info.points_count == 0: |
|
|
logger.info(f"Collection '{collection_name}' is empty") |
|
|
return [] |
|
|
|
|
|
|
|
|
query_vector_list = query_vector.tolist() if hasattr(query_vector, 'tolist') else query_vector |
|
|
|
|
|
|
|
|
|
|
|
try: |
|
|
logger.debug(f"Attempting query_points on collection '{collection_name}'") |
|
|
result = client.query_points( |
|
|
collection_name=collection_name, |
|
|
query=query_vector_list, |
|
|
limit=limit, |
|
|
with_payload=True |
|
|
) |
|
|
|
|
|
results = result.points if hasattr(result, 'points') else result |
|
|
logger.info(f"Found {len(results)} results using query_points in collection '{collection_name}'") |
|
|
return results |
|
|
except AttributeError as attr_err: |
|
|
|
|
|
logger.warning(f"query_points failed ({attr_err}), falling back to search method") |
|
|
try: |
|
|
results = client.search( |
|
|
collection_name=collection_name, |
|
|
query_vector=query_vector_list, |
|
|
limit=limit, |
|
|
with_payload=True |
|
|
) |
|
|
logger.info(f"Found {len(results)} results using search in collection '{collection_name}'") |
|
|
return results |
|
|
except Exception as search_err: |
|
|
logger.error(f"Both query_points and search failed. search error: {search_err}") |
|
|
raise |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error searching collection '{collection_name}': {str(e)}") |
|
|
logger.error(f"Error type: {type(e).__name__}") |
|
|
import traceback |
|
|
logger.error(f"Traceback: {traceback.format_exc()}") |
|
|
return [] |
|
|
|
|
|
def get_collection_info(client: QdrantClient, collection_name: str) -> dict: |
|
|
""" |
|
|
Get information about a collection. |
|
|
|
|
|
Args: |
|
|
client: Qdrant client instance |
|
|
collection_name: Name of the collection |
|
|
|
|
|
Returns: |
|
|
Dictionary with collection information or None if collection doesn't exist |
|
|
""" |
|
|
try: |
|
|
collection_info = client.get_collection(collection_name) |
|
|
return { |
|
|
"name": collection_name, |
|
|
"points_count": collection_info.points_count, |
|
|
"status": collection_info.status, |
|
|
"vectors_count": collection_info.vectors_count if hasattr(collection_info, 'vectors_count') else None |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error getting collection info for '{collection_name}': {str(e)}") |
|
|
return None |
|
|
|