Spaces:
Sleeping
Sleeping
File size: 5,046 Bytes
6874d8b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
"""
Qdrant client wrapper for vector database operations.
"""
import logging
from typing import List, Dict, Any
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
import time
logger = logging.getLogger(__name__)
class QdrantManager:
"""Manages Qdrant vector database operations."""
def __init__(self, url: str, api_key: str):
"""Initialize Qdrant client."""
self.client = QdrantClient(url=url, api_key=api_key)
logger.info(f"Connected to Qdrant at {url}")
def create_collection(self, collection_name: str, vector_size: int, distance: str = "Cosine"):
"""
Create a new collection in Qdrant.
Args:
collection_name: Name of the collection
vector_size: Dimension of vectors
distance: Distance metric (Cosine, Euclidean, Dot)
"""
try:
# Check if collection already exists
collections = self.client.get_collections().collections
existing_names = [col.name for col in collections]
if collection_name in existing_names:
logger.info(f"Collection '{collection_name}' already exists")
return True
# Create new collection
distance_map = {
"Cosine": Distance.COSINE,
"Euclidean": Distance.EUCLID,
"Dot": Distance.DOT
}
self.client.create_collection(
collection_name=collection_name,
vectors_config=VectorParams(
size=vector_size,
distance=distance_map.get(distance, Distance.COSINE)
)
)
logger.info(f"Created collection '{collection_name}' with vector size {vector_size}")
return True
except Exception as e:
logger.error(f"Error creating collection: {e}")
return False
def upsert_points(self, collection_name: str, points_data: List[Dict[str, Any]],
embeddings: List[List[float]], max_retries: int = 3):
"""
Upsert points into Qdrant collection with retry logic.
Args:
collection_name: Name of the collection
points_data: List of point data dictionaries
embeddings: List of embedding vectors
max_retries: Maximum number of retry attempts
"""
points = []
for i, (data, embedding) in enumerate(zip(points_data, embeddings)):
point = PointStruct(
id=data['id'],
vector=embedding,
payload={
'problem': data['problem'],
'solution': data['solution'],
'source': data['source']
}
)
points.append(point)
# Retry logic for network issues
for attempt in range(max_retries):
try:
self.client.upsert(
collection_name=collection_name,
points=points
)
logger.info(f"Successfully upserted {len(points)} points")
return True
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # Exponential backoff
else:
logger.error(f"Failed to upsert points after {max_retries} attempts")
raise e
def search_similar(self, collection_name: str, query_vector: List[float],
limit: int = 3, score_threshold: float = 0.0):
"""
Search for similar vectors in the collection.
Args:
collection_name: Name of the collection
query_vector: Query embedding vector
limit: Number of results to return
score_threshold: Minimum similarity score
Returns:
Search results from Qdrant
"""
try:
results = self.client.search(
collection_name=collection_name,
query_vector=query_vector,
limit=limit,
score_threshold=score_threshold
)
logger.info(f"Found {len(results)} similar results")
return results
except Exception as e:
logger.error(f"Error searching collection: {e}")
return []
def get_collection_info(self, collection_name: str):
"""Get information about a collection."""
try:
info = self.client.get_collection(collection_name)
logger.info(f"Collection info: {info}")
return info
except Exception as e:
logger.error(f"Error getting collection info: {e}")
return None
|