deci-core-api / app /database /qdrant_vault.py
Denisijcu's picture
update
c91db00 verified
import os
from typing import List, Dict, Any, Union
from qdrant_client import QdrantClient
from qdrant_client.http import models
class CognitiveVault:
"""
CognitiveVault v2.2 — Vertex Coders LLC
Gestiona la capa de persistencia vectorial en el Cognitive DNA Vault.
Aplica aislamiento de fallos (Fault Isolation) con conmutación dinámica
a entornos volátiles en memoria nativa si la infraestructura física decae.
"""
def __init__(self) -> None:
self.collection_name: str = "user_signatures"
# Inyección dinámica de configuración de infraestructura
self.host: str = os.getenv("QDRANT_HOST", "localhost")
self.port: int = int(os.getenv("QDRANT_PORT", 6333))
self.is_volatile: bool = False
# Inicialización resiliente del cliente vectorial
self.client: QdrantClient = self._initialize_client()
# Verificación no destructiva del esquema de colecciones
self._ensure_collection_schema()
def _initialize_client(self) -> QdrantClient:
"""Establece el canal de comunicación con Qdrant aplicando políticas de resguardo."""
try:
print(f"📡 [VAULT] Estableciendo enlace con Qdrant en {self.host}:{self.port}...")
# Timeout explícito de 2.0s para mitigar bloqueos síncronos en el arranque
client = QdrantClient(host=self.host, port=self.port, timeout=2.0)
# Verificación en frío de socket abierto
client.get_collections()
print("🚀 [VAULT] Conexión establecida con éxito con el clúster físico.")
self.is_volatile = False
return client
except Exception as conn_err:
print(f"⚠ [VAULT_WARNING] Infraestructura física no disponible: {str(conn_err)}")
print("🚨 [FALLBACK_ACTIVE] Conmutando a motor de vectores embebido en memoria RAM (:memory:).")
self.is_volatile = True
return QdrantClient(":memory:")
def _ensure_collection_schema(self) -> None:
"""Garantiza la existencia e integridad del esquema vectorial sin destruir datos previos."""
try:
collections_response = self.client.get_collections()
exists = any(c.name == self.collection_name for c in collections_response.collections)
if not exists:
self.client.recreate_collection(
collection_name=self.collection_name,
vectors_config=models.VectorParams(
size=128, # Dimensión fija de la firma biométrica DECI
distance=models.Distance.COSINE
)
)
print(f"📁 [VAULT] Colección '{self.collection_name}' estructurada correctamente.")
else:
print(f"ℹ [VAULT] Esquema de la colección '{self.collection_name}' validado e íntegro.")
except Exception as schema_err:
print(f"🚨 [VAULT_CRITICAL] Error en la fase de asimilación del esquema: {str(schema_err)}")
async def save_signature(self, user_id: Union[str, int], vector: List[float], metadata: Dict[str, Any]) -> bool:
"""
Registra un nuevo vector de firma cognitiva en el almacén disponible.
Returns:
bool: True si la operación fue confirmada; False ante fallos de persistencia.
"""
try:
# Cast explícito para mitigar inconsistencias de serialización de tipos en numpy
sanitized_vector: List[float] = [float(v) for v in vector]
self.client.upsert(
collection_name=self.collection_name,
points=[
models.PointStruct(
id=user_id,
vector=sanitized_vector,
payload=metadata
)
]
)
return True
except Exception as upsert_err:
print(f"⚠ [VAULT_WRITE_ERROR] No se pudo guardar la firma en el registro: {str(upsert_err)}")
return False
async def verify_identity(self, vector: List[float]) -> List[Any]:
"""
Ejecuta una búsqueda analítica por similitud de coseno en el espacio vectorial.
Returns:
List[Any]: Lista de coincidencias más cercanas encontradas en la colección.
"""
try:
sanitized_vector: List[float] = [float(v) for v in vector]
search_result = self.client.search(
collection_name=self.collection_name,
query_vector=sanitized_vector,
limit=1
)
return search_result
except Exception as search_err:
print(f"⚠ [VAULT_READ_ERROR] Falló la resolución de similitud en el espacio vectorial: {str(search_err)}")
return []