File size: 19,677 Bytes
8c486a8
 
 
307d729
8c486a8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
307d729
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8c486a8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3d5d7e9
8c486a8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5b99233
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8c486a8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
5b99233
 
 
 
 
 
 
 
 
 
8c486a8
 
 
 
 
 
 
 
 
 
 
 
3d5d7e9
8c486a8
 
 
3d5d7e9
 
8c486a8
 
 
 
3d5d7e9
 
 
8c486a8
 
dabed55
 
8c486a8
 
 
 
3d5d7e9
dabed55
 
8c486a8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
"""Tests for SnapshotRenderer -- template rendering pipeline."""

import tempfile
import json
from pathlib import Path

import pytest

from open_range.builder.renderer import SnapshotRenderer, _build_context
from open_range.protocols import (
    FlagSpec,
    GoldenPathStep,
    NPCTrafficSpec,
    SnapshotSpec,
    TaskSpec,
    TruthGraph,
    Vulnerability,
)


# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------


@pytest.fixture
def renderer():
    return SnapshotRenderer()


@pytest.fixture
def sqli_spec():
    """SnapshotSpec with a SQLi vuln -- exercises search_endpoint in nginx."""
    return SnapshotSpec(
        topology={
            "hosts": [
                "attacker", "firewall", "web", "mail",
                "db", "files", "ldap", "siem",
            ],
            "zones": {
                "external": ["attacker"],
                "dmz": ["web", "mail"],
                "internal": ["db", "files"],
                "management": ["ldap", "siem"],
            },
            "users": [
                {
                    "username": "admin",
                    "password": "Adm1n!Test",
                    "groups": ["admins"],
                    "hosts": ["web", "db"],
                },
                {
                    "username": "testuser",
                    "password": "Test!2024",
                    "groups": ["users"],
                    "hosts": ["web"],
                },
            ],
            "firewall_rules": [
                {
                    "action": "allow",
                    "from_zone": "external",
                    "to_zone": "dmz",
                    "ports": [80, 443, 25],
                },
                {
                    "action": "deny",
                    "from_zone": "internal",
                    "to_zone": "external",
                    "ports": [],
                },
            ],
        },
        truth_graph=TruthGraph(
            vulns=[
                Vulnerability(
                    id="vuln_sqli",
                    type="sqli",
                    host="web",
                    service="nginx+php",
                    injection_point="/search?q=",
                    root_cause="Unsanitized user input",
                )
            ]
        ),
        flags=[
            FlagSpec(
                id="flag1",
                value="FLAG{sql1_t3st_f1ag}",
                path="/var/flags/flag1.txt",
                host="web",
            ),
        ],
        golden_path=[
            GoldenPathStep(step=1, command="nmap -sV 10.0.1.0/24", expect_in_stdout="80/tcp"),
        ],
        npc_traffic=NPCTrafficSpec(level=0, rate_lambda=10.0),
        task=TaskSpec(red_briefing="Find vulns.", blue_briefing="Monitor."),
    )


@pytest.fixture
def path_traversal_spec():
    """SnapshotSpec with a path traversal vuln -- exercises download_endpoint."""
    return SnapshotSpec(
        topology={
            "hosts": ["web", "db"],
            "zones": {"dmz": ["web"], "internal": ["db"]},
            "users": [],
            "firewall_rules": [],
        },
        truth_graph=TruthGraph(
            vulns=[
                Vulnerability(
                    id="vuln_pt",
                    type="path_traversal",
                    host="web",
                    injection_point="/download?file=",
                )
            ]
        ),
        flags=[
            FlagSpec(
                id="flag1",
                value="FLAG{p4th_tr4v}",
                path="/var/flags/flag1.txt",
                host="web",
            ),
        ],
        golden_path=[],
        task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."),
    )


@pytest.fixture
def db_flag_spec():
    """SnapshotSpec with a flag stored in the database."""
    return SnapshotSpec(
        topology={
            "hosts": ["web", "db"],
            "zones": {"dmz": ["web"], "internal": ["db"]},
            "users": [
                {
                    "username": "dbadmin",
                    "password": "DbP@ss!",
                    "groups": ["admins"],
                    "hosts": ["db"],
                },
            ],
            "firewall_rules": [],
        },
        truth_graph=TruthGraph(
            vulns=[
                Vulnerability(id="vuln_idor", type="idor", host="web")
            ]
        ),
        flags=[
            FlagSpec(
                id="flag1",
                value="FLAG{1d0r_fl4g}",
                path="db:flags.secrets.flag",
                host="db",
            ),
        ],
        golden_path=[],
        task=TaskSpec(red_briefing="Go.", blue_briefing="Watch."),
    )


