Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """Test Tier 1 snapshot generation with LLM Builder + local Docker. | |
| Usage: | |
| export AZURE_API_KEY="..." | |
| export AZURE_API_BASE="..." | |
| export AZURE_API_VERSION="2025-04-01-preview" | |
| uv run python scripts/test_tier1_llm.py | |
| """ | |
| from __future__ import annotations | |
| import asyncio | |
| import json | |
| import os | |
| import sys | |
| import time | |
| from pathlib import Path | |
| import yaml | |
| # Ensure src is importable | |
| sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src")) | |
| from open_range.builder.builder import LLMSnapshotBuilder | |
| from open_range.protocols import BuildContext | |
| from open_range.server.environment import RangeEnvironment | |
| from open_range.server.models import RangeAction | |
| def load_manifest(path: str = "manifests/tier1_basic.yaml") -> dict: | |
| """Load and return the tier1 manifest as a dict.""" | |
| manifest_path = Path(__file__).resolve().parent.parent / path | |
| with open(manifest_path) as f: | |
| return yaml.safe_load(f) | |
| async def build_snapshot(manifest: dict) -> object: | |
| """Call the LLM builder to generate a snapshot spec.""" | |
| model = os.environ.get("OPENRANGE_BUILDER_MODEL", "azure/gpt-5.2-codex") | |
| print(f"\n{'='*60}") | |
| print(f" BUILDER: Generating Tier 1 snapshot") | |
| print(f" Model: {model}") | |
| print(f" API: {os.environ.get('AZURE_API_BASE', 'not set')}") | |
| print(f"{'='*60}\n") | |
| # Codex models don't support temperature | |
| temp = None if "codex" in model.lower() else 0.7 | |
| builder = LLMSnapshotBuilder( | |
| model=model, | |
| temperature=temp, | |
| max_retries=2, | |
| max_tokens=32768, | |
| ) | |
| context = BuildContext( | |
| seed=42, | |
| tier=1, | |
| previous_vuln_classes=[], | |
| solve_rates={}, | |
| weak_areas=[], | |
| ) | |
| t0 = time.time() | |
| snapshot = await builder.build(manifest, context) | |
| elapsed = time.time() - t0 | |
| print(f"Snapshot generated in {elapsed:.1f}s") | |
| print(f" Topology hosts: {snapshot.topology.get('hosts', [])}") | |
| print(f" Vulns: {len(snapshot.truth_graph.vulns)}") | |
| for v in snapshot.truth_graph.vulns: | |
| print(f" - {v.id}: {v.type} on {v.host} ({v.service})") | |
| print(f" Flags: {len(snapshot.flags)}") | |
| for f in snapshot.flags: | |
| print(f" - {f.id}: {f.value[:30]}... @ {f.host}:{f.path}") | |
| print(f" Golden path: {len(snapshot.golden_path)} steps") | |
| for gp in snapshot.golden_path: | |
| print(f" Step {gp.step}: {gp.command[:60]}") | |
| print(f" Files: {len(snapshot.files)} entries") | |
| for key in sorted(snapshot.files.keys()): | |
| size = len(snapshot.files[key]) | |
| print(f" - {key} ({size} chars)") | |
| print(f" NPC personas: {len(snapshot.npc_personas)}") | |
| print(f" Task red: {snapshot.task.red_briefing[:80]}...") | |
| print(f" Task blue: {snapshot.task.blue_briefing[:80]}...") | |
| return snapshot | |
| def run_episode(snapshot, docker_mode: bool = False) -> dict: | |
| """Run a scripted episode against the generated snapshot.""" | |
| print(f"\n{'='*60}") | |
| print(f" EPISODE: Running against generated snapshot") | |
| print(f" Docker: {'yes' if docker_mode else 'mock mode'}") | |
| print(f"{'='*60}\n") | |
| env = RangeEnvironment( | |
| docker_available=docker_mode, | |
| max_steps=50, | |
| ) | |
| # Reset with the LLM-generated snapshot | |
| obs = env.reset(snapshot=snapshot, episode_id="llm-tier1-test") | |
| print(f"[RESET] {obs.stdout[:200]}") | |
| print() | |
| # Use the golden path as a scripted Red agent | |
| golden_path = snapshot.golden_path | |
| if not golden_path: | |
| print("No golden path steps — cannot run scripted episode") | |
| return {"outcome": "no_golden_path", "steps": 0} | |
| step = 0 | |
| for gp in golden_path: | |
| step += 1 | |
| action = RangeAction(command=gp.command, mode="red") | |
| result = env.step(action) | |
| reward = result.reward if result.reward is not None else 0.0 | |
| status = "" | |
| if result.flags_captured: | |
| status = f" FLAGS={result.flags_captured}" | |
| if result.done: | |
| status += " [DONE]" | |
| print(f" [{step:2d}] RED >> {gp.command[:60]}") | |
| if docker_mode: | |
| # Show actual output in docker mode | |
| stdout_preview = result.stdout[:120].replace('\n', ' ') | |
| print(f" stdout: {stdout_preview}") | |
| else: | |
| print(f" expect: {gp.expect_in_stdout[:60]}") | |
| print(f" reward={reward:.4f}{status}") | |
| if result.done: | |
| break | |
| # Final state | |
| state = env.state | |
| print(f"\n{'='*60}") | |
| print(f" RESULT") | |
| print(f"{'='*60}") | |
| print(f" Steps: {state.step_count}") | |
| print(f" Flags found: {state.flags_found}") | |
| print(f" Tier: {state.tier}") | |
| print(f" Episode: {state.episode_id}") | |
| print(f"{'='*60}\n") | |
| return { | |
| "outcome": "flag_captured" if state.flags_found else "no_flag", | |
| "steps": state.step_count, | |
| "flags_found": list(state.flags_found), | |
| } | |
| def save_snapshot(snapshot, path: str = "snapshots/llm_tier1_test.json"): | |
| """Save the generated snapshot to disk for reuse.""" | |
| out = Path(__file__).resolve().parent.parent / path | |
| out.parent.mkdir(parents=True, exist_ok=True) | |
| data = { | |
| "topology": snapshot.topology, | |
| "truth_graph": { | |
| "vulns": [ | |
| { | |
| "id": v.id, | |
| "type": v.type, | |
| "host": v.host, | |
| "service": v.service, | |
| "injection_point": v.injection_point, | |
| "vulnerable_code": v.vulnerable_code, | |
| "root_cause": v.root_cause, | |
| "blast_radius": v.blast_radius, | |
| "remediation": v.remediation, | |
| } | |
| for v in snapshot.truth_graph.vulns | |
| ], | |
| "exploit_chain": [ | |
| {"vuln_id": ec.vuln_id, "command": ec.command, "description": ec.description} | |
| for ec in snapshot.truth_graph.exploit_chain | |
| ], | |
| }, | |
| "flags": [ | |
| {"id": f.id, "value": f.value, "path": f.path, "host": f.host} | |
| for f in snapshot.flags | |
| ], | |
| "golden_path": [ | |
| { | |
| "step": gp.step, | |
| "cmd": gp.command, | |
| "expect_stdout": gp.expect_in_stdout, | |
| "description": gp.description, | |
| } | |
| for gp in snapshot.golden_path | |
| ], | |
| "task": { | |
| "red_briefing": snapshot.task.red_briefing, | |
| "blue_briefing": snapshot.task.blue_briefing, | |
| }, | |
| "npc_personas": [ | |
| { | |
| "name": p.name, | |
| "role": p.role, | |
| "department": p.department, | |
| "security_awareness": p.security_awareness, | |
| } | |
| for p in snapshot.npc_personas | |
| ], | |
| "files": snapshot.files, | |
| } | |
| with open(out, "w") as f: | |
| json.dump(data, f, indent=2) | |
| print(f"Snapshot saved to {out}") | |
| async def main(): | |
| # Verify Azure creds are set | |
| required = ["AZURE_API_KEY", "AZURE_API_BASE"] | |
| missing = [k for k in required if not os.environ.get(k)] | |
| if missing: | |
| print(f"ERROR: Missing env vars: {missing}") | |
| print("Set AZURE_API_KEY and AZURE_API_BASE before running.") | |
| sys.exit(1) | |
| # Default to azure/gpt-5.2-codex if not overridden | |
| if not os.environ.get("OPENRANGE_BUILDER_MODEL"): | |
| os.environ["OPENRANGE_BUILDER_MODEL"] = "azure/gpt-5.2-codex" | |
| # Load manifest | |
| manifest = load_manifest() | |
| print(f"Loaded manifest: {manifest['name']} (tier {manifest['tier']})") | |
| print(f" Bug families: {len(manifest['bug_families'])}") | |
| print(f" Hosts: {[h['name'] for h in manifest['topology']['hosts']]}") | |
| # Build snapshot via LLM | |
| snapshot = await build_snapshot(manifest) | |
| # Save snapshot for reuse | |
| save_snapshot(snapshot) | |
| # Check if Docker compose stack is running | |
| docker_mode = False | |
| try: | |
| import docker | |
| client = docker.from_env() | |
| containers = client.containers.list() | |
| range_containers = [c for c in containers if "openrange" in c.name.lower() or "open-range" in c.name.lower()] | |
| if range_containers: | |
| print(f"\nFound {len(range_containers)} running range containers:") | |
| for c in range_containers: | |
| print(f" - {c.name} ({c.status})") | |
| docker_mode = True | |
| else: | |
| print("\nNo range containers running — using mock mode") | |
| print("To run with Docker: docker compose up -d") | |
| client.close() | |
| except Exception: | |
| print("\nDocker SDK unavailable — using mock mode") | |
| # Run episode | |
| result = run_episode(snapshot, docker_mode=docker_mode) | |
| print(f"Final result: {json.dumps(result, indent=2)}") | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |