Lars Talian commited on
Commit
5b3b677
·
1 Parent(s): 80ef9e0

Tighten graph checks and trust-principal lint

Browse files
src/open_range/lint.py CHANGED
@@ -12,6 +12,7 @@ Usage::
12
  from __future__ import annotations
13
 
14
  import argparse
 
15
  import sys
16
  from pathlib import Path
17
  from typing import Any
@@ -134,22 +135,28 @@ def _check_business_process_flows(manifest: Manifest) -> list[str]:
134
  return errors
135
 
136
 
 
 
 
137
  def _check_trust_relationships(manifest: Manifest) -> list[str]:
138
- """All trust relationships must reference valid usernames."""
139
- user_names = {u.username for u in manifest.users}
 
 
 
 
 
140
  errors: list[str] = []
141
  for rel in manifest.trust_relationships:
142
- if rel.source and rel.source not in user_names:
143
  errors.append(
144
- f"Trust relationship source '{rel.source}' "
145
- f"is not in the users list. "
146
- f"Valid usernames: {sorted(user_names)}"
147
  )
148
- if rel.target and rel.target not in user_names:
149
  errors.append(
150
- f"Trust relationship target '{rel.target}' "
151
- f"is not in the users list. "
152
- f"Valid usernames: {sorted(user_names)}"
153
  )
154
  return errors
155
 
@@ -165,7 +172,7 @@ ALL_CHECKS = [
165
  ("NPC persona usernames", _check_npc_usernames),
166
  ("data inventory hosts", _check_data_inventory_hosts),
167
  ("business process data flows", _check_business_process_flows),
168
- ("trust relationship usernames", _check_trust_relationships),
169
  ]
170
 
171
 
 
12
  from __future__ import annotations
13
 
14
  import argparse
15
+ import re
16
  import sys
17
  from pathlib import Path
18
  from typing import Any
 
135
  return errors
136
 
137
 
138
+ _PRINCIPAL_RE = re.compile(r"^[A-Za-z0-9._@-]+$")
139
+
140
+
141
  def _check_trust_relationships(manifest: Manifest) -> list[str]:
142
+ """Trust principals must be well-formed identifiers.
143
+
144
+ Trust edges may reference people who are not login accounts. Those are
145
+ normalized into the canonical principal catalog at build time, so lint
146
+ should validate identifier quality rather than requiring every principal to
147
+ appear in ``users``.
148
+ """
149
  errors: list[str] = []
150
  for rel in manifest.trust_relationships:
151
+ if rel.source and not _PRINCIPAL_RE.match(rel.source):
152
  errors.append(
153
+ f"Trust relationship source '{rel.source}' is not a valid "
154
+ "principal identifier"
 
155
  )
156
+ if rel.target and not _PRINCIPAL_RE.match(rel.target):
157
  errors.append(
158
+ f"Trust relationship target '{rel.target}' is not a valid "
159
+ "principal identifier"
 
160
  )
161
  return errors
162
 
 
172
  ("NPC persona usernames", _check_npc_usernames),
173
  ("data inventory hosts", _check_data_inventory_hosts),
174
  ("business process data flows", _check_business_process_flows),
175
+ ("trust relationship principals", _check_trust_relationships),
176
  ]
177
 
178
 
src/open_range/validator/graph_reward_grounding.py CHANGED
@@ -2,10 +2,9 @@
2
 
3
  from __future__ import annotations
4
 
5
- from collections import defaultdict, deque
6
-
7
  from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec
8
  from open_range.validator.graphs import compile_snapshot_graphs
 
9
 
10
 
11
  class GraphRewardGroundingCheck:
@@ -22,7 +21,7 @@ class GraphRewardGroundingCheck:
22
  error="snapshot has no flags to ground",
23
  )
24
 
25
- adjacency = _adjacency(compiled)
26
  vuln_hosts = {v.host for v in snapshot.truth_graph.vulns if v.host}
27
  for flag in snapshot.flags:
28
  if flag.host not in compiled.hosts:
