Spaces:
Runtime error
Runtime error
| """Check 6: Isolation + leakage — zones enforced, no flag values in briefings.""" | |
| from __future__ import annotations | |
| import shlex | |
| from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec | |
| # Common service ports to probe for zone isolation violations. | |
| _PROBE_PORTS = (22, 80, 443, 3306, 445, 389, 636, 8080) | |
| class IsolationCheck: | |
| """Verify: | |
| 1. Attacker cannot directly reach internal/management zones on any common port. | |
| 2. Task briefings do not contain flag values or golden-path commands. | |
| """ | |
| async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult: | |
| issues: list[str] = [] | |
| # --- Zone isolation --------------------------------------------------- | |
| # Internal hosts that the attacker should NOT reach directly. | |
| zones = snapshot.topology.get("zones", {}) | |
| attacker_host = "attacker" | |
| blocked_zones = {"internal", "management", "mgmt"} | |
| for zone_name, hosts in zones.items(): | |
| if zone_name.lower() not in blocked_zones: | |
| continue | |
| if not isinstance(hosts, list): | |
| continue | |
| for target in hosts: | |
| target_name = target if isinstance(target, str) else target.get("name", "") | |
| open_ports: list[int] = [] | |
| for port in _PROBE_PORTS: | |
| try: | |
| probe_cmd = ( | |
| "timeout 2 bash -lc 'echo > /dev/tcp/\"$1\"/\"$2\"' _ " | |
| f"{shlex.quote(target_name)} {shlex.quote(str(port))} " | |
| "2>/dev/null && echo OPEN || echo CLOSED" | |
| ) | |
| output = await containers.exec( | |
| attacker_host, | |
| probe_cmd, | |
| ) | |
| if "OPEN" in output: | |
| open_ports.append(port) | |
| except Exception: # noqa: BLE001 | |
| pass # exec failure means unreachable — that is fine | |
| if open_ports: | |
| ports_str = ", ".join(str(p) for p in open_ports) | |
| issues.append( | |
| f"attacker can reach {target_name} in {zone_name} " | |
| f"on port(s) {ports_str}" | |
| ) | |
| # --- Briefing leakage ------------------------------------------------- | |
| flag_values = [f.value for f in snapshot.flags] | |
| golden_cmds = [s.command for s in snapshot.golden_path] | |
| briefings = [ | |
| ("red_briefing", snapshot.task.red_briefing), | |
| ("blue_briefing", snapshot.task.blue_briefing), | |
| ] | |
| for label, text in briefings: | |
| if not text: | |
| continue | |
| for fv in flag_values: | |
| if fv in text: | |
| issues.append(f"flag value leaked in {label}") | |
| for cmd in golden_cmds: | |
| # Only flag exact long commands (>20 chars) to avoid false hits on | |
| # short strings like "nmap". | |
| if len(cmd) > 20 and cmd in text: | |
| issues.append(f"golden-path command leaked in {label}") | |
| passed = len(issues) == 0 | |
| return CheckResult( | |
| name="isolation", | |
| passed=passed, | |
| details={"issues": issues}, | |
| error="" if passed else "; ".join(issues), | |
| ) | |