Spaces:
Runtime error
Runtime error
File size: 8,927 Bytes
3d5d7e9 49d1c75 3d5d7e9 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 | #!/usr/bin/env python3
"""Test Tier 1 snapshot generation with LLM Builder + local Docker.
Usage:
export AZURE_API_KEY="..."
export AZURE_API_BASE="..."
export AZURE_API_VERSION="2025-04-01-preview"
uv run python scripts/test_tier1_llm.py
"""
from __future__ import annotations
import asyncio
import json
import os
import sys
import time
from pathlib import Path
import yaml
# Ensure src is importable
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src"))
from open_range.builder.builder import LLMSnapshotBuilder
from open_range.protocols import BuildContext
from open_range.server.environment import RangeEnvironment
from open_range.server.models import RangeAction
def load_manifest(path: str = "manifests/tier1_basic.yaml") -> dict:
"""Load and return the tier1 manifest as a dict."""
manifest_path = Path(__file__).resolve().parent.parent / path
with open(manifest_path) as f:
return yaml.safe_load(f)
async def build_snapshot(manifest: dict) -> object:
"""Call the LLM builder to generate a snapshot spec."""
model = os.environ.get("OPENRANGE_BUILDER_MODEL", "azure/gpt-5.2-codex")
print(f"\n{'='*60}")
print(f" BUILDER: Generating Tier 1 snapshot")
print(f" Model: {model}")
print(f" API: {os.environ.get('AZURE_API_BASE', 'not set')}")
print(f"{'='*60}\n")
# Codex models don't support temperature
temp = None if "codex" in model.lower() else 0.7
builder = LLMSnapshotBuilder(
model=model,
temperature=temp,
max_retries=2,
max_tokens=32768,
)
context = BuildContext(
seed=42,
tier=1,
previous_vuln_classes=[],
solve_rates={},
weak_areas=[],
)
t0 = time.time()
snapshot = await builder.build(manifest, context)
elapsed = time.time() - t0
print(f"Snapshot generated in {elapsed:.1f}s")
print(f" Topology hosts: {snapshot.topology.get('hosts', [])}")
print(f" Vulns: {len(snapshot.truth_graph.vulns)}")
for v in snapshot.truth_graph.vulns:
print(f" - {v.id}: {v.type} on {v.host} ({v.service})")
print(f" Flags: {len(snapshot.flags)}")
for f in snapshot.flags:
print(f" - {f.id}: {f.value[:30]}... @ {f.host}:{f.path}")
print(f" Golden path: {len(snapshot.golden_path)} steps")
for gp in snapshot.golden_path:
print(f" Step {gp.step}: {gp.command[:60]}")
print(f" Files: {len(snapshot.files)} entries")
for key in sorted(snapshot.files.keys()):
size = len(snapshot.files[key])
print(f" - {key} ({size} chars)")
print(f" NPC personas: {len(snapshot.npc_personas)}")
print(f" Task red: {snapshot.task.red_briefing[:80]}...")
print(f" Task blue: {snapshot.task.blue_briefing[:80]}...")
return snapshot
def run_episode(snapshot, docker_mode: bool = False) -> dict:
"""Run a scripted episode against the generated snapshot."""
print(f"\n{'='*60}")
print(f" EPISODE: Running against generated snapshot")
print(f" Docker: {'yes' if docker_mode else 'mock mode'}")
print(f"{'='*60}\n")
env = RangeEnvironment(
docker_available=docker_mode,
max_steps=50,
)
# Reset with the LLM-generated snapshot
obs = env.reset(snapshot=snapshot, episode_id="llm-tier1-test")
print(f"[RESET] {obs.stdout[:200]}")
print()
# Use the golden path as a scripted Red agent
golden_path = snapshot.golden_path
if not golden_path:
print("No golden path steps — cannot run scripted episode")
return {"outcome": "no_golden_path", "steps": 0}
step = 0
for gp in golden_path:
step += 1
action = RangeAction(command=gp.command, mode="red")
result = env.step(action)
reward = result.reward if result.reward is not None else 0.0
status = ""
if result.flags_captured:
status = f" FLAGS={result.flags_captured}"
if result.done:
status += " [DONE]"
print(f" [{step:2d}] RED >> {gp.command[:60]}")
if docker_mode:
# Show actual output in docker mode
stdout_preview = result.stdout[:120].replace('\n', ' ')
print(f" stdout: {stdout_preview}")
else:
print(f" expect: {gp.expect_in_stdout[:60]}")
print(f" reward={reward:.4f}{status}")
if result.done:
break
# Final state
state = env.state
print(f"\n{'='*60}")
print(f" RESULT")
print(f"{'='*60}")
print(f" Steps: {state.step_count}")
print(f" Flags found: {state.flags_found}")
print(f" Tier: {state.tier}")
print(f" Episode: {state.episode_id}")
print(f"{'='*60}\n")
return {
"outcome": "flag_captured" if state.flags_found else "no_flag",
"steps": state.step_count,
"flags_found": list(state.flags_found),
}
def save_snapshot(snapshot, path: str = "snapshots/llm_tier1_test.json"):
"""Save the generated snapshot to disk for reuse."""
out = Path(__file__).resolve().parent.parent / path
out.parent.mkdir(parents=True, exist_ok=True)
data = {
"topology": snapshot.topology,
"truth_graph": {
"vulns": [
{
"id": v.id,
"type": v.type,
"host": v.host,
"service": v.service,
"injection_point": v.injection_point,
"vulnerable_code": v.vulnerable_code,
"root_cause": v.root_cause,
"blast_radius": v.blast_radius,
"remediation": v.remediation,
}
for v in snapshot.truth_graph.vulns
],
"exploit_chain": [
{"vuln_id": ec.vuln_id, "command": ec.command, "description": ec.description}
for ec in snapshot.truth_graph.exploit_chain
],
},
"flags": [
{"id": f.id, "value": f.value, "path": f.path, "host": f.host}
for f in snapshot.flags
],
"golden_path": [
{
"step": gp.step,
"cmd": gp.command,
"expect_stdout": gp.expect_in_stdout,
"description": gp.description,
}
for gp in snapshot.golden_path
],
"task": {
"red_briefing": snapshot.task.red_briefing,
"blue_briefing": snapshot.task.blue_briefing,
},
"npc_personas": [
{
"name": p.name,
"role": p.role,
"department": p.department,
"security_awareness": p.security_awareness,
}
for p in snapshot.npc_personas
],
"files": snapshot.files,
}
with open(out, "w") as f:
json.dump(data, f, indent=2)
print(f"Snapshot saved to {out}")
async def main():
# Verify Azure creds are set
required = ["AZURE_API_KEY", "AZURE_API_BASE"]
missing = [k for k in required if not os.environ.get(k)]
if missing:
print(f"ERROR: Missing env vars: {missing}")
print("Set AZURE_API_KEY and AZURE_API_BASE before running.")
sys.exit(1)
# Default to azure/gpt-5.2-codex if not overridden
if not os.environ.get("OPENRANGE_BUILDER_MODEL"):
os.environ["OPENRANGE_BUILDER_MODEL"] = "azure/gpt-5.2-codex"
# Load manifest
manifest = load_manifest()
print(f"Loaded manifest: {manifest['name']} (tier {manifest['tier']})")
print(f" Bug families: {len(manifest['bug_families'])}")
print(f" Hosts: {[h['name'] for h in manifest['topology']['hosts']]}")
# Build snapshot via LLM
snapshot = await build_snapshot(manifest)
# Save snapshot for reuse
save_snapshot(snapshot)
# Check if Docker compose stack is running
docker_mode = False
try:
import docker
client = docker.from_env()
containers = client.containers.list()
range_containers = [c for c in containers if "openrange" in c.name.lower() or "open-range" in c.name.lower()]
if range_containers:
print(f"\nFound {len(range_containers)} running range containers:")
for c in range_containers:
print(f" - {c.name} ({c.status})")
docker_mode = True
else:
print("\nNo range containers running — using mock mode")
print("To run with Docker: docker compose up -d")
client.close()
except Exception:
print("\nDocker SDK unavailable — using mock mode")
# Run episode
result = run_episode(snapshot, docker_mode=docker_mode)
print(f"Final result: {json.dumps(result, indent=2)}")
if __name__ == "__main__":
asyncio.run(main())
|