hsaq-tools / permission_gate.py
mxguru1's picture
Add PermissionGate — capability-tier model (T0-T6) per Sovereign Hive spec, audit log with severity flagging, permissive() for test code
d5f7354 verified
"""
Sovereign Hive HSAQ — PermissionGate
=====================================
Capability-tier gate for all privileged operations in the HSAQ stack.
Implements the seven-tier model documented in the Sovereign Hive
permission spec (T0 inert → T6 system operator).
Design tenets:
- Capabilities are the unit of authorization, not table names or
function names. Resource types (Vault, filesystem, network, GPU,
agent lifecycle) are decomposed into specific capabilities so a
tier grant is auditable.
- Tiers are cumulative — each tier inherits everything below — but
grants are enumerated per tier explicitly so changes show up in
git history rather than being hidden in inheritance.
- The gate logs every check (granted or denied) to an in-memory
audit log, optionally also to an external sink. High-severity
capabilities (write code, schema, arbitrary egress, unsandboxed
subprocess) log at WARNING level.
- The gate never grants tier-6 capabilities by default. Test code
uses `PermissionGate.permissive()` to opt out of enforcement.
- The orchestration layer is responsible for spawn-tier bounds
(e.g., "T4 can only escalate to T5"). The gate just answers
"does tier N have capability X?" — it does not police lineage.
What this module deliberately doesn't do:
- Doesn't enforce rate limits — that belongs at the inference queue
gateway, not the auth gate.
- Doesn't justify spawn requests — `agent.spawn.escalated` is a
capability that exists; the calling orchestrator records the
justification in its Vault row, not here.
- Doesn't suspend itself — there is no "disable gate" capability,
by design. If the gate's wrong, fix the policy in source.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass
from datetime import datetime, timezone
from enum import Enum
from typing import Callable, Optional
logger = logging.getLogger("HSAQ.PermissionGate")
# ---------------------------------------------------------------------------
# Capability primitives
# ---------------------------------------------------------------------------
# Naming convention matches the spec's `domain.action.qualifier` shape so
# string forms stay human-readable in logs and audit rows.
class Capability(str, Enum):
# Filesystem
FS_READ_CACHE = "fs.read.cache"
FS_READ_CODE = "fs.read.code"
FS_READ_VAULT = "fs.read.vault"
FS_WRITE_SCRATCH = "fs.write.scratch"
FS_WRITE_OUTPUTS = "fs.write.outputs"
FS_WRITE_CONFIG = "fs.write.config"
FS_WRITE_CODE = "fs.write.code"
# Vault
VAULT_READ = "vault.read"
VAULT_APPEND = "vault.append"
VAULT_AMEND = "vault.amend"
VAULT_SCHEMA = "vault.schema"
# Network
NET_EGRESS_ALLOWLISTED = "net.egress.allowlisted"
NET_EGRESS_ARBITRARY = "net.egress.arbitrary"
# Compute
COMPUTE_CPU = "compute.cpu"
COMPUTE_GPU_INFERENCE = "compute.gpu.inference"
COMPUTE_GPU_PROFILE = "compute.gpu.profile"
COMPUTE_GPU_TRAIN = "compute.gpu.train"
COMPUTE_SUBPROCESS_SANDBOXED = "compute.subprocess.sandboxed"
COMPUTE_SUBPROCESS_UNSANDBOXED = "compute.subprocess.unsandboxed"
# Agent lifecycle
AGENT_SPAWN_LATERAL = "agent.spawn.lateral"
AGENT_SPAWN_ESCALATED = "agent.spawn.escalated"
AGENT_TERMINATE_CHILD = "agent.terminate.child"
AGENT_TERMINATE_PEER = "agent.terminate.peer"
# ---------------------------------------------------------------------------
# Tier grants — explicit, per-tier, cumulative-but-enumerated
# ---------------------------------------------------------------------------
# Why enumerated and not union-derived from a lower tier: changes to a tier's
# capability set should show up as a deliberate edit to that tier's frozenset,
# not as a side-effect of editing a lower tier. The cost is repeating
# capabilities across tiers; the benefit is grants being auditable in diffs.
_TIER_GRANTS: dict[int, frozenset[Capability]] = {
# T0 — Inert. Pure compute on own process; nothing privileged.
0: frozenset({
Capability.COMPUTE_CPU,
}),
# T1 — Read-only observer.
1: frozenset({
Capability.COMPUTE_CPU,
Capability.FS_READ_CACHE,
Capability.FS_READ_CODE,
Capability.VAULT_READ,
Capability.AGENT_SPAWN_LATERAL,
Capability.AGENT_TERMINATE_CHILD,
}),
# T2 — Local I/O worker. The default tier for new agents.
2: frozenset({
Capability.COMPUTE_CPU,
Capability.COMPUTE_GPU_INFERENCE,
Capability.FS_READ_CACHE,
Capability.FS_READ_CODE,
Capability.FS_READ_VAULT,
Capability.FS_WRITE_SCRATCH,
Capability.VAULT_READ,
Capability.VAULT_APPEND,
Capability.AGENT_SPAWN_LATERAL,
Capability.AGENT_TERMINATE_CHILD,
}),
# T3 — Network fetcher and long-running profiler.
3: frozenset({
Capability.COMPUTE_CPU,
Capability.COMPUTE_GPU_INFERENCE,
Capability.COMPUTE_GPU_PROFILE,
Capability.FS_READ_CACHE,
Capability.FS_READ_CODE,
Capability.FS_READ_VAULT,
Capability.FS_WRITE_SCRATCH,
Capability.FS_WRITE_OUTPUTS,
Capability.VAULT_READ,
Capability.VAULT_APPEND,
Capability.NET_EGRESS_ALLOWLISTED,
Capability.AGENT_SPAWN_LATERAL,
Capability.AGENT_TERMINATE_CHILD,
}),
# T4 — Trainer & quantizer.
4: frozenset({
Capability.COMPUTE_CPU,
Capability.COMPUTE_GPU_INFERENCE,
Capability.COMPUTE_GPU_PROFILE,
Capability.COMPUTE_GPU_TRAIN,
Capability.COMPUTE_SUBPROCESS_SANDBOXED,
Capability.FS_READ_CACHE,
Capability.FS_READ_CODE,
Capability.FS_READ_VAULT,
Capability.FS_WRITE_SCRATCH,
Capability.FS_WRITE_OUTPUTS,
Capability.VAULT_READ,
Capability.VAULT_APPEND,
Capability.NET_EGRESS_ALLOWLISTED,
Capability.AGENT_SPAWN_LATERAL,
Capability.AGENT_SPAWN_ESCALATED,
Capability.AGENT_TERMINATE_CHILD,
}),
# T5 — Orchestrator. Coordinates; does not do its own real work.
5: frozenset({
Capability.COMPUTE_CPU,
Capability.COMPUTE_GPU_INFERENCE,
Capability.COMPUTE_GPU_PROFILE,
Capability.COMPUTE_GPU_TRAIN,
Capability.COMPUTE_SUBPROCESS_SANDBOXED,
Capability.FS_READ_CACHE,
Capability.FS_READ_CODE,
Capability.FS_READ_VAULT,
Capability.FS_WRITE_SCRATCH,
Capability.FS_WRITE_OUTPUTS,
Capability.FS_WRITE_CONFIG,
Capability.VAULT_READ,
Capability.VAULT_APPEND,
Capability.VAULT_AMEND,
Capability.NET_EGRESS_ALLOWLISTED,
Capability.AGENT_SPAWN_LATERAL,
Capability.AGENT_SPAWN_ESCALATED,
Capability.AGENT_TERMINATE_CHILD,
Capability.AGENT_TERMINATE_PEER,
}),
# T6 — System operator. Short-lived, single-purpose. The ceiling.
6: frozenset({
# All T5 capabilities…
Capability.COMPUTE_CPU,
Capability.COMPUTE_GPU_INFERENCE,
Capability.COMPUTE_GPU_PROFILE,
Capability.COMPUTE_GPU_TRAIN,
Capability.COMPUTE_SUBPROCESS_SANDBOXED,
Capability.FS_READ_CACHE,
Capability.FS_READ_CODE,
Capability.FS_READ_VAULT,
Capability.FS_WRITE_SCRATCH,
Capability.FS_WRITE_OUTPUTS,
Capability.FS_WRITE_CONFIG,
Capability.VAULT_READ,
Capability.VAULT_APPEND,
Capability.VAULT_AMEND,
Capability.NET_EGRESS_ALLOWLISTED,
Capability.AGENT_SPAWN_LATERAL,
Capability.AGENT_SPAWN_ESCALATED,
Capability.AGENT_TERMINATE_CHILD,
Capability.AGENT_TERMINATE_PEER,
# …plus T6 delta:
Capability.FS_WRITE_CODE,
Capability.VAULT_SCHEMA,
Capability.NET_EGRESS_ARBITRARY,
Capability.COMPUTE_SUBPROCESS_UNSANDBOXED,
}),
}
# Capabilities that log at severity HIGH whenever exercised or attempted.
# Matches the spec's "logged with severity HIGH" annotations.
_HIGH_SEVERITY: frozenset[Capability] = frozenset({
Capability.FS_WRITE_CODE,
Capability.VAULT_SCHEMA,
Capability.NET_EGRESS_ARBITRARY,
Capability.COMPUTE_SUBPROCESS_UNSANDBOXED,
})
# ---------------------------------------------------------------------------
# Audit record + exception
# ---------------------------------------------------------------------------
@dataclass(frozen=True)
class AuditRecord:
"""One row in the gate's audit log. Same shape works for an external
sink (e.g. write to vault.audit_log table)."""
timestamp: str # ISO 8601 UTC
capability: str # Capability value (string form)
agent_id: str
agent_tier: int
granted: bool
severity: str # 'NORMAL' or 'HIGH'
reason: Optional[str] = None # populated on denial
class PermissionDenied(Exception):
"""Raised when a capability check fails."""
# ---------------------------------------------------------------------------
# PermissionGate
# ---------------------------------------------------------------------------
class PermissionGate:
"""Capability gate enforcing the seven-tier model.
The gate is a pure-logic check + audit log. It never opens
files, never touches the network, never reaches into a Vault.
The Vault adapter calls .check() before every privileged operation
and surfaces the denial to its caller.
"""
def __init__(
self,
audit_sink: Optional[Callable[[AuditRecord], None]] = None,
) -> None:
self.audit_log: list[AuditRecord] = []
self.audit_sink = audit_sink
self._permissive = False
@classmethod
def permissive(cls) -> "PermissionGate":
"""For test code only. Returns a gate that grants every capability.
Calls still log to audit_log so tests can assert audit behaviour."""
g = cls()
g._permissive = True
return g
def check(
self,
capability: "Capability | str",
agent_id: str,
agent_tier: int,
) -> None:
"""Authorize one capability for one agent. Raises PermissionDenied
if the agent's tier doesn't grant it; always appends an AuditRecord
and (if configured) calls the audit_sink."""
# Accept either enum or string for ergonomics
if isinstance(capability, str):
try:
capability = Capability(capability)
except ValueError as e:
raise PermissionDenied(
f"unknown capability: {capability!r}"
) from e
granted = self._permissive or self._has_capability(agent_tier, capability)
severity = "HIGH" if capability in _HIGH_SEVERITY else "NORMAL"
reason = None if granted else (
f"tier {agent_tier} does not grant {capability.value}"
)
record = AuditRecord(
timestamp=datetime.now(timezone.utc).isoformat(),
capability=capability.value,
agent_id=agent_id,
agent_tier=agent_tier,
granted=granted,
severity=severity,
reason=reason,
)
self.audit_log.append(record)
if self.audit_sink is not None:
try:
self.audit_sink(record)
except Exception as e:
# Never let a sink failure crash the gate path
logger.warning("audit sink raised: %s", e)
if severity == "HIGH":
logger.warning(
"[GATE] %s agent=%s tier=%d granted=%s",
capability.value, agent_id, agent_tier, granted,
)
else:
logger.debug(
"[GATE] %s agent=%s tier=%d granted=%s",
capability.value, agent_id, agent_tier, granted,
)
if not granted:
raise PermissionDenied(reason)
def has_capability(
self,
agent_tier: int,
capability: "Capability | str",
) -> bool:
"""Boolean check without raising or logging. Useful for dry-run
UI ("does this agent have permission to X before I show that
button?")."""
if isinstance(capability, str):
try:
capability = Capability(capability)
except ValueError:
return False
return self._permissive or self._has_capability(agent_tier, capability)
def tier_grants(self, agent_tier: int) -> frozenset[Capability]:
"""Return the full capability set for a tier. Empty frozenset for
unknown tier numbers."""
return _TIER_GRANTS.get(agent_tier, frozenset())
def _has_capability(self, tier: int, capability: Capability) -> bool:
if tier not in _TIER_GRANTS:
return False
return capability in _TIER_GRANTS[tier]