| import numpy as np | |
| from sklearn.ensemble import IsolationForest | |
| import pickle | |
| import os | |
| import threading | |
| class AnomalyModel: | |
| def __init__(self, buffer_size=500): | |
| self.model = None | |
| self.feature_buffer = [] | |
| self.buffer_size = buffer_size | |
| self.is_trained = False | |
| self.lock = threading.Lock() | |
| def _extract_features(self, packet_info): | |
| size = packet_info['size'] | |
| proto = packet_info['protocol'] | |
| proto_map = {'TCP': 0, 'UDP': 1, 'ICMP': 2, 'OTHER': 3} | |
| proto_code = proto_map.get(proto, 3) | |
| return [size, proto_code] | |
| def add_packet(self, packet_info): | |
| feats = self._extract_features(packet_info) | |
| with self.lock: | |
| self.feature_buffer.append(feats) | |
| if len(self.feature_buffer) >= self.buffer_size and not self.is_trained: | |
| self._train() | |
| return feats | |
| def _train(self): | |
| X = np.array(self.feature_buffer) | |
| self.model = IsolationForest(contamination=0.1, random_state=42) | |
| self.model.fit(X) | |
| self.is_trained = True | |
| print(f"[AI] Anomaly model trained on {len(X)} samples.") | |
| def predict_anomaly_score(self, packet_info): | |
| if not self.is_trained: | |
| return 0.0 | |
| feats = self._extract_features(packet_info) | |
| pred = self.model.predict([feats])[0] | |
| if pred == -1: | |
| score = -self.model.decision_function([feats])[0] | |
| score = np.clip(score, 0, 1) | |
| else: | |
| score = 0.0 | |
| return score |