auth / adaptiveauth /risk /session_intelligence.py
Piyush1225's picture
ADD : Docer
5dc261b
"""
Session Intelligence Engine
===========================
Implements 8 advanced security capabilities beyond standard login-time auth:
1. Continuous Verification – verify context throughout session lifecycle
2. Behavioral Intelligence – typing rhythm, mouse paths, scroll patterns
3. Dynamic Trust Score – evolving 0-100 score; decays + recovers
4. Low-Friction Micro-Challenges – triggered only when trust drops
5. Explainable Risk – factor contributions, confidence, audit trail
6. AI-Powered Anomaly Scoring – isolation-forest-inspired statistical model
7. Impossible Travel – haversine geo-velocity detection
8. Privacy-First Design – client computes signals; server receives only score
"""
import math
import hashlib
import random
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Any, Tuple
from sqlalchemy.orm import Session
from ..models import (
User, UserSession, LoginAttempt, AnomalyPattern, RiskLevel,
SessionTrustEvent, BehaviorSignalRecord,
)
# ── City Geo-Coordinates ────────────────────────────────────────────────────
CITY_COORDS: Dict[str, Tuple[float, float]] = {
"new york": (40.7128, -74.0060),
"moscow": (55.7558, 37.6173),
"beijing": (39.9042, 116.4074),
"london": (51.5074, -0.1278),
"tokyo": (35.6762, 139.6503),
"sydney": (-33.8688, 151.2093),
"paris": (48.8566, 2.3522),
"dubai": (25.2048, 55.2708),
"singapore": ( 1.3521, 103.8198),
"toronto": (43.6532, -79.3832),
"chicago": (41.8781, -87.6298),
"miami": (25.7617, -80.1918),
"berlin": (52.5200, 13.4050),
"mumbai": (19.0760, 72.8777),
"bangkok": (13.7563, 100.5018),
"cairo": (30.0444, 31.2357),
"johannesburg": (-26.2041, 28.0473),
"sao paulo": (-23.5505, -46.6333),
"buenos aires": (-34.6037, -58.3816),
"seattle": (47.6062, -122.3321),
"los angeles": (34.0522, -118.2437),
"mexico city": (19.4326, -99.1332),
}
# ── Behavior Baseline (mean, std) for each client-collected metric ──────────
BEHAVIOR_BASELINE: Dict[str, Tuple[float, float]] = {
"typing_entropy": (0.70, 0.15), # 1.0 = perfectly human-like rhythm
"mouse_linearity": (0.62, 0.18), # 1.0 = natural curved paths
"scroll_variance": (0.48, 0.22), # moderate = organic human scrolling
}
# ── Trust score baselines by security level ─────────────────────────────────
TRUST_BASELINE: Dict[int, float] = {0: 95.0, 1: 80.0, 2: 60.0, 3: 35.0, 4: 10.0}
# ── In-memory caches ─────────────────────────────────────────────────────────
# {session_id: {"score": float, "last_update": datetime}}
_trust_cache: Dict[int, Dict[str, Any]] = {}
# {challenge_id: {"answer_hash": str, "expires": datetime, "attempts": int}}
_micro_challenges: Dict[str, Dict[str, Any]] = {}
# ═══════════════════════════════════════════════════════════════════════════
# Helper math
# ═══════════════════════════════════════════════════════════════════════════
def haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Great-circle distance in km between two geographic points."""
R = 6371.0
d_lat = math.radians(lat2 - lat1)
d_lon = math.radians(lon2 - lon1)
a = (math.sin(d_lat / 2) ** 2
+ math.cos(math.radians(lat1)) * math.cos(math.radians(lat2))
* math.sin(d_lon / 2) ** 2)
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def _z_anomaly(value: float, mean: float, std: float) -> float:
"""Convert value to 0–100 anomaly score using a Z-score transform."""
if std < 1e-6:
return 0.0 if abs(value - mean) < 0.01 else 50.0
z = abs(value - mean) / std
return min(100.0, (z / 3.5) ** 0.75 * 100.0)
def _resolve_coords(
city: Optional[str],
lat: Optional[float],
lon: Optional[float],
) -> Tuple[Optional[float], Optional[float]]:
if lat is not None and lon is not None:
return lat, lon
if city:
return CITY_COORDS.get(city.lower().strip(), (None, None))
return None, None
def _classify_anomaly(score: float) -> str:
if score >= 80: return "critical"
if score >= 60: return "high"
if score >= 40: return "medium"
if score >= 20: return "low"
return "normal"
def _build_summary(anomalous_factors: List[str], security_level: int) -> str:
if not anomalous_factors:
return "All signals normal. Trusted session context."
if len(anomalous_factors) == 1:
return f"{anomalous_factors[0]} flagged. Minimal elevated risk."
return (
f"{len(anomalous_factors)} risk signals triggered: "
+ ", ".join(anomalous_factors[:3])
+ (f" and {len(anomalous_factors) - 3} more." if len(anomalous_factors) > 3 else ".")
)
# ═══════════════════════════════════════════════════════════════════════════
# Feature 1 & 3 – Continuous Verification + Dynamic Trust Score
# ═══════════════════════════════════════════════════════════════════════════
class TrustScoreManager:
"""
Maintains an evolving Trust Score (0–100) throughout the session lifecycle.
Decay rules:
β€’ Time-based: –0.25 pts/min of inactivity
β€’ Behavior anomaly: –4 to –24 depending on severity
β€’ Context change (IP/device): –20
β€’ Impossible travel: –50
Recovery rules:
β€’ Good behavior signal: +2 to +6
β€’ Micro-challenge pass: +20
"""
DECAY_RATE = 0.25 # pts / minute inactive
def __init__(self, db: Session):
self.db = db
def get(self, session: UserSession) -> float:
"""Return current trust score, hydrating from DB if not cached."""
sid = session.id
if sid in _trust_cache:
return _trust_cache[sid]["score"]
last = (
self.db.query(SessionTrustEvent)
.filter(SessionTrustEvent.session_id == sid)
.order_by(SessionTrustEvent.created_at.desc())
.first()
)
score = float(last.trust_score) if last else TRUST_BASELINE.get(
{"low": 0, "medium": 1, "high": 2, "critical": 3}.get(
session.current_risk_level or "low", 1
), 80.0
)
_trust_cache[sid] = {"score": score, "last_update": datetime.utcnow()}
return score
def apply_decay(self, session: UserSession) -> Tuple[float, float]:
"""Apply time-based trust decay. Returns (new_score, delta)."""
sid = session.id
cache = _trust_cache.get(sid)
if not cache:
return self.get(session), 0.0
elapsed_minutes = (datetime.utcnow() - cache["last_update"]).total_seconds() / 60
delta = -self.DECAY_RATE * elapsed_minutes
new_score = max(0.0, cache["score"] + delta)
_trust_cache[sid] = {"score": new_score, "last_update": datetime.utcnow()}
return new_score, delta
def update(
self,
session: UserSession,
delta: float,
event_type: str,
reason: str,
signals: Optional[Dict] = None,
) -> float:
"""Apply delta, clamp to [0, 100], persist to DB, update cache."""
current = self.get(session)
new_score = max(0.0, min(100.0, current + delta))
_trust_cache[session.id] = {"score": new_score, "last_update": datetime.utcnow()}
self.db.add(SessionTrustEvent(
session_id=session.id,
user_id=session.user_id,
trust_score=new_score,
delta=delta,
event_type=event_type,
reason=reason,
signals=signals or {},
))
self.db.commit()
return new_score
def get_history(self, session_id: int, limit: int = 40) -> List[Dict]:
events = (
self.db.query(SessionTrustEvent)
.filter(SessionTrustEvent.session_id == session_id)
.order_by(SessionTrustEvent.created_at.desc())
.limit(limit)
.all()
)
return [
{
"score": e.trust_score,
"delta": e.delta,
"event_type": e.event_type,
"reason": e.reason,
"at": e.created_at.isoformat(),
}
for e in reversed(events)
]
@staticmethod
def label(score: float) -> str:
if score >= 80: return "trusted"
if score >= 60: return "watchful"
if score >= 40: return "elevated"
if score >= 20: return "high_risk"
return "critical"
@staticmethod
def color(score: float) -> str:
if score >= 80: return "#22c55e"
if score >= 60: return "#84cc16"
if score >= 40: return "#f59e0b"
if score >= 20: return "#f97316"
return "#ef4444"
# ═══════════════════════════════════════════════════════════════════════════
# Feature 2 & 8 – Behavioral Intelligence + Privacy-First Design
# ═══════════════════════════════════════════════════════════════════════════
class BehaviorSignalProcessor:
"""
Processes privacy-first behavior signals submitted by the client.
The client JS collects raw events (keydown timings, mouse paths,
scroll deltas) and computes aggregated 0–1 scores LOCALLY.
Only those aggregated scores are transmitted β€” no raw events ever leave
the browser (Feature 8: Privacy-First Design).
Server validates plausibility, computes anomaly_score,
and derives a trust delta.
"""
def process(
self,
session: UserSession,
typing_entropy: float,
mouse_linearity: float,
scroll_variance: float,
local_risk_score: float,
db: Session,
) -> Dict[str, Any]:
# Clamp all inputs to [0, 1]
te = max(0.0, min(1.0, float(typing_entropy)))
ml = max(0.0, min(1.0, float(mouse_linearity)))
sv = max(0.0, min(1.0, float(scroll_variance)))
lr = max(0.0, min(1.0, float(local_risk_score)))
# Per-feature anomaly (0–100, higher = more anomalous)
te_a = _z_anomaly(te, *BEHAVIOR_BASELINE["typing_entropy"]) if te > 0 else 50.0
ml_a = _z_anomaly(ml, *BEHAVIOR_BASELINE["mouse_linearity"]) if ml > 0 else 50.0
sv_a = _z_anomaly(sv, *BEHAVIOR_BASELINE["scroll_variance"]) if sv > 0 else 50.0
# Weighted composite server score
server_score = 0.40 * te_a + 0.35 * ml_a + 0.25 * sv_a
# Blend with client-reported composite (60/40 split)
final = 0.60 * server_score + 0.40 * (lr * 100.0)
# Trust delta
if final < 20: td = +6.0
elif final < 40: td = +2.0
elif final < 60: td = -4.0
elif final < 80: td = -12.0
else: td = -24.0
db.add(BehaviorSignalRecord(
session_id=session.id,
user_id=session.user_id,
typing_entropy=te,
mouse_linearity=ml,
scroll_variance=sv,
local_risk_score=lr,
anomaly_score=final,
))
db.commit()
return {
"anomaly_score": round(final, 2),
"trust_delta": td,
"signals": {
"typing_entropy": round(te, 3),
"mouse_linearity": round(ml, 3),
"scroll_variance": round(sv, 3),
"local_risk_score": round(lr, 3),
},
"per_feature_anomaly": {
"typing": round(te_a, 1),
"mouse": round(ml_a, 1),
"scroll": round(sv_a, 1),
},
"classification": _classify_anomaly(final),
"privacy_note": (
"Raw keystrokes, mouse coordinates, and scroll positions "
"were processed entirely in-browser. Only aggregated scores "
"were transmitted to the server."
),
}
# ═══════════════════════════════════════════════════════════════════════════
# Feature 7 – Impossible Travel + Pattern Clustering
# ═══════════════════════════════════════════════════════════════════════════
class ImpossibleTravelDetector:
"""
Detects impossible travel using haversine distance + elapsed time.
Thresholds:
> 900 km/h β†’ IMPOSSIBLE (fastest commercial jet ~900 km/h)
> 400 km/h β†’ SUSPICIOUS (implausible without air travel)
< 400 km/h β†’ PLAUSIBLE
"""
IMPOSSIBLE_KMH = 900.0
SUSPICIOUS_KMH = 400.0
def __init__(self, db: Session):
self.db = db
def check(
self,
user_id: int,
city_now: str,
country_now: str,
lat_now: Optional[float] = None,
lon_now: Optional[float] = None,
time_gap_hours: Optional[float] = None, # override for demo mode
) -> Dict[str, Any]:
"""
Compare the current login location against the most recent successful login.
Pass time_gap_hours to simulate an arbitrary gap for demos.
"""
last = (
self.db.query(LoginAttempt)
.filter(
LoginAttempt.user_id == user_id,
LoginAttempt.success == True,
LoginAttempt.city.isnot(None),
)
.order_by(LoginAttempt.attempted_at.desc())
.first()
)
if not last:
return self._no_history(city_now, country_now, lat_now, lon_now)
lat1, lon1 = _resolve_coords(last.city, last.latitude, last.longitude)
lat2, lon2 = _resolve_coords(city_now, lat_now, lon_now)
if lat1 is None or lat2 is None:
return {
"possible": True,
"verdict": "coords_unknown",
"message": f"Cannot resolve coordinates for '{last.city}' or '{city_now}'.",
"distance_km": 0.0, "speed_kmh": 0.0, "time_gap_minutes": 0.0,
"trust_delta": 0.0,
}
distance_km = haversine(lat1, lon1, lat2, lon2)
if time_gap_hours is not None:
time_gap_s = time_gap_hours * 3600
else:
time_gap_s = (datetime.utcnow() - last.attempted_at).total_seconds()
time_gap_min = max(time_gap_s / 60, 0.001)
speed_kmh = (distance_km / (time_gap_s / 3600)) if time_gap_s > 0 else 0.0
if distance_km < 50:
verdict, possible, trust_delta = "same_area", True, 0.0
msg = f"Same area as last login ({last.city}). No anomaly."
elif speed_kmh > self.IMPOSSIBLE_KMH:
verdict, possible, trust_delta = "impossible", False, -50.0
msg = (
f"IMPOSSIBLE TRAVEL: {distance_km:.0f} km in {time_gap_min:.0f} min "
f"= {speed_kmh:.0f} km/h (fastest jet ~900 km/h)."
)
elif speed_kmh > self.SUSPICIOUS_KMH:
verdict, possible, trust_delta = "suspicious", True, -20.0
msg = (
f"Suspicious speed: {speed_kmh:.0f} km/h over "
f"{distance_km:.0f} km from {last.city} in {time_gap_min:.0f} min."
)
else:
verdict, possible, trust_delta = "plausible", True, 0.0
msg = (
f"Plausible: {distance_km:.0f} km from {last.city} "
f"at {speed_kmh:.0f} km/h in {time_gap_min:.0f} min."
)
return {
"possible": possible,
"verdict": verdict,
"message": msg,
"distance_km": round(distance_km, 1),
"speed_kmh": round(speed_kmh, 1),
"time_gap_minutes": round(time_gap_min, 1),
"from": {"city": last.city, "lat": lat1, "lon": lon1},
"to": {"city": city_now, "country": country_now, "lat": lat2, "lon": lon2},
"trust_delta": trust_delta,
}
def _no_history(self, city, country, lat, lon) -> Dict[str, Any]:
return {
"possible": True, "verdict": "no_history",
"message": "No previous login on record β€” travel check skipped.",
"distance_km": 0.0, "speed_kmh": 0.0, "time_gap_minutes": 0.0,
"from": None,
"to": {"city": city, "country": country, "lat": lat, "lon": lon},
"trust_delta": 0.0,
}
# ═══════════════════════════════════════════════════════════════════════════
# Feature 4 – Low-Friction Micro-Challenges
# ═══════════════════════════════════════════════════════════════════════════
class MicroChallengeEngine:
"""
Issues lightweight inline challenges ONLY when trust drops below a threshold.
Inspired by CAPTCHA but far less intrusive β€” a single arithmetic question.
Trust restored: +20 on pass, –15 on fail.
Threshold: trust < 40 β†’ challenge recommended.
"""
THRESHOLD = 40.0
def should_challenge(self, trust_score: float) -> bool:
return trust_score < self.THRESHOLD
def generate(self) -> Dict[str, Any]:
ops = [('+', lambda a, b: a + b), ('-', lambda a, b: a - b),
('Γ—', lambda a, b: a * b)]
sym, fn = random.choice(ops)
a = random.randint(2, 9) if sym == 'Γ—' else random.randint(10, 50)
b = random.randint(2, 9) if sym == 'Γ—' else random.randint(1, 20)
answer = fn(a, b)
cid = hashlib.sha256(
f"{datetime.utcnow().isoformat()}{random.random()}".encode()
).hexdigest()[:16]
_micro_challenges[cid] = {
"answer_hash": hashlib.sha256(str(answer).encode()).hexdigest(),
"expires": datetime.utcnow() + timedelta(minutes=5),
"attempts": 0,
"type": "math",
}
return {
"challenge_id": cid,
"type": "math",
"question": f"What is {a} {sym} {b} ?",
"hint": "Answer is an integer.",
"expires_in_seconds": 300,
}
def verify(self, challenge_id: str, response: str) -> Dict[str, Any]:
ch = _micro_challenges.get(challenge_id)
if not ch:
return {"correct": False, "reason": "Challenge not found or already used.", "trust_delta": -5.0}
if ch["expires"] < datetime.utcnow():
_micro_challenges.pop(challenge_id, None)
return {"correct": False, "reason": "Challenge expired.", "trust_delta": -5.0}
given_hash = hashlib.sha256(response.strip().encode()).hexdigest()
correct = given_hash == ch["answer_hash"]
ch["attempts"] = ch.get("attempts", 0) + 1
if correct or ch["attempts"] >= 3:
_micro_challenges.pop(challenge_id, None)
return {
"correct": correct,
"trust_delta": +20.0 if correct else -15.0,
"reason": (
"βœ… Correct – trust score restored." if correct
else f"❌ Incorrect. {'Max attempts reached.' if ch['attempts'] >= 3 else 'Try again.'}"
),
}
# ═══════════════════════════════════════════════════════════════════════════
# Feature 5 – Explainable Risk Transparency
# ═══════════════════════════════════════════════════════════════════════════
class RiskExplainer:
"""
Generates audit-ready, human-readable risk explanations.
Shows exactly: which signals contributed, their magnitude, and confidence.
"""
_FACTOR_META = {
"location": {"icon": "🌍", "label": "Location", "weight": "97.68%"},
"device": {"icon": "πŸ’»", "label": "Device", "weight": "0.21%"},
"time": {"icon": "πŸ•", "label": "Time Pattern", "weight": "0.02%"},
"velocity": {"icon": "⚑", "label": "Velocity", "weight": "2.08%"},
"behavior": {"icon": "🧠", "label": "Behavior", "weight": "0.01%"},
}
_LEVEL_LABEL = {
0: "No step-up – trusted context",
1: "Standard password auth",
2: "Email verification required (new IP)",
3: "2FA required (unknown device)",
4: "Access BLOCKED – critical risk",
}
def explain_login(
self,
risk_factors: Dict[str, float],
risk_level: str,
security_level: int,
) -> Dict[str, Any]:
factors = []
for key, score in risk_factors.items():
meta = self._FACTOR_META.get(key, {"icon": "πŸ“Œ", "label": key.title(), "weight": "N/A"})
anomalous = score > 30.0
factors.append({
"factor": meta["label"],
"score": round(score, 1),
"icon": meta["icon"],
"model_weight": meta["weight"],
"status": "anomalous" if anomalous else "normal",
"detail": self._detail(key, anomalous),
"contribution": round(-score * 0.4 if anomalous else (30 - score) * 0.1, 1),
})
factors.sort(key=lambda x: abs(x["contribution"]), reverse=True)
anomalous_names = [f["factor"] for f in factors if f["status"] == "anomalous"]
confidence = max(0.50, min(0.99, 1.0 - len(anomalous_names) * 0.08))
return {
"audit_id": hashlib.sha256(
f"{datetime.utcnow().isoformat()}{str(risk_factors)}".encode()
).hexdigest()[:12],
"timestamp": datetime.utcnow().isoformat(),
"risk_level": risk_level,
"security_level": security_level,
"action": self._LEVEL_LABEL.get(security_level, "Unknown"),
"confidence": round(confidence, 2),
"factors": factors,
"summary": _build_summary(anomalous_names, security_level),
}
def explain_trust_event(self, event_type: str, delta: float, signals: Dict) -> str:
msgs = {
"behavior_good": f"Human-like behavior signals (+{abs(delta):.0f} trust)",
"behavior_anomaly": f"Unusual behavior pattern (–{abs(delta):.0f} trust)",
"decay": f"Inactivity decay (–{abs(delta):.1f} trust)",
"context_change": f"IP or device changed (–{abs(delta):.0f} trust)",
"micro_challenge_pass": "Micro-challenge passed (+20 trust)",
"micro_challenge_fail": "Micro-challenge failed (–15 trust)",
"impossible_travel": "Impossible travel detected (–50 trust)",
"init": "Session initialised",
}
return msgs.get(event_type, f"Trust updated by {delta:+.1f}")
@staticmethod
def _detail(key: str, anomalous: bool) -> str:
details = {
"location": ("Location matches behavioral profile.", "New or unexpected country/city."),
"device": ("Known device fingerprint.", "Unknown device fingerprint."),
"time": ("Login within typical hours.", "Login outside typical hours."),
"velocity": ("Normal login frequency.", "Rapid or repeated attempts detected."),
"behavior": ("Behavior matches past patterns.", "Behavioral deviation detected."),
}
pair = details.get(key, ("Normal.", "Anomalous."))
return pair[1] if anomalous else pair[0]
# ═══════════════════════════════════════════════════════════════════════════
# Feature 6 – AI-Powered Anomaly Detection (Statistical Isolation Forest)
# ═══════════════════════════════════════════════════════════════════════════
class StatisticalAnomalyDetector:
"""
Isolation-Forest–inspired anomaly scorer.
Scores a multi-dimensional feature vector against a learned baseline.
Each feature's anomaly contribution is computed via Z-score transform,
then weighted into a composite 0–100 anomaly score.
In production this would be replaced with a trained sklearn IsolationForest
or an LSTM sequence model; the interface is kept identical for easy swap-in.
"""
BASELINE: Dict[str, Tuple[float, float]] = {
# feature β†’ (mean, std) derived from research on legitimate user behavior
"typing_entropy": (0.70, 0.15),
"mouse_linearity": (0.62, 0.18),
"scroll_variance": (0.48, 0.22),
"hour_normalized": (0.55, 0.28), # 0 = midnight, 1 = noon normalised to [0,1]
"failed_attempts_norm": (0.03, 0.10), # recent failed attempts / 20
}
WEIGHTS: Dict[str, float] = {
"typing_entropy": 0.28,
"mouse_linearity": 0.24,
"scroll_variance": 0.14,
"hour_normalized": 0.18,
"failed_attempts_norm": 0.16,
}
def score(self, features: Dict[str, float]) -> Dict[str, Any]:
"""
Score a feature vector. Returns anomaly_score ∈ [0, 100].
Higher means more anomalous.
"""
per_feature: Dict[str, float] = {}
total = 0.0
for feat, (mean, std) in self.BASELINE.items():
val = features.get(feat, mean)
a = _z_anomaly(float(val), mean, std)
per_feature[feat] = round(a, 1)
total += a * self.WEIGHTS.get(feat, 0.10)
label, color = self._classify(total)
confidence = round(min(0.99, 0.50 + total / 200.0), 2)
return {
"anomaly_score": round(total, 2),
"classification": label,
"color": color,
"confidence": confidence,
"per_feature": per_feature,
"baseline_comparison": {
feat: {
"your_value": round(features.get(feat, mean), 3),
"typical_mean": mean,
"typical_std": std,
"z_score": round(abs(features.get(feat, mean) - mean) / max(std, 1e-6), 2),
}
for feat, (mean, std) in self.BASELINE.items()
},
"method": "statistical_isolation_forest_analogy",
}
@staticmethod
def _classify(score: float) -> Tuple[str, str]:
if score >= 80: return "CRITICAL", "#ef4444"
if score >= 60: return "HIGH", "#f97316"
if score >= 40: return "MEDIUM", "#f59e0b"
if score >= 20: return "LOW", "#84cc16"
return "NORMAL", "#22c55e"