deci-core-api / app /core /engine.py
Denisijcu's picture
update
5cfd3b8 verified
import numpy as np
from typing import List, Dict, Any
from app.api.schemas.telemetry import KeystrokeData
class DECI_Engine:
"""
DECI Cognitive Identity Engine v2.1
Vertex Coders LLC — Miami, FL
Analiza patrones de escritura para determinar si el usuario es humano o bot.
Basado en 4 señales clave: Entropía IKL, CV, Velocidad, y Correcciones en ráfaga.
"""
def __init__(self):
self.MIN_KEYSTROKES = 10 # Mínimo de teclas para análisis válido
self.THRESHOLD_HUMAN = 0.65 # Por encima = humano
self.THRESHOLD_SUSPECT = 0.40 # Entre 0.40 y 0.65 = sospechoso
# Pesos de las señales (optimizados con pruebas de DeepSeek)
self.WEIGHTS = {
"entropy": 0.35, # Variabilidad del ritmo
"cv": 0.25, # Coeficiente de variación
"mean": 0.15, # Velocidad media
"corrections": 0.25 # Patrón de correcciones
}
def process_session(self, events: List[KeystrokeData]) -> Dict[str, Any]:
"""
Procesa una sesión de keystrokes y retorna veredicto y score.
Args:
events: Lista de eventos de teclado (KeystrokeData)
Returns:
Dict con score, verdict, is_human y breakdown de métricas
"""
# Validación de entrada
if len(events) < self.MIN_KEYSTROKES:
return {
"score": 0.0,
"verdict": "INCONCLUSIVE",
"is_human": False,
"reason": f"Se necesitan {self.MIN_KEYSTROKES} pulsaciones (recibidas: {len(events)})",
"breakdown": {}
}
# Extraer timestamps y calcular intervalos IKL
timestamps = np.array([float(e.timestamp) for e in events])
intervals = np.diff(timestamps)
intervals = intervals[intervals > 0] # Eliminar intervalos cero
if len(intervals) < 2:
return {
"score": 0.0,
"verdict": "INCONCLUSIVE",
"is_human": False,
"reason": "No hay suficientes intervalos válidos",
"breakdown": {}
}
# === 1. CÁLCULO DE MÉTRICAS ===
# 1.1 Shannon Entropy (normalizada a [0,1])
entropy = self._compute_shannon_entropy(intervals)
# 1.2 Coeficiente de Variación (CV) — detecta latencia uniforme
mean_ikl = np.mean(intervals)
cv = np.std(intervals) / mean_ikl if mean_ikl > 0 else 0.0
# 1.3 Velocidad media (CPM - caracteres por minuto)
total_chars = len(events)
total_time_ms = timestamps[-1] - timestamps[0] if len(timestamps) > 1 else 1
total_time_min = total_time_ms / 60000 # convertir a minutos
typing_speed = total_chars / total_time_min if total_time_min > 0 else 0
# 1.4 Correcciones y ráfagas
corrections = sum(1 for e in events if e.key == 'Backspace')
corr_rate = corrections / len(events)
burst_ratio = self._compute_burst_ratio(events)
# === 2. SCORING POR SEÑAL (Vertex Hardened Layer) ===
# 2.1 Entropy Score: Curva Gaussiana suave centrada en 0.70
s_entropy = float(np.exp(-((entropy - 0.70) ** 2) / 0.12))
s_entropy = max(0.05, min(1.0, s_entropy))
# 2.2 CV Score: Proporcional y continuo
s_cv = float(np.clip(cv / 0.50, 0.1, 1.0))
# 2.3 Mean IKL Score: Penalización suave solo en los extremos
if 80 <= mean_ikl <= 350:
s_mean = 1.0
else:
s_mean = float(np.clip(1.0 - abs(mean_ikl - 215) / 350, 0.2, 1.0))
# 2.4 Correction Score Dinámico (Mitigación de Falsos Positivos en Logins)
if corr_rate == 0:
if len(events) <= 20:
# Si la cadena es corta y no hay errores, evaluamos neutral (0.75) en vez de tratarlo como bot
s_corrections = 0.75
else:
# En textos largos, la ausencia total de errores sí es sospechosa (Bot / LLM)
s_corrections = 0.1
else:
# Si hay correcciones, aplicamos el mapeo continuo con el bonus de ráfaga de DeepSeek
s_corrections = 0.4 + (burst_ratio * 0.4) + (corr_rate * 2.0)
s_corrections = max(0.1, min(1.0, s_corrections))
# === 3. SCORE FINAL PONDERADO ===
final_score = (
s_entropy * self.WEIGHTS["entropy"] +
s_cv * self.WEIGHTS["cv"] +
s_mean * self.WEIGHTS["mean"] +
s_corrections * self.WEIGHTS["corrections"]
)
# Asegurar que el score esté en [0,1]
final_score = float(np.clip(final_score, 0.0, 1.0))
# === 4. VEREDICTO ===
if final_score >= self.THRESHOLD_HUMAN:
verdict = "HUMAN"
is_human = True
elif final_score >= self.THRESHOLD_SUSPECT:
verdict = "SUSPECT"
is_human = False
else:
verdict = "BOT"
is_human = False
# === 5. RESULTADO ===
return {
"score": round(final_score, 4),
"verdict": verdict,
"is_human": is_human,
"confidence": round(final_score, 4),
"breakdown": {
"entropy": round(entropy, 4),
"cv": round(cv, 4),
"mean_ikl_ms": round(mean_ikl, 2),
"typing_speed_cpm": round(typing_speed, 1),
"correction_rate": round(corr_rate, 4),
"burst_ratio": round(burst_ratio, 4),
"keystrokes": len(events),
"signals": {
"s_entropy": round(s_entropy, 4),
"s_cv": round(s_cv, 4),
"s_mean": round(s_mean, 4),
"s_corrections": round(s_corrections, 4)
}
}
}
def _compute_shannon_entropy(self, intervals: np.ndarray, bins: int = 20) -> float:
"""
Calcula entropía de Shannon normalizada a [0,1].
La entropía mide la "impredecibilidad" de los intervalos.
Humanos: 0.50-0.85 (variedad natural)
Bots: 0.00-0.35 (patrones uniformes o predecibles)
"""
if len(intervals) < 3:
return 0.0
# Asegurar bins válido
if bins < 2:
bins = 20
# Crear histograma
hist, bin_edges = np.histogram(intervals, bins=bins, density=True)
hist = hist + 1e-10 # evitar log(0)
# Calcular entropía raw
raw_entropy = -np.sum(hist * np.log2(hist))
# Normalizar por el ancho de los bins (solo si intervals.max() != intervals.min())
interval_range = intervals.max() - intervals.min()
if interval_range > 0 and bins > 0:
bin_width = interval_range / bins
raw_entropy *= bin_width
else:
bin_width = 1.0
# Normalizar a [0,1] dividiendo por la entropía máxima posible (log2(bins))
max_entropy = np.log2(bins)
if max_entropy > 0:
norm_entropy = raw_entropy / max_entropy
else:
norm_entropy = 0.0
return float(np.clip(norm_entropy, 0.0, 1.0))
def _compute_burst_ratio(self, events: List[KeystrokeData]) -> float:
"""
Calcula la proporción de correcciones que ocurren en ráfagas (3+ consecutivas).
Los humanos tienden a corregir en ráfagas (ej: borrar una palabra completa).
Los bots suelen tener correcciones aisladas o ninguna.
"""
if not events:
return 0.0
burst_count = 0
streak = 0
total_corrections = 0
for e in events:
if e.key == 'Backspace':
streak += 1
total_corrections += 1
else:
if streak >= 3:
burst_count += streak
streak = 0
# Check al final del loop
if streak >= 3:
burst_count += streak
if total_corrections == 0:
return 0.0
# Ratio de correcciones que están en ráfagas
return float(min(1.0, burst_count / total_corrections))
def get_health_status(self) -> Dict[str, Any]:
"""Retorna el estado de salud del motor DECI."""
return {
"engine": "DECI_Cognitive_Engine",
"version": "2.1.0",
"min_keystrokes": self.MIN_KEYSTROKES,
"thresholds": {
"human": self.THRESHOLD_HUMAN,
"suspect": self.THRESHOLD_SUSPECT
},
"weights": self.WEIGHTS,
"status": "operational"
}