mxguru1 commited on
Commit
836357f
Β·
verified Β·
1 Parent(s): d156365

Add permission_gate smoke tests: per-tier capability sets + monotonicity + severity + sink + permissive + unknown-cap diagnostic

Browse files
Files changed (1) hide show
  1. smoke_test_permission_gate.py +222 -0
smoke_test_permission_gate.py ADDED
@@ -0,0 +1,222 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ Smoke test for permission_gate.
3
+
4
+ Coverage:
5
+ 1. T0 (inert) has only compute.cpu β€” denied everything privileged.
6
+ 2. T1 (read-only observer) has vault.read but NOT vault.append.
7
+ 3. T2 (default I/O worker) has vault.append but NOT vault.schema.
8
+ 4. T3 has net.egress.allowlisted but NOT net.egress.arbitrary.
9
+ 5. T4 has compute.gpu.train but NOT vault.schema.
10
+ 6. T5 has vault.amend but NOT vault.schema.
11
+ 7. T6 has every capability defined.
12
+ 8. Tier upgrade is monotonic (T_{n+1} grants βŠ‡ T_n grants).
13
+ 9. High-severity capabilities flagged correctly in audit records.
14
+ 10. Audit sink callable receives every check.
15
+ 11. permissive() grants everything regardless of tier.
16
+ 12. Unknown capability string raises PermissionDenied with diagnostic.
17
+ """
18
+
19
+ import sys
20
+ from pathlib import Path
21
+
22
+ sys.path.insert(0, str(Path(__file__).resolve().parent))
23
+
24
+ import permission_gate as pg
25
+ from permission_gate import Capability, PermissionGate, PermissionDenied
26
+
27
+
28
+ def hr(title: str) -> None:
29
+ print(f"\n{'=' * 6} {title} {'=' * 6}")
30
+
31
+
32
+ def expect_denied(gate, capability, tier, *, label):
33
+ try:
34
+ gate.check(capability, f"agent-tier{tier}", tier)
35
+ print(f" FAIL: T{tier} {capability.value} should have been denied β€” {label}")
36
+ sys.exit(1)
37
+ except PermissionDenied:
38
+ return # expected
39
+
40
+
41
+ def expect_granted(gate, capability, tier, *, label):
42
+ try:
43
+ gate.check(capability, f"agent-tier{tier}", tier)
44
+ except PermissionDenied as e:
45
+ print(f" FAIL: T{tier} {capability.value} should have been granted β€” {label}: {e}")
46
+ sys.exit(1)
47
+
48
+
49
+ # ===========================================================================
50
+ # 1. T0 β€” inert. Only compute.cpu.
51
+ # ===========================================================================
52
+ hr("1. T0 (inert)")
53
+ gate = PermissionGate()
54
+ expect_granted(gate, Capability.COMPUTE_CPU, 0, label="T0 compute.cpu")
55
+ expect_denied(gate, Capability.VAULT_READ, 0, label="T0 cannot vault.read")
56
+ expect_denied(gate, Capability.VAULT_APPEND, 0, label="T0 cannot vault.append")
57
+ expect_denied(gate, Capability.FS_READ_CACHE, 0, label="T0 cannot fs.read.cache")
58
+ print(" βœ“ T0 capability set behaves as inert")
59
+
60
+
61
+ # ===========================================================================
62
+ # 2. T1 β€” read-only observer
63
+ # ===========================================================================
64
+ hr("2. T1 (read-only observer)")
65
+ gate = PermissionGate()
66
+ expect_granted(gate, Capability.VAULT_READ, 1, label="T1 vault.read")
67
+ expect_granted(gate, Capability.FS_READ_CACHE, 1, label="T1 fs.read.cache")
68
+ expect_granted(gate, Capability.FS_READ_CODE, 1, label="T1 fs.read.code")
69
+ expect_denied(gate, Capability.VAULT_APPEND, 1, label="T1 cannot vault.append")
70
+ expect_denied(gate, Capability.FS_WRITE_SCRATCH, 1, label="T1 cannot fs.write.scratch")
71
+ expect_denied(gate, Capability.COMPUTE_GPU_INFERENCE, 1, label="T1 cannot compute.gpu.inference")
72
+ print(" βœ“ T1 read-only enforced")
73
+
74
+
75
+ # ===========================================================================
76
+ # 3. T2 β€” default I/O worker
77
+ # ===========================================================================
78
+ hr("3. T2 (default I/O worker)")
79
+ gate = PermissionGate()
80
+ expect_granted(gate, Capability.VAULT_APPEND, 2, label="T2 vault.append")
81
+ expect_granted(gate, Capability.COMPUTE_GPU_INFERENCE, 2, label="T2 compute.gpu.inference")
82
+ expect_granted(gate, Capability.FS_WRITE_SCRATCH, 2, label="T2 fs.write.scratch")
83
+ expect_denied(gate, Capability.COMPUTE_GPU_PROFILE, 2, label="T2 cannot compute.gpu.profile")
84
+ expect_denied(gate, Capability.FS_WRITE_OUTPUTS, 2, label="T2 cannot fs.write.outputs")
85
+ expect_denied(gate, Capability.NET_EGRESS_ALLOWLISTED, 2, label="T2 cannot net.egress")
86
+ expect_denied(gate, Capability.VAULT_SCHEMA, 2, label="T2 cannot vault.schema")
87
+ print(" βœ“ T2 default-tier behavior correct")
88
+
89
+
90
+ # ===========================================================================
91
+ # 4. T3 β€” net-fetcher / profiler
92
+ # ===========================================================================
93
+ hr("4. T3 (network fetcher / profiler)")
94
+ gate = PermissionGate()
95
+ expect_granted(gate, Capability.NET_EGRESS_ALLOWLISTED, 3, label="T3 net.egress.allowlisted")
96
+ expect_granted(gate, Capability.COMPUTE_GPU_PROFILE, 3, label="T3 compute.gpu.profile")
97
+ expect_granted(gate, Capability.FS_WRITE_OUTPUTS, 3, label="T3 fs.write.outputs")
98
+ expect_denied(gate, Capability.COMPUTE_GPU_TRAIN, 3, label="T3 cannot compute.gpu.train")
99
+ expect_denied(gate, Capability.NET_EGRESS_ARBITRARY, 3, label="T3 cannot net.egress.arbitrary")
100
+ expect_denied(gate, Capability.FS_WRITE_CONFIG, 3, label="T3 cannot fs.write.config")
101
+ print(" βœ“ T3 profiler tier correct")
102
+
103
+
104
+ # ===========================================================================
105
+ # 5. T4 β€” trainer / quantizer
106
+ # ===========================================================================
107
+ hr("5. T4 (trainer / quantizer)")
108
+ gate = PermissionGate()
109
+ expect_granted(gate, Capability.COMPUTE_GPU_TRAIN, 4, label="T4 compute.gpu.train")
110
+ expect_granted(gate, Capability.COMPUTE_SUBPROCESS_SANDBOXED, 4,
111
+ label="T4 compute.subprocess.sandboxed")
112
+ expect_granted(gate, Capability.AGENT_SPAWN_ESCALATED, 4, label="T4 agent.spawn.escalated")
113
+ expect_denied(gate, Capability.FS_WRITE_CONFIG, 4, label="T4 cannot fs.write.config")
114
+ expect_denied(gate, Capability.VAULT_AMEND, 4, label="T4 cannot vault.amend")
115
+ expect_denied(gate, Capability.COMPUTE_SUBPROCESS_UNSANDBOXED, 4,
116
+ label="T4 cannot compute.subprocess.unsandboxed")
117
+ print(" βœ“ T4 trainer tier correct")
118
+
119
+
120
+ # ===========================================================================
121
+ # 6. T5 β€” orchestrator
122
+ # ===========================================================================
123
+ hr("6. T5 (orchestrator)")
124
+ gate = PermissionGate()
125
+ expect_granted(gate, Capability.FS_WRITE_CONFIG, 5, label="T5 fs.write.config")
126
+ expect_granted(gate, Capability.VAULT_AMEND, 5, label="T5 vault.amend")
127
+ expect_granted(gate, Capability.AGENT_TERMINATE_PEER, 5, label="T5 agent.terminate.peer")
128
+ expect_denied(gate, Capability.VAULT_SCHEMA, 5, label="T5 cannot vault.schema")
129
+ expect_denied(gate, Capability.FS_WRITE_CODE, 5, label="T5 cannot fs.write.code")
130
+ expect_denied(gate, Capability.NET_EGRESS_ARBITRARY, 5, label="T5 cannot net.egress.arbitrary")
131
+ print(" βœ“ T5 orchestrator tier correct")
132
+
133
+
134
+ # ===========================================================================
135
+ # 7. T6 β€” system operator. Ceiling.
136
+ # ===========================================================================
137
+ hr("7. T6 (system operator)")
138
+ gate = PermissionGate()
139
+ for cap in Capability:
140
+ expect_granted(gate, cap, 6, label=f"T6 {cap.value}")
141
+ print(f" βœ“ T6 grants all {len(list(Capability))} capabilities")
142
+
143
+
144
+ # ===========================================================================
145
+ # 8. Monotonic tier grants
146
+ # ===========================================================================
147
+ hr("8. Tier upgrades are monotonic")
148
+ gate = PermissionGate()
149
+ for tier in range(6):
150
+ lower = gate.tier_grants(tier)
151
+ upper = gate.tier_grants(tier + 1)
152
+ missing = lower - upper
153
+ assert not missing, (
154
+ f"T{tier+1} missing capabilities T{tier} had: {sorted(c.value for c in missing)}"
155
+ )
156
+ print(f" T{tier} ({len(lower)} caps) βŠ† T{tier+1} ({len(upper)} caps) βœ“")
157
+
158
+
159
+ # ===========================================================================
160
+ # 9. High-severity flagging
161
+ # ===========================================================================
162
+ hr("9. High-severity capabilities marked in audit")
163
+ gate = PermissionGate()
164
+ gate.check(Capability.VAULT_APPEND, "agent-A", 2) # NORMAL
165
+ gate.check(Capability.VAULT_SCHEMA, "agent-A", 6) # HIGH
166
+ gate.check(Capability.FS_WRITE_CODE, "agent-A", 6) # HIGH
167
+ gate.check(Capability.NET_EGRESS_ARBITRARY, "agent-A", 6) # HIGH
168
+
169
+ by_severity = {}
170
+ for r in gate.audit_log:
171
+ by_severity.setdefault(r.severity, []).append(r.capability)
172
+ print(f" NORMAL entries: {by_severity.get('NORMAL', [])}")
173
+ print(f" HIGH entries: {by_severity.get('HIGH', [])}")
174
+ assert len(by_severity["NORMAL"]) == 1
175
+ assert len(by_severity["HIGH"]) == 3
176
+
177
+
178
+ # ===========================================================================
179
+ # 10. Audit sink callable
180
+ # ===========================================================================
181
+ hr("10. Audit sink receives every check")
182
+ sunk = []
183
+ gate = PermissionGate(audit_sink=lambda r: sunk.append(r))
184
+ gate.check(Capability.VAULT_READ, "agent-S", 1)
185
+ gate.check(Capability.VAULT_APPEND, "agent-S", 2)
186
+ try:
187
+ gate.check(Capability.VAULT_SCHEMA, "agent-S", 2)
188
+ except PermissionDenied:
189
+ pass
190
+ assert len(sunk) == 3, f"expected 3 sink calls, got {len(sunk)}"
191
+ assert sunk[2].granted is False
192
+ print(f" sink received {len(sunk)} records (incl. 1 denial) βœ“")
193
+
194
+
195
+ # ===========================================================================
196
+ # 11. permissive() grants everything
197
+ # ===========================================================================
198
+ hr("11. PermissionGate.permissive() bypasses tier checks")
199
+ gate = PermissionGate.permissive()
200
+ gate.check(Capability.FS_WRITE_CODE, "test-agent", 0)
201
+ gate.check(Capability.VAULT_SCHEMA, "test-agent", 0)
202
+ gate.check(Capability.NET_EGRESS_ARBITRARY, "test-agent", 0)
203
+ assert len(gate.audit_log) == 3
204
+ assert all(r.granted for r in gate.audit_log)
205
+ print(f" permissive granted {len(gate.audit_log)} caps to T0 (test-only)")
206
+
207
+
208
+ # ===========================================================================
209
+ # 12. Unknown capability raises with diagnostic
210
+ # ===========================================================================
211
+ hr("12. Unknown capability string fails fast")
212
+ gate = PermissionGate()
213
+ try:
214
+ gate.check("foo.bar.baz", "agent-X", 6)
215
+ print(f" FAIL: unknown capability should have raised")
216
+ sys.exit(1)
217
+ except PermissionDenied as e:
218
+ assert "unknown capability" in str(e).lower()
219
+ print(f" caught: {e} βœ“")
220
+
221
+
222
+ print("\nAll assertions passed.")