Spaces:
Runtime error
Runtime error
| """Tests for SnapshotRenderer -- template rendering pipeline.""" | |
| import tempfile | |
| import json | |
| from pathlib import Path | |
| import pytest | |
| from open_range.builder.renderer import SnapshotRenderer, _build_context | |
| from open_range.protocols import ( | |
| FlagSpec, | |
| GoldenPathStep, | |
| NPCTrafficSpec, | |
| SnapshotSpec, | |
| TaskSpec, | |
| TruthGraph, | |
| Vulnerability, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Fixtures | |
| # --------------------------------------------------------------------------- | |
| def renderer(): | |
| return SnapshotRenderer() | |
| def sqli_spec(): | |
| """SnapshotSpec with a SQLi vuln -- exercises search_endpoint in nginx.""" | |
| return SnapshotSpec( | |
| topology={ | |
| "hosts": [ | |
| "attacker", "firewall", "web", "mail", | |
| "db", "files", "ldap", "siem", | |
| ], | |
| "zones": { | |
| "external": ["attacker"], | |
| "dmz": ["web", "mail"], | |
| "internal": ["db", "files"], | |
| "management": ["ldap", "siem"], | |
| }, | |
| "users": [ | |
| { | |
| "username": "admin", | |
| "password": "Adm1n!Test", | |
| "groups": ["admins"], | |
| "hosts": ["web", "db"], | |
| }, | |
| { | |
| "username": "testuser", | |
| "password": "Test!2024", | |
| "groups": ["users"], | |
| "hosts": ["web"], | |
| }, | |
| ], | |
| "firewall_rules": [ | |
| { | |
| "action": "allow", | |
| "from_zone": "external", | |
| "to_zone": "dmz", | |
| "ports": [80, 443, 25], | |
| }, | |
| { | |
| "action": "deny", | |
| "from_zone": "internal", | |
| "to_zone": "external", | |
| "ports": [], | |
| }, | |
| ], | |
| }, | |
| truth_graph=TruthGraph( | |
| vulns=[ | |
| Vulnerability( | |
| id="vuln_sqli", | |
| type="sqli", | |
| host="web", | |
| service="nginx+php", | |
| injection_point="/search?q=", | |
| root_cause="Unsanitized user input", | |
| ) | |
| ] | |
| ), | |
| flags=[ | |
| FlagSpec( | |
| id="flag1", | |
| value="FLAG{sql1_t3st_f1ag}", | |
| path="/var/flags/flag1.txt", | |
| host="web", | |
| ), | |
| ], | |
| golden_path=[ | |
| GoldenPathStep(step=1, command="nmap -sV 10.0.1.0/24", expect_in_stdout="80/tcp"), | |
| ], | |
| npc_traffic=NPCTrafficSpec(level=0, rate_lambda=10.0), | |
| task=TaskSpec(red_briefing="Find vulns.", blue_briefing="Monitor."), | |
| ) | |
| def path_traversal_spec(): | |
| """SnapshotSpec with a path traversal vuln -- exercises download_endpoint.""" | |
| return SnapshotSpec( | |
| topology={ | |
| "hosts": ["web", "db"], | |
| "zones": {"dmz": ["web"], "internal": ["db"]}, | |
| "users": [], | |
| "firewall_rules": [], | |
| }, | |
| truth_graph=TruthGraph( | |
| vulns=[ | |
| Vulnerability( | |
| id="vuln_pt", | |
| type="path_traversal", | |
| host="web", | |
| injection_point="/download?file=", | |
| ) | |
| ] | |
| ), | |
| flags=[ | |
| FlagSpec( | |
| id="flag1", | |
| value="FLAG{p4th_tr4v}", | |
| path="/var/flags/flag1.txt", | |
| host="web", | |
| ), | |
| ], | |
| golden_path=[], | |
| task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."), | |
| ) | |
| def db_flag_spec(): | |
| """SnapshotSpec with a flag stored in the database.""" | |
| return SnapshotSpec( | |
| topology={ | |
| "hosts": ["web", "db"], | |
| "zones": {"dmz": ["web"], "internal": ["db"]}, | |
| "users": [ | |
| { | |
| "username": "dbadmin", | |
| "password": "DbP@ss!", | |
| "groups": ["admins"], | |
| "hosts": ["db"], | |
| }, | |
| ], | |
| "firewall_rules": [], | |
| }, | |
| truth_graph=TruthGraph( | |
| vulns=[ | |
| Vulnerability(id="vuln_idor", type="idor", host="web") | |
| ] | |
| ), | |
| flags=[ | |
| FlagSpec( | |
| id="flag1", | |
| value="FLAG{1d0r_fl4g}", | |
| path="db:flags.secrets.flag", | |
| host="db", | |
| ), | |
| ], | |
| golden_path=[], | |
| task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."), | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Render tests -- all output files exist | |
| # --------------------------------------------------------------------------- | |
| def test_render_creates_output_dir(renderer, sqli_spec): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "snapshot_out" | |
| result = renderer.render(sqli_spec, out) | |
| assert result == out | |
| assert out.is_dir() | |
| def test_render_produces_all_files(renderer, sqli_spec): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "snapshot_out" | |
| renderer.render(sqli_spec, out) | |
| expected_files = [ | |
| "docker-compose.yml", | |
| "Dockerfile.web", | |
| "Dockerfile.db", | |
| "nginx.conf", | |
| "init.sql", | |
| "iptables.rules", | |
| ] | |
| for fname in expected_files: | |
| assert (out / fname).exists(), f"Missing output file: {fname}" | |
| content = (out / fname).read_text() | |
| assert len(content) > 0, f"Empty output file: {fname}" | |
| def test_render_idempotent(renderer, sqli_spec): | |
| """Rendering twice to the same dir should overwrite cleanly.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "snapshot_out" | |
| renderer.render(sqli_spec, out) | |
| content1 = (out / "docker-compose.yml").read_text() | |
| renderer.render(sqli_spec, out) | |
| content2 = (out / "docker-compose.yml").read_text() | |
| assert content1 == content2 | |
| def test_render_writes_payload_manifest_and_files(renderer, sqli_spec): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "snapshot_out" | |
| spec = sqli_spec.model_copy(deep=True) | |
| spec.files = { | |
| "web:/var/www/portal/search.php": "<?php echo 'ok'; ?>\n", | |
| "siem:/var/log/siem/consolidated/all.log": "Suspicious activity detected\n", | |
| "db:sql": "USE flags;\nSELECT 1;\n", | |
| } | |
| renderer.render(spec, out) | |
| manifest = json.loads((out / "file-payloads.json").read_text()) | |
| assert "web:/var/www/portal/search.php" in manifest | |
| assert "siem:/var/log/siem/consolidated/all.log" in manifest | |
| assert "db:sql" in manifest | |
| assert (out / manifest["web:/var/www/portal/search.php"]).read_text() == "<?php echo 'ok'; ?>\n" | |
| assert (out / manifest["siem:/var/log/siem/consolidated/all.log"]).read_text() == "Suspicious activity detected\n" | |
| assert (out / manifest["db:sql"]).read_text() == "USE flags;\nSELECT 1;\n" | |
| # --------------------------------------------------------------------------- | |
| # docker-compose.yml content checks | |
| # --------------------------------------------------------------------------- | |
| def test_compose_contains_services(renderer, sqli_spec): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "out" | |
| renderer.render(sqli_spec, out) | |
| compose = (out / "docker-compose.yml").read_text() | |
| assert "services:" in compose | |
| assert "web:" in compose | |
| assert "db:" in compose | |
| assert "firewall:" in compose | |
| assert "siem:" in compose | |
| assert "attacker:" in compose | |
| def test_compose_contains_networks(renderer, sqli_spec): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "out" | |
| renderer.render(sqli_spec, out) | |
| compose = (out / "docker-compose.yml").read_text() | |
| assert "networks:" in compose | |
| assert "external:" in compose | |
| assert "dmz:" in compose | |
| assert "internal:" in compose | |
| def test_compose_web_depends_on_db(renderer, sqli_spec): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "out" | |
| renderer.render(sqli_spec, out) | |
| compose = (out / "docker-compose.yml").read_text() | |
| # The web service should depend on db | |
| assert "depends_on:" in compose | |
| def test_compose_web_healthcheck_accepts_pre_overlay_http_statuses(renderer, sqli_spec): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "out" | |
| renderer.render(sqli_spec, out) | |
| compose = (out / "docker-compose.yml").read_text() | |
| assert "CMD-SHELL" in compose | |
| assert "http://localhost/ || true" in compose | |
| assert '$$status' in compose | |
| assert '2*|3*|4*) exit 0' in compose | |
| assert 'curl", "-sf", "http://localhost/"' not in compose | |
| def test_compose_attacker_has_routed_host_aliases_and_nmap_runtime_lib(renderer, sqli_spec): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "out" | |
| renderer.render(sqli_spec, out) | |
| compose = (out / "docker-compose.yml").read_text() | |
| assert "libblas3 nmap" in compose | |
| assert 'extra_hosts:' in compose | |
| assert '"web:10.0.1.10"' in compose | |
| assert '"db:10.0.2.20"' in compose | |
| assert '"files:10.0.2.21"' in compose | |
| assert "nmap --version" in compose | |
| assert "iptables -C FORWARD" in compose | |
| # --------------------------------------------------------------------------- | |
| # Dockerfile.web content checks | |
| # --------------------------------------------------------------------------- | |
| def test_dockerfile_web_creates_users(renderer, sqli_spec): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "out" | |
| renderer.render(sqli_spec, out) | |
| dockerfile = (out / "Dockerfile.web").read_text() | |
| assert "useradd" in dockerfile | |
| assert "admin" in dockerfile | |
| assert "testuser" in dockerfile | |
| def test_dockerfile_web_plants_flag(renderer, sqli_spec): | |
| """Flag on web host with a file path should appear in Dockerfile.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "out" | |
| renderer.render(sqli_spec, out) | |
| dockerfile = (out / "Dockerfile.web").read_text() | |
| assert "FLAG{sql1_t3st_f1ag}" in dockerfile | |
| assert "/var/flags/flag1.txt" in dockerfile | |
| def test_dockerfile_web_no_db_flag(renderer, db_flag_spec): | |
| """Flag stored in db should NOT appear in Dockerfile.web.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "out" | |
| renderer.render(db_flag_spec, out) | |
| dockerfile = (out / "Dockerfile.web").read_text() | |
| assert "FLAG{1d0r_fl4g}" not in dockerfile | |
| # --------------------------------------------------------------------------- | |
| # nginx.conf content checks | |
| # --------------------------------------------------------------------------- | |
| def test_nginx_has_search_for_sqli(renderer, sqli_spec): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "out" | |
| renderer.render(sqli_spec, out) | |
| nginx = (out / "nginx.conf").read_text() | |
| assert "/search" in nginx | |
| def test_nginx_has_download_for_path_traversal(renderer, path_traversal_spec): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "out" | |
| renderer.render(path_traversal_spec, out) | |
| nginx = (out / "nginx.conf").read_text() | |
| assert "/download" in nginx | |
| def test_nginx_no_download_for_sqli(renderer, sqli_spec): | |
| """SQLi spec should not enable download endpoint.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "out" | |
| renderer.render(sqli_spec, out) | |
| nginx = (out / "nginx.conf").read_text() | |
| # The download location block should not be rendered | |
| assert "download.php" not in nginx | |
| def test_compose_firewall_nat_is_subnet_based(renderer, sqli_spec): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "out" | |
| renderer.render(sqli_spec, out) | |
| compose = (out / "docker-compose.yml").read_text() | |
| assert "-s 10.0.0.0/24 -d 10.0.1.0/24 -j MASQUERADE" in compose | |
| assert "-s 10.0.1.0/24 -d 10.0.2.0/24 -j MASQUERADE" in compose | |
| assert "-o eth1 -j MASQUERADE" not in compose | |
| # --------------------------------------------------------------------------- | |
| # init.sql content checks | |
| # --------------------------------------------------------------------------- | |
| def test_init_sql_creates_tables(renderer, sqli_spec): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "out" | |
| renderer.render(sqli_spec, out) | |
| sql = (out / "init.sql").read_text() | |
| assert "CREATE TABLE" in sql | |
| assert "users" in sql | |
| assert "patients" in sql | |
| assert "secrets" in sql | |
| def test_init_sql_creates_referral_db(renderer, sqli_spec): | |
| """Template creates referral_db with healthcare tables.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "out" | |
| renderer.render(sqli_spec, out) | |
| sql = (out / "init.sql").read_text() | |
| assert "referral_db" in sql | |
| assert "patient_referrals" in sql | |
| assert "billing" in sql | |
| def test_init_sql_grants_runtime_db_user(renderer, db_flag_spec): | |
| """Template grants privileges to the runtime-selected DB account.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "out" | |
| renderer.render(db_flag_spec, out) | |
| sql = (out / "init.sql").read_text() | |
| assert "GRANT" in sql | |
| assert "TO '" in sql | |
| assert "app_user" not in sql | |
| def test_init_sql_no_file_flag(renderer, sqli_spec): | |
| """Flag with a file path should not be inserted into SQL.""" | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "out" | |
| renderer.render(sqli_spec, out) | |
| sql = (out / "init.sql").read_text() | |
| # The flag value should NOT be in SQL (it's a file-based flag) | |
| assert "FLAG{sql1_t3st_f1ag}" not in sql | |
| # --------------------------------------------------------------------------- | |
| # iptables.rules content checks | |
| # --------------------------------------------------------------------------- | |
| def test_iptables_has_rules(renderer, sqli_spec): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "out" | |
| renderer.render(sqli_spec, out) | |
| rules = (out / "iptables.rules").read_text() | |
| assert "*filter" in rules | |
| assert "COMMIT" in rules | |
| assert "FORWARD" in rules | |
| def test_iptables_allow_rules(renderer, sqli_spec): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "out" | |
| renderer.render(sqli_spec, out) | |
| rules = (out / "iptables.rules").read_text() | |
| assert "--dport 80" in rules | |
| assert "ACCEPT" in rules | |
| def test_iptables_deny_rules(renderer, sqli_spec): | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "out" | |
| renderer.render(sqli_spec, out) | |
| rules = (out / "iptables.rules").read_text() | |
| assert "DROP" in rules | |
| # --------------------------------------------------------------------------- | |
| # Context builder unit tests | |
| # --------------------------------------------------------------------------- | |
| def test_build_context_has_expected_keys(sqli_spec): | |
| ctx = _build_context(sqli_spec) | |
| # These keys are always present | |
| expected_keys = [ | |
| "snapshot_id", "networks", "hosts", "host_names", | |
| "db_host", "db_user", "db_pass", "mysql_root_password", | |
| "domain", "users", "flags", "server_name", | |
| "firewall_rules", "zone_cidrs", "app_files", | |
| ] | |
| for key in expected_keys: | |
| assert key in ctx, f"Missing context key: {key}" | |
| # search_endpoint/download_endpoint are conditionally present | |
| # (only when True, because templates use `is defined`) | |
| assert ctx.get("search_endpoint") is True # sqli -> search enabled | |
| def test_build_context_hosts_are_dicts(sqli_spec): | |
| ctx = _build_context(sqli_spec) | |
| for h in ctx["hosts"]: | |
| assert isinstance(h, dict) | |
| assert "name" in h | |
| assert "zone" in h | |
| assert "networks" in h | |
| def test_build_context_networks_have_names(sqli_spec): | |
| ctx = _build_context(sqli_spec) | |
| net_names = [n["name"] for n in ctx["networks"]] | |
| assert "external" in net_names | |
| assert "dmz" in net_names | |
| def test_build_context_search_enabled_for_sqli(sqli_spec): | |
| ctx = _build_context(sqli_spec) | |
| assert ctx.get("search_endpoint") is True | |
| def test_build_context_download_disabled_for_sqli(sqli_spec): | |
| ctx = _build_context(sqli_spec) | |
| assert "download_endpoint" not in ctx # omitted = undefined in template | |
| def test_build_context_download_enabled_for_path_traversal(path_traversal_spec): | |
| ctx = _build_context(path_traversal_spec) | |
| assert ctx.get("download_endpoint") is True | |
| # --------------------------------------------------------------------------- | |
| # Minimal / empty spec | |
| # --------------------------------------------------------------------------- | |
| def test_render_minimal_spec(renderer): | |
| """Even a near-empty spec should render without errors.""" | |
| spec = SnapshotSpec( | |
| topology={ | |
| "hosts": ["web"], | |
| "zones": {"dmz": ["web"]}, | |
| "users": [], | |
| "firewall_rules": [], | |
| }, | |
| ) | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "minimal" | |
| renderer.render(spec, out) | |
| assert (out / "docker-compose.yml").exists() | |
| assert (out / "init.sql").exists() | |
| # --------------------------------------------------------------------------- | |
| # Integration: TemplateOnlyBuilder -> Renderer | |
| # --------------------------------------------------------------------------- | |
| async def test_builder_to_renderer_pipeline(tier1_manifest): | |
| """Full pipeline: TemplateOnlyBuilder generates spec, Renderer renders it.""" | |
| from open_range.builder.builder import TemplateOnlyBuilder | |
| from open_range.protocols import BuildContext | |
| builder = TemplateOnlyBuilder() | |
| ctx = BuildContext(seed=42, tier=1) | |
| spec = await builder.build(tier1_manifest, ctx) | |
| renderer = SnapshotRenderer() | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| out = Path(tmpdir) / "pipeline_out" | |
| renderer.render(spec, out) | |
| # All 6 artifacts should exist | |
| for fname in [ | |
| "docker-compose.yml", "Dockerfile.web", "Dockerfile.db", | |
| "nginx.conf", "init.sql", "iptables.rules", | |
| ]: | |
| assert (out / fname).exists(), f"Missing: {fname}" | |
| # docker-compose should reference the web service | |
| compose = (out / "docker-compose.yml").read_text() | |
| assert "web:" in compose | |
| # At least one flag should be in the rendered artifacts | |
| flag_value = spec.flags[0].value | |
| all_content = "" | |
| for fname in ["Dockerfile.web", "Dockerfile.db", "init.sql"]: | |
| all_content += (out / fname).read_text() | |
| assert flag_value in all_content, ( | |
| f"Flag {flag_value} not found in any rendered artifact" | |
| ) | |