mxguru1 commited on
Commit
b413d99
·
verified ·
1 Parent(s): d5f7354

Wire vault_adapter to capability-based PermissionGate: VAULT_APPEND for inserts, VAULT_SCHEMA for migrations (T6 only)

Browse files
Files changed (1) hide show
  1. vault_adapter.py +33 -41
vault_adapter.py CHANGED
@@ -15,12 +15,16 @@ Why this exists in one place:
15
  - Migration application is centralised so bootstrap order is
16
  deterministic.
17
 
18
- PermissionGate interface:
19
- This module defines a minimal default (`PermissionGate`) that
20
- accepts everything but records the request. Replace with the real
21
- policy implementation when it exists. The adapter calls
22
- `gate.check_write(table, agent_id, agent_tier)` before every INSERT;
23
- the gate raises `PermissionDenied` to abort, or returns to allow.
 
 
 
 
24
 
25
  Idempotency:
26
  All inserts are `INSERT OR IGNORE` on the migrations' primary keys.
@@ -37,35 +41,9 @@ from dataclasses import dataclass
37
  from pathlib import Path
38
  from typing import Iterator, Optional, Sequence
39
 
40
- logger = logging.getLogger("HSAQ.Vault")
41
-
42
-
43
- # ---------------------------------------------------------------------------
44
- # Permission gate (default + interface)
45
- # ---------------------------------------------------------------------------
46
-
47
-
48
- class PermissionDenied(Exception):
49
- """Raised by a PermissionGate when a write is rejected."""
50
-
51
 
52
- class PermissionGate:
53
- """Default gate: permissive, but records every check.
54
-
55
- Override `check_write` for real policy:
56
- class RealGate(PermissionGate):
57
- def check_write(self, table, agent_id, agent_tier):
58
- if agent_tier < self.min_tier_for(table):
59
- raise PermissionDenied(...)
60
- super().check_write(table, agent_id, agent_tier)
61
- """
62
- def __init__(self) -> None:
63
- self.audit_log: list[tuple[str, str, int]] = []
64
-
65
- def check_write(self, table: str, agent_id: str, agent_tier: int) -> None:
66
- self.audit_log.append((table, agent_id, agent_tier))
67
- logger.debug("permission check: write %s by agent %s (tier %d)",
68
- table, agent_id, agent_tier)
69
 
70
 
71
  # ---------------------------------------------------------------------------
@@ -134,10 +112,24 @@ class VaultAdapter:
134
  cur = self.conn.execute("SELECT version FROM schema_migrations")
135
  return {row[0] for row in cur.fetchall()}
136
 
137
- def apply_migration(self, sql_path: str | Path) -> bool:
138
- """Apply a migration file. Idempotent — checks schema_migrations
139
- first and skips if already recorded. Returns True if applied,
140
- False if skipped."""
 
 
 
 
 
 
 
 
 
 
 
 
 
 
141
  sql_path = Path(sql_path)
142
  sql = sql_path.read_text(encoding="utf-8")
143
 
@@ -202,7 +194,7 @@ class VaultAdapter:
202
  for r in rows:
203
  seen_agents.add((r.profiled_by_agent_id, r.profiled_by_agent_tier))
204
  for agent_id, agent_tier in seen_agents:
205
- self.gate.check_write("kv_sensitivity_profile", agent_id, agent_tier)
206
 
207
  # Convert dataclasses to dict payloads. ProfileRow.to_vault_payload()
208
  # is the contract — if it changes, this assert will fire loudly.
@@ -319,8 +311,8 @@ class VaultAdapter:
319
  if k in payload and isinstance(payload[k], bool):
320
  payload[k] = 1 if payload[k] else 0
321
 
322
- self.gate.check_write(
323
- "candidate_record",
324
  payload["discovered_by_agent_id"],
325
  payload["discovered_by_agent_tier"],
326
  )
 
15
  - Migration application is centralised so bootstrap order is
16
  deterministic.
17
 
18
+ PermissionGate routing:
19
+ Every privileged operation calls `gate.check(capability, agent_id,
20
+ agent_tier)` against the seven-tier model in permission_gate.py.
21
+ Default is a strict tier-enforcing gate; pass `PermissionGate.permissive()`
22
+ for unit tests that aren't testing authorization.
23
+
24
+ Capability mapping (per the spec):
25
+ - inserts on profile/record tables → VAULT_APPEND (T2+)
26
+ - reads of profile/record tables → VAULT_READ (T1+, not gated here)
27
+ - apply_migration → VAULT_SCHEMA (T6 only)
28
 
29
  Idempotency:
30
  All inserts are `INSERT OR IGNORE` on the migrations' primary keys.
 
41
  from pathlib import Path
42
  from typing import Iterator, Optional, Sequence
43
 
44
+ from permission_gate import Capability, PermissionDenied, PermissionGate
 
 
 
 
 
 
 
 
 
 
45
 
46
+ logger = logging.getLogger("HSAQ.Vault")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
 
48
 
49
  # ---------------------------------------------------------------------------
 
112
  cur = self.conn.execute("SELECT version FROM schema_migrations")
113
  return {row[0] for row in cur.fetchall()}
114
 
115
+ def apply_migration(
116
+ self,
117
+ sql_path: str | Path,
118
+ *,
119
+ agent_id: str = "human-operator",
120
+ agent_tier: int = 6,
121
+ ) -> bool:
122
+ """Apply a migration file. Requires VAULT_SCHEMA (T6 only).
123
+ Idempotent — checks schema_migrations first and skips if already
124
+ recorded. Returns True if applied, False if skipped.
125
+
126
+ Per the spec, migrations are human-applied only; the agent_id /
127
+ agent_tier args default to the human-operator T6 identity for
128
+ the typical `sqlite3 vault.db < migration.sql`-shaped invocation,
129
+ but a future maintenance agent could pass its own identity for
130
+ the audit row."""
131
+ self.gate.check(Capability.VAULT_SCHEMA, agent_id, agent_tier)
132
+
133
  sql_path = Path(sql_path)
134
  sql = sql_path.read_text(encoding="utf-8")
135
 
 
194
  for r in rows:
195
  seen_agents.add((r.profiled_by_agent_id, r.profiled_by_agent_tier))
196
  for agent_id, agent_tier in seen_agents:
197
+ self.gate.check(Capability.VAULT_APPEND, agent_id, agent_tier)
198
 
199
  # Convert dataclasses to dict payloads. ProfileRow.to_vault_payload()
200
  # is the contract — if it changes, this assert will fire loudly.
 
311
  if k in payload and isinstance(payload[k], bool):
312
  payload[k] = 1 if payload[k] else 0
313
 
314
+ self.gate.check(
315
+ Capability.VAULT_APPEND,
316
  payload["discovered_by_agent_id"],
317
  payload["discovered_by_agent_tier"],
318
  )