abcj / ai /anomaly_model.py
theghostcmd's picture
Upload 10 files
758edb3 verified
Raw
History Blame Contribute Delete
1.6 kB
import numpy as np
from sklearn.ensemble import IsolationForest
import pickle
import os
import threading
class AnomalyModel:
def __init__(self, buffer_size=500):
self.model = None
self.feature_buffer = []
self.buffer_size = buffer_size
self.is_trained = False
self.lock = threading.Lock()
def _extract_features(self, packet_info):
size = packet_info['size']
proto = packet_info['protocol']
proto_map = {'TCP': 0, 'UDP': 1, 'ICMP': 2, 'OTHER': 3}
proto_code = proto_map.get(proto, 3)
return [size, proto_code]
def add_packet(self, packet_info):
feats = self._extract_features(packet_info)
with self.lock:
self.feature_buffer.append(feats)
if len(self.feature_buffer) >= self.buffer_size and not self.is_trained:
self._train()
return feats
def _train(self):
X = np.array(self.feature_buffer)
self.model = IsolationForest(contamination=0.1, random_state=42)
self.model.fit(X)
self.is_trained = True
print(f"[AI] Anomaly model trained on {len(X)} samples.")
def predict_anomaly_score(self, packet_info):
if not self.is_trained:
return 0.0
feats = self._extract_features(packet_info)
pred = self.model.predict([feats])[0]
if pred == -1:
score = -self.model.decision_function([feats])[0]
score = np.clip(score, 0, 1)
else:
score = 0.0
return score