""" Sovereign Hive HSAQ — Vault adapter The pure-logic modules (candidate_record.py, kv_profiler.py, bit_width assignment) deliberately don't import sqlite3, requests, os, pathlib, subprocess, or socket. They emit row-shaped dataclasses. This module is the I/O layer that takes those dataclasses and INSERTs them through the permission gate. Why this exists in one place: - "Vault is single source of truth" — every row-write goes through here so the audit chain (agent_id, agent_tier, written_at) is uniform. - "No raw sqlite3 elsewhere" — if you find an open() of a .db file in any other module, that's the bug. Move it here. - Migration application is centralised so bootstrap order is deterministic. PermissionGate routing: Every privileged operation calls `gate.check(capability, agent_id, agent_tier)` against the seven-tier model in permission_gate.py. Default is a strict tier-enforcing gate; pass `PermissionGate.permissive()` for unit tests that aren't testing authorization. Capability mapping (per the spec): - inserts on profile/record tables → VAULT_APPEND (T2+) - reads of profile/record tables → VAULT_READ (T1+, not gated here) - apply_migration → VAULT_SCHEMA (T6 only) Idempotency: All inserts are `INSERT OR IGNORE` on the migrations' primary keys. Re-running a profiler on the same (model_hash, calibration_hash, pipeline_version) is safe — duplicates are silently dropped, not errored. """ from __future__ import annotations import logging import sqlite3 from contextlib import contextmanager from dataclasses import dataclass from pathlib import Path from typing import Iterator, Optional, Sequence from permission_gate import Capability, PermissionDenied, PermissionGate logger = logging.getLogger("HSAQ.Vault") # --------------------------------------------------------------------------- # Adapter # --------------------------------------------------------------------------- @dataclass class InsertResult: """What an insert call returns. Distinguishes attempted vs actually-written.""" requested: int # rows in the input inserted: int # rows that hit storage (ignored duplicates excluded) table: str class VaultAdapter: """SQLite-backed Vault for HSAQ row persistence. Single open connection per adapter instance. Caller is responsible for closing (use `with` for automatic cleanup). """ def __init__( self, db_path: str | Path, permission_gate: Optional[PermissionGate] = None, *, timeout: float = 30.0, ) -> None: self.db_path = Path(db_path) self.conn = sqlite3.connect(str(self.db_path), timeout=timeout) # Standard hygiene self.conn.execute("PRAGMA foreign_keys = ON") self.conn.execute("PRAGMA journal_mode = WAL") self.conn.row_factory = sqlite3.Row self.gate = permission_gate or PermissionGate() self._ensure_schema_migrations_table() def _ensure_schema_migrations_table(self) -> None: """Bootstrap. The schema_migrations table normally comes from migration 001, but we create it idempotently here so the adapter works on a fresh DB before any migration has been applied.""" self.conn.execute(""" CREATE TABLE IF NOT EXISTS schema_migrations ( version TEXT PRIMARY KEY, applied_at TEXT NOT NULL, description TEXT ) """) self.conn.commit() def close(self) -> None: if self.conn: self.conn.close() self.conn = None # type: ignore def __enter__(self) -> "VaultAdapter": return self def __exit__(self, *_a) -> None: self.close() # ----- Migration management --------------------------------------------- def applied_migrations(self) -> set[str]: cur = self.conn.execute("SELECT version FROM schema_migrations") return {row[0] for row in cur.fetchall()} def apply_migration( self, sql_path: str | Path, *, agent_id: str = "human-operator", agent_tier: int = 6, ) -> bool: """Apply a migration file. Requires VAULT_SCHEMA (T6 only). Idempotent — checks schema_migrations first and skips if already recorded. Returns True if applied, False if skipped. Per the spec, migrations are human-applied only; the agent_id / agent_tier args default to the human-operator T6 identity for the typical `sqlite3 vault.db < migration.sql`-shaped invocation, but a future maintenance agent could pass its own identity for the audit row.""" self.gate.check(Capability.VAULT_SCHEMA, agent_id, agent_tier) sql_path = Path(sql_path) sql = sql_path.read_text(encoding="utf-8") # The migration file itself inserts its version row, so we just # need to detect whether it has already run. Convention: filename # `vault_migration_NNN_*` carries the version. name = sql_path.name try: version = name.split("_")[2] # vault_migration_003_kv... -> "003" except IndexError as e: raise ValueError(f"can't derive version from {name}") from e if version in self.applied_migrations(): logger.info("migration %s already applied — skipping", version) return False # Some SQLite drivers can't handle multiple BEGIN/COMMIT in one # executescript() call. Stripping is the simplest workaround. cleaned = "\n".join( ln for ln in sql.splitlines() if ln.strip().upper() not in ("BEGIN;", "COMMIT;") ) self.conn.executescript(cleaned) self.conn.commit() logger.info("migration %s applied from %s", version, name) return True # ----- KV sensitivity profile ------------------------------------------- KV_INSERT_SQL = """ INSERT OR IGNORE INTO kv_sensitivity_profile ( model_hash, calibration_hash, pipeline_version, layer_idx, k_bits, v_bits, quantizer, drift_attn_output, drift_metric, bytes_per_kv_token, max_seq_len_observed, num_kv_heads, head_dim, profiled_at, profiled_by_agent_id, profiled_by_agent_tier ) VALUES ( :model_hash, :calibration_hash, :pipeline_version, :layer_idx, :k_bits, :v_bits, :quantizer, :drift_attn_output, :drift_metric, :bytes_per_kv_token, :max_seq_len_observed, :num_kv_heads, :head_dim, :profiled_at, :profiled_by_agent_id, :profiled_by_agent_tier ) """ def insert_kv_profile_rows( self, rows: Sequence[object], # ProfileRow from kv_profiler ) -> InsertResult: """Bulk-insert ProfileRows from kv_profiler.profile_kv_sensitivity(). Each row carries its own agent_id + agent_tier; we check the gate once per distinct (agent_id, agent_tier) pair seen in the batch. """ if not rows: return InsertResult(requested=0, inserted=0, table="kv_sensitivity_profile") # Gate check — one call per distinct agent seen_agents: set[tuple[str, int]] = set() for r in rows: seen_agents.add((r.profiled_by_agent_id, r.profiled_by_agent_tier)) for agent_id, agent_tier in seen_agents: self.gate.check(Capability.VAULT_APPEND, agent_id, agent_tier) # Convert dataclasses to dict payloads. ProfileRow.to_vault_payload() # is the contract — if it changes, this assert will fire loudly. payloads = [] for r in rows: if hasattr(r, "to_vault_payload"): payloads.append(r.to_vault_payload()) else: # Be tolerant of plain dicts too payloads.append(dict(r)) # Count before/after to surface duplicates before = self._count_kv_rows() self.conn.executemany(self.KV_INSERT_SQL, payloads) self.conn.commit() after = self._count_kv_rows() return InsertResult( requested=len(payloads), inserted=after - before, table="kv_sensitivity_profile", ) def _count_kv_rows(self) -> int: cur = self.conn.execute("SELECT COUNT(*) FROM kv_sensitivity_profile") return cur.fetchone()[0] def fetch_kv_profile_rows( self, model_hash: str, calibration_hash: str, pipeline_version: str, ) -> list[dict]: """Return all KV profile rows for an invalidation key, as dicts. Caller can either consume the dicts directly or rehydrate them into ProfileRow objects via the constructor. """ cur = self.conn.execute(""" SELECT * FROM kv_sensitivity_profile WHERE model_hash = ? AND calibration_hash = ? AND pipeline_version = ? ORDER BY layer_idx, k_bits, v_bits, quantizer """, (model_hash, calibration_hash, pipeline_version)) return [dict(row) for row in cur.fetchall()] def has_kv_profile( self, model_hash: str, calibration_hash: str, pipeline_version: str, ) -> bool: """Quick existence check — does the cache have this key?""" cur = self.conn.execute(""" SELECT 1 FROM kv_sensitivity_profile WHERE model_hash = ? AND calibration_hash = ? AND pipeline_version = ? LIMIT 1 """, (model_hash, calibration_hash, pipeline_version)) return cur.fetchone() is not None # ----- Candidate record (migration 002) --------------------------------- CANDIDATE_INSERT_SQL = """ INSERT OR IGNORE INTO candidate_record ( model_id, model_hash, source, discovered_at, arch_type, param_count, hidden_size, num_layers, num_attention_heads, num_kv_heads, head_dim, max_position_embeddings, license, license_commercial_ok, tokenizer_family, tokenizer_compat_score, has_published_sensitivity_profile, published_profile_source, kv_bytes_per_token_fp16, kv_bytes_per_token_int8, predicted_vram_weights_mixed_34, predicted_vram_kv_4k_int8, predicted_vram_total_4k, predicted_headroom_gb, pruning_eligible, pruning_eligible_reason, hsaq_eligibility, eligibility_reasons, discovered_by_agent_id, discovered_by_agent_tier ) VALUES ( :model_id, :model_hash, :source, :discovered_at, :arch_type, :param_count, :hidden_size, :num_layers, :num_attention_heads, :num_kv_heads, :head_dim, :max_position_embeddings, :license, :license_commercial_ok, :tokenizer_family, :tokenizer_compat_score, :has_published_sensitivity_profile, :published_profile_source, :kv_bytes_per_token_fp16, :kv_bytes_per_token_int8, :predicted_vram_weights_mixed_34, :predicted_vram_kv_4k_int8, :predicted_vram_total_4k, :predicted_headroom_gb, :pruning_eligible, :pruning_eligible_reason, :hsaq_eligibility, :eligibility_reasons, :discovered_by_agent_id, :discovered_by_agent_tier ) """ def insert_candidate_record(self, record: object) -> InsertResult: """Insert one CandidateRecord (from candidate_record.py). The record's to_vault_payload() shape is the contract.""" import json if not hasattr(record, "to_vault_payload"): raise TypeError( "CandidateRecord-like object required (has .to_vault_payload())" ) payload = record.to_vault_payload() # eligibility_reasons is a list in Python; JSON-encode for storage if isinstance(payload.get("eligibility_reasons"), list): payload["eligibility_reasons"] = json.dumps(payload["eligibility_reasons"]) # Coerce bools to 0/1 (sqlite stores them either way but the CHECK # constraints in 002 want integers) for k in ("license_commercial_ok", "has_published_sensitivity_profile", "pruning_eligible"): if k in payload and isinstance(payload[k], bool): payload[k] = 1 if payload[k] else 0 self.gate.check( Capability.VAULT_APPEND, payload["discovered_by_agent_id"], payload["discovered_by_agent_tier"], ) before = self.conn.execute( "SELECT COUNT(*) FROM candidate_record" ).fetchone()[0] self.conn.execute(self.CANDIDATE_INSERT_SQL, payload) self.conn.commit() after = self.conn.execute( "SELECT COUNT(*) FROM candidate_record" ).fetchone()[0] return InsertResult( requested=1, inserted=after - before, table="candidate_record", ) # ----- Convenience: transaction context manager ------------------------- @contextmanager def transaction(self) -> Iterator["VaultAdapter"]: """Manual transaction grouping. Commits on success, rolls back on exception. Use when you want multiple inserts atomic.""" try: yield self self.conn.commit() except Exception: self.conn.rollback() raise # --------------------------------------------------------------------------- # Convenience: open a Vault at a default path # --------------------------------------------------------------------------- def default_vault_path() -> Path: """Where the Vault DB lives by default. Caller can override. Convention: $SOVEREIGN_HIVE_VAULT or ~/.sovereign_hive/vault.db. """ import os env = os.environ.get("SOVEREIGN_HIVE_VAULT") if env: return Path(env) return Path.home() / ".sovereign_hive" / "vault.db" def open_vault( db_path: str | Path | None = None, permission_gate: Optional[PermissionGate] = None, ) -> VaultAdapter: """Open the Vault. If `db_path` is None, uses default_vault_path() and creates the parent directory if needed.""" path = Path(db_path) if db_path else default_vault_path() path.parent.mkdir(parents=True, exist_ok=True) return VaultAdapter(path, permission_gate=permission_gate)