Spaces:
Sleeping
Sleeping
File size: 7,461 Bytes
4ad96d9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 | from fastapi import APIRouter, HTTPException
from app.api.schemas.telemetry import TelemetryPayload
import numpy as np
router = APIRouter()
def _compute_entropy(intervals: np.ndarray, bins: int = 20) -> float:
"""
Shannon entropy normalizada β valor entre 0.0 y 1.0.
ANTES (bug de Gemini): entropy_score = np.std(intervals)
β DevolvΓa ~120ms (el std en milisegundos) β score de 129.74
β Cualquier std > 10ms daba verdict HUMAN β bypass trivial
AHORA: Shannon entropy del histograma de distribuciΓ³n, normalizada.
Humans: 0.50β0.85 (Goldilocks zone)
Bots constantes: ~0.0
Bots con ruido puro (demasiado uniforme): ~1.0
"""
if len(intervals) < 3:
return 0.0
hist, _ = np.histogram(intervals, bins=bins, density=True)
hist = hist + 1e-10 # evitar log(0)
raw_entropy = -np.sum(hist * np.log2(hist)) * (intervals.max() - intervals.min()) / bins
# Normalizar contra entropΓa mΓ‘xima teΓ³rica
max_entropy = np.log2(bins)
return float(np.clip(raw_entropy / max_entropy, 0.0, 1.0))
def _compute_cv(intervals: np.ndarray) -> float:
"""Coeficiente de variaciΓ³n β mide irregularidad orgΓ‘nica."""
mean = np.mean(intervals)
if mean == 0:
return 0.0
return float(np.clip(np.std(intervals) / mean, 0.0, 3.0))
def _correction_burst_ratio(events: list) -> float:
"""
QuΓ© fracciΓ³n de correcciones vienen en bursts de 3+.
Humanos corrigen en rΓ‘fagas (darse cuenta de una palabra entera mal).
Bots corrigen uniformemente o no corrigen.
"""
corrections = [getattr(e, 'is_correction', False) or getattr(e, 'key', '') == 'Backspace'
for e in events]
if not any(corrections):
return 0.0
burst = streak = 0
for c in corrections:
if c:
streak += 1
else:
if streak >= 3:
burst += streak
streak = 0
if streak >= 3:
burst += streak
total_corrections = sum(corrections)
return float(burst / max(total_corrections, 1))
def _score_ghosting(intervals: np.ndarray, events: list) -> tuple[float, dict]:
"""
Scoring multi-seΓ±al con pesos.
Retorna (score_final, signal_breakdown) donde score β [0.0, 1.0].
"""
entropy = _compute_entropy(intervals)
cv = _compute_cv(intervals)
burst = _correction_burst_ratio(events)
mean_ms = float(np.mean(intervals))
# ββ Signal 1: Entropy (Goldilocks zone) ββββββββββββββββββββββββββββββββββ
# Demasiado bajo = bot regular. Demasiado alto = bot con ruido puro.
if 0.50 <= entropy <= 0.85:
s_entropy = 1.0
elif 0.35 <= entropy <= 0.95:
s_entropy = 0.5
else:
s_entropy = 0.05
# ββ Signal 2: CV β variabilidad orgΓ‘nica βββββββββββββββββββββββββββββββββ
if cv > 0.50:
s_cv = 1.0
elif cv > 0.30:
s_cv = 0.6
elif cv > 0.15:
s_cv = 0.3
else:
s_cv = 0.05 # cv casi 0 = bot metronomo
# ββ Signal 3: Mean IKL en rango humano βββββββββββββββββββββββββββββββββββ
if 60 <= mean_ms <= 500:
s_mean = 1.0
elif 40 <= mean_ms <= 700:
s_mean = 0.5
else:
s_mean = 0.1
# ββ Signal 4: Correction burst ratio βββββββββββββββββββββββββββββββββββββ
# Zero corrections = penalizaciΓ³n. Bots no cometen errores orgΓ‘nicos.
total_events = len(events)
corrections = sum(1 for e in events
if getattr(e, 'is_correction', False)
or getattr(e, 'key', '') == 'Backspace')
corr_rate = corrections / max(total_events, 1)
if corr_rate == 0.0:
s_corrections = 0.05 # zero corrections β bot tell
elif 0.02 <= corr_rate <= 0.15:
s_corrections = 0.5 + burst * 0.5 # rate OK + burst bonus
else:
s_corrections = 0.2 # rate fuera de rango
# ββ Weighted final score ββββββββββββββββββββββββββββββββββββββββββββββββββ
weights = {"entropy": 0.35, "cv": 0.30, "mean": 0.15, "corrections": 0.20}
raw = (s_entropy * weights["entropy"] +
s_cv * weights["cv"] +
s_mean * weights["mean"] +
s_corrections * weights["corrections"])
final_score = float(np.clip(raw, 0.0, 1.0))
breakdown = {
"entropy": round(entropy, 4),
"cv": round(cv, 4),
"mean_ikl_ms": round(mean_ms, 2),
"corr_rate": round(corr_rate, 4),
"burst_ratio": round(burst, 4),
"s_entropy": round(s_entropy, 3),
"s_cv": round(s_cv, 3),
"s_mean": round(s_mean, 3),
"s_corrections": round(s_corrections, 3),
}
return final_score, breakdown
@router.post("/simulate/ghosting")
async def simulate_ghosting(payload: TelemetryPayload):
"""
Ghosting attack detector β v2 (fixed).
FIX: score ahora es Shannon entropy normalizada β [0.0, 1.0]
con scoring multi-seΓ±al (entropy + CV + IKL mean + corrections).
BUG anterior: entropy_score = np.std(intervals)
β std en ms (~120) nunca bounded β score 129.74 β bypass trivial.
"""
events = payload.events
if not events or len(events) < 2:
raise HTTPException(status_code=400, detail="Minimum 2 events required")
# Extraer timestamps β compatible con ambos schemas (timestamp y timestamp_ms)
timestamps = []
for e in events:
ts = getattr(e, 'timestamp_ms', None) or getattr(e, 'timestamp', None)
if ts is not None:
timestamps.append(float(ts))
if len(timestamps) < 2:
raise HTTPException(status_code=400, detail="Could not extract timestamps from events")
intervals = np.diff(np.array(timestamps))
intervals = intervals[intervals > 0] # filtrar intervalos imposibles
if len(intervals) < 2:
raise HTTPException(status_code=400, detail="Not enough valid intervals")
# MΓnimo de keystrokes para anΓ‘lisis confiable
if len(events) < 15:
return {
"session_id": payload.session_id,
"entropy_score": 0.0,
"score": 0.0,
"verdict": "INCONCLUSIVE",
"reason": f"Need at least 15 keystrokes, got {len(events)}",
"signal_breakdown": {},
}
final_score, breakdown = _score_ghosting(intervals, events)
# Thresholds alineados con engine.py de Claude
if final_score >= 0.65:
verdict = "HUMAN"
elif final_score >= 0.40:
verdict = "SUSPECT"
else:
verdict = "BOT"
return {
"session_id": payload.session_id,
"entropy_score": breakdown["entropy"], # mantener campo para compatibilidad
"score": round(final_score, 4), # el score real normalizado
"verdict": verdict,
"signal_breakdown": breakdown,
} |