|
|
""" |
|
|
Vector Database - Almacenamiento y recuperación de embeddings |
|
|
""" |
|
|
|
|
|
from typing import List, Dict, Optional |
|
|
import json |
|
|
from datetime import datetime |
|
|
from loguru import logger |
|
|
try: |
|
|
from qdrant_client import QdrantClient |
|
|
from qdrant_client.models import Distance, VectorParams, PointStruct |
|
|
QDRANT_AVAILABLE = True |
|
|
except ImportError: |
|
|
QDRANT_AVAILABLE = False |
|
|
logger.warning("Qdrant no disponible, usando almacenamiento en memoria") |
|
|
|
|
|
|
|
|
class VectorDatabase: |
|
|
""" |
|
|
Gestiona el almacenamiento de embeddings y resultados de búsqueda. |
|
|
Usa Qdrant si está disponible, sino almacenamiento en memoria. |
|
|
""" |
|
|
|
|
|
def __init__(self, host="localhost", port=6333, collection_name="aliah_faces"): |
|
|
""" |
|
|
Inicializa la conexión con la base de datos vectorial. |
|
|
""" |
|
|
self.collection_name = collection_name |
|
|
self.memory_store = {} |
|
|
|
|
|
if QDRANT_AVAILABLE: |
|
|
try: |
|
|
self.client = QdrantClient(host=host, port=port) |
|
|
self._init_collection() |
|
|
self.use_qdrant = True |
|
|
logger.info(f"Conectado a Qdrant: {host}:{port}") |
|
|
except Exception as e: |
|
|
logger.warning(f"No se pudo conectar a Qdrant, usando memoria: {e}") |
|
|
self.use_qdrant = False |
|
|
else: |
|
|
self.use_qdrant = False |
|
|
logger.info("Usando almacenamiento en memoria") |
|
|
|
|
|
def _init_collection(self): |
|
|
"""Inicializa la colección de Qdrant si no existe""" |
|
|
try: |
|
|
collections = self.client.get_collections().collections |
|
|
if self.collection_name not in [c.name for c in collections]: |
|
|
self.client.create_collection( |
|
|
collection_name=self.collection_name, |
|
|
vectors_config=VectorParams(size=512, distance=Distance.COSINE) |
|
|
) |
|
|
logger.info(f"Colección '{self.collection_name}' creada") |
|
|
except Exception as e: |
|
|
logger.error(f"Error inicializando colección: {e}") |
|
|
|
|
|
def store_result(self, query_id: str, embedding: List[float], results: List[Dict]): |
|
|
""" |
|
|
Almacena el embedding y resultados de una búsqueda. |
|
|
|
|
|
Args: |
|
|
query_id: ID único de la búsqueda |
|
|
embedding: Vector de embedding |
|
|
results: Lista de resultados verificados |
|
|
""" |
|
|
data = { |
|
|
'query_id': query_id, |
|
|
'embedding': embedding.tolist() if hasattr(embedding, 'tolist') else embedding, |
|
|
'results': results, |
|
|
'timestamp': datetime.now().isoformat(), |
|
|
'num_results': len(results) |
|
|
} |
|
|
|
|
|
if self.use_qdrant: |
|
|
try: |
|
|
point = PointStruct( |
|
|
id=hash(query_id) % (10 ** 8), |
|
|
vector=data['embedding'], |
|
|
payload={ |
|
|
'query_id': query_id, |
|
|
'results': json.dumps(results), |
|
|
'timestamp': data['timestamp'], |
|
|
'num_results': len(results) |
|
|
} |
|
|
) |
|
|
|
|
|
self.client.upsert( |
|
|
collection_name=self.collection_name, |
|
|
points=[point] |
|
|
) |
|
|
|
|
|
logger.info(f"Resultado almacenado en Qdrant: {query_id}") |
|
|
except Exception as e: |
|
|
logger.error(f"Error almacenando en Qdrant: {e}") |
|
|
self.memory_store[query_id] = data |
|
|
else: |
|
|
|
|
|
self.memory_store[query_id] = data |
|
|
logger.debug(f"Resultado almacenado en memoria: {query_id}") |
|
|
|
|
|
def get_result(self, query_id: str) -> Optional[Dict]: |
|
|
""" |
|
|
Recupera los resultados de una búsqueda previa. |
|
|
|
|
|
Args: |
|
|
query_id: ID de la búsqueda |
|
|
|
|
|
Returns: |
|
|
Diccionario con los resultados o None |
|
|
""" |
|
|
if self.use_qdrant: |
|
|
try: |
|
|
|
|
|
results = self.client.scroll( |
|
|
collection_name=self.collection_name, |
|
|
scroll_filter={ |
|
|
"must": [ |
|
|
{ |
|
|
"key": "query_id", |
|
|
"match": {"value": query_id} |
|
|
} |
|
|
] |
|
|
}, |
|
|
limit=1 |
|
|
) |
|
|
|
|
|
if results[0]: |
|
|
point = results[0][0] |
|
|
return { |
|
|
'query_id': point.payload['query_id'], |
|
|
'results': json.loads(point.payload['results']), |
|
|
'timestamp': point.payload['timestamp'], |
|
|
'num_results': point.payload['num_results'] |
|
|
} |
|
|
except Exception as e: |
|
|
logger.error(f"Error recuperando de Qdrant: {e}") |
|
|
|
|
|
|
|
|
return self.memory_store.get(query_id) |
|
|
|
|
|
def search_similar(self, embedding: List[float], limit: int = 10) -> List[Dict]: |
|
|
""" |
|
|
Busca embeddings similares en la base de datos. |
|
|
|
|
|
Args: |
|
|
embedding: Vector de embedding query |
|
|
limit: Número máximo de resultados |
|
|
|
|
|
Returns: |
|
|
Lista de búsquedas similares previas |
|
|
""" |
|
|
if self.use_qdrant: |
|
|
try: |
|
|
results = self.client.search( |
|
|
collection_name=self.collection_name, |
|
|
query_vector=embedding, |
|
|
limit=limit |
|
|
) |
|
|
|
|
|
similar = [] |
|
|
for result in results: |
|
|
similar.append({ |
|
|
'query_id': result.payload['query_id'], |
|
|
'similarity': result.score, |
|
|
'timestamp': result.payload['timestamp'], |
|
|
'num_results': result.payload['num_results'] |
|
|
}) |
|
|
|
|
|
return similar |
|
|
except Exception as e: |
|
|
logger.error(f"Error buscando similares: {e}") |
|
|
|
|
|
return [] |
|
|
|