J / src /vector_db.py
Andro0s's picture
Upload 13 files
85fa7d2 verified
"""
Vector Database - Almacenamiento y recuperación de embeddings
"""
from typing import List, Dict, Optional
import json
from datetime import datetime
from loguru import logger
try:
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, VectorParams, PointStruct
QDRANT_AVAILABLE = True
except ImportError:
QDRANT_AVAILABLE = False
logger.warning("Qdrant no disponible, usando almacenamiento en memoria")
class VectorDatabase:
"""
Gestiona el almacenamiento de embeddings y resultados de búsqueda.
Usa Qdrant si está disponible, sino almacenamiento en memoria.
"""
def __init__(self, host="localhost", port=6333, collection_name="aliah_faces"):
"""
Inicializa la conexión con la base de datos vectorial.
"""
self.collection_name = collection_name
self.memory_store = {} # Fallback a memoria
if QDRANT_AVAILABLE:
try:
self.client = QdrantClient(host=host, port=port)
self._init_collection()
self.use_qdrant = True
logger.info(f"Conectado a Qdrant: {host}:{port}")
except Exception as e:
logger.warning(f"No se pudo conectar a Qdrant, usando memoria: {e}")
self.use_qdrant = False
else:
self.use_qdrant = False
logger.info("Usando almacenamiento en memoria")
def _init_collection(self):
"""Inicializa la colección de Qdrant si no existe"""
try:
collections = self.client.get_collections().collections
if self.collection_name not in [c.name for c in collections]:
self.client.create_collection(
collection_name=self.collection_name,
vectors_config=VectorParams(size=512, distance=Distance.COSINE)
)
logger.info(f"Colección '{self.collection_name}' creada")
except Exception as e:
logger.error(f"Error inicializando colección: {e}")
def store_result(self, query_id: str, embedding: List[float], results: List[Dict]):
"""
Almacena el embedding y resultados de una búsqueda.
Args:
query_id: ID único de la búsqueda
embedding: Vector de embedding
results: Lista de resultados verificados
"""
data = {
'query_id': query_id,
'embedding': embedding.tolist() if hasattr(embedding, 'tolist') else embedding,
'results': results,
'timestamp': datetime.now().isoformat(),
'num_results': len(results)
}
if self.use_qdrant:
try:
point = PointStruct(
id=hash(query_id) % (10 ** 8), # ID numérico
vector=data['embedding'],
payload={
'query_id': query_id,
'results': json.dumps(results),
'timestamp': data['timestamp'],
'num_results': len(results)
}
)
self.client.upsert(
collection_name=self.collection_name,
points=[point]
)
logger.info(f"Resultado almacenado en Qdrant: {query_id}")
except Exception as e:
logger.error(f"Error almacenando en Qdrant: {e}")
self.memory_store[query_id] = data
else:
# Almacenar en memoria
self.memory_store[query_id] = data
logger.debug(f"Resultado almacenado en memoria: {query_id}")
def get_result(self, query_id: str) -> Optional[Dict]:
"""
Recupera los resultados de una búsqueda previa.
Args:
query_id: ID de la búsqueda
Returns:
Diccionario con los resultados o None
"""
if self.use_qdrant:
try:
# Buscar por payload
results = self.client.scroll(
collection_name=self.collection_name,
scroll_filter={
"must": [
{
"key": "query_id",
"match": {"value": query_id}
}
]
},
limit=1
)
if results[0]:
point = results[0][0]
return {
'query_id': point.payload['query_id'],
'results': json.loads(point.payload['results']),
'timestamp': point.payload['timestamp'],
'num_results': point.payload['num_results']
}
except Exception as e:
logger.error(f"Error recuperando de Qdrant: {e}")
# Buscar en memoria
return self.memory_store.get(query_id)
def search_similar(self, embedding: List[float], limit: int = 10) -> List[Dict]:
"""
Busca embeddings similares en la base de datos.
Args:
embedding: Vector de embedding query
limit: Número máximo de resultados
Returns:
Lista de búsquedas similares previas
"""
if self.use_qdrant:
try:
results = self.client.search(
collection_name=self.collection_name,
query_vector=embedding,
limit=limit
)
similar = []
for result in results:
similar.append({
'query_id': result.payload['query_id'],
'similarity': result.score,
'timestamp': result.payload['timestamp'],
'num_results': result.payload['num_results']
})
return similar
except Exception as e:
logger.error(f"Error buscando similares: {e}")
return []