from fastapi import APIRouter, HTTPException from app.api.schemas.telemetry import TelemetryPayload import numpy as np router = APIRouter() def _compute_entropy(intervals: np.ndarray, bins: int = 20) -> float: """ Shannon entropy normalizada — valor entre 0.0 y 1.0. ANTES (bug de Gemini): entropy_score = np.std(intervals) → Devolvía ~120ms (el std en milisegundos) → score de 129.74 → Cualquier std > 10ms daba verdict HUMAN → bypass trivial AHORA: Shannon entropy del histograma de distribución, normalizada. Humans: 0.50–0.85 (Goldilocks zone) Bots constantes: ~0.0 Bots con ruido puro (demasiado uniforme): ~1.0 """ if len(intervals) < 3: return 0.0 hist, _ = np.histogram(intervals, bins=bins, density=True) hist = hist + 1e-10 # evitar log(0) raw_entropy = -np.sum(hist * np.log2(hist)) * (intervals.max() - intervals.min()) / bins # Normalizar contra entropía máxima teórica max_entropy = np.log2(bins) return float(np.clip(raw_entropy / max_entropy, 0.0, 1.0)) def _compute_cv(intervals: np.ndarray) -> float: """Coeficiente de variación — mide irregularidad orgánica.""" mean = np.mean(intervals) if mean == 0: return 0.0 return float(np.clip(np.std(intervals) / mean, 0.0, 3.0)) def _correction_burst_ratio(events: list) -> float: """ Qué fracción de correcciones vienen en bursts de 3+. Humanos corrigen en ráfagas (darse cuenta de una palabra entera mal). Bots corrigen uniformemente o no corrigen. """ corrections = [getattr(e, 'is_correction', False) or getattr(e, 'key', '') == 'Backspace' for e in events] if not any(corrections): return 0.0 burst = streak = 0 for c in corrections: if c: streak += 1 else: if streak >= 3: burst += streak streak = 0 if streak >= 3: burst += streak total_corrections = sum(corrections) return float(burst / max(total_corrections, 1)) def _score_ghosting(intervals: np.ndarray, events: list) -> tuple[float, dict]: """ Scoring multi-señal con pesos. Retorna (score_final, signal_breakdown) donde score ∈ [0.0, 1.0]. """ entropy = _compute_entropy(intervals) cv = _compute_cv(intervals) burst = _correction_burst_ratio(events) mean_ms = float(np.mean(intervals)) # ── Signal 1: Entropy (Goldilocks zone) ────────────────────────────────── # Demasiado bajo = bot regular. Demasiado alto = bot con ruido puro. if 0.50 <= entropy <= 0.85: s_entropy = 1.0 elif 0.35 <= entropy <= 0.95: s_entropy = 0.5 else: s_entropy = 0.05 # ── Signal 2: CV — variabilidad orgánica ───────────────────────────────── if cv > 0.50: s_cv = 1.0 elif cv > 0.30: s_cv = 0.6 elif cv > 0.15: s_cv = 0.3 else: s_cv = 0.05 # cv casi 0 = bot metronomo # ── Signal 3: Mean IKL en rango humano ─────────────────────────────────── if 60 <= mean_ms <= 500: s_mean = 1.0 elif 40 <= mean_ms <= 700: s_mean = 0.5 else: s_mean = 0.1 # ── Signal 4: Correction burst ratio ───────────────────────────────────── # Zero corrections = penalización. Bots no cometen errores orgánicos. total_events = len(events) corrections = sum(1 for e in events if getattr(e, 'is_correction', False) or getattr(e, 'key', '') == 'Backspace') corr_rate = corrections / max(total_events, 1) if corr_rate == 0.0: s_corrections = 0.05 # zero corrections → bot tell elif 0.02 <= corr_rate <= 0.15: s_corrections = 0.5 + burst * 0.5 # rate OK + burst bonus else: s_corrections = 0.2 # rate fuera de rango # ── Weighted final score ────────────────────────────────────────────────── weights = {"entropy": 0.35, "cv": 0.30, "mean": 0.15, "corrections": 0.20} raw = (s_entropy * weights["entropy"] + s_cv * weights["cv"] + s_mean * weights["mean"] + s_corrections * weights["corrections"]) final_score = float(np.clip(raw, 0.0, 1.0)) breakdown = { "entropy": round(entropy, 4), "cv": round(cv, 4), "mean_ikl_ms": round(mean_ms, 2), "corr_rate": round(corr_rate, 4), "burst_ratio": round(burst, 4), "s_entropy": round(s_entropy, 3), "s_cv": round(s_cv, 3), "s_mean": round(s_mean, 3), "s_corrections": round(s_corrections, 3), } return final_score, breakdown @router.post("/simulate/ghosting") async def simulate_ghosting(payload: TelemetryPayload): """ Ghosting attack detector — v2 (fixed). FIX: score ahora es Shannon entropy normalizada ∈ [0.0, 1.0] con scoring multi-señal (entropy + CV + IKL mean + corrections). BUG anterior: entropy_score = np.std(intervals) → std en ms (~120) nunca bounded → score 129.74 → bypass trivial. """ events = payload.events if not events or len(events) < 2: raise HTTPException(status_code=400, detail="Minimum 2 events required") # Extraer timestamps — compatible con ambos schemas (timestamp y timestamp_ms) timestamps = [] for e in events: ts = getattr(e, 'timestamp_ms', None) or getattr(e, 'timestamp', None) if ts is not None: timestamps.append(float(ts)) if len(timestamps) < 2: raise HTTPException(status_code=400, detail="Could not extract timestamps from events") intervals = np.diff(np.array(timestamps)) intervals = intervals[intervals > 0] # filtrar intervalos imposibles if len(intervals) < 2: raise HTTPException(status_code=400, detail="Not enough valid intervals") # Mínimo de keystrokes para análisis confiable if len(events) < 15: return { "session_id": payload.session_id, "entropy_score": 0.0, "score": 0.0, "verdict": "INCONCLUSIVE", "reason": f"Need at least 15 keystrokes, got {len(events)}", "signal_breakdown": {}, } final_score, breakdown = _score_ghosting(intervals, events) # Thresholds alineados con engine.py de Claude if final_score >= 0.65: verdict = "HUMAN" elif final_score >= 0.40: verdict = "SUSPECT" else: verdict = "BOT" return { "session_id": payload.session_id, "entropy_score": breakdown["entropy"], # mantener campo para compatibilidad "score": round(final_score, 4), # el score real normalizado "verdict": verdict, "signal_breakdown": breakdown, }