Spaces:
Sleeping
Sleeping
| import numpy as np | |
| from typing import List, Dict, Any | |
| from app.api.schemas.telemetry import KeystrokeData | |
| class DECI_Engine: | |
| """ | |
| DECI Cognitive Identity Engine v2.1 | |
| Vertex Coders LLC — Miami, FL | |
| Analiza patrones de escritura para determinar si el usuario es humano o bot. | |
| Basado en 4 señales clave: Entropía IKL, CV, Velocidad, y Correcciones en ráfaga. | |
| """ | |
| def __init__(self): | |
| self.MIN_KEYSTROKES = 10 # Mínimo de teclas para análisis válido | |
| self.THRESHOLD_HUMAN = 0.65 # Por encima = humano | |
| self.THRESHOLD_SUSPECT = 0.40 # Entre 0.40 y 0.65 = sospechoso | |
| # Pesos de las señales (optimizados con pruebas de DeepSeek) | |
| self.WEIGHTS = { | |
| "entropy": 0.35, # Variabilidad del ritmo | |
| "cv": 0.25, # Coeficiente de variación | |
| "mean": 0.15, # Velocidad media | |
| "corrections": 0.25 # Patrón de correcciones | |
| } | |
| def process_session(self, events: List[KeystrokeData]) -> Dict[str, Any]: | |
| """ | |
| Procesa una sesión de keystrokes y retorna veredicto y score. | |
| Args: | |
| events: Lista de eventos de teclado (KeystrokeData) | |
| Returns: | |
| Dict con score, verdict, is_human y breakdown de métricas | |
| """ | |
| # Validación de entrada | |
| if len(events) < self.MIN_KEYSTROKES: | |
| return { | |
| "score": 0.0, | |
| "verdict": "INCONCLUSIVE", | |
| "is_human": False, | |
| "reason": f"Se necesitan {self.MIN_KEYSTROKES} pulsaciones (recibidas: {len(events)})", | |
| "breakdown": {} | |
| } | |
| # Extraer timestamps y calcular intervalos IKL | |
| timestamps = np.array([float(e.timestamp) for e in events]) | |
| intervals = np.diff(timestamps) | |
| intervals = intervals[intervals > 0] # Eliminar intervalos cero | |
| if len(intervals) < 2: | |
| return { | |
| "score": 0.0, | |
| "verdict": "INCONCLUSIVE", | |
| "is_human": False, | |
| "reason": "No hay suficientes intervalos válidos", | |
| "breakdown": {} | |
| } | |
| # === 1. CÁLCULO DE MÉTRICAS === | |
| # 1.1 Shannon Entropy (normalizada a [0,1]) | |
| entropy = self._compute_shannon_entropy(intervals) | |
| # 1.2 Coeficiente de Variación (CV) — detecta latencia uniforme | |
| mean_ikl = np.mean(intervals) | |
| cv = np.std(intervals) / mean_ikl if mean_ikl > 0 else 0.0 | |
| # 1.3 Velocidad media (CPM - caracteres por minuto) | |
| total_chars = len(events) | |
| total_time_ms = timestamps[-1] - timestamps[0] if len(timestamps) > 1 else 1 | |
| total_time_min = total_time_ms / 60000 # convertir a minutos | |
| typing_speed = total_chars / total_time_min if total_time_min > 0 else 0 | |
| # 1.4 Correcciones y ráfagas | |
| corrections = sum(1 for e in events if e.key == 'Backspace') | |
| corr_rate = corrections / len(events) | |
| burst_ratio = self._compute_burst_ratio(events) | |
| # === 2. SCORING POR SEÑAL (Vertex Hardened Layer) === | |
| # 2.1 Entropy Score: Curva Gaussiana suave centrada en 0.70 | |
| s_entropy = float(np.exp(-((entropy - 0.70) ** 2) / 0.12)) | |
| s_entropy = max(0.05, min(1.0, s_entropy)) | |
| # 2.2 CV Score: Proporcional y continuo | |
| s_cv = float(np.clip(cv / 0.50, 0.1, 1.0)) | |
| # 2.3 Mean IKL Score: Penalización suave solo en los extremos | |
| if 80 <= mean_ikl <= 350: | |
| s_mean = 1.0 | |
| else: | |
| s_mean = float(np.clip(1.0 - abs(mean_ikl - 215) / 350, 0.2, 1.0)) | |
| # 2.4 Correction Score Dinámico (Mitigación de Falsos Positivos en Logins) | |
| if corr_rate == 0: | |
| if len(events) <= 20: | |
| # Si la cadena es corta y no hay errores, evaluamos neutral (0.75) en vez de tratarlo como bot | |
| s_corrections = 0.75 | |
| else: | |
| # En textos largos, la ausencia total de errores sí es sospechosa (Bot / LLM) | |
| s_corrections = 0.1 | |
| else: | |
| # Si hay correcciones, aplicamos el mapeo continuo con el bonus de ráfaga de DeepSeek | |
| s_corrections = 0.4 + (burst_ratio * 0.4) + (corr_rate * 2.0) | |
| s_corrections = max(0.1, min(1.0, s_corrections)) | |
| # === 3. SCORE FINAL PONDERADO === | |
| final_score = ( | |
| s_entropy * self.WEIGHTS["entropy"] + | |
| s_cv * self.WEIGHTS["cv"] + | |
| s_mean * self.WEIGHTS["mean"] + | |
| s_corrections * self.WEIGHTS["corrections"] | |
| ) | |
| # Asegurar que el score esté en [0,1] | |
| final_score = float(np.clip(final_score, 0.0, 1.0)) | |
| # === 4. VEREDICTO === | |
| if final_score >= self.THRESHOLD_HUMAN: | |
| verdict = "HUMAN" | |
| is_human = True | |
| elif final_score >= self.THRESHOLD_SUSPECT: | |
| verdict = "SUSPECT" | |
| is_human = False | |
| else: | |
| verdict = "BOT" | |
| is_human = False | |
| # === 5. RESULTADO === | |
| return { | |
| "score": round(final_score, 4), | |
| "verdict": verdict, | |
| "is_human": is_human, | |
| "confidence": round(final_score, 4), | |
| "breakdown": { | |
| "entropy": round(entropy, 4), | |
| "cv": round(cv, 4), | |
| "mean_ikl_ms": round(mean_ikl, 2), | |
| "typing_speed_cpm": round(typing_speed, 1), | |
| "correction_rate": round(corr_rate, 4), | |
| "burst_ratio": round(burst_ratio, 4), | |
| "keystrokes": len(events), | |
| "signals": { | |
| "s_entropy": round(s_entropy, 4), | |
| "s_cv": round(s_cv, 4), | |
| "s_mean": round(s_mean, 4), | |
| "s_corrections": round(s_corrections, 4) | |
| } | |
| } | |
| } | |
| def _compute_shannon_entropy(self, intervals: np.ndarray, bins: int = 20) -> float: | |
| """ | |
| Calcula entropía de Shannon normalizada a [0,1]. | |
| La entropía mide la "impredecibilidad" de los intervalos. | |
| Humanos: 0.50-0.85 (variedad natural) | |
| Bots: 0.00-0.35 (patrones uniformes o predecibles) | |
| """ | |
| if len(intervals) < 3: | |
| return 0.0 | |
| # Asegurar bins válido | |
| if bins < 2: | |
| bins = 20 | |
| # Crear histograma | |
| hist, bin_edges = np.histogram(intervals, bins=bins, density=True) | |
| hist = hist + 1e-10 # evitar log(0) | |
| # Calcular entropía raw | |
| raw_entropy = -np.sum(hist * np.log2(hist)) | |
| # Normalizar por el ancho de los bins (solo si intervals.max() != intervals.min()) | |
| interval_range = intervals.max() - intervals.min() | |
| if interval_range > 0 and bins > 0: | |
| bin_width = interval_range / bins | |
| raw_entropy *= bin_width | |
| else: | |
| bin_width = 1.0 | |
| # Normalizar a [0,1] dividiendo por la entropía máxima posible (log2(bins)) | |
| max_entropy = np.log2(bins) | |
| if max_entropy > 0: | |
| norm_entropy = raw_entropy / max_entropy | |
| else: | |
| norm_entropy = 0.0 | |
| return float(np.clip(norm_entropy, 0.0, 1.0)) | |
| def _compute_burst_ratio(self, events: List[KeystrokeData]) -> float: | |
| """ | |
| Calcula la proporción de correcciones que ocurren en ráfagas (3+ consecutivas). | |
| Los humanos tienden a corregir en ráfagas (ej: borrar una palabra completa). | |
| Los bots suelen tener correcciones aisladas o ninguna. | |
| """ | |
| if not events: | |
| return 0.0 | |
| burst_count = 0 | |
| streak = 0 | |
| total_corrections = 0 | |
| for e in events: | |
| if e.key == 'Backspace': | |
| streak += 1 | |
| total_corrections += 1 | |
| else: | |
| if streak >= 3: | |
| burst_count += streak | |
| streak = 0 | |
| # Check al final del loop | |
| if streak >= 3: | |
| burst_count += streak | |
| if total_corrections == 0: | |
| return 0.0 | |
| # Ratio de correcciones que están en ráfagas | |
| return float(min(1.0, burst_count / total_corrections)) | |
| def get_health_status(self) -> Dict[str, Any]: | |
| """Retorna el estado de salud del motor DECI.""" | |
| return { | |
| "engine": "DECI_Cognitive_Engine", | |
| "version": "2.1.0", | |
| "min_keystrokes": self.MIN_KEYSTROKES, | |
| "thresholds": { | |
| "human": self.THRESHOLD_HUMAN, | |
| "suspect": self.THRESHOLD_SUSPECT | |
| }, | |
| "weights": self.WEIGHTS, | |
| "status": "operational" | |
| } |