# ---------------------------------------------------------------------------
# Render tests -- all output files exist
# ---------------------------------------------------------------------------


def test_render_creates_output_dir(renderer, sqli_spec):
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "snapshot_out"
        result = renderer.render(sqli_spec, out)
        assert result == out
        assert out.is_dir()


def test_render_produces_all_files(renderer, sqli_spec):
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "snapshot_out"
        renderer.render(sqli_spec, out)
        expected_files = [
            "docker-compose.yml",
            "Dockerfile.web",
            "Dockerfile.db",
            "nginx.conf",
            "init.sql",
            "iptables.rules",
        ]
        for fname in expected_files:
            assert (out / fname).exists(), f"Missing output file: {fname}"
            content = (out / fname).read_text()
            assert len(content) > 0, f"Empty output file: {fname}"


def test_render_idempotent(renderer, sqli_spec):
    """Rendering twice to the same dir should overwrite cleanly."""
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "snapshot_out"
        renderer.render(sqli_spec, out)
        content1 = (out / "docker-compose.yml").read_text()
        renderer.render(sqli_spec, out)
        content2 = (out / "docker-compose.yml").read_text()
        assert content1 == content2


def test_render_writes_payload_manifest_and_files(renderer, sqli_spec):
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "snapshot_out"
        spec = sqli_spec.model_copy(deep=True)
        spec.files = {
            "web:/var/www/portal/search.php": "<?php echo 'ok'; ?>\n",
            "siem:/var/log/siem/consolidated/all.log": "Suspicious activity detected\n",
            "db:sql": "USE flags;\nSELECT 1;\n",
        }

        renderer.render(spec, out)

        manifest = json.loads((out / "file-payloads.json").read_text())
        assert "web:/var/www/portal/search.php" in manifest
        assert "siem:/var/log/siem/consolidated/all.log" in manifest
        assert "db:sql" in manifest

        assert (out / manifest["web:/var/www/portal/search.php"]).read_text() == "<?php echo 'ok'; ?>\n"
        assert (out / manifest["siem:/var/log/siem/consolidated/all.log"]).read_text() == "Suspicious activity detected\n"
        assert (out / manifest["db:sql"]).read_text() == "USE flags;\nSELECT 1;\n"


# ---------------------------------------------------------------------------
# docker-compose.yml content checks
# ---------------------------------------------------------------------------


def test_compose_contains_services(renderer, sqli_spec):
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "out"
        renderer.render(sqli_spec, out)
        compose = (out / "docker-compose.yml").read_text()
        assert "services:" in compose
        assert "web:" in compose
        assert "db:" in compose
        assert "firewall:" in compose
        assert "siem:" in compose
        assert "attacker:" in compose


def test_compose_contains_networks(renderer, sqli_spec):
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "out"
        renderer.render(sqli_spec, out)
        compose = (out / "docker-compose.yml").read_text()
        assert "networks:" in compose
        assert "external:" in compose
        assert "dmz:" in compose
        assert "internal:" in compose


def test_compose_web_depends_on_db(renderer, sqli_spec):
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "out"
        renderer.render(sqli_spec, out)
        compose = (out / "docker-compose.yml").read_text()
        # The web service should depend on db
        assert "depends_on:" in compose


def test_compose_web_healthcheck_accepts_pre_overlay_http_statuses(renderer, sqli_spec):
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "out"
        renderer.render(sqli_spec, out)
        compose = (out / "docker-compose.yml").read_text()
        assert "CMD-SHELL" in compose
        assert "http://localhost/ || true" in compose
        assert '$$status' in compose
        assert '2*|3*|4*) exit 0' in compose
        assert 'curl", "-sf", "http://localhost/"' not in compose


def test_compose_attacker_has_routed_host_aliases_and_nmap_runtime_lib(renderer, sqli_spec):
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "out"
        renderer.render(sqli_spec, out)
        compose = (out / "docker-compose.yml").read_text()
        assert "libblas3 nmap" in compose
        assert 'extra_hosts:' in compose
        assert '"web:10.0.1.10"' in compose
        assert '"db:10.0.2.20"' in compose
        assert '"files:10.0.2.21"' in compose
        assert "nmap --version" in compose
        assert "iptables -C FORWARD" in compose


