Spaces:
Running
Running
| """ | |
| Session Intelligence Engine | |
| =========================== | |
| Implements 8 advanced security capabilities beyond standard login-time auth: | |
| 1. Continuous Verification β verify context throughout session lifecycle | |
| 2. Behavioral Intelligence β typing rhythm, mouse paths, scroll patterns | |
| 3. Dynamic Trust Score β evolving 0-100 score; decays + recovers | |
| 4. Low-Friction Micro-Challenges β triggered only when trust drops | |
| 5. Explainable Risk β factor contributions, confidence, audit trail | |
| 6. AI-Powered Anomaly Scoring β isolation-forest-inspired statistical model | |
| 7. Impossible Travel β haversine geo-velocity detection | |
| 8. Privacy-First Design β client computes signals; server receives only score | |
| """ | |
| import math | |
| import hashlib | |
| import random | |
| from datetime import datetime, timedelta | |
| from typing import Optional, List, Dict, Any, Tuple | |
| from sqlalchemy.orm import Session | |
| from ..models import ( | |
| User, UserSession, LoginAttempt, AnomalyPattern, RiskLevel, | |
| SessionTrustEvent, BehaviorSignalRecord, | |
| ) | |
| # ββ City Geo-Coordinates ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| CITY_COORDS: Dict[str, Tuple[float, float]] = { | |
| "new york": (40.7128, -74.0060), | |
| "moscow": (55.7558, 37.6173), | |
| "beijing": (39.9042, 116.4074), | |
| "london": (51.5074, -0.1278), | |
| "tokyo": (35.6762, 139.6503), | |
| "sydney": (-33.8688, 151.2093), | |
| "paris": (48.8566, 2.3522), | |
| "dubai": (25.2048, 55.2708), | |
| "singapore": ( 1.3521, 103.8198), | |
| "toronto": (43.6532, -79.3832), | |
| "chicago": (41.8781, -87.6298), | |
| "miami": (25.7617, -80.1918), | |
| "berlin": (52.5200, 13.4050), | |
| "mumbai": (19.0760, 72.8777), | |
| "bangkok": (13.7563, 100.5018), | |
| "cairo": (30.0444, 31.2357), | |
| "johannesburg": (-26.2041, 28.0473), | |
| "sao paulo": (-23.5505, -46.6333), | |
| "buenos aires": (-34.6037, -58.3816), | |
| "seattle": (47.6062, -122.3321), | |
| "los angeles": (34.0522, -118.2437), | |
| "mexico city": (19.4326, -99.1332), | |
| } | |
| # ββ Behavior Baseline (mean, std) for each client-collected metric ββββββββββ | |
| BEHAVIOR_BASELINE: Dict[str, Tuple[float, float]] = { | |
| "typing_entropy": (0.70, 0.15), # 1.0 = perfectly human-like rhythm | |
| "mouse_linearity": (0.62, 0.18), # 1.0 = natural curved paths | |
| "scroll_variance": (0.48, 0.22), # moderate = organic human scrolling | |
| } | |
| # ββ Trust score baselines by security level βββββββββββββββββββββββββββββββββ | |
| TRUST_BASELINE: Dict[int, float] = {0: 95.0, 1: 80.0, 2: 60.0, 3: 35.0, 4: 10.0} | |
| # ββ In-memory caches βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # {session_id: {"score": float, "last_update": datetime}} | |
| _trust_cache: Dict[int, Dict[str, Any]] = {} | |
| # {challenge_id: {"answer_hash": str, "expires": datetime, "attempts": int}} | |
| _micro_challenges: Dict[str, Dict[str, Any]] = {} | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Helper math | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float: | |
| """Great-circle distance in km between two geographic points.""" | |
| R = 6371.0 | |
| d_lat = math.radians(lat2 - lat1) | |
| d_lon = math.radians(lon2 - lon1) | |
| a = (math.sin(d_lat / 2) ** 2 | |
| + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) | |
| * math.sin(d_lon / 2) ** 2) | |
| return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) | |
| def _z_anomaly(value: float, mean: float, std: float) -> float: | |
| """Convert value to 0β100 anomaly score using a Z-score transform.""" | |
| if std < 1e-6: | |
| return 0.0 if abs(value - mean) < 0.01 else 50.0 | |
| z = abs(value - mean) / std | |
| return min(100.0, (z / 3.5) ** 0.75 * 100.0) | |
| def _resolve_coords( | |
| city: Optional[str], | |
| lat: Optional[float], | |
| lon: Optional[float], | |
| ) -> Tuple[Optional[float], Optional[float]]: | |
| if lat is not None and lon is not None: | |
| return lat, lon | |
| if city: | |
| return CITY_COORDS.get(city.lower().strip(), (None, None)) | |
| return None, None | |
| def _classify_anomaly(score: float) -> str: | |
| if score >= 80: return "critical" | |
| if score >= 60: return "high" | |
| if score >= 40: return "medium" | |
| if score >= 20: return "low" | |
| return "normal" | |
| def _build_summary(anomalous_factors: List[str], security_level: int) -> str: | |
| if not anomalous_factors: | |
| return "All signals normal. Trusted session context." | |
| if len(anomalous_factors) == 1: | |
| return f"{anomalous_factors[0]} flagged. Minimal elevated risk." | |
| return ( | |
| f"{len(anomalous_factors)} risk signals triggered: " | |
| + ", ".join(anomalous_factors[:3]) | |
| + (f" and {len(anomalous_factors) - 3} more." if len(anomalous_factors) > 3 else ".") | |
| ) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Feature 1 & 3 β Continuous Verification + Dynamic Trust Score | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class TrustScoreManager: | |
| """ | |
| Maintains an evolving Trust Score (0β100) throughout the session lifecycle. | |
| Decay rules: | |
| β’ Time-based: β0.25 pts/min of inactivity | |
| β’ Behavior anomaly: β4 to β24 depending on severity | |
| β’ Context change (IP/device): β20 | |
| β’ Impossible travel: β50 | |
| Recovery rules: | |
| β’ Good behavior signal: +2 to +6 | |
| β’ Micro-challenge pass: +20 | |
| """ | |
| DECAY_RATE = 0.25 # pts / minute inactive | |
| def __init__(self, db: Session): | |
| self.db = db | |
| def get(self, session: UserSession) -> float: | |
| """Return current trust score, hydrating from DB if not cached.""" | |
| sid = session.id | |
| if sid in _trust_cache: | |
| return _trust_cache[sid]["score"] | |
| last = ( | |
| self.db.query(SessionTrustEvent) | |
| .filter(SessionTrustEvent.session_id == sid) | |
| .order_by(SessionTrustEvent.created_at.desc()) | |
| .first() | |
| ) | |
| score = float(last.trust_score) if last else TRUST_BASELINE.get( | |
| {"low": 0, "medium": 1, "high": 2, "critical": 3}.get( | |
| session.current_risk_level or "low", 1 | |
| ), 80.0 | |
| ) | |
| _trust_cache[sid] = {"score": score, "last_update": datetime.utcnow()} | |
| return score | |
| def apply_decay(self, session: UserSession) -> Tuple[float, float]: | |
| """Apply time-based trust decay. Returns (new_score, delta).""" | |
| sid = session.id | |
| cache = _trust_cache.get(sid) | |
| if not cache: | |
| return self.get(session), 0.0 | |
| elapsed_minutes = (datetime.utcnow() - cache["last_update"]).total_seconds() / 60 | |
| delta = -self.DECAY_RATE * elapsed_minutes | |
| new_score = max(0.0, cache["score"] + delta) | |
| _trust_cache[sid] = {"score": new_score, "last_update": datetime.utcnow()} | |
| return new_score, delta | |
| def update( | |
| self, | |
| session: UserSession, | |
| delta: float, | |
| event_type: str, | |
| reason: str, | |
| signals: Optional[Dict] = None, | |
| ) -> float: | |
| """Apply delta, clamp to [0, 100], persist to DB, update cache.""" | |
| current = self.get(session) | |
| new_score = max(0.0, min(100.0, current + delta)) | |
| _trust_cache[session.id] = {"score": new_score, "last_update": datetime.utcnow()} | |
| self.db.add(SessionTrustEvent( | |
| session_id=session.id, | |
| user_id=session.user_id, | |
| trust_score=new_score, | |
| delta=delta, | |
| event_type=event_type, | |
| reason=reason, | |
| signals=signals or {}, | |
| )) | |
| self.db.commit() | |
| return new_score | |
| def get_history(self, session_id: int, limit: int = 40) -> List[Dict]: | |
| events = ( | |
| self.db.query(SessionTrustEvent) | |
| .filter(SessionTrustEvent.session_id == session_id) | |
| .order_by(SessionTrustEvent.created_at.desc()) | |
| .limit(limit) | |
| .all() | |
| ) | |
| return [ | |
| { | |
| "score": e.trust_score, | |
| "delta": e.delta, | |
| "event_type": e.event_type, | |
| "reason": e.reason, | |
| "at": e.created_at.isoformat(), | |
| } | |
| for e in reversed(events) | |
| ] | |
| def label(score: float) -> str: | |
| if score >= 80: return "trusted" | |
| if score >= 60: return "watchful" | |
| if score >= 40: return "elevated" | |
| if score >= 20: return "high_risk" | |
| return "critical" | |
| def color(score: float) -> str: | |
| if score >= 80: return "#22c55e" | |
| if score >= 60: return "#84cc16" | |
| if score >= 40: return "#f59e0b" | |
| if score >= 20: return "#f97316" | |
| return "#ef4444" | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Feature 2 & 8 β Behavioral Intelligence + Privacy-First Design | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class BehaviorSignalProcessor: | |
| """ | |
| Processes privacy-first behavior signals submitted by the client. | |
| The client JS collects raw events (keydown timings, mouse paths, | |
| scroll deltas) and computes aggregated 0β1 scores LOCALLY. | |
| Only those aggregated scores are transmitted β no raw events ever leave | |
| the browser (Feature 8: Privacy-First Design). | |
| Server validates plausibility, computes anomaly_score, | |
| and derives a trust delta. | |
| """ | |
| def process( | |
| self, | |
| session: UserSession, | |
| typing_entropy: float, | |
| mouse_linearity: float, | |
| scroll_variance: float, | |
| local_risk_score: float, | |
| db: Session, | |
| ) -> Dict[str, Any]: | |
| # Clamp all inputs to [0, 1] | |
| te = max(0.0, min(1.0, float(typing_entropy))) | |
| ml = max(0.0, min(1.0, float(mouse_linearity))) | |
| sv = max(0.0, min(1.0, float(scroll_variance))) | |
| lr = max(0.0, min(1.0, float(local_risk_score))) | |
| # Per-feature anomaly (0β100, higher = more anomalous) | |
| te_a = _z_anomaly(te, *BEHAVIOR_BASELINE["typing_entropy"]) if te > 0 else 50.0 | |
| ml_a = _z_anomaly(ml, *BEHAVIOR_BASELINE["mouse_linearity"]) if ml > 0 else 50.0 | |
| sv_a = _z_anomaly(sv, *BEHAVIOR_BASELINE["scroll_variance"]) if sv > 0 else 50.0 | |
| # Weighted composite server score | |
| server_score = 0.40 * te_a + 0.35 * ml_a + 0.25 * sv_a | |
| # Blend with client-reported composite (60/40 split) | |
| final = 0.60 * server_score + 0.40 * (lr * 100.0) | |
| # Trust delta | |
| if final < 20: td = +6.0 | |
| elif final < 40: td = +2.0 | |
| elif final < 60: td = -4.0 | |
| elif final < 80: td = -12.0 | |
| else: td = -24.0 | |
| db.add(BehaviorSignalRecord( | |
| session_id=session.id, | |
| user_id=session.user_id, | |
| typing_entropy=te, | |
| mouse_linearity=ml, | |
| scroll_variance=sv, | |
| local_risk_score=lr, | |
| anomaly_score=final, | |
| )) | |
| db.commit() | |
| return { | |
| "anomaly_score": round(final, 2), | |
| "trust_delta": td, | |
| "signals": { | |
| "typing_entropy": round(te, 3), | |
| "mouse_linearity": round(ml, 3), | |
| "scroll_variance": round(sv, 3), | |
| "local_risk_score": round(lr, 3), | |
| }, | |
| "per_feature_anomaly": { | |
| "typing": round(te_a, 1), | |
| "mouse": round(ml_a, 1), | |
| "scroll": round(sv_a, 1), | |
| }, | |
| "classification": _classify_anomaly(final), | |
| "privacy_note": ( | |
| "Raw keystrokes, mouse coordinates, and scroll positions " | |
| "were processed entirely in-browser. Only aggregated scores " | |
| "were transmitted to the server." | |
| ), | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Feature 7 β Impossible Travel + Pattern Clustering | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class ImpossibleTravelDetector: | |
| """ | |
| Detects impossible travel using haversine distance + elapsed time. | |
| Thresholds: | |
| > 900 km/h β IMPOSSIBLE (fastest commercial jet ~900 km/h) | |
| > 400 km/h β SUSPICIOUS (implausible without air travel) | |
| < 400 km/h β PLAUSIBLE | |
| """ | |
| IMPOSSIBLE_KMH = 900.0 | |
| SUSPICIOUS_KMH = 400.0 | |
| def __init__(self, db: Session): | |
| self.db = db | |
| def check( | |
| self, | |
| user_id: int, | |
| city_now: str, | |
| country_now: str, | |
| lat_now: Optional[float] = None, | |
| lon_now: Optional[float] = None, | |
| time_gap_hours: Optional[float] = None, # override for demo mode | |
| ) -> Dict[str, Any]: | |
| """ | |
| Compare the current login location against the most recent successful login. | |
| Pass time_gap_hours to simulate an arbitrary gap for demos. | |
| """ | |
| last = ( | |
| self.db.query(LoginAttempt) | |
| .filter( | |
| LoginAttempt.user_id == user_id, | |
| LoginAttempt.success == True, | |
| LoginAttempt.city.isnot(None), | |
| ) | |
| .order_by(LoginAttempt.attempted_at.desc()) | |
| .first() | |
| ) | |
| if not last: | |
| return self._no_history(city_now, country_now, lat_now, lon_now) | |
| lat1, lon1 = _resolve_coords(last.city, last.latitude, last.longitude) | |
| lat2, lon2 = _resolve_coords(city_now, lat_now, lon_now) | |
| if lat1 is None or lat2 is None: | |
| return { | |
| "possible": True, | |
| "verdict": "coords_unknown", | |
| "message": f"Cannot resolve coordinates for '{last.city}' or '{city_now}'.", | |
| "distance_km": 0.0, "speed_kmh": 0.0, "time_gap_minutes": 0.0, | |
| "trust_delta": 0.0, | |
| } | |
| distance_km = haversine(lat1, lon1, lat2, lon2) | |
| if time_gap_hours is not None: | |
| time_gap_s = time_gap_hours * 3600 | |
| else: | |
| time_gap_s = (datetime.utcnow() - last.attempted_at).total_seconds() | |
| time_gap_min = max(time_gap_s / 60, 0.001) | |
| speed_kmh = (distance_km / (time_gap_s / 3600)) if time_gap_s > 0 else 0.0 | |
| if distance_km < 50: | |
| verdict, possible, trust_delta = "same_area", True, 0.0 | |
| msg = f"Same area as last login ({last.city}). No anomaly." | |
| elif speed_kmh > self.IMPOSSIBLE_KMH: | |
| verdict, possible, trust_delta = "impossible", False, -50.0 | |
| msg = ( | |
| f"IMPOSSIBLE TRAVEL: {distance_km:.0f} km in {time_gap_min:.0f} min " | |
| f"= {speed_kmh:.0f} km/h (fastest jet ~900 km/h)." | |
| ) | |
| elif speed_kmh > self.SUSPICIOUS_KMH: | |
| verdict, possible, trust_delta = "suspicious", True, -20.0 | |
| msg = ( | |
| f"Suspicious speed: {speed_kmh:.0f} km/h over " | |
| f"{distance_km:.0f} km from {last.city} in {time_gap_min:.0f} min." | |
| ) | |
| else: | |
| verdict, possible, trust_delta = "plausible", True, 0.0 | |
| msg = ( | |
| f"Plausible: {distance_km:.0f} km from {last.city} " | |
| f"at {speed_kmh:.0f} km/h in {time_gap_min:.0f} min." | |
| ) | |
| return { | |
| "possible": possible, | |
| "verdict": verdict, | |
| "message": msg, | |
| "distance_km": round(distance_km, 1), | |
| "speed_kmh": round(speed_kmh, 1), | |
| "time_gap_minutes": round(time_gap_min, 1), | |
| "from": {"city": last.city, "lat": lat1, "lon": lon1}, | |
| "to": {"city": city_now, "country": country_now, "lat": lat2, "lon": lon2}, | |
| "trust_delta": trust_delta, | |
| } | |
| def _no_history(self, city, country, lat, lon) -> Dict[str, Any]: | |
| return { | |
| "possible": True, "verdict": "no_history", | |
| "message": "No previous login on record β travel check skipped.", | |
| "distance_km": 0.0, "speed_kmh": 0.0, "time_gap_minutes": 0.0, | |
| "from": None, | |
| "to": {"city": city, "country": country, "lat": lat, "lon": lon}, | |
| "trust_delta": 0.0, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Feature 4 β Low-Friction Micro-Challenges | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class MicroChallengeEngine: | |
| """ | |
| Issues lightweight inline challenges ONLY when trust drops below a threshold. | |
| Inspired by CAPTCHA but far less intrusive β a single arithmetic question. | |
| Trust restored: +20 on pass, β15 on fail. | |
| Threshold: trust < 40 β challenge recommended. | |
| """ | |
| THRESHOLD = 40.0 | |
| def should_challenge(self, trust_score: float) -> bool: | |
| return trust_score < self.THRESHOLD | |
| def generate(self) -> Dict[str, Any]: | |
| ops = [('+', lambda a, b: a + b), ('-', lambda a, b: a - b), | |
| ('Γ', lambda a, b: a * b)] | |
| sym, fn = random.choice(ops) | |
| a = random.randint(2, 9) if sym == 'Γ' else random.randint(10, 50) | |
| b = random.randint(2, 9) if sym == 'Γ' else random.randint(1, 20) | |
| answer = fn(a, b) | |
| cid = hashlib.sha256( | |
| f"{datetime.utcnow().isoformat()}{random.random()}".encode() | |
| ).hexdigest()[:16] | |
| _micro_challenges[cid] = { | |
| "answer_hash": hashlib.sha256(str(answer).encode()).hexdigest(), | |
| "expires": datetime.utcnow() + timedelta(minutes=5), | |
| "attempts": 0, | |
| "type": "math", | |
| } | |
| return { | |
| "challenge_id": cid, | |
| "type": "math", | |
| "question": f"What is {a} {sym} {b} ?", | |
| "hint": "Answer is an integer.", | |
| "expires_in_seconds": 300, | |
| } | |
| def verify(self, challenge_id: str, response: str) -> Dict[str, Any]: | |
| ch = _micro_challenges.get(challenge_id) | |
| if not ch: | |
| return {"correct": False, "reason": "Challenge not found or already used.", "trust_delta": -5.0} | |
| if ch["expires"] < datetime.utcnow(): | |
| _micro_challenges.pop(challenge_id, None) | |
| return {"correct": False, "reason": "Challenge expired.", "trust_delta": -5.0} | |
| given_hash = hashlib.sha256(response.strip().encode()).hexdigest() | |
| correct = given_hash == ch["answer_hash"] | |
| ch["attempts"] = ch.get("attempts", 0) + 1 | |
| if correct or ch["attempts"] >= 3: | |
| _micro_challenges.pop(challenge_id, None) | |
| return { | |
| "correct": correct, | |
| "trust_delta": +20.0 if correct else -15.0, | |
| "reason": ( | |
| "β Correct β trust score restored." if correct | |
| else f"β Incorrect. {'Max attempts reached.' if ch['attempts'] >= 3 else 'Try again.'}" | |
| ), | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Feature 5 β Explainable Risk Transparency | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class RiskExplainer: | |
| """ | |
| Generates audit-ready, human-readable risk explanations. | |
| Shows exactly: which signals contributed, their magnitude, and confidence. | |
| """ | |
| _FACTOR_META = { | |
| "location": {"icon": "π", "label": "Location", "weight": "97.68%"}, | |
| "device": {"icon": "π»", "label": "Device", "weight": "0.21%"}, | |
| "time": {"icon": "π", "label": "Time Pattern", "weight": "0.02%"}, | |
| "velocity": {"icon": "β‘", "label": "Velocity", "weight": "2.08%"}, | |
| "behavior": {"icon": "π§ ", "label": "Behavior", "weight": "0.01%"}, | |
| } | |
| _LEVEL_LABEL = { | |
| 0: "No step-up β trusted context", | |
| 1: "Standard password auth", | |
| 2: "Email verification required (new IP)", | |
| 3: "2FA required (unknown device)", | |
| 4: "Access BLOCKED β critical risk", | |
| } | |
| def explain_login( | |
| self, | |
| risk_factors: Dict[str, float], | |
| risk_level: str, | |
| security_level: int, | |
| ) -> Dict[str, Any]: | |
| factors = [] | |
| for key, score in risk_factors.items(): | |
| meta = self._FACTOR_META.get(key, {"icon": "π", "label": key.title(), "weight": "N/A"}) | |
| anomalous = score > 30.0 | |
| factors.append({ | |
| "factor": meta["label"], | |
| "score": round(score, 1), | |
| "icon": meta["icon"], | |
| "model_weight": meta["weight"], | |
| "status": "anomalous" if anomalous else "normal", | |
| "detail": self._detail(key, anomalous), | |
| "contribution": round(-score * 0.4 if anomalous else (30 - score) * 0.1, 1), | |
| }) | |
| factors.sort(key=lambda x: abs(x["contribution"]), reverse=True) | |
| anomalous_names = [f["factor"] for f in factors if f["status"] == "anomalous"] | |
| confidence = max(0.50, min(0.99, 1.0 - len(anomalous_names) * 0.08)) | |
| return { | |
| "audit_id": hashlib.sha256( | |
| f"{datetime.utcnow().isoformat()}{str(risk_factors)}".encode() | |
| ).hexdigest()[:12], | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "risk_level": risk_level, | |
| "security_level": security_level, | |
| "action": self._LEVEL_LABEL.get(security_level, "Unknown"), | |
| "confidence": round(confidence, 2), | |
| "factors": factors, | |
| "summary": _build_summary(anomalous_names, security_level), | |
| } | |
| def explain_trust_event(self, event_type: str, delta: float, signals: Dict) -> str: | |
| msgs = { | |
| "behavior_good": f"Human-like behavior signals (+{abs(delta):.0f} trust)", | |
| "behavior_anomaly": f"Unusual behavior pattern (β{abs(delta):.0f} trust)", | |
| "decay": f"Inactivity decay (β{abs(delta):.1f} trust)", | |
| "context_change": f"IP or device changed (β{abs(delta):.0f} trust)", | |
| "micro_challenge_pass": "Micro-challenge passed (+20 trust)", | |
| "micro_challenge_fail": "Micro-challenge failed (β15 trust)", | |
| "impossible_travel": "Impossible travel detected (β50 trust)", | |
| "init": "Session initialised", | |
| } | |
| return msgs.get(event_type, f"Trust updated by {delta:+.1f}") | |
| def _detail(key: str, anomalous: bool) -> str: | |
| details = { | |
| "location": ("Location matches behavioral profile.", "New or unexpected country/city."), | |
| "device": ("Known device fingerprint.", "Unknown device fingerprint."), | |
| "time": ("Login within typical hours.", "Login outside typical hours."), | |
| "velocity": ("Normal login frequency.", "Rapid or repeated attempts detected."), | |
| "behavior": ("Behavior matches past patterns.", "Behavioral deviation detected."), | |
| } | |
| pair = details.get(key, ("Normal.", "Anomalous.")) | |
| return pair[1] if anomalous else pair[0] | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Feature 6 β AI-Powered Anomaly Detection (Statistical Isolation Forest) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| class StatisticalAnomalyDetector: | |
| """ | |
| Isolation-Forestβinspired anomaly scorer. | |
| Scores a multi-dimensional feature vector against a learned baseline. | |
| Each feature's anomaly contribution is computed via Z-score transform, | |
| then weighted into a composite 0β100 anomaly score. | |
| In production this would be replaced with a trained sklearn IsolationForest | |
| or an LSTM sequence model; the interface is kept identical for easy swap-in. | |
| """ | |
| BASELINE: Dict[str, Tuple[float, float]] = { | |
| # feature β (mean, std) derived from research on legitimate user behavior | |
| "typing_entropy": (0.70, 0.15), | |
| "mouse_linearity": (0.62, 0.18), | |
| "scroll_variance": (0.48, 0.22), | |
| "hour_normalized": (0.55, 0.28), # 0 = midnight, 1 = noon normalised to [0,1] | |
| "failed_attempts_norm": (0.03, 0.10), # recent failed attempts / 20 | |
| } | |
| WEIGHTS: Dict[str, float] = { | |
| "typing_entropy": 0.28, | |
| "mouse_linearity": 0.24, | |
| "scroll_variance": 0.14, | |
| "hour_normalized": 0.18, | |
| "failed_attempts_norm": 0.16, | |
| } | |
| def score(self, features: Dict[str, float]) -> Dict[str, Any]: | |
| """ | |
| Score a feature vector. Returns anomaly_score β [0, 100]. | |
| Higher means more anomalous. | |
| """ | |
| per_feature: Dict[str, float] = {} | |
| total = 0.0 | |
| for feat, (mean, std) in self.BASELINE.items(): | |
| val = features.get(feat, mean) | |
| a = _z_anomaly(float(val), mean, std) | |
| per_feature[feat] = round(a, 1) | |
| total += a * self.WEIGHTS.get(feat, 0.10) | |
| label, color = self._classify(total) | |
| confidence = round(min(0.99, 0.50 + total / 200.0), 2) | |
| return { | |
| "anomaly_score": round(total, 2), | |
| "classification": label, | |
| "color": color, | |
| "confidence": confidence, | |
| "per_feature": per_feature, | |
| "baseline_comparison": { | |
| feat: { | |
| "your_value": round(features.get(feat, mean), 3), | |
| "typical_mean": mean, | |
| "typical_std": std, | |
| "z_score": round(abs(features.get(feat, mean) - mean) / max(std, 1e-6), 2), | |
| } | |
| for feat, (mean, std) in self.BASELINE.items() | |
| }, | |
| "method": "statistical_isolation_forest_analogy", | |
| } | |
| def _classify(score: float) -> Tuple[str, str]: | |
| if score >= 80: return "CRITICAL", "#ef4444" | |
| if score >= 60: return "HIGH", "#f97316" | |
| if score >= 40: return "MEDIUM", "#f59e0b" | |
| if score >= 20: return "LOW", "#84cc16" | |
| return "NORMAL", "#22c55e" | |