Spaces:
Sleeping
Sleeping
File size: 1,903 Bytes
0f8fe33 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
# risk_engine.py (Optimized)
# - Accepts optional `recent_events` to avoid repeated disk/IO calls
# - Uses light-weight counters and caching for frequency checks
# - Returns (level, score) as before
import random
import time
from utils.logger import get_recent_events
# small in-memory cache for source counts to avoid repeated scans
_SRC_CACHE = {
"ts": 0,
"counts": {},
"ttl": 2.0 # seconds
}
def _build_source_cache(recent_events):
counts = {}
for e in recent_events:
s = e.get("src_ip")
if s:
counts[s] = counts.get(s, 0) + 1
return counts
def compute_risk_score(evt, recent_events=None):
"""Compute adaptive risk score (0–100).
If `recent_events` is provided, it is used directly. Otherwise `get_recent_events()`
is called once (limited inside the function).
"""
label = (evt.get("prediction") or "").upper()
src_ip = evt.get("src_ip") or ""
base_map = {
"TOR": 90,
"I2P": 85,
"ZERONET": 70,
"VPN": 55,
"FREENET": 60,
"HTTP": 30,
"DNS": 25,
}
base = base_map.get(label, 35)
# get recent events once if not provided
if recent_events is None:
recent_events = get_recent_events()
# try cached counts for short TTL
now = time.time()
if now - _SRC_CACHE.get("ts", 0) > _SRC_CACHE.get("ttl", 2.0) or not _SRC_CACHE.get("counts"):
_SRC_CACHE["counts"] = _build_source_cache(recent_events)
_SRC_CACHE["ts"] = now
freq = _SRC_CACHE["counts"].get(src_ip, 0)
freq_boost = 0
if freq >= 3:
freq_boost = 5
if freq >= 6:
freq_boost = 15
noise = random.randint(-3, 3)
score = min(100, max(0, base + freq_boost + noise))
if score >= 80:
level = "High"
elif score >= 50:
level = "Medium"
else:
level = "Low"
return level, score
|