deci-core-api / attack.py
Denisijcu's picture
Upload 2 files
4ad96d9 verified
from fastapi import APIRouter, HTTPException
from app.api.schemas.telemetry import TelemetryPayload
import numpy as np
router = APIRouter()
def _compute_entropy(intervals: np.ndarray, bins: int = 20) -> float:
"""
Shannon entropy normalizada β€” valor entre 0.0 y 1.0.
ANTES (bug de Gemini): entropy_score = np.std(intervals)
β†’ DevolvΓ­a ~120ms (el std en milisegundos) β†’ score de 129.74
β†’ Cualquier std > 10ms daba verdict HUMAN β†’ bypass trivial
AHORA: Shannon entropy del histograma de distribuciΓ³n, normalizada.
Humans: 0.50–0.85 (Goldilocks zone)
Bots constantes: ~0.0
Bots con ruido puro (demasiado uniforme): ~1.0
"""
if len(intervals) < 3:
return 0.0
hist, _ = np.histogram(intervals, bins=bins, density=True)
hist = hist + 1e-10 # evitar log(0)
raw_entropy = -np.sum(hist * np.log2(hist)) * (intervals.max() - intervals.min()) / bins
# Normalizar contra entropΓ­a mΓ‘xima teΓ³rica
max_entropy = np.log2(bins)
return float(np.clip(raw_entropy / max_entropy, 0.0, 1.0))
def _compute_cv(intervals: np.ndarray) -> float:
"""Coeficiente de variaciΓ³n β€” mide irregularidad orgΓ‘nica."""
mean = np.mean(intervals)
if mean == 0:
return 0.0
return float(np.clip(np.std(intervals) / mean, 0.0, 3.0))
def _correction_burst_ratio(events: list) -> float:
"""
QuΓ© fracciΓ³n de correcciones vienen en bursts de 3+.
Humanos corrigen en rΓ‘fagas (darse cuenta de una palabra entera mal).
Bots corrigen uniformemente o no corrigen.
"""
corrections = [getattr(e, 'is_correction', False) or getattr(e, 'key', '') == 'Backspace'
for e in events]
if not any(corrections):
return 0.0
burst = streak = 0
for c in corrections:
if c:
streak += 1
else:
if streak >= 3:
burst += streak
streak = 0
if streak >= 3:
burst += streak
total_corrections = sum(corrections)
return float(burst / max(total_corrections, 1))
def _score_ghosting(intervals: np.ndarray, events: list) -> tuple[float, dict]:
"""
Scoring multi-seΓ±al con pesos.
Retorna (score_final, signal_breakdown) donde score ∈ [0.0, 1.0].
"""
entropy = _compute_entropy(intervals)
cv = _compute_cv(intervals)
burst = _correction_burst_ratio(events)
mean_ms = float(np.mean(intervals))
# ── Signal 1: Entropy (Goldilocks zone) ──────────────────────────────────
# Demasiado bajo = bot regular. Demasiado alto = bot con ruido puro.
if 0.50 <= entropy <= 0.85:
s_entropy = 1.0
elif 0.35 <= entropy <= 0.95:
s_entropy = 0.5
else:
s_entropy = 0.05
# ── Signal 2: CV β€” variabilidad orgΓ‘nica ─────────────────────────────────
if cv > 0.50:
s_cv = 1.0
elif cv > 0.30:
s_cv = 0.6
elif cv > 0.15:
s_cv = 0.3
else:
s_cv = 0.05 # cv casi 0 = bot metronomo
# ── Signal 3: Mean IKL en rango humano ───────────────────────────────────
if 60 <= mean_ms <= 500:
s_mean = 1.0
elif 40 <= mean_ms <= 700:
s_mean = 0.5
else:
s_mean = 0.1
# ── Signal 4: Correction burst ratio ─────────────────────────────────────
# Zero corrections = penalizaciΓ³n. Bots no cometen errores orgΓ‘nicos.
total_events = len(events)
corrections = sum(1 for e in events
if getattr(e, 'is_correction', False)
or getattr(e, 'key', '') == 'Backspace')
corr_rate = corrections / max(total_events, 1)
if corr_rate == 0.0:
s_corrections = 0.05 # zero corrections β†’ bot tell
elif 0.02 <= corr_rate <= 0.15:
s_corrections = 0.5 + burst * 0.5 # rate OK + burst bonus
else:
s_corrections = 0.2 # rate fuera de rango
# ── Weighted final score ──────────────────────────────────────────────────
weights = {"entropy": 0.35, "cv": 0.30, "mean": 0.15, "corrections": 0.20}
raw = (s_entropy * weights["entropy"] +
s_cv * weights["cv"] +
s_mean * weights["mean"] +
s_corrections * weights["corrections"])
final_score = float(np.clip(raw, 0.0, 1.0))
breakdown = {
"entropy": round(entropy, 4),
"cv": round(cv, 4),
"mean_ikl_ms": round(mean_ms, 2),
"corr_rate": round(corr_rate, 4),
"burst_ratio": round(burst, 4),
"s_entropy": round(s_entropy, 3),
"s_cv": round(s_cv, 3),
"s_mean": round(s_mean, 3),
"s_corrections": round(s_corrections, 3),
}
return final_score, breakdown
@router.post("/simulate/ghosting")
async def simulate_ghosting(payload: TelemetryPayload):
"""
Ghosting attack detector β€” v2 (fixed).
FIX: score ahora es Shannon entropy normalizada ∈ [0.0, 1.0]
con scoring multi-seΓ±al (entropy + CV + IKL mean + corrections).
BUG anterior: entropy_score = np.std(intervals)
β†’ std en ms (~120) nunca bounded β†’ score 129.74 β†’ bypass trivial.
"""
events = payload.events
if not events or len(events) < 2:
raise HTTPException(status_code=400, detail="Minimum 2 events required")
# Extraer timestamps β€” compatible con ambos schemas (timestamp y timestamp_ms)
timestamps = []
for e in events:
ts = getattr(e, 'timestamp_ms', None) or getattr(e, 'timestamp', None)
if ts is not None:
timestamps.append(float(ts))
if len(timestamps) < 2:
raise HTTPException(status_code=400, detail="Could not extract timestamps from events")
intervals = np.diff(np.array(timestamps))
intervals = intervals[intervals > 0] # filtrar intervalos imposibles
if len(intervals) < 2:
raise HTTPException(status_code=400, detail="Not enough valid intervals")
# MΓ­nimo de keystrokes para anΓ‘lisis confiable
if len(events) < 15:
return {
"session_id": payload.session_id,
"entropy_score": 0.0,
"score": 0.0,
"verdict": "INCONCLUSIVE",
"reason": f"Need at least 15 keystrokes, got {len(events)}",
"signal_breakdown": {},
}
final_score, breakdown = _score_ghosting(intervals, events)
# Thresholds alineados con engine.py de Claude
if final_score >= 0.65:
verdict = "HUMAN"
elif final_score >= 0.40:
verdict = "SUSPECT"
else:
verdict = "BOT"
return {
"session_id": payload.session_id,
"entropy_score": breakdown["entropy"], # mantener campo para compatibilidad
"score": round(final_score, 4), # el score real normalizado
"verdict": verdict,
"signal_breakdown": breakdown,
}