Spaces:
Sleeping
Sleeping
| from fastapi import APIRouter, HTTPException | |
| from app.api.schemas.telemetry import TelemetryPayload | |
| import numpy as np | |
| router = APIRouter() | |
| def _compute_entropy(intervals: np.ndarray, bins: int = 20) -> float: | |
| """ | |
| Shannon entropy normalizada β valor entre 0.0 y 1.0. | |
| ANTES (bug de Gemini): entropy_score = np.std(intervals) | |
| β DevolvΓa ~120ms (el std en milisegundos) β score de 129.74 | |
| β Cualquier std > 10ms daba verdict HUMAN β bypass trivial | |
| AHORA: Shannon entropy del histograma de distribuciΓ³n, normalizada. | |
| Humans: 0.50β0.85 (Goldilocks zone) | |
| Bots constantes: ~0.0 | |
| Bots con ruido puro (demasiado uniforme): ~1.0 | |
| """ | |
| if len(intervals) < 3: | |
| return 0.0 | |
| hist, _ = np.histogram(intervals, bins=bins, density=True) | |
| hist = hist + 1e-10 # evitar log(0) | |
| raw_entropy = -np.sum(hist * np.log2(hist)) * (intervals.max() - intervals.min()) / bins | |
| # Normalizar contra entropΓa mΓ‘xima teΓ³rica | |
| max_entropy = np.log2(bins) | |
| return float(np.clip(raw_entropy / max_entropy, 0.0, 1.0)) | |
| def _compute_cv(intervals: np.ndarray) -> float: | |
| """Coeficiente de variaciΓ³n β mide irregularidad orgΓ‘nica.""" | |
| mean = np.mean(intervals) | |
| if mean == 0: | |
| return 0.0 | |
| return float(np.clip(np.std(intervals) / mean, 0.0, 3.0)) | |
| def _correction_burst_ratio(events: list) -> float: | |
| """ | |
| QuΓ© fracciΓ³n de correcciones vienen en bursts de 3+. | |
| Humanos corrigen en rΓ‘fagas (darse cuenta de una palabra entera mal). | |
| Bots corrigen uniformemente o no corrigen. | |
| """ | |
| corrections = [getattr(e, 'is_correction', False) or getattr(e, 'key', '') == 'Backspace' | |
| for e in events] | |
| if not any(corrections): | |
| return 0.0 | |
| burst = streak = 0 | |
| for c in corrections: | |
| if c: | |
| streak += 1 | |
| else: | |
| if streak >= 3: | |
| burst += streak | |
| streak = 0 | |
| if streak >= 3: | |
| burst += streak | |
| total_corrections = sum(corrections) | |
| return float(burst / max(total_corrections, 1)) | |
| def _score_ghosting(intervals: np.ndarray, events: list) -> tuple[float, dict]: | |
| """ | |
| Scoring multi-seΓ±al con pesos. | |
| Retorna (score_final, signal_breakdown) donde score β [0.0, 1.0]. | |
| """ | |
| entropy = _compute_entropy(intervals) | |
| cv = _compute_cv(intervals) | |
| burst = _correction_burst_ratio(events) | |
| mean_ms = float(np.mean(intervals)) | |
| # ββ Signal 1: Entropy (Goldilocks zone) ββββββββββββββββββββββββββββββββββ | |
| # Demasiado bajo = bot regular. Demasiado alto = bot con ruido puro. | |
| if 0.50 <= entropy <= 0.85: | |
| s_entropy = 1.0 | |
| elif 0.35 <= entropy <= 0.95: | |
| s_entropy = 0.5 | |
| else: | |
| s_entropy = 0.05 | |
| # ββ Signal 2: CV β variabilidad orgΓ‘nica βββββββββββββββββββββββββββββββββ | |
| if cv > 0.50: | |
| s_cv = 1.0 | |
| elif cv > 0.30: | |
| s_cv = 0.6 | |
| elif cv > 0.15: | |
| s_cv = 0.3 | |
| else: | |
| s_cv = 0.05 # cv casi 0 = bot metronomo | |
| # ββ Signal 3: Mean IKL en rango humano βββββββββββββββββββββββββββββββββββ | |
| if 60 <= mean_ms <= 500: | |
| s_mean = 1.0 | |
| elif 40 <= mean_ms <= 700: | |
| s_mean = 0.5 | |
| else: | |
| s_mean = 0.1 | |
| # ββ Signal 4: Correction burst ratio βββββββββββββββββββββββββββββββββββββ | |
| # Zero corrections = penalizaciΓ³n. Bots no cometen errores orgΓ‘nicos. | |
| total_events = len(events) | |
| corrections = sum(1 for e in events | |
| if getattr(e, 'is_correction', False) | |
| or getattr(e, 'key', '') == 'Backspace') | |
| corr_rate = corrections / max(total_events, 1) | |
| if corr_rate == 0.0: | |
| s_corrections = 0.05 # zero corrections β bot tell | |
| elif 0.02 <= corr_rate <= 0.15: | |
| s_corrections = 0.5 + burst * 0.5 # rate OK + burst bonus | |
| else: | |
| s_corrections = 0.2 # rate fuera de rango | |
| # ββ Weighted final score ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| weights = {"entropy": 0.35, "cv": 0.30, "mean": 0.15, "corrections": 0.20} | |
| raw = (s_entropy * weights["entropy"] + | |
| s_cv * weights["cv"] + | |
| s_mean * weights["mean"] + | |
| s_corrections * weights["corrections"]) | |
| final_score = float(np.clip(raw, 0.0, 1.0)) | |
| breakdown = { | |
| "entropy": round(entropy, 4), | |
| "cv": round(cv, 4), | |
| "mean_ikl_ms": round(mean_ms, 2), | |
| "corr_rate": round(corr_rate, 4), | |
| "burst_ratio": round(burst, 4), | |
| "s_entropy": round(s_entropy, 3), | |
| "s_cv": round(s_cv, 3), | |
| "s_mean": round(s_mean, 3), | |
| "s_corrections": round(s_corrections, 3), | |
| } | |
| return final_score, breakdown | |
| async def simulate_ghosting(payload: TelemetryPayload): | |
| """ | |
| Ghosting attack detector β v2 (fixed). | |
| FIX: score ahora es Shannon entropy normalizada β [0.0, 1.0] | |
| con scoring multi-seΓ±al (entropy + CV + IKL mean + corrections). | |
| BUG anterior: entropy_score = np.std(intervals) | |
| β std en ms (~120) nunca bounded β score 129.74 β bypass trivial. | |
| """ | |
| events = payload.events | |
| if not events or len(events) < 2: | |
| raise HTTPException(status_code=400, detail="Minimum 2 events required") | |
| # Extraer timestamps β compatible con ambos schemas (timestamp y timestamp_ms) | |
| timestamps = [] | |
| for e in events: | |
| ts = getattr(e, 'timestamp_ms', None) or getattr(e, 'timestamp', None) | |
| if ts is not None: | |
| timestamps.append(float(ts)) | |
| if len(timestamps) < 2: | |
| raise HTTPException(status_code=400, detail="Could not extract timestamps from events") | |
| intervals = np.diff(np.array(timestamps)) | |
| intervals = intervals[intervals > 0] # filtrar intervalos imposibles | |
| if len(intervals) < 2: | |
| raise HTTPException(status_code=400, detail="Not enough valid intervals") | |
| # MΓnimo de keystrokes para anΓ‘lisis confiable | |
| if len(events) < 15: | |
| return { | |
| "session_id": payload.session_id, | |
| "entropy_score": 0.0, | |
| "score": 0.0, | |
| "verdict": "INCONCLUSIVE", | |
| "reason": f"Need at least 15 keystrokes, got {len(events)}", | |
| "signal_breakdown": {}, | |
| } | |
| final_score, breakdown = _score_ghosting(intervals, events) | |
| # Thresholds alineados con engine.py de Claude | |
| if final_score >= 0.65: | |
| verdict = "HUMAN" | |
| elif final_score >= 0.40: | |
| verdict = "SUSPECT" | |
| else: | |
| verdict = "BOT" | |
| return { | |
| "session_id": payload.session_id, | |
| "entropy_score": breakdown["entropy"], # mantener campo para compatibilidad | |
| "score": round(final_score, 4), # el score real normalizado | |
| "verdict": verdict, | |
| "signal_breakdown": breakdown, | |
| } |