@@ -32,7 +31,9 @@ class GraphRewardGroundingCheck:
32
  if flag.host in vuln_hosts:
33
  continue
34
 
35
- if vuln_hosts and not any(_has_path(source, flag.host, adjacency) for source in vuln_hosts):
 
 
36
  issues.append(
37
  f"flag '{flag.id}' on '{flag.host}' is not reachable from any vuln host"
38
  )
@@ -44,27 +45,3 @@ class GraphRewardGroundingCheck:
44
  details={"issues": issues},
45
  error="" if passed else "; ".join(issues),
46
  )
47
-
48
-
49
- def _adjacency(compiled) -> dict[str, set[str]]:
50
- adjacency: dict[str, set[str]] = defaultdict(set)
51
- for source, target in compiled.dependency_edges:
52
- adjacency[source].add(target)
53
- return adjacency
54
-
55
-
56
- def _has_path(start: str, target: str, adjacency: dict[str, set[str]]) -> bool:
57
- if start == target:
58
- return True
59
- queue: deque[str] = deque([start])
60
- seen = {start}
61
- while queue:
62
- current = queue.popleft()
63
- for neighbor in adjacency.get(current, set()):
64
- if neighbor == target:
65
- return True
66
- if neighbor in seen:
67
- continue
68
- seen.add(neighbor)
69
- queue.append(neighbor)
70
- return False
 
2
 
3
  from __future__ import annotations
4
 
 
 
5
  from open_range.protocols import CheckResult, ContainerSet, SnapshotSpec
6
  from open_range.validator.graphs import compile_snapshot_graphs
7
+ from open_range.validator.path_solvability import build_host_adjacency, has_host_path
8
 
9
 
10
  class GraphRewardGroundingCheck:
 
21
  error="snapshot has no flags to ground",
22
  )
23
 
24
+ adjacency = build_host_adjacency(snapshot, compiled)
25
  vuln_hosts = {v.host for v in snapshot.truth_graph.vulns if v.host}
26
  for flag in snapshot.flags:
27
  if flag.host not in compiled.hosts:
 
31
  if flag.host in vuln_hosts:
32
  continue
33
 
34
+ if vuln_hosts and not any(
35
+ has_host_path(source, flag.host, adjacency) for source in vuln_hosts
36
+ ):
37
  issues.append(
38
  f"flag '{flag.id}' on '{flag.host}' is not reachable from any vuln host"
39
  )
 
45
  details={"issues": issues},
46
  error="" if passed else "; ".join(issues),
47
  )
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
src/open_range/validator/path_solvability.py CHANGED
@@ -33,7 +33,7 @@ class PathSolvabilityCheck:
33
  error="snapshot has no vuln or flag hosts to solve toward",
34
  )
35
 
