open-range / scripts /test_tier1_llm.py
Aaron Brown
Production all-in-one container with real service execution
49d1c75
#!/usr/bin/env python3
"""Test Tier 1 snapshot generation with LLM Builder + local Docker.
Usage:
export AZURE_API_KEY="..."
export AZURE_API_BASE="..."
export AZURE_API_VERSION="2025-04-01-preview"
uv run python scripts/test_tier1_llm.py
"""
from __future__ import annotations
import asyncio
import json
import os
import sys
import time
from pathlib import Path
import yaml
# Ensure src is importable
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src"))
from open_range.builder.builder import LLMSnapshotBuilder
from open_range.protocols import BuildContext
from open_range.server.environment import RangeEnvironment
from open_range.server.models import RangeAction
def load_manifest(path: str = "manifests/tier1_basic.yaml") -> dict:
"""Load and return the tier1 manifest as a dict."""
manifest_path = Path(__file__).resolve().parent.parent / path
with open(manifest_path) as f:
return yaml.safe_load(f)
async def build_snapshot(manifest: dict) -> object:
"""Call the LLM builder to generate a snapshot spec."""
model = os.environ.get("OPENRANGE_BUILDER_MODEL", "azure/gpt-5.2-codex")
print(f"\n{'='*60}")
print(f" BUILDER: Generating Tier 1 snapshot")
print(f" Model: {model}")
print(f" API: {os.environ.get('AZURE_API_BASE', 'not set')}")
print(f"{'='*60}\n")
# Codex models don't support temperature
temp = None if "codex" in model.lower() else 0.7
builder = LLMSnapshotBuilder(
model=model,
temperature=temp,
max_retries=2,
max_tokens=32768,
)
context = BuildContext(
seed=42,
tier=1,
previous_vuln_classes=[],
solve_rates={},
weak_areas=[],
)
t0 = time.time()
snapshot = await builder.build(manifest, context)
elapsed = time.time() - t0
print(f"Snapshot generated in {elapsed:.1f}s")
print(f" Topology hosts: {snapshot.topology.get('hosts', [])}")
print(f" Vulns: {len(snapshot.truth_graph.vulns)}")
for v in snapshot.truth_graph.vulns:
print(f" - {v.id}: {v.type} on {v.host} ({v.service})")
print(f" Flags: {len(snapshot.flags)}")
for f in snapshot.flags:
print(f" - {f.id}: {f.value[:30]}... @ {f.host}:{f.path}")
print(f" Golden path: {len(snapshot.golden_path)} steps")
for gp in snapshot.golden_path:
print(f" Step {gp.step}: {gp.command[:60]}")
print(f" Files: {len(snapshot.files)} entries")
for key in sorted(snapshot.files.keys()):
size = len(snapshot.files[key])
print(f" - {key} ({size} chars)")
print(f" NPC personas: {len(snapshot.npc_personas)}")
print(f" Task red: {snapshot.task.red_briefing[:80]}...")
print(f" Task blue: {snapshot.task.blue_briefing[:80]}...")
return snapshot
def run_episode(snapshot, docker_mode: bool = False) -> dict:
"""Run a scripted episode against the generated snapshot."""
print(f"\n{'='*60}")
print(f" EPISODE: Running against generated snapshot")
print(f" Docker: {'yes' if docker_mode else 'mock mode'}")
print(f"{'='*60}\n")
env = RangeEnvironment(
docker_available=docker_mode,
max_steps=50,
)
# Reset with the LLM-generated snapshot
obs = env.reset(snapshot=snapshot, episode_id="llm-tier1-test")
print(f"[RESET] {obs.stdout[:200]}")
print()
# Use the golden path as a scripted Red agent
golden_path = snapshot.golden_path
if not golden_path:
print("No golden path steps — cannot run scripted episode")
return {"outcome": "no_golden_path", "steps": 0}
step = 0
for gp in golden_path:
step += 1
action = RangeAction(command=gp.command, mode="red")
result = env.step(action)
reward = result.reward if result.reward is not None else 0.0
status = ""
if result.flags_captured:
status = f" FLAGS={result.flags_captured}"
if result.done:
status += " [DONE]"
print(f" [{step:2d}] RED >> {gp.command[:60]}")
if docker_mode:
# Show actual output in docker mode
stdout_preview = result.stdout[:120].replace('\n', ' ')
print(f" stdout: {stdout_preview}")
else:
print(f" expect: {gp.expect_in_stdout[:60]}")
print(f" reward={reward:.4f}{status}")
if result.done:
break
# Final state
state = env.state
print(f"\n{'='*60}")
print(f" RESULT")
print(f"{'='*60}")
print(f" Steps: {state.step_count}")
print(f" Flags found: {state.flags_found}")
print(f" Tier: {state.tier}")
print(f" Episode: {state.episode_id}")
print(f"{'='*60}\n")
return {
"outcome": "flag_captured" if state.flags_found else "no_flag",
"steps": state.step_count,
"flags_found": list(state.flags_found),
}
def save_snapshot(snapshot, path: str = "snapshots/llm_tier1_test.json"):
"""Save the generated snapshot to disk for reuse."""
out = Path(__file__).resolve().parent.parent / path
out.parent.mkdir(parents=True, exist_ok=True)
data = {
"topology": snapshot.topology,
"truth_graph": {
"vulns": [
{
"id": v.id,
"type": v.type,
"host": v.host,
"service": v.service,
"injection_point": v.injection_point,
"vulnerable_code": v.vulnerable_code,
"root_cause": v.root_cause,
"blast_radius": v.blast_radius,
"remediation": v.remediation,
}
for v in snapshot.truth_graph.vulns
],
"exploit_chain": [
{"vuln_id": ec.vuln_id, "command": ec.command, "description": ec.description}
for ec in snapshot.truth_graph.exploit_chain
],
},
"flags": [
{"id": f.id, "value": f.value, "path": f.path, "host": f.host}
for f in snapshot.flags
],
"golden_path": [
{
"step": gp.step,
"cmd": gp.command,
"expect_stdout": gp.expect_in_stdout,
"description": gp.description,
}
for gp in snapshot.golden_path
],
"task": {
"red_briefing": snapshot.task.red_briefing,
"blue_briefing": snapshot.task.blue_briefing,
},
"npc_personas": [
{
"name": p.name,
"role": p.role,
"department": p.department,
"security_awareness": p.security_awareness,
}
for p in snapshot.npc_personas
],
"files": snapshot.files,
}
with open(out, "w") as f:
json.dump(data, f, indent=2)
print(f"Snapshot saved to {out}")
async def main():
# Verify Azure creds are set
required = ["AZURE_API_KEY", "AZURE_API_BASE"]
missing = [k for k in required if not os.environ.get(k)]
if missing:
print(f"ERROR: Missing env vars: {missing}")
print("Set AZURE_API_KEY and AZURE_API_BASE before running.")
sys.exit(1)
# Default to azure/gpt-5.2-codex if not overridden
if not os.environ.get("OPENRANGE_BUILDER_MODEL"):
os.environ["OPENRANGE_BUILDER_MODEL"] = "azure/gpt-5.2-codex"
# Load manifest
manifest = load_manifest()
print(f"Loaded manifest: {manifest['name']} (tier {manifest['tier']})")
print(f" Bug families: {len(manifest['bug_families'])}")
print(f" Hosts: {[h['name'] for h in manifest['topology']['hosts']]}")
# Build snapshot via LLM
snapshot = await build_snapshot(manifest)
# Save snapshot for reuse
save_snapshot(snapshot)
# Check if Docker compose stack is running
docker_mode = False
try:
import docker
client = docker.from_env()
containers = client.containers.list()
range_containers = [c for c in containers if "openrange" in c.name.lower() or "open-range" in c.name.lower()]
if range_containers:
print(f"\nFound {len(range_containers)} running range containers:")
for c in range_containers:
print(f" - {c.name} ({c.status})")
docker_mode = True
else:
print("\nNo range containers running — using mock mode")
print("To run with Docker: docker compose up -d")
client.close()
except Exception:
print("\nDocker SDK unavailable — using mock mode")
# Run episode
result = run_episode(snapshot, docker_mode=docker_mode)
print(f"Final result: {json.dumps(result, indent=2)}")
if __name__ == "__main__":
asyncio.run(main())