File size: 3,456 Bytes
8c486a8
 
 
 
9a22987
 
8c486a8
 
49d1c75
 
 
8c486a8
 
 
49d1c75
8c486a8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
49d1c75
 
 
9a22987
 
 
 
 
49d1c75
 
9a22987
49d1c75
 
 
 
 
 
 
 
 
 
8c486a8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
"""Check 6: Isolation + leakage — zones enforced, no flag values in briefings."""

from __future__ import annotations

import shlex

from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec

# Common service ports to probe for zone isolation violations.
_PROBE_PORTS = (22, 80, 443, 3306, 445, 389, 636, 8080)


class IsolationCheck:
    """Verify:
    1. Attacker cannot directly reach internal/management zones on any common port.
    2. Task briefings do not contain flag values or golden-path commands.
    """

    async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult:
        issues: list[str] = []

        # --- Zone isolation ---------------------------------------------------
        # Internal hosts that the attacker should NOT reach directly.
        zones = snapshot.topology.get("zones", {})
        attacker_host = "attacker"
        blocked_zones = {"internal", "management", "mgmt"}

        for zone_name, hosts in zones.items():
            if zone_name.lower() not in blocked_zones:
                continue
            if not isinstance(hosts, list):
                continue
            for target in hosts:
                target_name = target if isinstance(target, str) else target.get("name", "")
                open_ports: list[int] = []
                for port in _PROBE_PORTS:
                    try:
                        probe_cmd = (
                            "timeout 2 bash -lc 'echo > /dev/tcp/\"$1\"/\"$2\"' _ "
                            f"{shlex.quote(target_name)} {shlex.quote(str(port))} "
                            "2>/dev/null && echo OPEN || echo CLOSED"
                        )
                        output = await containers.exec(
                            attacker_host,
                            probe_cmd,
                        )
                        if "OPEN" in output:
                            open_ports.append(port)
                    except Exception:  # noqa: BLE001
                        pass  # exec failure means unreachable — that is fine
                if open_ports:
                    ports_str = ", ".join(str(p) for p in open_ports)
                    issues.append(
                        f"attacker can reach {target_name} in {zone_name} "
                        f"on port(s) {ports_str}"
                    )

        # --- Briefing leakage -------------------------------------------------
        flag_values = [f.value for f in snapshot.flags]
        golden_cmds = [s.command for s in snapshot.golden_path]

        briefings = [
            ("red_briefing", snapshot.task.red_briefing),
            ("blue_briefing", snapshot.task.blue_briefing),
        ]
        for label, text in briefings:
            if not text:
                continue
            for fv in flag_values:
                if fv in text:
                    issues.append(f"flag value leaked in {label}")
            for cmd in golden_cmds:
                # Only flag exact long commands (>20 chars) to avoid false hits on
                # short strings like "nmap".
                if len(cmd) > 20 and cmd in text:
                    issues.append(f"golden-path command leaked in {label}")

        passed = len(issues) == 0
        return CheckResult(
            name="isolation",
            passed=passed,
            details={"issues": issues},
            error="" if passed else "; ".join(issues),
        )