36
- adjacency = _adjacency(compiled)
37
  unreachable = [
38
  host
39
  for host in target_hosts
@@ -78,13 +78,64 @@ def _start_hosts(compiled: CompiledGraphs) -> set[str]:
78
  return set()
79
 
80
 
81
- def _adjacency(compiled: CompiledGraphs) -> dict[str, set[str]]:
 
 
 
82
  adjacency: dict[str, set[str]] = defaultdict(set)
83
  for source, target in compiled.dependency_edges:
84
  adjacency[source].add(target)
 
 
 
 
 
 
 
 
 
85
  return adjacency
86
 
87
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
88
  def _reachable_from_any(
89
  target: str,
90
  starts: set[str],
 
33
  error="snapshot has no vuln or flag hosts to solve toward",
34
  )
35
 
36
+ adjacency = build_host_adjacency(snapshot, compiled)
37
  unreachable = [
38
  host
39
  for host in target_hosts
 
78
  return set()
79
 
80
 
81
+ def build_host_adjacency(
82
+ snapshot: SnapshotSpec,
83
+ compiled: CompiledGraphs,
84
+ ) -> dict[str, set[str]]:
85
  adjacency: dict[str, set[str]] = defaultdict(set)
86
  for source, target in compiled.dependency_edges:
87
  adjacency[source].add(target)
88
+
89
+ principal_hosts = _principal_hosts(snapshot)
90
+ for source_principal, target_principal, _edge_type in compiled.trust_edges:
91
+ source_hosts = principal_hosts.get(source_principal, set())
92
+ target_hosts = principal_hosts.get(target_principal, set())
93
+ for source_host in source_hosts:
94
+ for target_host in target_hosts:
95
+ if source_host and target_host:
96
+ adjacency[source_host].add(target_host)
97
  return adjacency
98
 
99
 
100
+ def has_host_path(
101
+ start: str,
102
+ target: str,
103
+ adjacency: dict[str, set[str]],
104
+ ) -> bool:
105
+ return _has_path(start, target, adjacency)
106
+
107
+
108
+ def _principal_hosts(snapshot: SnapshotSpec) -> dict[str, set[str]]:
109
+ topology = snapshot.topology or {}
110
+ mapping: dict[str, set[str]] = defaultdict(set)
111
+
112
+ raw_users = topology.get("users", [])
113
+ if isinstance(raw_users, list):
114
+ for raw in raw_users:
115
+ if not isinstance(raw, dict):
116
+ continue
117
+ username = str(raw.get("username", "")).strip()
118
+ if not username:
119
+ continue
120
+ for raw_host in raw.get("hosts", []):
121
+ host = str(raw_host).strip()
122
+ if host:
123
+ mapping[username].add(host)
124
+
125
+ raw_catalog = topology.get("principal_catalog", {})
126
+ if isinstance(raw_catalog, dict):
127
+ for raw_name, raw_principal in raw_catalog.items():
128
+ name = str(raw_name).strip()
129
+ if not name or not isinstance(raw_principal, dict):
130
+ continue
131
+ for raw_host in raw_principal.get("hosts", []):
132
+ host = str(raw_host).strip()
133
+ if host:
134
+ mapping[name].add(host)
135
+
136
+ return mapping
137
+
138
+
139
  def _reachable_from_any(
140
  target: str,
141
  starts: set[str],
tests/test_lint.py CHANGED
@@ -106,9 +106,10 @@ class TestValidManifest:
106
  assert errors == [], f"Check '{check_name}' failed: {errors}"
107
 
108
  def test_tier1_manifest_loads(self):
109
- """Tier 1 manifest should at least load without schema error."""
110
  result = lint_file(ROOT / "manifests" / "tier1_basic.yaml")
111
  assert result["schema_error"] is None, result["schema_error"]
 
112
 
113
 
114
  # ---------------------------------------------------------------------------
@@ -177,35 +178,35 @@ class TestInvalidUserRefs:
177
  assert len(errors) == 1
178
  assert "ghost_user" in errors[0]
179
 
180
- def test_trust_relationship_invalid_source(self):
181
  data = _minimal_manifest()
182
  data["trust_relationships"] = [
183
  {
184
  "type": "delegates_access",
185
- "from": "nobody",
186
  "to": "admin",
187
  },
188
  ]
189
  manifest = Manifest(**data)
190
  results = lint_manifest(manifest)
191
- errors = results["trust relationship usernames"]
192
  assert len(errors) == 1
193
- assert "nobody" in errors[0]
194
 
195
- def test_trust_relationship_invalid_target(self):
196
  data = _minimal_manifest()
197
  data["trust_relationships"] = [
198
  {
199
  "type": "delegates_access",
200
  "from": "admin",
201
- "to": "phantom",
202
  },
203
  ]
204
  manifest = Manifest(**data)
205
  results = lint_manifest(manifest)
206
- errors = results["trust relationship usernames"]
207
  assert len(errors) == 1
208
- assert "phantom" in errors[0]
209
 
210
 
211
  # ---------------------------------------------------------------------------
 
106
  assert errors == [], f"Check '{check_name}' failed: {errors}"
107
 
108
  def test_tier1_manifest_loads(self):
109
+ """Tier 1 manifest should load and pass lint checks."""
110
  result = lint_file(ROOT / "manifests" / "tier1_basic.yaml")
111
  assert result["schema_error"] is None, result["schema_error"]
112
+ assert result["valid"] is True, result["checks"]
113
 
114
 
115
  # ---------------------------------------------------------------------------
 
178
  assert len(errors) == 1
179
  assert "ghost_user" in errors[0]
180
 
181
+ def test_trust_relationship_invalid_source_identifier(self):
182
  data = _minimal_manifest()
183
  data["trust_relationships"] = [
184
  {
185
  "type": "delegates_access",
186
+ "from": "bad actor!",
187
  "to": "admin",
188
  },
189
  ]
190
  manifest = Manifest(**data)
191
  results = lint_manifest(manifest)
192
+ errors = results["trust relationship principals"]
193
  assert len(errors) == 1
194
+ assert "bad actor!" in errors[0]
195
 
196
+ def test_trust_relationship_invalid_target_identifier(self):
197
  data = _minimal_manifest()
198
  data["trust_relationships"] = [
199
  {
200
  "type": "delegates_access",
201
  "from": "admin",
202
+ "to": "phantom user",
203
  },
204
  ]
205
  manifest = Manifest(**data)
206
  results = lint_manifest(manifest)
207
+ errors = results["trust relationship principals"]
208
  assert len(errors) == 1
209
+ assert "phantom user" in errors[0]
210
 
211
 
212
  # ---------------------------------------------------------------------------
tests/test_validator.py CHANGED
@@ -165,6 +165,42 @@ async def test_graph_reward_grounding_fails_when_flag_host_unreachable(mock_cont
165
  assert "not reachable from any vuln host" in result.error
166
 
167
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
168
  # ---------------------------------------------------------------------------
169
  # Check 1: BuildBoot
170
  # ---------------------------------------------------------------------------
 
165
  assert "not reachable from any vuln host" in result.error
166
 
167
 
168
+ @pytest.mark.asyncio
169
+ async def test_graph_checks_allow_trust_based_host_pivots(mock_containers):
170
+ from open_range.validator.graph_reward_grounding import GraphRewardGroundingCheck
171
+ from open_range.validator.path_solvability import PathSolvabilityCheck
172
+
173
+ spec = SnapshotSpec(
174
+ topology={
175
+ "hosts": ["attacker", "web", "db"],
176
+ "zones": {"external": ["attacker"], "dmz": ["web"], "internal": ["db"]},
177
+ "dependency_edges": [{"source": "attacker", "target": "web"}],
178
+ "trust_edges": [{"source": "websvc", "target": "dbsvc", "type": "credential_reuse"}],
179
+ "host_details": {
180
+ "attacker": {"services": ["nmap"]},
181
+ "web": {"services": ["nginx"]},
182
+ "db": {"services": ["mysql"]},
183
+ },
184
+ "principal_catalog": {
185
+ "websvc": {"username": "websvc", "hosts": ["web"], "is_login_account": False},
186
+ "dbsvc": {"username": "dbsvc", "hosts": ["db"], "is_login_account": False},
187
+ },
188
+ },
189
+ truth_graph=TruthGraph(
190
+ vulns=[Vulnerability(id="v1", type="credential_reuse", host="web", service="nginx")],
191
+ ),
192
+ flags=[FlagSpec(id="f1", value="FLAG{db}", path="/var/flags/flag1.txt", host="db")],
193
+ evidence_spec=[EvidenceItem(type="log_entry", location="db:/var/log/mysql.log")],
194
+ golden_path=[GoldenPathStep(step=1, command="scan", expect_in_stdout="ok")],
195
+ task=TaskSpec(red_briefing="go", blue_briefing="watch"),
196
+ )
197
+
198
+ path_result = await PathSolvabilityCheck().check(spec, mock_containers)
199
+ reward_result = await GraphRewardGroundingCheck().check(spec, mock_containers)
200
+ assert path_result.passed is True
201
+ assert reward_result.passed is True
202
+
203
+
204
  # ---------------------------------------------------------------------------
205
  # Check 1: BuildBoot
206
  # ---------------------------------------------------------------------------