Spaces:
Runtime error
Runtime error
File size: 14,694 Bytes
3ea4118 f016eb7 3ea4118 5b99233 3ea4118 f016eb7 3ea4118 dabed55 3ea4118 dabed55 3ea4118 dabed55 3ea4118 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 | """Integration tests for the full renderer pipeline.
Loads real LLM output from snapshots/llm_tier1_test.json, parses it
through _parse_llm_response(), renders through SnapshotRenderer.render(),
and verifies all output files contain expected content.
"""
from __future__ import annotations
import json
import tempfile
from pathlib import Path
import pytest
from open_range.builder.builder import _parse_llm_response
from open_range.builder.renderer import SnapshotRenderer
ROOT = Path(__file__).parent.parent
SNAPSHOT_PATH = ROOT / "snapshots" / "llm_tier1_test.json"
@pytest.fixture
def llm_output() -> dict:
"""Load the real LLM output JSON."""
if not SNAPSHOT_PATH.exists():
pytest.skip("llm_tier1_test.json fixture not present")
return json.loads(SNAPSHOT_PATH.read_text())
@pytest.fixture
def parsed_spec(llm_output):
"""Parse real LLM output through _parse_llm_response."""
return _parse_llm_response(json.dumps(llm_output))
@pytest.fixture
def rendered_dir(parsed_spec):
"""Render the parsed spec and yield the output directory."""
renderer = SnapshotRenderer()
with tempfile.TemporaryDirectory() as tmpdir:
out = Path(tmpdir) / "integration_out"
renderer.render(parsed_spec, out)
yield out
# ---------------------------------------------------------------------------
# Pipeline: parse -> render round-trip
# ---------------------------------------------------------------------------
class TestParseLLMOutput:
"""Verify _parse_llm_response correctly handles real LLM output."""
def test_parse_produces_snapshot_spec(self, parsed_spec):
from open_range.protocols import SnapshotSpec
assert isinstance(parsed_spec, SnapshotSpec)
def test_parse_has_topology(self, parsed_spec):
assert "hosts" in parsed_spec.topology
assert len(parsed_spec.topology["hosts"]) == 8
def test_parse_has_vulns(self, parsed_spec):
assert len(parsed_spec.truth_graph.vulns) >= 1
vuln_types = {v.type for v in parsed_spec.truth_graph.vulns}
assert "sqli" in vuln_types
def test_parse_has_flags(self, parsed_spec):
assert len(parsed_spec.flags) >= 2
def test_parse_has_golden_path(self, parsed_spec):
assert len(parsed_spec.golden_path) >= 1
# Golden path steps should have commands
for step in parsed_spec.golden_path:
assert step.command, f"Step {step.step} has empty command"
def test_parse_has_task_briefings(self, parsed_spec):
assert parsed_spec.task.red_briefing
assert parsed_spec.task.blue_briefing
def test_parse_has_files(self, parsed_spec):
assert len(parsed_spec.files) > 0
# Should include web files and db:sql
web_files = [k for k in parsed_spec.files if k.startswith("web:")]
assert len(web_files) > 0
def test_parse_has_npc_personas(self, parsed_spec):
assert len(parsed_spec.npc_personas) >= 1
def test_golden_path_uses_command_field(self, parsed_spec):
"""LLM output uses 'cmd', parser should map to 'command'."""
for step in parsed_spec.golden_path:
assert step.command # Should be populated from 'cmd' key
def test_golden_path_uses_expect_in_stdout(self, parsed_spec):
"""LLM output uses 'expect_stdout', parser maps to 'expect_in_stdout'."""
for step in parsed_spec.golden_path:
assert step.expect_in_stdout
# ---------------------------------------------------------------------------
# All output files exist
# ---------------------------------------------------------------------------
class TestRenderedFilesExist:
"""Verify all 6 template outputs are created."""
EXPECTED_FILES = [
"docker-compose.yml",
"Dockerfile.web",
"Dockerfile.db",
"nginx.conf",
"init.sql",
"iptables.rules",
]
def test_all_output_files_exist(self, rendered_dir):
for fname in self.EXPECTED_FILES:
path = rendered_dir / fname
assert path.exists(), f"Missing output file: {fname}"
def test_all_output_files_non_empty(self, rendered_dir):
for fname in self.EXPECTED_FILES:
content = (rendered_dir / fname).read_text()
assert len(content) > 0, f"Empty output file: {fname}"
# ---------------------------------------------------------------------------
# nginx.conf content verification
# ---------------------------------------------------------------------------
class TestNginxConf:
"""Verify rendered nginx.conf has correct content."""
def test_references_php_fpm_socket(self, rendered_dir):
nginx = (rendered_dir / "nginx.conf").read_text()
assert "php8.1-fpm.sock" in nginx
def test_has_server_block(self, rendered_dir):
nginx = (rendered_dir / "nginx.conf").read_text()
assert "server {" in nginx
assert "listen 80" in nginx
def test_has_php_location(self, rendered_dir):
nginx = (rendered_dir / "nginx.conf").read_text()
assert "location ~ \\.php$" in nginx
def test_has_fastcgi_pass(self, rendered_dir):
nginx = (rendered_dir / "nginx.conf").read_text()
assert "fastcgi_pass unix:/run/php/php8.1-fpm.sock" in nginx
# ---------------------------------------------------------------------------
# docker-compose.yml content verification
# ---------------------------------------------------------------------------
class TestDockerCompose:
"""Verify rendered docker-compose.yml has correct static IPs and structure."""
def test_has_services_section(self, rendered_dir):
compose = (rendered_dir / "docker-compose.yml").read_text()
assert "services:" in compose
def test_has_all_core_services(self, rendered_dir):
compose = (rendered_dir / "docker-compose.yml").read_text()
for service in ["attacker:", "firewall:", "web:", "mail:", "db:", "siem:", "ldap:", "files:"]:
assert service in compose, f"Missing service: {service}"
def test_has_network_definitions(self, rendered_dir):
compose = (rendered_dir / "docker-compose.yml").read_text()
assert "networks:" in compose
assert "external:" in compose
assert "dmz:" in compose
assert "internal:" in compose
assert "management:" in compose
def test_has_static_ips(self, rendered_dir):
compose = (rendered_dir / "docker-compose.yml").read_text()
# Key static IPs from the template
assert "10.0.0.10" in compose # attacker
assert "10.0.0.2" in compose # firewall external
assert "10.0.1.10" in compose # web dmz
assert "10.0.2.20" in compose # db internal
assert "10.0.3.20" in compose # ldap management
assert "10.0.3.21" in compose # siem management
def test_web_depends_on_db(self, rendered_dir):
compose = (rendered_dir / "docker-compose.yml").read_text()
# web service should have depends_on db
assert "depends_on:" in compose
def test_has_subnet_definitions(self, rendered_dir):
compose = (rendered_dir / "docker-compose.yml").read_text()
assert "10.0.0.0/24" in compose # external
assert "10.0.1.0/24" in compose # dmz
assert "10.0.2.0/24" in compose # internal
assert "10.0.3.0/24" in compose # management
def test_has_healthchecks(self, rendered_dir):
compose = (rendered_dir / "docker-compose.yml").read_text()
assert "healthcheck:" in compose
def test_web_healthcheck_does_not_require_pre_overlay_2xx(self, rendered_dir):
compose = (rendered_dir / "docker-compose.yml").read_text()
assert "CMD-SHELL" in compose
assert "http://localhost/ || true" in compose
assert "$$status" in compose
assert '2*|3*|4*) exit 0' in compose
assert 'curl", "-sf", "http://localhost/"' not in compose
def test_attacker_has_net_admin(self, rendered_dir):
compose = (rendered_dir / "docker-compose.yml").read_text()
assert "NET_ADMIN" in compose
def test_db_has_mysql_env_vars(self, rendered_dir):
compose = (rendered_dir / "docker-compose.yml").read_text()
assert "MYSQL_ROOT_PASSWORD" in compose
assert "MYSQL_DATABASE=" in compose
assert "MYSQL_USER=" in compose
# ---------------------------------------------------------------------------
# init.sql content verification
# ---------------------------------------------------------------------------
class TestInitSQL:
"""Verify rendered init.sql has referral_db and runtime-selected DB grants."""
def test_creates_referral_db(self, rendered_dir):
sql = (rendered_dir / "init.sql").read_text()
assert "referral_db" in sql
def test_creates_flags_db(self, rendered_dir):
sql = (rendered_dir / "init.sql").read_text()
assert "flags" in sql
def test_creates_core_tables(self, rendered_dir):
sql = (rendered_dir / "init.sql").read_text()
assert "CREATE TABLE" in sql
assert "users" in sql
assert "patients" in sql
assert "secrets" in sql
def test_creates_healthcare_tables(self, rendered_dir):
sql = (rendered_dir / "init.sql").read_text()
assert "patient_referrals" in sql
assert "billing" in sql
def test_grants_runtime_db_user(self, rendered_dir):
sql = (rendered_dir / "init.sql").read_text()
assert "GRANT" in sql
assert "TO '" in sql
def test_has_flush_privileges(self, rendered_dir):
sql = (rendered_dir / "init.sql").read_text()
assert "FLUSH PRIVILEGES" in sql
# ---------------------------------------------------------------------------
# Dockerfile.web content verification
# ---------------------------------------------------------------------------
class TestDockerfileWeb:
"""Verify rendered Dockerfile.web creates users from topology."""
def test_creates_users_from_topology(self, rendered_dir, parsed_spec):
dockerfile = (rendered_dir / "Dockerfile.web").read_text()
# Should have useradd for users from topology
users = parsed_spec.topology.get("users", [])
assert len(users) > 0, "Parsed spec should have users"
for user in users:
username = user.get("username", "")
if username:
assert "useradd" in dockerfile
def test_has_php_fpm(self, rendered_dir):
dockerfile = (rendered_dir / "Dockerfile.web").read_text()
assert "php8.1-fpm" in dockerfile
def test_has_nginx(self, rendered_dir):
dockerfile = (rendered_dir / "Dockerfile.web").read_text()
assert "nginx" in dockerfile
def test_copies_nginx_conf(self, rendered_dir):
dockerfile = (rendered_dir / "Dockerfile.web").read_text()
assert "COPY nginx.conf" in dockerfile
def test_exposes_ports(self, rendered_dir):
dockerfile = (rendered_dir / "Dockerfile.web").read_text()
assert "EXPOSE" in dockerfile
assert "80" in dockerfile
def test_plants_file_flags(self, rendered_dir, parsed_spec):
"""Flags with file paths on web host should appear in Dockerfile."""
dockerfile = (rendered_dir / "Dockerfile.web").read_text()
for flag in parsed_spec.flags:
if flag.host == "web" and "/" in flag.path:
assert flag.value in dockerfile, (
f"Flag {flag.id} ({flag.value}) not in Dockerfile.web"
)
def test_db_flags_not_in_dockerfile(self, rendered_dir, parsed_spec):
"""Flags with db: paths should NOT appear in Dockerfile.web."""
dockerfile = (rendered_dir / "Dockerfile.web").read_text()
for flag in parsed_spec.flags:
if flag.path.startswith("mysql:") or flag.path.startswith("db:"):
assert flag.value not in dockerfile, (
f"DB flag {flag.id} ({flag.value}) should not be in Dockerfile.web"
)
# ---------------------------------------------------------------------------
# iptables.rules content verification
# ---------------------------------------------------------------------------
class TestIptablesRules:
"""Verify rendered iptables.rules has correct structure."""
def test_has_filter_table(self, rendered_dir):
rules = (rendered_dir / "iptables.rules").read_text()
assert "*filter" in rules
assert "COMMIT" in rules
def test_has_forward_chain(self, rendered_dir):
rules = (rendered_dir / "iptables.rules").read_text()
assert "FORWARD" in rules
# ---------------------------------------------------------------------------
# Full round-trip: files dict is preserved through parse
# ---------------------------------------------------------------------------
class TestFilesPreserved:
"""Verify that files from LLM output survive the parse pipeline."""
def test_files_dict_has_web_files(self, parsed_spec):
web_files = {k: v for k, v in parsed_spec.files.items() if k.startswith("web:")}
assert len(web_files) > 0
def test_files_dict_has_sql(self, parsed_spec):
assert "db:sql" in parsed_spec.files
def test_index_php_content(self, parsed_spec):
key = "web:/var/www/portal/index.php"
assert key in parsed_spec.files
assert "Meridian Referral Portal" in parsed_spec.files[key]
def test_lookup_php_has_sqli(self, parsed_spec):
key = "web:/var/www/portal/lookup.php"
assert key in parsed_spec.files
content = parsed_spec.files[key]
# Should contain the vulnerable SQL query
assert "last_name LIKE" in content or "$last" in content
def test_compliance_report_has_flag(self, parsed_spec):
key = "web:/var/www/portal/reports/hipaa_audit.txt"
assert key in parsed_spec.files
assert "FLAG{1a2b3c4d5e6f7788}" in parsed_spec.files[key]
def test_sql_has_user_inserts(self, parsed_spec):
sql = parsed_spec.files.get("db:sql", "")
assert "dthompson" in sql
assert "kwilliams" in sql
def test_sql_has_flag_insert(self, parsed_spec):
sql = parsed_spec.files.get("db:sql", "")
assert "FLAG{9f3a2b4c5d6e7f80}" in sql
def test_files_samba_shares(self, parsed_spec):
files_entries = {k: v for k, v in parsed_spec.files.items() if k.startswith("files:")}
assert len(files_entries) > 0
def test_db_backup_script(self, parsed_spec):
key = "db:/opt/scripts/db_backup.sh"
assert key in parsed_spec.files
assert "mysqldump" in parsed_spec.files[key]
|