# ---------------------------------------------------------------------------
# Dockerfile.web content checks
# ---------------------------------------------------------------------------


def test_dockerfile_web_creates_users(renderer, sqli_spec):
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "out"
        renderer.render(sqli_spec, out)
        dockerfile = (out / "Dockerfile.web").read_text()
        assert "useradd" in dockerfile
        assert "admin" in dockerfile
        assert "testuser" in dockerfile


def test_dockerfile_web_plants_flag(renderer, sqli_spec):
    """Flag on web host with a file path should appear in Dockerfile."""
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "out"
        renderer.render(sqli_spec, out)
        dockerfile = (out / "Dockerfile.web").read_text()
        assert "FLAG{sql1_t3st_f1ag}" in dockerfile
        assert "/var/flags/flag1.txt" in dockerfile


def test_dockerfile_web_no_db_flag(renderer, db_flag_spec):
    """Flag stored in db should NOT appear in Dockerfile.web."""
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "out"
        renderer.render(db_flag_spec, out)
        dockerfile = (out / "Dockerfile.web").read_text()
        assert "FLAG{1d0r_fl4g}" not in dockerfile


# ---------------------------------------------------------------------------
# nginx.conf content checks
# ---------------------------------------------------------------------------


def test_nginx_has_search_for_sqli(renderer, sqli_spec):
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "out"
        renderer.render(sqli_spec, out)
        nginx = (out / "nginx.conf").read_text()
        assert "/search" in nginx


def test_nginx_has_download_for_path_traversal(renderer, path_traversal_spec):
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "out"
        renderer.render(path_traversal_spec, out)
        nginx = (out / "nginx.conf").read_text()
        assert "/download" in nginx


def test_nginx_no_download_for_sqli(renderer, sqli_spec):
    """SQLi spec should not enable download endpoint."""
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "out"
        renderer.render(sqli_spec, out)
        nginx = (out / "nginx.conf").read_text()
        # The download location block should not be rendered
        assert "download.php" not in nginx


def test_compose_firewall_nat_is_subnet_based(renderer, sqli_spec):
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "out"
        renderer.render(sqli_spec, out)
        compose = (out / "docker-compose.yml").read_text()
        assert "-s 10.0.0.0/24 -d 10.0.1.0/24 -j MASQUERADE" in compose
        assert "-s 10.0.1.0/24 -d 10.0.2.0/24 -j MASQUERADE" in compose
        assert "-o eth1 -j MASQUERADE" not in compose


# ---------------------------------------------------------------------------
# init.sql content checks
# ---------------------------------------------------------------------------


def test_init_sql_creates_tables(renderer, sqli_spec):
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "out"
        renderer.render(sqli_spec, out)
        sql = (out / "init.sql").read_text()
        assert "CREATE TABLE" in sql
        assert "users" in sql
        assert "patients" in sql
        assert "secrets" in sql


def test_init_sql_creates_referral_db(renderer, sqli_spec):
    """Template creates referral_db with healthcare tables."""
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "out"
        renderer.render(sqli_spec, out)
        sql = (out / "init.sql").read_text()
        assert "referral_db" in sql
        assert "patient_referrals" in sql
        assert "billing" in sql


def test_init_sql_grants_runtime_db_user(renderer, db_flag_spec):
    """Template grants privileges to the runtime-selected DB account."""
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "out"
        renderer.render(db_flag_spec, out)
        sql = (out / "init.sql").read_text()
        assert "GRANT" in sql
        assert "TO '" in sql
        assert "app_user" not in sql


def test_init_sql_no_file_flag(renderer, sqli_spec):
    """Flag with a file path should not be inserted into SQL."""
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "out"
        renderer.render(sqli_spec, out)
        sql = (out / "init.sql").read_text()
        # The flag value should NOT be in SQL (it's a file-based flag)
        assert "FLAG{sql1_t3st_f1ag}" not in sql


# ---------------------------------------------------------------------------
# iptables.rules content checks
# ---------------------------------------------------------------------------


def test_iptables_has_rules(renderer, sqli_spec):
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "out"
        renderer.render(sqli_spec, out)
        rules = (out / "iptables.rules").read_text()
        assert "*filter" in rules
        assert "COMMIT" in rules
        assert "FORWARD" in rules


