Spaces:
Runtime error
Runtime error
File size: 3,456 Bytes
8c486a8 9a22987 8c486a8 49d1c75 8c486a8 49d1c75 8c486a8 49d1c75 9a22987 49d1c75 9a22987 49d1c75 8c486a8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | """Check 6: Isolation + leakage — zones enforced, no flag values in briefings."""
from __future__ import annotations
import shlex
from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec
# Common service ports to probe for zone isolation violations.
_PROBE_PORTS = (22, 80, 443, 3306, 445, 389, 636, 8080)
class IsolationCheck:
"""Verify:
1. Attacker cannot directly reach internal/management zones on any common port.
2. Task briefings do not contain flag values or golden-path commands.
"""
async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult:
issues: list[str] = []
# --- Zone isolation ---------------------------------------------------
# Internal hosts that the attacker should NOT reach directly.
zones = snapshot.topology.get("zones", {})
attacker_host = "attacker"
blocked_zones = {"internal", "management", "mgmt"}
for zone_name, hosts in zones.items():
if zone_name.lower() not in blocked_zones:
continue
if not isinstance(hosts, list):
continue
for target in hosts:
target_name = target if isinstance(target, str) else target.get("name", "")
open_ports: list[int] = []
for port in _PROBE_PORTS:
try:
probe_cmd = (
"timeout 2 bash -lc 'echo > /dev/tcp/\"$1\"/\"$2\"' _ "
f"{shlex.quote(target_name)} {shlex.quote(str(port))} "
"2>/dev/null && echo OPEN || echo CLOSED"
)
output = await containers.exec(
attacker_host,
probe_cmd,
)
if "OPEN" in output:
open_ports.append(port)
except Exception: # noqa: BLE001
pass # exec failure means unreachable — that is fine
if open_ports:
ports_str = ", ".join(str(p) for p in open_ports)
issues.append(
f"attacker can reach {target_name} in {zone_name} "
f"on port(s) {ports_str}"
)
# --- Briefing leakage -------------------------------------------------
flag_values = [f.value for f in snapshot.flags]
golden_cmds = [s.command for s in snapshot.golden_path]
briefings = [
("red_briefing", snapshot.task.red_briefing),
("blue_briefing", snapshot.task.blue_briefing),
]
for label, text in briefings:
if not text:
continue
for fv in flag_values:
if fv in text:
issues.append(f"flag value leaked in {label}")
for cmd in golden_cmds:
# Only flag exact long commands (>20 chars) to avoid false hits on
# short strings like "nmap".
if len(cmd) > 20 and cmd in text:
issues.append(f"golden-path command leaked in {label}")
passed = len(issues) == 0
return CheckResult(
name="isolation",
passed=passed,
details={"issues": issues},
error="" if passed else "; ".join(issues),
)
|