File size: 6,084 Bytes
8c486a8
 
 
 
 
ca6fc4d
 
8c486a8
 
 
 
 
 
 
 
 
 
 
ca6fc4d
 
8c486a8
 
 
 
 
 
 
 
 
 
 
7fedc25
 
 
 
 
 
 
 
 
8c486a8
 
 
7fedc25
8c486a8
 
 
 
 
 
 
 
 
7fedc25
8c486a8
 
 
 
 
 
 
 
 
 
7fedc25
8c486a8
 
ca6fc4d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
8c486a8
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""Check 7: Task feasibility — golden path references real hosts, evidence targets exist."""

from __future__ import annotations

from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec
from open_range.validator.graphs import compile_snapshot_graphs
from open_range.validator.path_solvability import build_host_adjacency, has_host_path


class TaskFeasibilityCheck:
    """Verify:
    1. Every golden-path step references a host that exists in the topology.
    2. Every evidence_spec item references a container that exists.
    3. Red's exploit chain references vulns that exist in truth_graph.
    """

    async def check(self, snapshot: SnapshotSpec, containers: ContainerSet) -> CheckResult:
        issues: list[str] = []
        compiled = compile_snapshot_graphs(snapshot)
        adjacency = build_host_adjacency(snapshot, compiled)

        # Collect known host names from topology.
        topo_hosts: set[str] = set()
        raw_hosts = snapshot.topology.get("hosts", [])
        for h in raw_hosts:
            if isinstance(h, dict):
                topo_hosts.add(h.get("name", ""))
            else:
                topo_hosts.add(str(h))
        topo_hosts.discard("")

        # Fail early if topology has no hosts.
        if not topo_hosts:
            return CheckResult(
                name="task_feasibility",
                passed=False,
                details={"issues": ["Topology has no hosts defined"]},
                error="Topology has no hosts defined",
            )

        # 1. Golden-path hosts exist in topology.
        for step in snapshot.golden_path:
            host = getattr(step, "host", None) or "attacker"
            if host not in topo_hosts:
                issues.append(f"golden path step {step.step}: host '{host}' not in topology")

        # 2. Evidence targets reference existing containers.
        for item in snapshot.evidence_spec:
            loc = item.location
            if ":" in loc:
                host = loc.split(":")[0]
            else:
                host = "siem"
            if host not in topo_hosts:
                issues.append(f"evidence item '{item.type}' references unknown host '{host}'")

        # 3. Exploit chain vuln IDs exist in truth_graph.
        vuln_ids = {v.id for v in snapshot.truth_graph.vulns}
        for step in snapshot.truth_graph.exploit_chain:
            if step.vuln_id and step.vuln_id not in vuln_ids:
                issues.append(f"exploit chain references unknown vuln '{step.vuln_id}'")

        # 4. Flag hosts exist in topology.
        for flag in snapshot.flags:
            if flag.host not in topo_hosts:
                issues.append(f"flag '{flag.id}' references unknown host '{flag.host}'")

        vuln_by_id = {v.id: v for v in snapshot.truth_graph.vulns}
        flag_by_value = {flag.value: flag for flag in snapshot.flags}
        submit_steps = {
            step.command.removeprefix("submit_flag ").strip(): step
            for step in snapshot.golden_path
            if step.command.strip().startswith("submit_flag ")
        }
        exploit_steps = {step.vuln_id: step for step in snapshot.truth_graph.exploit_chain}

        plan = snapshot.mutation_plan
        if plan is not None:
            for op in plan.ops:
                if op.op_type != "seed_vuln":
                    continue

                vuln_id = str(op.params.get("instantiated_vuln_id", "")).strip()
                flag_value = str(op.params.get("instantiated_flag_value", "")).strip()
                exploit_command = str(op.params.get("instantiated_exploit_command", "")).strip()
                flag_host = str(op.params.get("instantiated_flag_host", "")).strip()

                vuln = vuln_by_id.get(vuln_id)
                if vuln is None:
                    issues.append(
                        f"seed_vuln mutation '{op.mutation_id}' did not materialize a vuln id"
                    )
                    continue

                flag = flag_by_value.get(flag_value)
                if flag is None:
                    issues.append(
                        f"seed_vuln mutation '{op.mutation_id}' did not materialize a flag"
                    )
                    continue

                if flag_host and flag.host != flag_host:
                    issues.append(
                        f"seed_vuln mutation '{op.mutation_id}' flag host mismatch "
                        f"('{flag.host}' != '{flag_host}')"
                    )

                if flag_value not in submit_steps:
                    issues.append(
                        f"seed_vuln mutation '{op.mutation_id}' missing submit_flag step for "
                        f"'{flag_value}'"
                    )

                exploit = exploit_steps.get(vuln_id)
                if exploit is None:
                    issues.append(
                        f"seed_vuln mutation '{op.mutation_id}' missing exploit_chain linkage"
                    )
                elif exploit_command and exploit.command != exploit_command:
                    issues.append(
                        f"seed_vuln mutation '{op.mutation_id}' exploit command mismatch"
                    )

                if exploit_command and not any(
                    exploit_command in step.command for step in snapshot.golden_path
                ):
                    issues.append(
                        f"seed_vuln mutation '{op.mutation_id}' missing golden-path exploit step"
                    )

                if flag.host != vuln.host and not has_host_path(vuln.host, flag.host, adjacency):
                    issues.append(
                        f"seed_vuln mutation '{op.mutation_id}' flag host '{flag.host}' "
                        f"is unreachable from vuln host '{vuln.host}'"
                    )

        passed = len(issues) == 0
        return CheckResult(
            name="task_feasibility",
            passed=passed,
            details={"issues": issues},
            error="" if passed else f"{len(issues)} feasibility issue(s)",
        )