|
|
Python 3.13.9 (tags/v3.13.9:8183fa5, Oct 14 2025, 14:09:13) [MSC v.1944 64 bit (AMD64)] on win32
|
|
|
Enter "help" below or click "Help" above for more information.
|
|
|
"""
|
|
|
Core Defender Engine
|
|
|
"""
|
|
|
|
|
|
import time
|
|
|
import threading
|
|
|
from datetime import datetime
|
|
|
from collections import deque
|
|
|
import psutil
|
|
|
|
|
|
class DefenderEngine:
|
|
|
def __init__(self):
|
|
|
self.events = deque(maxlen=10000)
|
|
|
self.alerts = deque(maxlen=1000)
|
|
|
self.rules = {}
|
|
|
self.status = {}
|
|
|
self.start_time = time.time()
|
|
|
self.events_processed = 0
|
|
|
|
|
|
def process_event(self, event):
|
|
|
"""Process a security event"""
|
|
|
event['processed_at'] = datetime.now().isoformat()
|
|
|
self.events.append(event)
|
|
|
self.events_processed += 1
|
|
|
|
|
|
|
|
|
for rule_name, rule in self.rules.items():
|
|
|
if rule.is_active() and rule.matches(event):
|
|
|
self.generate_alert({
|
|
|
'rule': rule_name,
|
|
|
'event': event,
|
|
|
'timestamp': datetime.now().isoformat(),
|
|
|
'threat_level': rule.get_threat_level()
|
|
|
})
|
|
|
|
|
|
def generate_alert(self, assessment):
|
|
|
"""Generate security alert"""
|
|
|
alert = {
|
|
|
'id': len(self.alerts) + 1,
|
|
|
'assessment': assessment,
|
|
|
'timestamp': datetime.now().isoformat(),
|
|
|
'status': 'NEW'
|
|
|
}
|
|
|
self.alerts.append(alert)
|
|
|
|
|
|
|
|
|
print(f"SECURITY ALERT: {assessment}")
|
|
|
|
|
|
def get_recent_events(self, limit=100):
|
|
|
"""Get recent security events"""
|
|
|
return list(self.events)[-limit:]
|
|
|
|
|
|
def get_active_rules(self):
|
|
|
"""Get active security rules"""
|
|
|
return {name: rule for name, rule in self.rules.items() if rule.is_active()}
|
|
|
|
|
|
def get_current_threat_level(self):
|
|
|
"""Get current system threat level"""
|
|
|
recent_alerts = [alert for alert in self.alerts if
|
|
|
datetime.now().timestamp() -
|
|
|
datetime.fromisoformat(alert['timestamp']).timestamp() < 3600]
|
|
|
|
|
|
if not recent_alerts:
|
|
|
return "LOW"
|
|
|
|
|
|
high_severity_count = sum(1 for alert in recent_alerts
|
|
|
if alert.get('assessment', {}).get('threat_level', 0) > 80)
|
|
|
|
|
|
if high_severity_count > 5:
|
|
|
return "CRITICAL"
|
|
|
elif high_severity_count > 2:
|
|
|
return "HIGH"
|
|
|
elif len(recent_alerts) > 10:
|
|
|
return "MEDIUM"
|
|
|
else:
|
|
|
return "LOW"
|
|
|
|
|
|
def update_status(self, status):
|
|
|
"""Update system status"""
|
|
|
self.status = status
|
|
|
|
|
|
def get_cpu_usage(self):
|
|
|
"""Get current CPU usage"""
|
|
|
return psutil.cpu_percent()
|
|
|
|
|
|
def get_memory_usage(self):
|
|
|
"""Get current memory usage"""
|
|
|
return psutil.virtual_memory().percent
|
|
|
|
|
|
def get_average_response_time(self):
|
|
|
"""Get average response time"""
|
|
|
|
|
|
return 2.8
|
|
|
|
|
|
def get_events_processed_count(self):
|
|
|
"""Get total events processed"""
|
|
|
return self.events_processed
|
|
|
|
|
|
|
|
|
class SecurityRule:
|
|
|
def __init__(self, name, config):
|
|
|
self.name = name
|
|
|
self.config = config
|
|
|
self.active = config.get('active', True)
|
|
|
self.threat_level = config.get('threat_level', 50)
|
|
|
|
|
|
def is_active(self):
|
|
|
return self.active
|
|
|
|
|
|
def matches(self, event):
|
|
|
"""Check if event matches this rule"""
|
|
|
raise NotImplementedError
|
|
|
|
|
|
def get_threat_level(self):
|
|
|
return self.threat_level
|
|
|
|
|
|
|
|
|
class NetworkTrafficSpikeRule(SecurityRule):
|
|
|
def matches(self, event):
|
|
|
if event.get('type') != 'network_traffic':
|
|
|
return False
|
|
|
|
|
|
current_traffic = event.get('traffic_mbps', 0)
|
|
|
baseline = self.config.get('baseline_mbps', 100)
|
|
|
threshold = self.config.get('spike_threshold_percent', 200)
|
|
|
|
|
|
spike_threshold = baseline * (threshold / 100)
|
|
|
return current_traffic > spike_threshold
|
|
|
|
|
|
|
|
|
class ResourceUsageSpikeRule(SecurityRule):
|
|
|
def matches(self, event):
|
|
|
if event.get('type') != 'resource_usage':
|
|
|
return False
|
|
|
|
|
|
usage_type = event.get('resource_type')
|
|
|
current_usage = event.get('usage_percent', 0)
|
|
|
threshold = self.config.get(f'{usage_type}_threshold', 80)
|
|
|
|
|
|
|
|
|
process_name = event.get('process_name', '')
|
|
|
excluded_processes = ['system_update', 'windows_update', 'apt-get']
|
|
|
|
|
|
return (current_usage > threshold and
|
|
|
process_name not in excluded_processes)
|
|
|
|
|
|
|
|
|
class DataExfiltrationRule(SecurityRule):
|
|
|
def matches(self, event):
|
|
|
if event.get('type') != 'data_transfer':
|
|
|
return False
|
|
|
|
|
|
|
|
|
transfer_size = event.get('size_mb', 0)
|
|
|
size_threshold = self.config.get('size_threshold_mb', 100)
|
|
|
|
|
|
if transfer_size > size_threshold:
|
|
|
return True
|
|
|
...
|
|
|
...
|
|
|
... file_ext = event.get('file_extension', '')
|
|
|
... suspicious_exts = self.config.get('suspicious_extensions', ['.zip', '.rar', '.exe'])
|
|
|
...
|
|
|
... if file_ext in suspicious_exts:
|
|
|
... return True
|
|
|
...
|
|
|
...
|
|
|
... dest_ip = event.get('destination_ip', '')
|
|
|
... blacklist = self.config.get('blacklisted_ips', [])
|
|
|
...
|
|
|
... if dest_ip in blacklist:
|
|
|
... return True
|
|
|
...
|
|
|
... return False
|
|
|
...
|
|
|
...
|
|
|
... class UnauthorizedAccessRule(SecurityRule):
|
|
|
... def matches(self, event):
|
|
|
... if event.get('type') != 'login_attempt':
|
|
|
... return False
|
|
|
...
|
|
|
... failed_attempts = event.get('failed_attempts', 0)
|
|
|
... threshold = self.config.get('attempt_threshold', 5)
|
|
|
...
|
|
|
... if failed_attempts > threshold:
|
|
|
... return True
|
|
|
...
|
|
|
...
|
|
|
... username = event.get('username', '')
|
|
|
... restricted_accounts = self.config.get('restricted_accounts', ['admin', 'root'])
|
|
|
...
|
|
|
... if username in restricted_accounts:
|
|
|
... return True
|
|
|
...
|
|
|
... return False
|
|
|
|