import os from typing import List, Dict, Any, Union from qdrant_client import QdrantClient from qdrant_client.http import models class CognitiveVault: """ CognitiveVault v2.2 — Vertex Coders LLC Gestiona la capa de persistencia vectorial en el Cognitive DNA Vault. Aplica aislamiento de fallos (Fault Isolation) con conmutación dinámica a entornos volátiles en memoria nativa si la infraestructura física decae. """ def __init__(self) -> None: self.collection_name: str = "user_signatures" # Inyección dinámica de configuración de infraestructura self.host: str = os.getenv("QDRANT_HOST", "localhost") self.port: int = int(os.getenv("QDRANT_PORT", 6333)) self.is_volatile: bool = False # Inicialización resiliente del cliente vectorial self.client: QdrantClient = self._initialize_client() # Verificación no destructiva del esquema de colecciones self._ensure_collection_schema() def _initialize_client(self) -> QdrantClient: """Establece el canal de comunicación con Qdrant aplicando políticas de resguardo.""" try: print(f"📡 [VAULT] Estableciendo enlace con Qdrant en {self.host}:{self.port}...") # Timeout explícito de 2.0s para mitigar bloqueos síncronos en el arranque client = QdrantClient(host=self.host, port=self.port, timeout=2.0) # Verificación en frío de socket abierto client.get_collections() print("🚀 [VAULT] Conexión establecida con éxito con el clúster físico.") self.is_volatile = False return client except Exception as conn_err: print(f"⚠ [VAULT_WARNING] Infraestructura física no disponible: {str(conn_err)}") print("🚨 [FALLBACK_ACTIVE] Conmutando a motor de vectores embebido en memoria RAM (:memory:).") self.is_volatile = True return QdrantClient(":memory:") def _ensure_collection_schema(self) -> None: """Garantiza la existencia e integridad del esquema vectorial sin destruir datos previos.""" try: collections_response = self.client.get_collections() exists = any(c.name == self.collection_name for c in collections_response.collections) if not exists: self.client.recreate_collection( collection_name=self.collection_name, vectors_config=models.VectorParams( size=128, # Dimensión fija de la firma biométrica DECI distance=models.Distance.COSINE ) ) print(f"📁 [VAULT] Colección '{self.collection_name}' estructurada correctamente.") else: print(f"ℹ [VAULT] Esquema de la colección '{self.collection_name}' validado e íntegro.") except Exception as schema_err: print(f"🚨 [VAULT_CRITICAL] Error en la fase de asimilación del esquema: {str(schema_err)}") async def save_signature(self, user_id: Union[str, int], vector: List[float], metadata: Dict[str, Any]) -> bool: """ Registra un nuevo vector de firma cognitiva en el almacén disponible. Returns: bool: True si la operación fue confirmada; False ante fallos de persistencia. """ try: # Cast explícito para mitigar inconsistencias de serialización de tipos en numpy sanitized_vector: List[float] = [float(v) for v in vector] self.client.upsert( collection_name=self.collection_name, points=[ models.PointStruct( id=user_id, vector=sanitized_vector, payload=metadata ) ] ) return True except Exception as upsert_err: print(f"⚠ [VAULT_WRITE_ERROR] No se pudo guardar la firma en el registro: {str(upsert_err)}") return False async def verify_identity(self, vector: List[float]) -> List[Any]: """ Ejecuta una búsqueda analítica por similitud de coseno en el espacio vectorial. Returns: List[Any]: Lista de coincidencias más cercanas encontradas en la colección. """ try: sanitized_vector: List[float] = [float(v) for v in vector] search_result = self.client.search( collection_name=self.collection_name, query_vector=sanitized_vector, limit=1 ) return search_result except Exception as search_err: print(f"⚠ [VAULT_READ_ERROR] Falló la resolución de similitud en el espacio vectorial: {str(search_err)}") return []