hsaq-tools / vault_adapter.py
mxguru1's picture
Wire vault_adapter to capability-based PermissionGate: VAULT_APPEND for inserts, VAULT_SCHEMA for migrations (T6 only)
b413d99 verified
"""
Sovereign Hive HSAQ — Vault adapter
The pure-logic modules (candidate_record.py, kv_profiler.py, bit_width
assignment) deliberately don't import sqlite3, requests, os, pathlib,
subprocess, or socket. They emit row-shaped dataclasses. This module is
the I/O layer that takes those dataclasses and INSERTs them through the
permission gate.
Why this exists in one place:
- "Vault is single source of truth" — every row-write goes through here
so the audit chain (agent_id, agent_tier, written_at) is uniform.
- "No raw sqlite3 elsewhere" — if you find an open() of a .db file in
any other module, that's the bug. Move it here.
- Migration application is centralised so bootstrap order is
deterministic.
PermissionGate routing:
Every privileged operation calls `gate.check(capability, agent_id,
agent_tier)` against the seven-tier model in permission_gate.py.
Default is a strict tier-enforcing gate; pass `PermissionGate.permissive()`
for unit tests that aren't testing authorization.
Capability mapping (per the spec):
- inserts on profile/record tables → VAULT_APPEND (T2+)
- reads of profile/record tables → VAULT_READ (T1+, not gated here)
- apply_migration → VAULT_SCHEMA (T6 only)
Idempotency:
All inserts are `INSERT OR IGNORE` on the migrations' primary keys.
Re-running a profiler on the same (model_hash, calibration_hash,
pipeline_version) is safe — duplicates are silently dropped, not
errored.
"""
from __future__ import annotations
import logging
import sqlite3
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Iterator, Optional, Sequence
from permission_gate import Capability, PermissionDenied, PermissionGate
logger = logging.getLogger("HSAQ.Vault")
# ---------------------------------------------------------------------------
# Adapter
# ---------------------------------------------------------------------------
@dataclass
class InsertResult:
"""What an insert call returns. Distinguishes attempted vs actually-written."""
requested: int # rows in the input
inserted: int # rows that hit storage (ignored duplicates excluded)
table: str
class VaultAdapter:
"""SQLite-backed Vault for HSAQ row persistence.
Single open connection per adapter instance. Caller is responsible
for closing (use `with` for automatic cleanup).
"""
def __init__(
self,
db_path: str | Path,
permission_gate: Optional[PermissionGate] = None,
*,
timeout: float = 30.0,
) -> None:
self.db_path = Path(db_path)
self.conn = sqlite3.connect(str(self.db_path), timeout=timeout)
# Standard hygiene
self.conn.execute("PRAGMA foreign_keys = ON")
self.conn.execute("PRAGMA journal_mode = WAL")
self.conn.row_factory = sqlite3.Row
self.gate = permission_gate or PermissionGate()
self._ensure_schema_migrations_table()
def _ensure_schema_migrations_table(self) -> None:
"""Bootstrap. The schema_migrations table normally comes from
migration 001, but we create it idempotently here so the adapter
works on a fresh DB before any migration has been applied."""
self.conn.execute("""
CREATE TABLE IF NOT EXISTS schema_migrations (
version TEXT PRIMARY KEY,
applied_at TEXT NOT NULL,
description TEXT
)
""")
self.conn.commit()
def close(self) -> None:
if self.conn:
self.conn.close()
self.conn = None # type: ignore
def __enter__(self) -> "VaultAdapter":
return self
def __exit__(self, *_a) -> None:
self.close()
# ----- Migration management ---------------------------------------------
def applied_migrations(self) -> set[str]:
cur = self.conn.execute("SELECT version FROM schema_migrations")
return {row[0] for row in cur.fetchall()}
def apply_migration(
self,
sql_path: str | Path,
*,
agent_id: str = "human-operator",
agent_tier: int = 6,
) -> bool:
"""Apply a migration file. Requires VAULT_SCHEMA (T6 only).
Idempotent — checks schema_migrations first and skips if already
recorded. Returns True if applied, False if skipped.
Per the spec, migrations are human-applied only; the agent_id /
agent_tier args default to the human-operator T6 identity for
the typical `sqlite3 vault.db < migration.sql`-shaped invocation,
but a future maintenance agent could pass its own identity for
the audit row."""
self.gate.check(Capability.VAULT_SCHEMA, agent_id, agent_tier)
sql_path = Path(sql_path)
sql = sql_path.read_text(encoding="utf-8")
# The migration file itself inserts its version row, so we just
# need to detect whether it has already run. Convention: filename
# `vault_migration_NNN_*` carries the version.
name = sql_path.name
try:
version = name.split("_")[2] # vault_migration_003_kv... -> "003"
except IndexError as e:
raise ValueError(f"can't derive version from {name}") from e
if version in self.applied_migrations():
logger.info("migration %s already applied — skipping", version)
return False
# Some SQLite drivers can't handle multiple BEGIN/COMMIT in one
# executescript() call. Stripping is the simplest workaround.
cleaned = "\n".join(
ln for ln in sql.splitlines()
if ln.strip().upper() not in ("BEGIN;", "COMMIT;")
)
self.conn.executescript(cleaned)
self.conn.commit()
logger.info("migration %s applied from %s", version, name)
return True
# ----- KV sensitivity profile -------------------------------------------
KV_INSERT_SQL = """
INSERT OR IGNORE INTO kv_sensitivity_profile (
model_hash, calibration_hash, pipeline_version,
layer_idx, k_bits, v_bits, quantizer,
drift_attn_output, drift_metric,
bytes_per_kv_token, max_seq_len_observed,
num_kv_heads, head_dim,
profiled_at, profiled_by_agent_id, profiled_by_agent_tier
) VALUES (
:model_hash, :calibration_hash, :pipeline_version,
:layer_idx, :k_bits, :v_bits, :quantizer,
:drift_attn_output, :drift_metric,
:bytes_per_kv_token, :max_seq_len_observed,
:num_kv_heads, :head_dim,
:profiled_at, :profiled_by_agent_id, :profiled_by_agent_tier
)
"""
def insert_kv_profile_rows(
self,
rows: Sequence[object], # ProfileRow from kv_profiler
) -> InsertResult:
"""Bulk-insert ProfileRows from kv_profiler.profile_kv_sensitivity().
Each row carries its own agent_id + agent_tier; we check the gate
once per distinct (agent_id, agent_tier) pair seen in the batch.
"""
if not rows:
return InsertResult(requested=0, inserted=0, table="kv_sensitivity_profile")
# Gate check — one call per distinct agent
seen_agents: set[tuple[str, int]] = set()
for r in rows:
seen_agents.add((r.profiled_by_agent_id, r.profiled_by_agent_tier))
for agent_id, agent_tier in seen_agents:
self.gate.check(Capability.VAULT_APPEND, agent_id, agent_tier)
# Convert dataclasses to dict payloads. ProfileRow.to_vault_payload()
# is the contract — if it changes, this assert will fire loudly.
payloads = []
for r in rows:
if hasattr(r, "to_vault_payload"):
payloads.append(r.to_vault_payload())
else:
# Be tolerant of plain dicts too
payloads.append(dict(r))
# Count before/after to surface duplicates
before = self._count_kv_rows()
self.conn.executemany(self.KV_INSERT_SQL, payloads)
self.conn.commit()
after = self._count_kv_rows()
return InsertResult(
requested=len(payloads),
inserted=after - before,
table="kv_sensitivity_profile",
)
def _count_kv_rows(self) -> int:
cur = self.conn.execute("SELECT COUNT(*) FROM kv_sensitivity_profile")
return cur.fetchone()[0]
def fetch_kv_profile_rows(
self,
model_hash: str,
calibration_hash: str,
pipeline_version: str,
) -> list[dict]:
"""Return all KV profile rows for an invalidation key, as dicts.
Caller can either consume the dicts directly or rehydrate them
into ProfileRow objects via the constructor.
"""
cur = self.conn.execute("""
SELECT * FROM kv_sensitivity_profile
WHERE model_hash = ?
AND calibration_hash = ?
AND pipeline_version = ?
ORDER BY layer_idx, k_bits, v_bits, quantizer
""", (model_hash, calibration_hash, pipeline_version))
return [dict(row) for row in cur.fetchall()]
def has_kv_profile(
self,
model_hash: str,
calibration_hash: str,
pipeline_version: str,
) -> bool:
"""Quick existence check — does the cache have this key?"""
cur = self.conn.execute("""
SELECT 1 FROM kv_sensitivity_profile
WHERE model_hash = ?
AND calibration_hash = ?
AND pipeline_version = ?
LIMIT 1
""", (model_hash, calibration_hash, pipeline_version))
return cur.fetchone() is not None
# ----- Candidate record (migration 002) ---------------------------------
CANDIDATE_INSERT_SQL = """
INSERT OR IGNORE INTO candidate_record (
model_id, model_hash, source, discovered_at,
arch_type, param_count, hidden_size, num_layers,
num_attention_heads, num_kv_heads, head_dim, max_position_embeddings,
license, license_commercial_ok, tokenizer_family, tokenizer_compat_score,
has_published_sensitivity_profile, published_profile_source,
kv_bytes_per_token_fp16, kv_bytes_per_token_int8,
predicted_vram_weights_mixed_34, predicted_vram_kv_4k_int8,
predicted_vram_total_4k, predicted_headroom_gb,
pruning_eligible, pruning_eligible_reason,
hsaq_eligibility, eligibility_reasons,
discovered_by_agent_id, discovered_by_agent_tier
) VALUES (
:model_id, :model_hash, :source, :discovered_at,
:arch_type, :param_count, :hidden_size, :num_layers,
:num_attention_heads, :num_kv_heads, :head_dim, :max_position_embeddings,
:license, :license_commercial_ok, :tokenizer_family, :tokenizer_compat_score,
:has_published_sensitivity_profile, :published_profile_source,
:kv_bytes_per_token_fp16, :kv_bytes_per_token_int8,
:predicted_vram_weights_mixed_34, :predicted_vram_kv_4k_int8,
:predicted_vram_total_4k, :predicted_headroom_gb,
:pruning_eligible, :pruning_eligible_reason,
:hsaq_eligibility, :eligibility_reasons,
:discovered_by_agent_id, :discovered_by_agent_tier
)
"""
def insert_candidate_record(self, record: object) -> InsertResult:
"""Insert one CandidateRecord (from candidate_record.py). The
record's to_vault_payload() shape is the contract."""
import json
if not hasattr(record, "to_vault_payload"):
raise TypeError(
"CandidateRecord-like object required (has .to_vault_payload())"
)
payload = record.to_vault_payload()
# eligibility_reasons is a list in Python; JSON-encode for storage
if isinstance(payload.get("eligibility_reasons"), list):
payload["eligibility_reasons"] = json.dumps(payload["eligibility_reasons"])
# Coerce bools to 0/1 (sqlite stores them either way but the CHECK
# constraints in 002 want integers)
for k in ("license_commercial_ok", "has_published_sensitivity_profile",
"pruning_eligible"):
if k in payload and isinstance(payload[k], bool):
payload[k] = 1 if payload[k] else 0
self.gate.check(
Capability.VAULT_APPEND,
payload["discovered_by_agent_id"],
payload["discovered_by_agent_tier"],
)
before = self.conn.execute(
"SELECT COUNT(*) FROM candidate_record"
).fetchone()[0]
self.conn.execute(self.CANDIDATE_INSERT_SQL, payload)
self.conn.commit()
after = self.conn.execute(
"SELECT COUNT(*) FROM candidate_record"
).fetchone()[0]
return InsertResult(
requested=1,
inserted=after - before,
table="candidate_record",
)
# ----- Convenience: transaction context manager -------------------------
@contextmanager
def transaction(self) -> Iterator["VaultAdapter"]:
"""Manual transaction grouping. Commits on success, rolls back on
exception. Use when you want multiple inserts atomic."""
try:
yield self
self.conn.commit()
except Exception:
self.conn.rollback()
raise
# ---------------------------------------------------------------------------
# Convenience: open a Vault at a default path
# ---------------------------------------------------------------------------
def default_vault_path() -> Path:
"""Where the Vault DB lives by default. Caller can override.
Convention: $SOVEREIGN_HIVE_VAULT or ~/.sovereign_hive/vault.db.
"""
import os
env = os.environ.get("SOVEREIGN_HIVE_VAULT")
if env:
return Path(env)
return Path.home() / ".sovereign_hive" / "vault.db"
def open_vault(
db_path: str | Path | None = None,
permission_gate: Optional[PermissionGate] = None,
) -> VaultAdapter:
"""Open the Vault. If `db_path` is None, uses default_vault_path()
and creates the parent directory if needed."""
path = Path(db_path) if db_path else default_vault_path()
path.parent.mkdir(parents=True, exist_ok=True)
return VaultAdapter(path, permission_gate=permission_gate)