File size: 7,766 Bytes
214209a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
"""
User Profile Service

Per-user behavioral profiling and vulnerability scoring.
Tracks scan history, domain patterns, and computes a rolling vulnerability score.
"""

import json
import logging
from pathlib import Path
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
from collections import Counter

logger = logging.getLogger(__name__)

PROFILES_FILE = Path(__file__).parent.parent.parent / "user_profiles.json"

# Global average threat ratio (bootstrap value — can be updated periodically)
GLOBAL_AVG_THREAT_RATIO = 0.12  # 12% of scans are threats on average


class UserProfileManager:
    """Manages per-user scan history, vulnerability scoring, and anomaly detection."""

    def __init__(self):
        self.profiles: Dict[str, Dict[str, Any]] = {}
        self._load()

    def _load(self):
        """Load profiles from disk."""
        try:
            if PROFILES_FILE.exists():
                with open(PROFILES_FILE, "r") as f:
                    self.profiles = json.load(f)
                logger.info(f"[UserProfile] Loaded {len(self.profiles)} user profiles")
        except Exception as e:
            logger.error(f"[UserProfile] Failed to load profiles: {e}")
            self.profiles = {}

    def _save(self):
        """Persist profiles to disk."""
        try:
            PROFILES_FILE.parent.mkdir(parents=True, exist_ok=True)
            with open(PROFILES_FILE, "w") as f:
                json.dump(self.profiles, f, indent=2, default=str)
        except Exception as e:
            logger.error(f"[UserProfile] Failed to save profiles: {e}")

    def _ensure_profile(self, user_id: str) -> Dict[str, Any]:
        """Create a profile if it doesn't exist."""
        if user_id not in self.profiles:
            self.profiles[user_id] = {
                "user_id": user_id,
                "created_at": datetime.utcnow().isoformat() + "Z",
                "total_scans": 0,
                "total_threats": 0,
                "domain_history": [],  # list of {domain, timestamp, risk_score}
                "daily_scores": [],     # list of {date, avg_score, scan_count, threat_count}
                "top_domains": [],      # top 10 most visited domains
            }
        return self.profiles[user_id]

    def record_scan(
        self,
        user_id: str,
        domain: str,
        risk_score: int,
        category: str
    ) -> None:
        """
        Record a scan event for a user.

        Args:
            user_id: Anonymous user identifier
            domain: The domain scanned
            risk_score: Risk score from the analysis
            category: Risk category (high_risk, medium_risk, etc.)
        """
        profile = self._ensure_profile(user_id)
        now = datetime.utcnow()
        today_str = now.strftime("%Y-%m-%d")

        profile["total_scans"] += 1
        is_threat = category in ("high_risk", "medium_risk")
        if is_threat:
            profile["total_threats"] += 1

        # Add to domain history (keep last 500)
        profile["domain_history"].append({
            "domain": domain,
            "timestamp": now.isoformat() + "Z",
            "risk_score": risk_score,
            "category": category
        })
        if len(profile["domain_history"]) > 500:
            profile["domain_history"] = profile["domain_history"][-500:]

        # Update daily scores
        daily = profile["daily_scores"]
        if daily and daily[-1].get("date") == today_str:
            entry = daily[-1]
            n = entry["scan_count"]
            entry["avg_score"] = round(
                (entry["avg_score"] * n + risk_score) / (n + 1), 2
            )
            entry["scan_count"] += 1
            if is_threat:
                entry["threat_count"] += 1
        else:
            daily.append({
                "date": today_str,
                "avg_score": risk_score,
                "scan_count": 1,
                "threat_count": 1 if is_threat else 0
            })

        # Keep only last 90 days of daily data
        if len(daily) > 90:
            profile["daily_scores"] = daily[-90:]

        # Update top domains
        all_domains = [d["domain"] for d in profile["domain_history"]]
        counter = Counter(all_domains)
        profile["top_domains"] = [d for d, _ in counter.most_common(10)]

        self._save()

    def get_vulnerability_score(self, user_id: str) -> float:
        """
        Compute a vulnerability score (0–100) for a user.

        Based on their threat-to-scan ratio over the last 30 days,
        relative to the global average.
        """
        profile = self._ensure_profile(user_id)
        now = datetime.utcnow()
        cutoff = now - timedelta(days=30)

        # Count recent threats and scans
        recent = []
        for d in profile["domain_history"]:
            ts = d.get("timestamp", "")
            try:
                parsed = datetime.fromisoformat(ts.replace("Z", "+00:00") if ts.endswith("Z") else ts)
                if parsed.replace(tzinfo=None) > cutoff:
                    recent.append(d)
            except (ValueError, AttributeError):
                continue

        if not recent:
            return 0.0

        scans_30d = len(recent)
        threats_30d = sum(
            1 for d in recent if d["category"] in ("high_risk", "medium_risk")
        )

        user_ratio = threats_30d / scans_30d if scans_30d > 0 else 0.0

        # Score relative to global average
        # If user_ratio == global_avg → score ~50
        # If user_ratio >> global_avg → score → 100
        # If user_ratio << global_avg → score → 0
        if GLOBAL_AVG_THREAT_RATIO > 0:
            relative = user_ratio / GLOBAL_AVG_THREAT_RATIO
        else:
            relative = 0.0

        score = min(relative * 50, 100.0)
        return round(score, 1)

    def get_history_anomaly_boost(self, user_id: str, domain: str) -> float:
        """
        Check if a domain is anomalous for this user.

        Returns a risk boost (0–20) if the domain is dissimilar
        from the user's top-10 visited domains.
        """
        profile = self._ensure_profile(user_id)
        top_domains = profile.get("top_domains", [])

        if not top_domains:
            return 0.0

        # If domain is in user's top-10, no anomaly
        if domain in top_domains:
            return 0.0

        # Check partial match (subdomain awareness)
        for known in top_domains:
            if domain.endswith(known) or known.endswith(domain):
                return 0.0

        # Domain is unfamiliar — boost
        return 15.0

    def get_prediction_trend(self, user_id: str, days: int = 7) -> List[Dict[str, Any]]:
        """
        Return the daily risk score trend for the last N days.

        Returns list of {date, avg_score, scan_count, threat_count}.
        """
        profile = self._ensure_profile(user_id)
        daily = profile.get("daily_scores", [])
        return daily[-days:] if daily else []

    def get_profile_summary(self, user_id: str) -> Dict[str, Any]:
        """Return a summary of the user's profile for the dashboard."""
        profile = self._ensure_profile(user_id)
        vuln_score = self.get_vulnerability_score(user_id)
        trend = self.get_prediction_trend(user_id, 30)

        return {
            "user_id": user_id,
            "vulnerability_score": vuln_score,
            "total_scans": profile["total_scans"],
            "total_threats": profile["total_threats"],
            "top_domains": profile.get("top_domains", []),
            "daily_trend": trend,
            "member_since": profile.get("created_at", "")
        }


# Global singleton
user_profile_manager = UserProfileManager()