Spaces:
Runtime error
Runtime error
| """ | |
| Qdrant Manager - Vector database client for RAG persistence and retrieval | |
| Manages collections, embeddings, and search operations | |
| """ | |
| from qdrant_client import QdrantClient | |
| from qdrant_client.models import Distance, VectorParams, PointStruct | |
| from typing import List, Dict, Any, Optional | |
| from uuid import uuid4 | |
| from backend.config import settings | |
| class QdrantManager: | |
| """Singleton for managing Qdrant vector database""" | |
| _instance = None | |
| _client = None | |
| def __new__(cls): | |
| if cls._instance is None: | |
| cls._instance = super().__new__(cls) | |
| cls._instance._initialized = False | |
| return cls._instance | |
| def __init__(self): | |
| if not self._initialized: | |
| # Initialize Qdrant client | |
| self.client = QdrantClient( | |
| url=settings.qdrant_url, | |
| api_key=settings.qdrant_api_key | |
| ) | |
| self._initialized = True | |
| self._ensure_collections() | |
| def _ensure_collections(self): | |
| """Create collections if they don't exist""" | |
| collections = [ | |
| settings.kb_collection_name, | |
| settings.history_collection_name | |
| ] | |
| for collection_name in collections: | |
| try: | |
| # Try to get collection info | |
| self.client.get_collection(collection_name) | |
| except Exception: | |
| # Create if doesn't exist | |
| self.client.create_collection( | |
| collection_name=collection_name, | |
| vectors_config=VectorParams( | |
| size=settings.vector_dimension, | |
| distance=Distance.COSINE | |
| ) | |
| ) | |
| print(f"✅ Created collection: {collection_name}") | |
| def add_to_kb(self, documents: List[Dict[str, Any]]) -> None: | |
| """ | |
| Add documents to knowledge base collection | |
| Args: | |
| documents: List of dicts with 'id', 'text', 'embedding' keys | |
| """ | |
| from rag.embedding_manager import embedding_manager | |
| points = [] | |
| for doc in documents: | |
| embedding = embedding_manager.embed(doc['text']) | |
| point = PointStruct( | |
| id=int(uuid4().int % (10**8)), | |
| vector=embedding, | |
| payload={ | |
| "text": doc['text'], | |
| "source": doc.get('source', 'unknown'), | |
| "document_id": doc.get('document_id', str(uuid4())) | |
| } | |
| ) | |
| points.append(point) | |
| self.client.upsert( | |
| collection_name=settings.kb_collection_name, | |
| points=points | |
| ) | |
| print(f"✅ Added {len(documents)} documents to KB collection") | |
| def search_kb(self, query: str, limit: int = 3) -> List[Dict[str, Any]]: | |
| """ | |
| Search knowledge base | |
| Args: | |
| query: Search query text | |
| limit: Number of results to return | |
| Returns: | |
| List of similar documents | |
| """ | |
| from rag.embedding_manager import embedding_manager | |
| query_embedding = embedding_manager.embed(query) | |
| results = self.client.query_points( | |
| collection_name=settings.kb_collection_name, | |
| query=query_embedding, | |
| limit=limit | |
| ).points | |
| return [ | |
| { | |
| "text": r.payload['text'], | |
| "source": r.payload.get('source', 'unknown'), | |
| "score": r.score | |
| } | |
| for r in results | |
| ] | |
| def search_history(self, query: str, customer_id: str, limit: int = 3) -> List[Dict[str, Any]]: | |
| """ | |
| Search customer history with customer_id filter | |
| Args: | |
| query: Search query text | |
| customer_id: Filter by customer | |
| limit: Number of results | |
| Returns: | |
| List of similar history records for customer | |
| """ | |
| from rag.embedding_manager import embedding_manager | |
| query_embedding = embedding_manager.embed(query) | |
| results = self.client.query_points( | |
| collection_name=settings.history_collection_name, | |
| query=query_embedding, | |
| limit=limit, | |
| query_filter={ | |
| "must": [ | |
| { | |
| "key": "customer_id", | |
| "match": {"value": customer_id} | |
| } | |
| ] | |
| } | |
| ).points | |
| return [ | |
| { | |
| "text": r.payload['text'], | |
| "customer_id": r.payload.get('customer_id'), | |
| "interaction_type": r.payload.get('interaction_type'), | |
| "score": r.score | |
| } | |
| for r in results | |
| ] | |
| def add_to_history(self, customer_id: str, text: str, interaction_type: str) -> None: | |
| """ | |
| Add conversation to customer history | |
| Args: | |
| customer_id: Customer identifier | |
| text: Conversation text | |
| interaction_type: Type of interaction (e.g., 'complaint', 'refund_request') | |
| """ | |
| from rag.embedding_manager import embedding_manager | |
| embedding = embedding_manager.embed(text) | |
| point = PointStruct( | |
| id=int(uuid4().int % (10**8)), | |
| vector=embedding, | |
| payload={ | |
| "customer_id": customer_id, | |
| "text": text, | |
| "interaction_type": interaction_type, | |
| "timestamp": str(__import__('datetime').datetime.now()) | |
| } | |
| ) | |
| self.client.upsert( | |
| collection_name=settings.history_collection_name, | |
| points=[point] | |
| ) | |
| def get_collection_info(self, collection_name: str) -> Dict[str, Any]: | |
| """Get collection statistics""" | |
| info = self.client.get_collection(collection_name) | |
| return { | |
| "name": collection_name, | |
| "points_count": info.points_count, | |
| "vectors_count": info.vectors_count | |
| } | |
| # Global singleton instance | |
| qdrant_manager = QdrantManager() | |