""" Vector Database - Almacenamiento y recuperación de embeddings """ from typing import List, Dict, Optional import json from datetime import datetime from loguru import logger try: from qdrant_client import QdrantClient from qdrant_client.models import Distance, VectorParams, PointStruct QDRANT_AVAILABLE = True except ImportError: QDRANT_AVAILABLE = False logger.warning("Qdrant no disponible, usando almacenamiento en memoria") class VectorDatabase: """ Gestiona el almacenamiento de embeddings y resultados de búsqueda. Usa Qdrant si está disponible, sino almacenamiento en memoria. """ def __init__(self, host="localhost", port=6333, collection_name="aliah_faces"): """ Inicializa la conexión con la base de datos vectorial. """ self.collection_name = collection_name self.memory_store = {} # Fallback a memoria if QDRANT_AVAILABLE: try: self.client = QdrantClient(host=host, port=port) self._init_collection() self.use_qdrant = True logger.info(f"Conectado a Qdrant: {host}:{port}") except Exception as e: logger.warning(f"No se pudo conectar a Qdrant, usando memoria: {e}") self.use_qdrant = False else: self.use_qdrant = False logger.info("Usando almacenamiento en memoria") def _init_collection(self): """Inicializa la colección de Qdrant si no existe""" try: collections = self.client.get_collections().collections if self.collection_name not in [c.name for c in collections]: self.client.create_collection( collection_name=self.collection_name, vectors_config=VectorParams(size=512, distance=Distance.COSINE) ) logger.info(f"Colección '{self.collection_name}' creada") except Exception as e: logger.error(f"Error inicializando colección: {e}") def store_result(self, query_id: str, embedding: List[float], results: List[Dict]): """ Almacena el embedding y resultados de una búsqueda. Args: query_id: ID único de la búsqueda embedding: Vector de embedding results: Lista de resultados verificados """ data = { 'query_id': query_id, 'embedding': embedding.tolist() if hasattr(embedding, 'tolist') else embedding, 'results': results, 'timestamp': datetime.now().isoformat(), 'num_results': len(results) } if self.use_qdrant: try: point = PointStruct( id=hash(query_id) % (10 ** 8), # ID numérico vector=data['embedding'], payload={ 'query_id': query_id, 'results': json.dumps(results), 'timestamp': data['timestamp'], 'num_results': len(results) } ) self.client.upsert( collection_name=self.collection_name, points=[point] ) logger.info(f"Resultado almacenado en Qdrant: {query_id}") except Exception as e: logger.error(f"Error almacenando en Qdrant: {e}") self.memory_store[query_id] = data else: # Almacenar en memoria self.memory_store[query_id] = data logger.debug(f"Resultado almacenado en memoria: {query_id}") def get_result(self, query_id: str) -> Optional[Dict]: """ Recupera los resultados de una búsqueda previa. Args: query_id: ID de la búsqueda Returns: Diccionario con los resultados o None """ if self.use_qdrant: try: # Buscar por payload results = self.client.scroll( collection_name=self.collection_name, scroll_filter={ "must": [ { "key": "query_id", "match": {"value": query_id} } ] }, limit=1 ) if results[0]: point = results[0][0] return { 'query_id': point.payload['query_id'], 'results': json.loads(point.payload['results']), 'timestamp': point.payload['timestamp'], 'num_results': point.payload['num_results'] } except Exception as e: logger.error(f"Error recuperando de Qdrant: {e}") # Buscar en memoria return self.memory_store.get(query_id) def search_similar(self, embedding: List[float], limit: int = 10) -> List[Dict]: """ Busca embeddings similares en la base de datos. Args: embedding: Vector de embedding query limit: Número máximo de resultados Returns: Lista de búsquedas similares previas """ if self.use_qdrant: try: results = self.client.search( collection_name=self.collection_name, query_vector=embedding, limit=limit ) similar = [] for result in results: similar.append({ 'query_id': result.payload['query_id'], 'similarity': result.score, 'timestamp': result.payload['timestamp'], 'num_results': result.payload['num_results'] }) return similar except Exception as e: logger.error(f"Error buscando similares: {e}") return []