def test_iptables_allow_rules(renderer, sqli_spec):
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "out"
        renderer.render(sqli_spec, out)
        rules = (out / "iptables.rules").read_text()
        assert "--dport 80" in rules
        assert "ACCEPT" in rules


def test_iptables_deny_rules(renderer, sqli_spec):
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "out"
        renderer.render(sqli_spec, out)
        rules = (out / "iptables.rules").read_text()
        assert "DROP" in rules


# ---------------------------------------------------------------------------
# Context builder unit tests
# ---------------------------------------------------------------------------


def test_build_context_has_expected_keys(sqli_spec):
    ctx = _build_context(sqli_spec)
    # These keys are always present
    expected_keys = [
        "snapshot_id", "networks", "hosts", "host_names",
        "db_host", "db_user", "db_pass", "mysql_root_password",
        "domain", "users", "flags", "server_name",
        "firewall_rules", "zone_cidrs", "app_files",
    ]
    for key in expected_keys:
        assert key in ctx, f"Missing context key: {key}"
    # search_endpoint/download_endpoint are conditionally present
    # (only when True, because templates use `is defined`)
    assert ctx.get("search_endpoint") is True  # sqli -> search enabled


def test_build_context_hosts_are_dicts(sqli_spec):
    ctx = _build_context(sqli_spec)
    for h in ctx["hosts"]:
        assert isinstance(h, dict)
        assert "name" in h
        assert "zone" in h
        assert "networks" in h


def test_build_context_networks_have_names(sqli_spec):
    ctx = _build_context(sqli_spec)
    net_names = [n["name"] for n in ctx["networks"]]
    assert "external" in net_names
    assert "dmz" in net_names


def test_build_context_search_enabled_for_sqli(sqli_spec):
    ctx = _build_context(sqli_spec)
    assert ctx.get("search_endpoint") is True


def test_build_context_download_disabled_for_sqli(sqli_spec):
    ctx = _build_context(sqli_spec)
    assert "download_endpoint" not in ctx  # omitted = undefined in template


def test_build_context_download_enabled_for_path_traversal(path_traversal_spec):
    ctx = _build_context(path_traversal_spec)
    assert ctx.get("download_endpoint") is True


# ---------------------------------------------------------------------------
# Minimal / empty spec
# ---------------------------------------------------------------------------


def test_render_minimal_spec(renderer):
    """Even a near-empty spec should render without errors."""
    spec = SnapshotSpec(
        topology={
            "hosts": ["web"],
            "zones": {"dmz": ["web"]},
            "users": [],
            "firewall_rules": [],
        },
    )
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "minimal"
        renderer.render(spec, out)
        assert (out / "docker-compose.yml").exists()
        assert (out / "init.sql").exists()


# ---------------------------------------------------------------------------
# Integration: TemplateOnlyBuilder -> Renderer
# ---------------------------------------------------------------------------


@pytest.mark.asyncio
async def test_builder_to_renderer_pipeline(tier1_manifest):
    """Full pipeline: TemplateOnlyBuilder generates spec, Renderer renders it."""
    from open_range.builder.builder import TemplateOnlyBuilder
    from open_range.protocols import BuildContext

    builder = TemplateOnlyBuilder()
    ctx = BuildContext(seed=42, tier=1)
    spec = await builder.build(tier1_manifest, ctx)

    renderer = SnapshotRenderer()
    with tempfile.TemporaryDirectory() as tmpdir:
        out = Path(tmpdir) / "pipeline_out"
        renderer.render(spec, out)

        # All 6 artifacts should exist
        for fname in [
            "docker-compose.yml", "Dockerfile.web", "Dockerfile.db",
            "nginx.conf", "init.sql", "iptables.rules",
        ]:
            assert (out / fname).exists(), f"Missing: {fname}"

        # docker-compose should reference the web service
        compose = (out / "docker-compose.yml").read_text()
        assert "web:" in compose

        # At least one flag should be in the rendered artifacts
        flag_value = spec.flags[0].value
        all_content = ""
        for fname in ["Dockerfile.web", "Dockerfile.db", "init.sql"]:
            all_content += (out / fname).read_text()
        assert flag_value in all_content, (
            f"Flag {flag_value} not found in any rendered artifact"
        )