File size: 7,766 Bytes
214209a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 | """
User Profile Service
Per-user behavioral profiling and vulnerability scoring.
Tracks scan history, domain patterns, and computes a rolling vulnerability score.
"""
import json
import logging
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
from collections import Counter
logger = logging.getLogger(__name__)
PROFILES_FILE = Path(__file__).parent.parent.parent / "user_profiles.json"
# Global average threat ratio (bootstrap value — can be updated periodically)
GLOBAL_AVG_THREAT_RATIO = 0.12 # 12% of scans are threats on average
class UserProfileManager:
"""Manages per-user scan history, vulnerability scoring, and anomaly detection."""
def __init__(self):
self.profiles: Dict[str, Dict[str, Any]] = {}
self._load()
def _load(self):
"""Load profiles from disk."""
try:
if PROFILES_FILE.exists():
with open(PROFILES_FILE, "r") as f:
self.profiles = json.load(f)
logger.info(f"[UserProfile] Loaded {len(self.profiles)} user profiles")
except Exception as e:
logger.error(f"[UserProfile] Failed to load profiles: {e}")
self.profiles = {}
def _save(self):
"""Persist profiles to disk."""
try:
PROFILES_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(PROFILES_FILE, "w") as f:
json.dump(self.profiles, f, indent=2, default=str)
except Exception as e:
logger.error(f"[UserProfile] Failed to save profiles: {e}")
def _ensure_profile(self, user_id: str) -> Dict[str, Any]:
"""Create a profile if it doesn't exist."""
if user_id not in self.profiles:
self.profiles[user_id] = {
"user_id": user_id,
"created_at": datetime.utcnow().isoformat() + "Z",
"total_scans": 0,
"total_threats": 0,
"domain_history": [], # list of {domain, timestamp, risk_score}
"daily_scores": [], # list of {date, avg_score, scan_count, threat_count}
"top_domains": [], # top 10 most visited domains
}
return self.profiles[user_id]
def record_scan(
self,
user_id: str,
domain: str,
risk_score: int,
category: str
) -> None:
"""
Record a scan event for a user.
Args:
user_id: Anonymous user identifier
domain: The domain scanned
risk_score: Risk score from the analysis
category: Risk category (high_risk, medium_risk, etc.)
"""
profile = self._ensure_profile(user_id)
now = datetime.utcnow()
today_str = now.strftime("%Y-%m-%d")
profile["total_scans"] += 1
is_threat = category in ("high_risk", "medium_risk")
if is_threat:
profile["total_threats"] += 1
# Add to domain history (keep last 500)
profile["domain_history"].append({
"domain": domain,
"timestamp": now.isoformat() + "Z",
"risk_score": risk_score,
"category": category
})
if len(profile["domain_history"]) > 500:
profile["domain_history"] = profile["domain_history"][-500:]
# Update daily scores
daily = profile["daily_scores"]
if daily and daily[-1].get("date") == today_str:
entry = daily[-1]
n = entry["scan_count"]
entry["avg_score"] = round(
(entry["avg_score"] * n + risk_score) / (n + 1), 2
)
entry["scan_count"] += 1
if is_threat:
entry["threat_count"] += 1
else:
daily.append({
"date": today_str,
"avg_score": risk_score,
"scan_count": 1,
"threat_count": 1 if is_threat else 0
})
# Keep only last 90 days of daily data
if len(daily) > 90:
profile["daily_scores"] = daily[-90:]
# Update top domains
all_domains = [d["domain"] for d in profile["domain_history"]]
counter = Counter(all_domains)
profile["top_domains"] = [d for d, _ in counter.most_common(10)]
self._save()
def get_vulnerability_score(self, user_id: str) -> float:
"""
Compute a vulnerability score (0–100) for a user.
Based on their threat-to-scan ratio over the last 30 days,
relative to the global average.
"""
profile = self._ensure_profile(user_id)
now = datetime.utcnow()
cutoff = now - timedelta(days=30)
# Count recent threats and scans
recent = []
for d in profile["domain_history"]:
ts = d.get("timestamp", "")
try:
parsed = datetime.fromisoformat(ts.replace("Z", "+00:00") if ts.endswith("Z") else ts)
if parsed.replace(tzinfo=None) > cutoff:
recent.append(d)
except (ValueError, AttributeError):
continue
if not recent:
return 0.0
scans_30d = len(recent)
threats_30d = sum(
1 for d in recent if d["category"] in ("high_risk", "medium_risk")
)
user_ratio = threats_30d / scans_30d if scans_30d > 0 else 0.0
# Score relative to global average
# If user_ratio == global_avg → score ~50
# If user_ratio >> global_avg → score → 100
# If user_ratio << global_avg → score → 0
if GLOBAL_AVG_THREAT_RATIO > 0:
relative = user_ratio / GLOBAL_AVG_THREAT_RATIO
else:
relative = 0.0
score = min(relative * 50, 100.0)
return round(score, 1)
def get_history_anomaly_boost(self, user_id: str, domain: str) -> float:
"""
Check if a domain is anomalous for this user.
Returns a risk boost (0–20) if the domain is dissimilar
from the user's top-10 visited domains.
"""
profile = self._ensure_profile(user_id)
top_domains = profile.get("top_domains", [])
if not top_domains:
return 0.0
# If domain is in user's top-10, no anomaly
if domain in top_domains:
return 0.0
# Check partial match (subdomain awareness)
for known in top_domains:
if domain.endswith(known) or known.endswith(domain):
return 0.0
# Domain is unfamiliar — boost
return 15.0
def get_prediction_trend(self, user_id: str, days: int = 7) -> List[Dict[str, Any]]:
"""
Return the daily risk score trend for the last N days.
Returns list of {date, avg_score, scan_count, threat_count}.
"""
profile = self._ensure_profile(user_id)
daily = profile.get("daily_scores", [])
return daily[-days:] if daily else []
def get_profile_summary(self, user_id: str) -> Dict[str, Any]:
"""Return a summary of the user's profile for the dashboard."""
profile = self._ensure_profile(user_id)
vuln_score = self.get_vulnerability_score(user_id)
trend = self.get_prediction_trend(user_id, 30)
return {
"user_id": user_id,
"vulnerability_score": vuln_score,
"total_scans": profile["total_scans"],
"total_threats": profile["total_threats"],
"top_domains": profile.get("top_domains", []),
"daily_trend": trend,
"member_since": profile.get("created_at", "")
}
# Global singleton
user_profile_manager = UserProfileManager()
|