"""Integration tests for the full renderer pipeline. Loads real LLM output from snapshots/llm_tier1_test.json, parses it through _parse_llm_response(), renders through SnapshotRenderer.render(), and verifies all output files contain expected content. """ from __future__ import annotations import json import tempfile from pathlib import Path import pytest from open_range.builder.builder import _parse_llm_response from open_range.builder.renderer import SnapshotRenderer ROOT = Path(__file__).parent.parent SNAPSHOT_PATH = ROOT / "snapshots" / "llm_tier1_test.json" @pytest.fixture def llm_output() -> dict: """Load the real LLM output JSON.""" if not SNAPSHOT_PATH.exists(): pytest.skip("llm_tier1_test.json fixture not present") return json.loads(SNAPSHOT_PATH.read_text()) @pytest.fixture def parsed_spec(llm_output): """Parse real LLM output through _parse_llm_response.""" return _parse_llm_response(json.dumps(llm_output)) @pytest.fixture def rendered_dir(parsed_spec): """Render the parsed spec and yield the output directory.""" renderer = SnapshotRenderer() with tempfile.TemporaryDirectory() as tmpdir: out = Path(tmpdir) / "integration_out" renderer.render(parsed_spec, out) yield out # --------------------------------------------------------------------------- # Pipeline: parse -> render round-trip # --------------------------------------------------------------------------- class TestParseLLMOutput: """Verify _parse_llm_response correctly handles real LLM output.""" def test_parse_produces_snapshot_spec(self, parsed_spec): from open_range.protocols import SnapshotSpec assert isinstance(parsed_spec, SnapshotSpec) def test_parse_has_topology(self, parsed_spec): assert "hosts" in parsed_spec.topology assert len(parsed_spec.topology["hosts"]) == 8 def test_parse_has_vulns(self, parsed_spec): assert len(parsed_spec.truth_graph.vulns) >= 1 vuln_types = {v.type for v in parsed_spec.truth_graph.vulns} assert "sqli" in vuln_types def test_parse_has_flags(self, parsed_spec): assert len(parsed_spec.flags) >= 2 def test_parse_has_golden_path(self, parsed_spec): assert len(parsed_spec.golden_path) >= 1 # Golden path steps should have commands for step in parsed_spec.golden_path: assert step.command, f"Step {step.step} has empty command" def test_parse_has_task_briefings(self, parsed_spec): assert parsed_spec.task.red_briefing assert parsed_spec.task.blue_briefing def test_parse_has_files(self, parsed_spec): assert len(parsed_spec.files) > 0 # Should include web files and db:sql web_files = [k for k in parsed_spec.files if k.startswith("web:")] assert len(web_files) > 0 def test_parse_has_npc_personas(self, parsed_spec): assert len(parsed_spec.npc_personas) >= 1 def test_golden_path_uses_command_field(self, parsed_spec): """LLM output uses 'cmd', parser should map to 'command'.""" for step in parsed_spec.golden_path: assert step.command # Should be populated from 'cmd' key def test_golden_path_uses_expect_in_stdout(self, parsed_spec): """LLM output uses 'expect_stdout', parser maps to 'expect_in_stdout'.""" for step in parsed_spec.golden_path: assert step.expect_in_stdout # --------------------------------------------------------------------------- # All output files exist # --------------------------------------------------------------------------- class TestRenderedFilesExist: """Verify all 6 template outputs are created.""" EXPECTED_FILES = [ "docker-compose.yml", "Dockerfile.web", "Dockerfile.db", "nginx.conf", "init.sql", "iptables.rules", ] def test_all_output_files_exist(self, rendered_dir): for fname in self.EXPECTED_FILES: path = rendered_dir / fname assert path.exists(), f"Missing output file: {fname}" def test_all_output_files_non_empty(self, rendered_dir): for fname in self.EXPECTED_FILES: content = (rendered_dir / fname).read_text() assert len(content) > 0, f"Empty output file: {fname}" # --------------------------------------------------------------------------- # nginx.conf content verification # --------------------------------------------------------------------------- class TestNginxConf: """Verify rendered nginx.conf has correct content.""" def test_references_php_fpm_socket(self, rendered_dir): nginx = (rendered_dir / "nginx.conf").read_text() assert "php8.1-fpm.sock" in nginx def test_has_server_block(self, rendered_dir): nginx = (rendered_dir / "nginx.conf").read_text() assert "server {" in nginx assert "listen 80" in nginx def test_has_php_location(self, rendered_dir): nginx = (rendered_dir / "nginx.conf").read_text() assert "location ~ \\.php$" in nginx def test_has_fastcgi_pass(self, rendered_dir): nginx = (rendered_dir / "nginx.conf").read_text() assert "fastcgi_pass unix:/run/php/php8.1-fpm.sock" in nginx # --------------------------------------------------------------------------- # docker-compose.yml content verification # --------------------------------------------------------------------------- class TestDockerCompose: """Verify rendered docker-compose.yml has correct static IPs and structure.""" def test_has_services_section(self, rendered_dir): compose = (rendered_dir / "docker-compose.yml").read_text() assert "services:" in compose def test_has_all_core_services(self, rendered_dir): compose = (rendered_dir / "docker-compose.yml").read_text() for service in ["attacker:", "firewall:", "web:", "mail:", "db:", "siem:", "ldap:", "files:"]: assert service in compose, f"Missing service: {service}" def test_has_network_definitions(self, rendered_dir): compose = (rendered_dir / "docker-compose.yml").read_text() assert "networks:" in compose assert "external:" in compose assert "dmz:" in compose assert "internal:" in compose assert "management:" in compose def test_has_static_ips(self, rendered_dir): compose = (rendered_dir / "docker-compose.yml").read_text() # Key static IPs from the template assert "10.0.0.10" in compose # attacker assert "10.0.0.2" in compose # firewall external assert "10.0.1.10" in compose # web dmz assert "10.0.2.20" in compose # db internal assert "10.0.3.20" in compose # ldap management assert "10.0.3.21" in compose # siem management def test_web_depends_on_db(self, rendered_dir): compose = (rendered_dir / "docker-compose.yml").read_text() # web service should have depends_on db assert "depends_on:" in compose def test_has_subnet_definitions(self, rendered_dir): compose = (rendered_dir / "docker-compose.yml").read_text() assert "10.0.0.0/24" in compose # external assert "10.0.1.0/24" in compose # dmz assert "10.0.2.0/24" in compose # internal assert "10.0.3.0/24" in compose # management def test_has_healthchecks(self, rendered_dir): compose = (rendered_dir / "docker-compose.yml").read_text() assert "healthcheck:" in compose def test_web_healthcheck_does_not_require_pre_overlay_2xx(self, rendered_dir): compose = (rendered_dir / "docker-compose.yml").read_text() assert "CMD-SHELL" in compose assert "http://localhost/ || true" in compose assert "$$status" in compose assert '2*|3*|4*) exit 0' in compose assert 'curl", "-sf", "http://localhost/"' not in compose def test_attacker_has_net_admin(self, rendered_dir): compose = (rendered_dir / "docker-compose.yml").read_text() assert "NET_ADMIN" in compose def test_db_has_mysql_env_vars(self, rendered_dir): compose = (rendered_dir / "docker-compose.yml").read_text() assert "MYSQL_ROOT_PASSWORD" in compose assert "MYSQL_DATABASE=" in compose assert "MYSQL_USER=" in compose # --------------------------------------------------------------------------- # init.sql content verification # --------------------------------------------------------------------------- class TestInitSQL: """Verify rendered init.sql has referral_db and runtime-selected DB grants.""" def test_creates_referral_db(self, rendered_dir): sql = (rendered_dir / "init.sql").read_text() assert "referral_db" in sql def test_creates_flags_db(self, rendered_dir): sql = (rendered_dir / "init.sql").read_text() assert "flags" in sql def test_creates_core_tables(self, rendered_dir): sql = (rendered_dir / "init.sql").read_text() assert "CREATE TABLE" in sql assert "users" in sql assert "patients" in sql assert "secrets" in sql def test_creates_healthcare_tables(self, rendered_dir): sql = (rendered_dir / "init.sql").read_text() assert "patient_referrals" in sql assert "billing" in sql def test_grants_runtime_db_user(self, rendered_dir): sql = (rendered_dir / "init.sql").read_text() assert "GRANT" in sql assert "TO '" in sql def test_has_flush_privileges(self, rendered_dir): sql = (rendered_dir / "init.sql").read_text() assert "FLUSH PRIVILEGES" in sql # --------------------------------------------------------------------------- # Dockerfile.web content verification # --------------------------------------------------------------------------- class TestDockerfileWeb: """Verify rendered Dockerfile.web creates users from topology.""" def test_creates_users_from_topology(self, rendered_dir, parsed_spec): dockerfile = (rendered_dir / "Dockerfile.web").read_text() # Should have useradd for users from topology users = parsed_spec.topology.get("users", []) assert len(users) > 0, "Parsed spec should have users" for user in users: username = user.get("username", "") if username: assert "useradd" in dockerfile def test_has_php_fpm(self, rendered_dir): dockerfile = (rendered_dir / "Dockerfile.web").read_text() assert "php8.1-fpm" in dockerfile def test_has_nginx(self, rendered_dir): dockerfile = (rendered_dir / "Dockerfile.web").read_text() assert "nginx" in dockerfile def test_copies_nginx_conf(self, rendered_dir): dockerfile = (rendered_dir / "Dockerfile.web").read_text() assert "COPY nginx.conf" in dockerfile def test_exposes_ports(self, rendered_dir): dockerfile = (rendered_dir / "Dockerfile.web").read_text() assert "EXPOSE" in dockerfile assert "80" in dockerfile def test_plants_file_flags(self, rendered_dir, parsed_spec): """Flags with file paths on web host should appear in Dockerfile.""" dockerfile = (rendered_dir / "Dockerfile.web").read_text() for flag in parsed_spec.flags: if flag.host == "web" and "/" in flag.path: assert flag.value in dockerfile, ( f"Flag {flag.id} ({flag.value}) not in Dockerfile.web" ) def test_db_flags_not_in_dockerfile(self, rendered_dir, parsed_spec): """Flags with db: paths should NOT appear in Dockerfile.web.""" dockerfile = (rendered_dir / "Dockerfile.web").read_text() for flag in parsed_spec.flags: if flag.path.startswith("mysql:") or flag.path.startswith("db:"): assert flag.value not in dockerfile, ( f"DB flag {flag.id} ({flag.value}) should not be in Dockerfile.web" ) # --------------------------------------------------------------------------- # iptables.rules content verification # --------------------------------------------------------------------------- class TestIptablesRules: """Verify rendered iptables.rules has correct structure.""" def test_has_filter_table(self, rendered_dir): rules = (rendered_dir / "iptables.rules").read_text() assert "*filter" in rules assert "COMMIT" in rules def test_has_forward_chain(self, rendered_dir): rules = (rendered_dir / "iptables.rules").read_text() assert "FORWARD" in rules # --------------------------------------------------------------------------- # Full round-trip: files dict is preserved through parse # --------------------------------------------------------------------------- class TestFilesPreserved: """Verify that files from LLM output survive the parse pipeline.""" def test_files_dict_has_web_files(self, parsed_spec): web_files = {k: v for k, v in parsed_spec.files.items() if k.startswith("web:")} assert len(web_files) > 0 def test_files_dict_has_sql(self, parsed_spec): assert "db:sql" in parsed_spec.files def test_index_php_content(self, parsed_spec): key = "web:/var/www/portal/index.php" assert key in parsed_spec.files assert "Meridian Referral Portal" in parsed_spec.files[key] def test_lookup_php_has_sqli(self, parsed_spec): key = "web:/var/www/portal/lookup.php" assert key in parsed_spec.files content = parsed_spec.files[key] # Should contain the vulnerable SQL query assert "last_name LIKE" in content or "$last" in content def test_compliance_report_has_flag(self, parsed_spec): key = "web:/var/www/portal/reports/hipaa_audit.txt" assert key in parsed_spec.files assert "FLAG{1a2b3c4d5e6f7788}" in parsed_spec.files[key] def test_sql_has_user_inserts(self, parsed_spec): sql = parsed_spec.files.get("db:sql", "") assert "dthompson" in sql assert "kwilliams" in sql def test_sql_has_flag_insert(self, parsed_spec): sql = parsed_spec.files.get("db:sql", "") assert "FLAG{9f3a2b4c5d6e7f80}" in sql def test_files_samba_shares(self, parsed_spec): files_entries = {k: v for k, v in parsed_spec.files.items() if k.startswith("files:")} assert len(files_entries) > 0 def test_db_backup_script(self, parsed_spec): key = "db:/opt/scripts/db_backup.sh" assert key in parsed_spec.files assert "mysqldump" in parsed_spec.files[key]