Spaces:
Sleeping
Sleeping
File size: 4,103 Bytes
1c85a69 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | import requests
import time
import random
import uuid
from datetime import datetime, timezone
BASE_URL = "http://localhost:8000/api"
def register_agent():
print("π€ Registering malicious agent...")
system_info = {
'hostname': f"DESKTOP-DEMO-{random.randint(1000,9999)}",
'username': "demo_user",
'os': "Windows",
'os_version': "10.0.19045",
'ip_address': "192.168.1.105"
}
try:
response = requests.post(f"{BASE_URL}/agent/register", json=system_info)
if response.status_code == 200:
data = response.json()
print(f"β
Registered Agent ID: {data['employee_id']}")
return data['employee_id']
except Exception as e:
print(f"β Registration failed: {e}")
return None
def send_events(employee_id, count=10, type="normal"):
print(f"π‘ Sending {count} {type} events...")
events = []
for i in range(count):
if type == "normal":
cpu = random.uniform(5.0, 15.0)
ram = random.uniform(30.0, 40.0)
event_type = "file_access"
elif type == "attack":
# Simulate Crypto Miner
cpu = random.uniform(85.0, 99.0)
ram = random.uniform(70.0, 90.0)
event_type = "process_start"
event = {
"employee_id": employee_id,
"event_type": event_type,
"timestamp": datetime.now(timezone.utc).isoformat(),
"location": "Office_Network",
"ip_address": "192.168.1.105",
"port": 443,
"file_path": "C:\\Users\\demo\\AppData\\Local\\Temp\\miner.exe" if type == "attack" else "C:\\Users\\demo\\Documents\\report.docx",
"action": "execute" if type == "attack" else "read",
"success": True,
"cpu_usage": cpu,
"memory_usage": ram
}
events.append(event)
try:
# Use bulk endpoint which now triggers anomalies
response = requests.post(f"{BASE_URL}/events/bulk", json=events)
if response.status_code == 200:
print(f"β
Sent {len(events)} events.")
else:
print(f"β Failed to send events: {response.text}")
except Exception as e:
print(f"β Error sending events: {e}")
def trigger_training():
print("π§ Triggering model training...")
try:
# Check health first
health = requests.get(f"{BASE_URL}/health/system").json()
if health['ml_model']['status'] == 'active':
print("β
Model is already active.")
return True
response = requests.post(f"{BASE_URL}/ml/train")
if response.status_code == 200:
print("β
Model trained successfully!")
return True
else:
print(f"β οΈ Model training skipped/failed: {response.text}")
return False
except Exception as e:
print(f"β Training error: {e}")
return False
def main():
print("π SENTINEL AI - HACKATHON DEMO SIMULATION π")
print("===============================================")
# 1. Register
emp_id = register_agent()
if not emp_id:
return
# 2. Establish Baseline (Normal Behavior)
# We need enough data for the model to learn "normal"
print("\n[Phase 1] Establishing Baseline...")
for _ in range(3):
send_events(emp_id, count=5, type="normal")
time.sleep(1)
# 3. Train Model (if needed)
print("\n[Phase 2] Training Model...")
trigger_training()
time.sleep(2)
# 4. Launch Attack
print("\n[Phase 3] βοΈ LAUNCHING CRYPTO-MINING ATTACK βοΈ")
input("Press Enter to simulate attack...")
send_events(emp_id, count=5, type="attack")
print("\nβ
Attack simulation complete.")
print("π Check the Dashboard for 'High CPU Usage' alerts and MITRE T1496 mapping.")
if __name__ == "__main__":
main()
|