import numpy as np from typing import List, Dict, Any from app.api.schemas.telemetry import KeystrokeData class DECI_Engine: """ DECI Cognitive Identity Engine v2.1 Vertex Coders LLC — Miami, FL Analiza patrones de escritura para determinar si el usuario es humano o bot. Basado en 4 señales clave: Entropía IKL, CV, Velocidad, y Correcciones en ráfaga. """ def __init__(self): self.MIN_KEYSTROKES = 10 # Mínimo de teclas para análisis válido self.THRESHOLD_HUMAN = 0.65 # Por encima = humano self.THRESHOLD_SUSPECT = 0.40 # Entre 0.40 y 0.65 = sospechoso # Pesos de las señales (optimizados con pruebas de DeepSeek) self.WEIGHTS = { "entropy": 0.35, # Variabilidad del ritmo "cv": 0.25, # Coeficiente de variación "mean": 0.15, # Velocidad media "corrections": 0.25 # Patrón de correcciones } def process_session(self, events: List[KeystrokeData]) -> Dict[str, Any]: """ Procesa una sesión de keystrokes y retorna veredicto y score. Args: events: Lista de eventos de teclado (KeystrokeData) Returns: Dict con score, verdict, is_human y breakdown de métricas """ # Validación de entrada if len(events) < self.MIN_KEYSTROKES: return { "score": 0.0, "verdict": "INCONCLUSIVE", "is_human": False, "reason": f"Se necesitan {self.MIN_KEYSTROKES} pulsaciones (recibidas: {len(events)})", "breakdown": {} } # Extraer timestamps y calcular intervalos IKL timestamps = np.array([float(e.timestamp) for e in events]) intervals = np.diff(timestamps) intervals = intervals[intervals > 0] # Eliminar intervalos cero if len(intervals) < 2: return { "score": 0.0, "verdict": "INCONCLUSIVE", "is_human": False, "reason": "No hay suficientes intervalos válidos", "breakdown": {} } # === 1. CÁLCULO DE MÉTRICAS === # 1.1 Shannon Entropy (normalizada a [0,1]) entropy = self._compute_shannon_entropy(intervals) # 1.2 Coeficiente de Variación (CV) — detecta latencia uniforme mean_ikl = np.mean(intervals) cv = np.std(intervals) / mean_ikl if mean_ikl > 0 else 0.0 # 1.3 Velocidad media (CPM - caracteres por minuto) total_chars = len(events) total_time_ms = timestamps[-1] - timestamps[0] if len(timestamps) > 1 else 1 total_time_min = total_time_ms / 60000 # convertir a minutos typing_speed = total_chars / total_time_min if total_time_min > 0 else 0 # 1.4 Correcciones y ráfagas corrections = sum(1 for e in events if e.key == 'Backspace') corr_rate = corrections / len(events) burst_ratio = self._compute_burst_ratio(events) # === 2. SCORING POR SEÑAL (Vertex Hardened Layer) === # 2.1 Entropy Score: Curva Gaussiana suave centrada en 0.70 s_entropy = float(np.exp(-((entropy - 0.70) ** 2) / 0.12)) s_entropy = max(0.05, min(1.0, s_entropy)) # 2.2 CV Score: Proporcional y continuo s_cv = float(np.clip(cv / 0.50, 0.1, 1.0)) # 2.3 Mean IKL Score: Penalización suave solo en los extremos if 80 <= mean_ikl <= 350: s_mean = 1.0 else: s_mean = float(np.clip(1.0 - abs(mean_ikl - 215) / 350, 0.2, 1.0)) # 2.4 Correction Score Dinámico (Mitigación de Falsos Positivos en Logins) if corr_rate == 0: if len(events) <= 20: # Si la cadena es corta y no hay errores, evaluamos neutral (0.75) en vez de tratarlo como bot s_corrections = 0.75 else: # En textos largos, la ausencia total de errores sí es sospechosa (Bot / LLM) s_corrections = 0.1 else: # Si hay correcciones, aplicamos el mapeo continuo con el bonus de ráfaga de DeepSeek s_corrections = 0.4 + (burst_ratio * 0.4) + (corr_rate * 2.0) s_corrections = max(0.1, min(1.0, s_corrections)) # === 3. SCORE FINAL PONDERADO === final_score = ( s_entropy * self.WEIGHTS["entropy"] + s_cv * self.WEIGHTS["cv"] + s_mean * self.WEIGHTS["mean"] + s_corrections * self.WEIGHTS["corrections"] ) # Asegurar que el score esté en [0,1] final_score = float(np.clip(final_score, 0.0, 1.0)) # === 4. VEREDICTO === if final_score >= self.THRESHOLD_HUMAN: verdict = "HUMAN" is_human = True elif final_score >= self.THRESHOLD_SUSPECT: verdict = "SUSPECT" is_human = False else: verdict = "BOT" is_human = False # === 5. RESULTADO === return { "score": round(final_score, 4), "verdict": verdict, "is_human": is_human, "confidence": round(final_score, 4), "breakdown": { "entropy": round(entropy, 4), "cv": round(cv, 4), "mean_ikl_ms": round(mean_ikl, 2), "typing_speed_cpm": round(typing_speed, 1), "correction_rate": round(corr_rate, 4), "burst_ratio": round(burst_ratio, 4), "keystrokes": len(events), "signals": { "s_entropy": round(s_entropy, 4), "s_cv": round(s_cv, 4), "s_mean": round(s_mean, 4), "s_corrections": round(s_corrections, 4) } } } def _compute_shannon_entropy(self, intervals: np.ndarray, bins: int = 20) -> float: """ Calcula entropía de Shannon normalizada a [0,1]. La entropía mide la "impredecibilidad" de los intervalos. Humanos: 0.50-0.85 (variedad natural) Bots: 0.00-0.35 (patrones uniformes o predecibles) """ if len(intervals) < 3: return 0.0 # Asegurar bins válido if bins < 2: bins = 20 # Crear histograma hist, bin_edges = np.histogram(intervals, bins=bins, density=True) hist = hist + 1e-10 # evitar log(0) # Calcular entropía raw raw_entropy = -np.sum(hist * np.log2(hist)) # Normalizar por el ancho de los bins (solo si intervals.max() != intervals.min()) interval_range = intervals.max() - intervals.min() if interval_range > 0 and bins > 0: bin_width = interval_range / bins raw_entropy *= bin_width else: bin_width = 1.0 # Normalizar a [0,1] dividiendo por la entropía máxima posible (log2(bins)) max_entropy = np.log2(bins) if max_entropy > 0: norm_entropy = raw_entropy / max_entropy else: norm_entropy = 0.0 return float(np.clip(norm_entropy, 0.0, 1.0)) def _compute_burst_ratio(self, events: List[KeystrokeData]) -> float: """ Calcula la proporción de correcciones que ocurren en ráfagas (3+ consecutivas). Los humanos tienden a corregir en ráfagas (ej: borrar una palabra completa). Los bots suelen tener correcciones aisladas o ninguna. """ if not events: return 0.0 burst_count = 0 streak = 0 total_corrections = 0 for e in events: if e.key == 'Backspace': streak += 1 total_corrections += 1 else: if streak >= 3: burst_count += streak streak = 0 # Check al final del loop if streak >= 3: burst_count += streak if total_corrections == 0: return 0.0 # Ratio de correcciones que están en ráfagas return float(min(1.0, burst_count / total_corrections)) def get_health_status(self) -> Dict[str, Any]: """Retorna el estado de salud del motor DECI.""" return { "engine": "DECI_Cognitive_Engine", "version": "2.1.0", "min_keystrokes": self.MIN_KEYSTROKES, "thresholds": { "human": self.THRESHOLD_HUMAN, "suspect": self.THRESHOLD_SUSPECT }, "weights": self.WEIGHTS, "status": "operational" }