File size: 7,461 Bytes
4ad96d9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
from fastapi import APIRouter, HTTPException
from app.api.schemas.telemetry import TelemetryPayload
import numpy as np

router = APIRouter()


def _compute_entropy(intervals: np.ndarray, bins: int = 20) -> float:
    """

    Shannon entropy normalizada β€” valor entre 0.0 y 1.0.

    

    ANTES (bug de Gemini): entropy_score = np.std(intervals)

    β†’ DevolvΓ­a ~120ms (el std en milisegundos) β†’ score de 129.74

    β†’ Cualquier std > 10ms daba verdict HUMAN β†’ bypass trivial

    

    AHORA: Shannon entropy del histograma de distribuciΓ³n, normalizada.

    Humans: 0.50–0.85 (Goldilocks zone)

    Bots constantes: ~0.0

    Bots con ruido puro (demasiado uniforme): ~1.0

    """
    if len(intervals) < 3:
        return 0.0
    hist, _ = np.histogram(intervals, bins=bins, density=True)
    hist = hist + 1e-10  # evitar log(0)
    raw_entropy = -np.sum(hist * np.log2(hist)) * (intervals.max() - intervals.min()) / bins
    # Normalizar contra entropΓ­a mΓ‘xima teΓ³rica
    max_entropy = np.log2(bins)
    return float(np.clip(raw_entropy / max_entropy, 0.0, 1.0))


def _compute_cv(intervals: np.ndarray) -> float:
    """Coeficiente de variaciΓ³n β€” mide irregularidad orgΓ‘nica."""
    mean = np.mean(intervals)
    if mean == 0:
        return 0.0
    return float(np.clip(np.std(intervals) / mean, 0.0, 3.0))


def _correction_burst_ratio(events: list) -> float:
    """

    QuΓ© fracciΓ³n de correcciones vienen en bursts de 3+.

    Humanos corrigen en rΓ‘fagas (darse cuenta de una palabra entera mal).

    Bots corrigen uniformemente o no corrigen.

    """
    corrections = [getattr(e, 'is_correction', False) or getattr(e, 'key', '') == 'Backspace'
                   for e in events]
    if not any(corrections):
        return 0.0

    burst = streak = 0
    for c in corrections:
        if c:
            streak += 1
        else:
            if streak >= 3:
                burst += streak
            streak = 0
    if streak >= 3:
        burst += streak

    total_corrections = sum(corrections)
    return float(burst / max(total_corrections, 1))


def _score_ghosting(intervals: np.ndarray, events: list) -> tuple[float, dict]:
    """

    Scoring multi-seΓ±al con pesos.

    Retorna (score_final, signal_breakdown) donde score ∈ [0.0, 1.0].

    """
    entropy = _compute_entropy(intervals)
    cv      = _compute_cv(intervals)
    burst   = _correction_burst_ratio(events)
    mean_ms = float(np.mean(intervals))

    # ── Signal 1: Entropy (Goldilocks zone) ──────────────────────────────────
    # Demasiado bajo = bot regular. Demasiado alto = bot con ruido puro.
    if 0.50 <= entropy <= 0.85:
        s_entropy = 1.0
    elif 0.35 <= entropy <= 0.95:
        s_entropy = 0.5
    else:
        s_entropy = 0.05

    # ── Signal 2: CV β€” variabilidad orgΓ‘nica ─────────────────────────────────
    if cv > 0.50:
        s_cv = 1.0
    elif cv > 0.30:
        s_cv = 0.6
    elif cv > 0.15:
        s_cv = 0.3
    else:
        s_cv = 0.05   # cv casi 0 = bot metronomo

    # ── Signal 3: Mean IKL en rango humano ───────────────────────────────────
    if 60 <= mean_ms <= 500:
        s_mean = 1.0
    elif 40 <= mean_ms <= 700:
        s_mean = 0.5
    else:
        s_mean = 0.1

    # ── Signal 4: Correction burst ratio ─────────────────────────────────────
    # Zero corrections = penalizaciΓ³n. Bots no cometen errores orgΓ‘nicos.
    total_events = len(events)
    corrections  = sum(1 for e in events
                       if getattr(e, 'is_correction', False)
                       or getattr(e, 'key', '') == 'Backspace')
    corr_rate = corrections / max(total_events, 1)

    if corr_rate == 0.0:
        s_corrections = 0.05   # zero corrections β†’ bot tell
    elif 0.02 <= corr_rate <= 0.15:
        s_corrections = 0.5 + burst * 0.5   # rate OK + burst bonus
    else:
        s_corrections = 0.2    # rate fuera de rango

    # ── Weighted final score ──────────────────────────────────────────────────
    weights = {"entropy": 0.35, "cv": 0.30, "mean": 0.15, "corrections": 0.20}
    raw = (s_entropy    * weights["entropy"] +
           s_cv         * weights["cv"]      +
           s_mean       * weights["mean"]    +
           s_corrections * weights["corrections"])

    final_score = float(np.clip(raw, 0.0, 1.0))

    breakdown = {
        "entropy":     round(entropy, 4),
        "cv":          round(cv, 4),
        "mean_ikl_ms": round(mean_ms, 2),
        "corr_rate":   round(corr_rate, 4),
        "burst_ratio": round(burst, 4),
        "s_entropy":   round(s_entropy, 3),
        "s_cv":        round(s_cv, 3),
        "s_mean":      round(s_mean, 3),
        "s_corrections": round(s_corrections, 3),
    }
    return final_score, breakdown


@router.post("/simulate/ghosting")
async def simulate_ghosting(payload: TelemetryPayload):
    """

    Ghosting attack detector β€” v2 (fixed).



    FIX: score ahora es Shannon entropy normalizada ∈ [0.0, 1.0]

         con scoring multi-seΓ±al (entropy + CV + IKL mean + corrections).



    BUG anterior: entropy_score = np.std(intervals)

         β†’ std en ms (~120) nunca bounded β†’ score 129.74 β†’ bypass trivial.

    """
    events = payload.events
    if not events or len(events) < 2:
        raise HTTPException(status_code=400, detail="Minimum 2 events required")

    # Extraer timestamps β€” compatible con ambos schemas (timestamp y timestamp_ms)
    timestamps = []
    for e in events:
        ts = getattr(e, 'timestamp_ms', None) or getattr(e, 'timestamp', None)
        if ts is not None:
            timestamps.append(float(ts))

    if len(timestamps) < 2:
        raise HTTPException(status_code=400, detail="Could not extract timestamps from events")

    intervals = np.diff(np.array(timestamps))
    intervals = intervals[intervals > 0]  # filtrar intervalos imposibles

    if len(intervals) < 2:
        raise HTTPException(status_code=400, detail="Not enough valid intervals")

    # MΓ­nimo de keystrokes para anΓ‘lisis confiable
    if len(events) < 15:
        return {
            "session_id": payload.session_id,
            "entropy_score": 0.0,
            "score": 0.0,
            "verdict": "INCONCLUSIVE",
            "reason": f"Need at least 15 keystrokes, got {len(events)}",
            "signal_breakdown": {},
        }

    final_score, breakdown = _score_ghosting(intervals, events)

    # Thresholds alineados con engine.py de Claude
    if final_score >= 0.65:
        verdict = "HUMAN"
    elif final_score >= 0.40:
        verdict = "SUSPECT"
    else:
        verdict = "BOT"

    return {
        "session_id":      payload.session_id,
        "entropy_score":   breakdown["entropy"],   # mantener campo para compatibilidad
        "score":           round(final_score, 4),  # el score real normalizado
        "verdict":         verdict,
        "signal_breakdown": breakdown,
    }