Spaces:
Sleeping
Sleeping
File size: 9,203 Bytes
6acaa0b b092d11 6acaa0b b092d11 5cfd3b8 b092d11 6acaa0b b092d11 6acaa0b b092d11 6acaa0b b092d11 6acaa0b b092d11 6acaa0b b092d11 6acaa0b b092d11 5cfd3b8 b092d11 5cfd3b8 d36a657 f2b86aa d36a657 f2b86aa b092d11 d36a657 f2b86aa b092d11 f2b86aa d36a657 f2b86aa b092d11 f2b86aa b092d11 6acaa0b b092d11 6acaa0b b092d11 6acaa0b b092d11 5cfd3b8 b092d11 5cfd3b8 b092d11 5cfd3b8 b092d11 6acaa0b b092d11 6acaa0b b092d11 6acaa0b b092d11 5cfd3b8 b092d11 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 | import numpy as np
from typing import List, Dict, Any
from app.api.schemas.telemetry import KeystrokeData
class DECI_Engine:
"""
DECI Cognitive Identity Engine v2.1
Vertex Coders LLC — Miami, FL
Analiza patrones de escritura para determinar si el usuario es humano o bot.
Basado en 4 señales clave: Entropía IKL, CV, Velocidad, y Correcciones en ráfaga.
"""
def __init__(self):
self.MIN_KEYSTROKES = 10 # Mínimo de teclas para análisis válido
self.THRESHOLD_HUMAN = 0.65 # Por encima = humano
self.THRESHOLD_SUSPECT = 0.40 # Entre 0.40 y 0.65 = sospechoso
# Pesos de las señales (optimizados con pruebas de DeepSeek)
self.WEIGHTS = {
"entropy": 0.35, # Variabilidad del ritmo
"cv": 0.25, # Coeficiente de variación
"mean": 0.15, # Velocidad media
"corrections": 0.25 # Patrón de correcciones
}
def process_session(self, events: List[KeystrokeData]) -> Dict[str, Any]:
"""
Procesa una sesión de keystrokes y retorna veredicto y score.
Args:
events: Lista de eventos de teclado (KeystrokeData)
Returns:
Dict con score, verdict, is_human y breakdown de métricas
"""
# Validación de entrada
if len(events) < self.MIN_KEYSTROKES:
return {
"score": 0.0,
"verdict": "INCONCLUSIVE",
"is_human": False,
"reason": f"Se necesitan {self.MIN_KEYSTROKES} pulsaciones (recibidas: {len(events)})",
"breakdown": {}
}
# Extraer timestamps y calcular intervalos IKL
timestamps = np.array([float(e.timestamp) for e in events])
intervals = np.diff(timestamps)
intervals = intervals[intervals > 0] # Eliminar intervalos cero
if len(intervals) < 2:
return {
"score": 0.0,
"verdict": "INCONCLUSIVE",
"is_human": False,
"reason": "No hay suficientes intervalos válidos",
"breakdown": {}
}
# === 1. CÁLCULO DE MÉTRICAS ===
# 1.1 Shannon Entropy (normalizada a [0,1])
entropy = self._compute_shannon_entropy(intervals)
# 1.2 Coeficiente de Variación (CV) — detecta latencia uniforme
mean_ikl = np.mean(intervals)
cv = np.std(intervals) / mean_ikl if mean_ikl > 0 else 0.0
# 1.3 Velocidad media (CPM - caracteres por minuto)
total_chars = len(events)
total_time_ms = timestamps[-1] - timestamps[0] if len(timestamps) > 1 else 1
total_time_min = total_time_ms / 60000 # convertir a minutos
typing_speed = total_chars / total_time_min if total_time_min > 0 else 0
# 1.4 Correcciones y ráfagas
corrections = sum(1 for e in events if e.key == 'Backspace')
corr_rate = corrections / len(events)
burst_ratio = self._compute_burst_ratio(events)
# === 2. SCORING POR SEÑAL (Vertex Hardened Layer) ===
# 2.1 Entropy Score: Curva Gaussiana suave centrada en 0.70
s_entropy = float(np.exp(-((entropy - 0.70) ** 2) / 0.12))
s_entropy = max(0.05, min(1.0, s_entropy))
# 2.2 CV Score: Proporcional y continuo
s_cv = float(np.clip(cv / 0.50, 0.1, 1.0))
# 2.3 Mean IKL Score: Penalización suave solo en los extremos
if 80 <= mean_ikl <= 350:
s_mean = 1.0
else:
s_mean = float(np.clip(1.0 - abs(mean_ikl - 215) / 350, 0.2, 1.0))
# 2.4 Correction Score Dinámico (Mitigación de Falsos Positivos en Logins)
if corr_rate == 0:
if len(events) <= 20:
# Si la cadena es corta y no hay errores, evaluamos neutral (0.75) en vez de tratarlo como bot
s_corrections = 0.75
else:
# En textos largos, la ausencia total de errores sí es sospechosa (Bot / LLM)
s_corrections = 0.1
else:
# Si hay correcciones, aplicamos el mapeo continuo con el bonus de ráfaga de DeepSeek
s_corrections = 0.4 + (burst_ratio * 0.4) + (corr_rate * 2.0)
s_corrections = max(0.1, min(1.0, s_corrections))
# === 3. SCORE FINAL PONDERADO ===
final_score = (
s_entropy * self.WEIGHTS["entropy"] +
s_cv * self.WEIGHTS["cv"] +
s_mean * self.WEIGHTS["mean"] +
s_corrections * self.WEIGHTS["corrections"]
)
# Asegurar que el score esté en [0,1]
final_score = float(np.clip(final_score, 0.0, 1.0))
# === 4. VEREDICTO ===
if final_score >= self.THRESHOLD_HUMAN:
verdict = "HUMAN"
is_human = True
elif final_score >= self.THRESHOLD_SUSPECT:
verdict = "SUSPECT"
is_human = False
else:
verdict = "BOT"
is_human = False
# === 5. RESULTADO ===
return {
"score": round(final_score, 4),
"verdict": verdict,
"is_human": is_human,
"confidence": round(final_score, 4),
"breakdown": {
"entropy": round(entropy, 4),
"cv": round(cv, 4),
"mean_ikl_ms": round(mean_ikl, 2),
"typing_speed_cpm": round(typing_speed, 1),
"correction_rate": round(corr_rate, 4),
"burst_ratio": round(burst_ratio, 4),
"keystrokes": len(events),
"signals": {
"s_entropy": round(s_entropy, 4),
"s_cv": round(s_cv, 4),
"s_mean": round(s_mean, 4),
"s_corrections": round(s_corrections, 4)
}
}
}
def _compute_shannon_entropy(self, intervals: np.ndarray, bins: int = 20) -> float:
"""
Calcula entropía de Shannon normalizada a [0,1].
La entropía mide la "impredecibilidad" de los intervalos.
Humanos: 0.50-0.85 (variedad natural)
Bots: 0.00-0.35 (patrones uniformes o predecibles)
"""
if len(intervals) < 3:
return 0.0
# Asegurar bins válido
if bins < 2:
bins = 20
# Crear histograma
hist, bin_edges = np.histogram(intervals, bins=bins, density=True)
hist = hist + 1e-10 # evitar log(0)
# Calcular entropía raw
raw_entropy = -np.sum(hist * np.log2(hist))
# Normalizar por el ancho de los bins (solo si intervals.max() != intervals.min())
interval_range = intervals.max() - intervals.min()
if interval_range > 0 and bins > 0:
bin_width = interval_range / bins
raw_entropy *= bin_width
else:
bin_width = 1.0
# Normalizar a [0,1] dividiendo por la entropía máxima posible (log2(bins))
max_entropy = np.log2(bins)
if max_entropy > 0:
norm_entropy = raw_entropy / max_entropy
else:
norm_entropy = 0.0
return float(np.clip(norm_entropy, 0.0, 1.0))
def _compute_burst_ratio(self, events: List[KeystrokeData]) -> float:
"""
Calcula la proporción de correcciones que ocurren en ráfagas (3+ consecutivas).
Los humanos tienden a corregir en ráfagas (ej: borrar una palabra completa).
Los bots suelen tener correcciones aisladas o ninguna.
"""
if not events:
return 0.0
burst_count = 0
streak = 0
total_corrections = 0
for e in events:
if e.key == 'Backspace':
streak += 1
total_corrections += 1
else:
if streak >= 3:
burst_count += streak
streak = 0
# Check al final del loop
if streak >= 3:
burst_count += streak
if total_corrections == 0:
return 0.0
# Ratio de correcciones que están en ráfagas
return float(min(1.0, burst_count / total_corrections))
def get_health_status(self) -> Dict[str, Any]:
"""Retorna el estado de salud del motor DECI."""
return {
"engine": "DECI_Cognitive_Engine",
"version": "2.1.0",
"min_keystrokes": self.MIN_KEYSTROKES,
"thresholds": {
"human": self.THRESHOLD_HUMAN,
"suspect": self.THRESHOLD_SUSPECT
},
"weights": self.WEIGHTS,
"status": "operational"
} |