| """ |
| Sovereign Hive HSAQ — PermissionGate |
| ===================================== |
| |
| Capability-tier gate for all privileged operations in the HSAQ stack. |
| Implements the seven-tier model documented in the Sovereign Hive |
| permission spec (T0 inert → T6 system operator). |
| |
| Design tenets: |
| - Capabilities are the unit of authorization, not table names or |
| function names. Resource types (Vault, filesystem, network, GPU, |
| agent lifecycle) are decomposed into specific capabilities so a |
| tier grant is auditable. |
| - Tiers are cumulative — each tier inherits everything below — but |
| grants are enumerated per tier explicitly so changes show up in |
| git history rather than being hidden in inheritance. |
| - The gate logs every check (granted or denied) to an in-memory |
| audit log, optionally also to an external sink. High-severity |
| capabilities (write code, schema, arbitrary egress, unsandboxed |
| subprocess) log at WARNING level. |
| - The gate never grants tier-6 capabilities by default. Test code |
| uses `PermissionGate.permissive()` to opt out of enforcement. |
| - The orchestration layer is responsible for spawn-tier bounds |
| (e.g., "T4 can only escalate to T5"). The gate just answers |
| "does tier N have capability X?" — it does not police lineage. |
| |
| What this module deliberately doesn't do: |
| - Doesn't enforce rate limits — that belongs at the inference queue |
| gateway, not the auth gate. |
| - Doesn't justify spawn requests — `agent.spawn.escalated` is a |
| capability that exists; the calling orchestrator records the |
| justification in its Vault row, not here. |
| - Doesn't suspend itself — there is no "disable gate" capability, |
| by design. If the gate's wrong, fix the policy in source. |
| """ |
| from __future__ import annotations |
|
|
| import logging |
| from dataclasses import dataclass |
| from datetime import datetime, timezone |
| from enum import Enum |
| from typing import Callable, Optional |
|
|
| logger = logging.getLogger("HSAQ.PermissionGate") |
|
|
|
|
| |
| |
| |
| |
| |
|
|
|
|
| class Capability(str, Enum): |
| |
| FS_READ_CACHE = "fs.read.cache" |
| FS_READ_CODE = "fs.read.code" |
| FS_READ_VAULT = "fs.read.vault" |
| FS_WRITE_SCRATCH = "fs.write.scratch" |
| FS_WRITE_OUTPUTS = "fs.write.outputs" |
| FS_WRITE_CONFIG = "fs.write.config" |
| FS_WRITE_CODE = "fs.write.code" |
|
|
| |
| VAULT_READ = "vault.read" |
| VAULT_APPEND = "vault.append" |
| VAULT_AMEND = "vault.amend" |
| VAULT_SCHEMA = "vault.schema" |
|
|
| |
| NET_EGRESS_ALLOWLISTED = "net.egress.allowlisted" |
| NET_EGRESS_ARBITRARY = "net.egress.arbitrary" |
|
|
| |
| COMPUTE_CPU = "compute.cpu" |
| COMPUTE_GPU_INFERENCE = "compute.gpu.inference" |
| COMPUTE_GPU_PROFILE = "compute.gpu.profile" |
| COMPUTE_GPU_TRAIN = "compute.gpu.train" |
| COMPUTE_SUBPROCESS_SANDBOXED = "compute.subprocess.sandboxed" |
| COMPUTE_SUBPROCESS_UNSANDBOXED = "compute.subprocess.unsandboxed" |
|
|
| |
| AGENT_SPAWN_LATERAL = "agent.spawn.lateral" |
| AGENT_SPAWN_ESCALATED = "agent.spawn.escalated" |
| AGENT_TERMINATE_CHILD = "agent.terminate.child" |
| AGENT_TERMINATE_PEER = "agent.terminate.peer" |
|
|
|
|
| |
| |
| |
| |
| |
| |
| |
|
|
| _TIER_GRANTS: dict[int, frozenset[Capability]] = { |
| |
| 0: frozenset({ |
| Capability.COMPUTE_CPU, |
| }), |
|
|
| |
| 1: frozenset({ |
| Capability.COMPUTE_CPU, |
| Capability.FS_READ_CACHE, |
| Capability.FS_READ_CODE, |
| Capability.VAULT_READ, |
| Capability.AGENT_SPAWN_LATERAL, |
| Capability.AGENT_TERMINATE_CHILD, |
| }), |
|
|
| |
| 2: frozenset({ |
| Capability.COMPUTE_CPU, |
| Capability.COMPUTE_GPU_INFERENCE, |
| Capability.FS_READ_CACHE, |
| Capability.FS_READ_CODE, |
| Capability.FS_READ_VAULT, |
| Capability.FS_WRITE_SCRATCH, |
| Capability.VAULT_READ, |
| Capability.VAULT_APPEND, |
| Capability.AGENT_SPAWN_LATERAL, |
| Capability.AGENT_TERMINATE_CHILD, |
| }), |
|
|
| |
| 3: frozenset({ |
| Capability.COMPUTE_CPU, |
| Capability.COMPUTE_GPU_INFERENCE, |
| Capability.COMPUTE_GPU_PROFILE, |
| Capability.FS_READ_CACHE, |
| Capability.FS_READ_CODE, |
| Capability.FS_READ_VAULT, |
| Capability.FS_WRITE_SCRATCH, |
| Capability.FS_WRITE_OUTPUTS, |
| Capability.VAULT_READ, |
| Capability.VAULT_APPEND, |
| Capability.NET_EGRESS_ALLOWLISTED, |
| Capability.AGENT_SPAWN_LATERAL, |
| Capability.AGENT_TERMINATE_CHILD, |
| }), |
|
|
| |
| 4: frozenset({ |
| Capability.COMPUTE_CPU, |
| Capability.COMPUTE_GPU_INFERENCE, |
| Capability.COMPUTE_GPU_PROFILE, |
| Capability.COMPUTE_GPU_TRAIN, |
| Capability.COMPUTE_SUBPROCESS_SANDBOXED, |
| Capability.FS_READ_CACHE, |
| Capability.FS_READ_CODE, |
| Capability.FS_READ_VAULT, |
| Capability.FS_WRITE_SCRATCH, |
| Capability.FS_WRITE_OUTPUTS, |
| Capability.VAULT_READ, |
| Capability.VAULT_APPEND, |
| Capability.NET_EGRESS_ALLOWLISTED, |
| Capability.AGENT_SPAWN_LATERAL, |
| Capability.AGENT_SPAWN_ESCALATED, |
| Capability.AGENT_TERMINATE_CHILD, |
| }), |
|
|
| |
| 5: frozenset({ |
| Capability.COMPUTE_CPU, |
| Capability.COMPUTE_GPU_INFERENCE, |
| Capability.COMPUTE_GPU_PROFILE, |
| Capability.COMPUTE_GPU_TRAIN, |
| Capability.COMPUTE_SUBPROCESS_SANDBOXED, |
| Capability.FS_READ_CACHE, |
| Capability.FS_READ_CODE, |
| Capability.FS_READ_VAULT, |
| Capability.FS_WRITE_SCRATCH, |
| Capability.FS_WRITE_OUTPUTS, |
| Capability.FS_WRITE_CONFIG, |
| Capability.VAULT_READ, |
| Capability.VAULT_APPEND, |
| Capability.VAULT_AMEND, |
| Capability.NET_EGRESS_ALLOWLISTED, |
| Capability.AGENT_SPAWN_LATERAL, |
| Capability.AGENT_SPAWN_ESCALATED, |
| Capability.AGENT_TERMINATE_CHILD, |
| Capability.AGENT_TERMINATE_PEER, |
| }), |
|
|
| |
| 6: frozenset({ |
| |
| Capability.COMPUTE_CPU, |
| Capability.COMPUTE_GPU_INFERENCE, |
| Capability.COMPUTE_GPU_PROFILE, |
| Capability.COMPUTE_GPU_TRAIN, |
| Capability.COMPUTE_SUBPROCESS_SANDBOXED, |
| Capability.FS_READ_CACHE, |
| Capability.FS_READ_CODE, |
| Capability.FS_READ_VAULT, |
| Capability.FS_WRITE_SCRATCH, |
| Capability.FS_WRITE_OUTPUTS, |
| Capability.FS_WRITE_CONFIG, |
| Capability.VAULT_READ, |
| Capability.VAULT_APPEND, |
| Capability.VAULT_AMEND, |
| Capability.NET_EGRESS_ALLOWLISTED, |
| Capability.AGENT_SPAWN_LATERAL, |
| Capability.AGENT_SPAWN_ESCALATED, |
| Capability.AGENT_TERMINATE_CHILD, |
| Capability.AGENT_TERMINATE_PEER, |
| |
| Capability.FS_WRITE_CODE, |
| Capability.VAULT_SCHEMA, |
| Capability.NET_EGRESS_ARBITRARY, |
| Capability.COMPUTE_SUBPROCESS_UNSANDBOXED, |
| }), |
| } |
|
|
|
|
| |
| |
| _HIGH_SEVERITY: frozenset[Capability] = frozenset({ |
| Capability.FS_WRITE_CODE, |
| Capability.VAULT_SCHEMA, |
| Capability.NET_EGRESS_ARBITRARY, |
| Capability.COMPUTE_SUBPROCESS_UNSANDBOXED, |
| }) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @dataclass(frozen=True) |
| class AuditRecord: |
| """One row in the gate's audit log. Same shape works for an external |
| sink (e.g. write to vault.audit_log table).""" |
| timestamp: str |
| capability: str |
| agent_id: str |
| agent_tier: int |
| granted: bool |
| severity: str |
| reason: Optional[str] = None |
|
|
|
|
| class PermissionDenied(Exception): |
| """Raised when a capability check fails.""" |
|
|
|
|
| |
| |
| |
|
|
|
|
| class PermissionGate: |
| """Capability gate enforcing the seven-tier model. |
| |
| The gate is a pure-logic check + audit log. It never opens |
| files, never touches the network, never reaches into a Vault. |
| The Vault adapter calls .check() before every privileged operation |
| and surfaces the denial to its caller. |
| """ |
|
|
| def __init__( |
| self, |
| audit_sink: Optional[Callable[[AuditRecord], None]] = None, |
| ) -> None: |
| self.audit_log: list[AuditRecord] = [] |
| self.audit_sink = audit_sink |
| self._permissive = False |
|
|
| @classmethod |
| def permissive(cls) -> "PermissionGate": |
| """For test code only. Returns a gate that grants every capability. |
| Calls still log to audit_log so tests can assert audit behaviour.""" |
| g = cls() |
| g._permissive = True |
| return g |
|
|
| def check( |
| self, |
| capability: "Capability | str", |
| agent_id: str, |
| agent_tier: int, |
| ) -> None: |
| """Authorize one capability for one agent. Raises PermissionDenied |
| if the agent's tier doesn't grant it; always appends an AuditRecord |
| and (if configured) calls the audit_sink.""" |
| |
| if isinstance(capability, str): |
| try: |
| capability = Capability(capability) |
| except ValueError as e: |
| raise PermissionDenied( |
| f"unknown capability: {capability!r}" |
| ) from e |
|
|
| granted = self._permissive or self._has_capability(agent_tier, capability) |
| severity = "HIGH" if capability in _HIGH_SEVERITY else "NORMAL" |
| reason = None if granted else ( |
| f"tier {agent_tier} does not grant {capability.value}" |
| ) |
|
|
| record = AuditRecord( |
| timestamp=datetime.now(timezone.utc).isoformat(), |
| capability=capability.value, |
| agent_id=agent_id, |
| agent_tier=agent_tier, |
| granted=granted, |
| severity=severity, |
| reason=reason, |
| ) |
| self.audit_log.append(record) |
|
|
| if self.audit_sink is not None: |
| try: |
| self.audit_sink(record) |
| except Exception as e: |
| |
| logger.warning("audit sink raised: %s", e) |
|
|
| if severity == "HIGH": |
| logger.warning( |
| "[GATE] %s agent=%s tier=%d granted=%s", |
| capability.value, agent_id, agent_tier, granted, |
| ) |
| else: |
| logger.debug( |
| "[GATE] %s agent=%s tier=%d granted=%s", |
| capability.value, agent_id, agent_tier, granted, |
| ) |
|
|
| if not granted: |
| raise PermissionDenied(reason) |
|
|
| def has_capability( |
| self, |
| agent_tier: int, |
| capability: "Capability | str", |
| ) -> bool: |
| """Boolean check without raising or logging. Useful for dry-run |
| UI ("does this agent have permission to X before I show that |
| button?").""" |
| if isinstance(capability, str): |
| try: |
| capability = Capability(capability) |
| except ValueError: |
| return False |
| return self._permissive or self._has_capability(agent_tier, capability) |
|
|
| def tier_grants(self, agent_tier: int) -> frozenset[Capability]: |
| """Return the full capability set for a tier. Empty frozenset for |
| unknown tier numbers.""" |
| return _TIER_GRANTS.get(agent_tier, frozenset()) |
|
|
| def _has_capability(self, tier: int, capability: Capability) -> bool: |
| if tier not in _TIER_GRANTS: |
| return False |
| return capability in _TIER_GRANTS[tier] |
|
|