| """ |
| Sovereign Hive HSAQ — Vault adapter |
| |
| The pure-logic modules (candidate_record.py, kv_profiler.py, bit_width |
| assignment) deliberately don't import sqlite3, requests, os, pathlib, |
| subprocess, or socket. They emit row-shaped dataclasses. This module is |
| the I/O layer that takes those dataclasses and INSERTs them through the |
| permission gate. |
| |
| Why this exists in one place: |
| - "Vault is single source of truth" — every row-write goes through here |
| so the audit chain (agent_id, agent_tier, written_at) is uniform. |
| - "No raw sqlite3 elsewhere" — if you find an open() of a .db file in |
| any other module, that's the bug. Move it here. |
| - Migration application is centralised so bootstrap order is |
| deterministic. |
| |
| PermissionGate routing: |
| Every privileged operation calls `gate.check(capability, agent_id, |
| agent_tier)` against the seven-tier model in permission_gate.py. |
| Default is a strict tier-enforcing gate; pass `PermissionGate.permissive()` |
| for unit tests that aren't testing authorization. |
| |
| Capability mapping (per the spec): |
| - inserts on profile/record tables → VAULT_APPEND (T2+) |
| - reads of profile/record tables → VAULT_READ (T1+, not gated here) |
| - apply_migration → VAULT_SCHEMA (T6 only) |
| |
| Idempotency: |
| All inserts are `INSERT OR IGNORE` on the migrations' primary keys. |
| Re-running a profiler on the same (model_hash, calibration_hash, |
| pipeline_version) is safe — duplicates are silently dropped, not |
| errored. |
| """ |
| from __future__ import annotations |
|
|
| import logging |
| import sqlite3 |
| from contextlib import contextmanager |
| from dataclasses import dataclass |
| from pathlib import Path |
| from typing import Iterator, Optional, Sequence |
|
|
| from permission_gate import Capability, PermissionDenied, PermissionGate |
|
|
| logger = logging.getLogger("HSAQ.Vault") |
|
|
|
|
| |
| |
| |
|
|
|
|
| @dataclass |
| class InsertResult: |
| """What an insert call returns. Distinguishes attempted vs actually-written.""" |
| requested: int |
| inserted: int |
| table: str |
|
|
|
|
| class VaultAdapter: |
| """SQLite-backed Vault for HSAQ row persistence. |
| |
| Single open connection per adapter instance. Caller is responsible |
| for closing (use `with` for automatic cleanup). |
| """ |
|
|
| def __init__( |
| self, |
| db_path: str | Path, |
| permission_gate: Optional[PermissionGate] = None, |
| *, |
| timeout: float = 30.0, |
| ) -> None: |
| self.db_path = Path(db_path) |
| self.conn = sqlite3.connect(str(self.db_path), timeout=timeout) |
| |
| self.conn.execute("PRAGMA foreign_keys = ON") |
| self.conn.execute("PRAGMA journal_mode = WAL") |
| self.conn.row_factory = sqlite3.Row |
| self.gate = permission_gate or PermissionGate() |
| self._ensure_schema_migrations_table() |
|
|
| def _ensure_schema_migrations_table(self) -> None: |
| """Bootstrap. The schema_migrations table normally comes from |
| migration 001, but we create it idempotently here so the adapter |
| works on a fresh DB before any migration has been applied.""" |
| self.conn.execute(""" |
| CREATE TABLE IF NOT EXISTS schema_migrations ( |
| version TEXT PRIMARY KEY, |
| applied_at TEXT NOT NULL, |
| description TEXT |
| ) |
| """) |
| self.conn.commit() |
|
|
| def close(self) -> None: |
| if self.conn: |
| self.conn.close() |
| self.conn = None |
|
|
| def __enter__(self) -> "VaultAdapter": |
| return self |
|
|
| def __exit__(self, *_a) -> None: |
| self.close() |
|
|
| |
|
|
| def applied_migrations(self) -> set[str]: |
| cur = self.conn.execute("SELECT version FROM schema_migrations") |
| return {row[0] for row in cur.fetchall()} |
|
|
| def apply_migration( |
| self, |
| sql_path: str | Path, |
| *, |
| agent_id: str = "human-operator", |
| agent_tier: int = 6, |
| ) -> bool: |
| """Apply a migration file. Requires VAULT_SCHEMA (T6 only). |
| Idempotent — checks schema_migrations first and skips if already |
| recorded. Returns True if applied, False if skipped. |
| |
| Per the spec, migrations are human-applied only; the agent_id / |
| agent_tier args default to the human-operator T6 identity for |
| the typical `sqlite3 vault.db < migration.sql`-shaped invocation, |
| but a future maintenance agent could pass its own identity for |
| the audit row.""" |
| self.gate.check(Capability.VAULT_SCHEMA, agent_id, agent_tier) |
|
|
| sql_path = Path(sql_path) |
| sql = sql_path.read_text(encoding="utf-8") |
|
|
| |
| |
| |
| name = sql_path.name |
| try: |
| version = name.split("_")[2] |
| except IndexError as e: |
| raise ValueError(f"can't derive version from {name}") from e |
|
|
| if version in self.applied_migrations(): |
| logger.info("migration %s already applied — skipping", version) |
| return False |
|
|
| |
| |
| cleaned = "\n".join( |
| ln for ln in sql.splitlines() |
| if ln.strip().upper() not in ("BEGIN;", "COMMIT;") |
| ) |
| self.conn.executescript(cleaned) |
| self.conn.commit() |
| logger.info("migration %s applied from %s", version, name) |
| return True |
|
|
| |
|
|
| KV_INSERT_SQL = """ |
| INSERT OR IGNORE INTO kv_sensitivity_profile ( |
| model_hash, calibration_hash, pipeline_version, |
| layer_idx, k_bits, v_bits, quantizer, |
| drift_attn_output, drift_metric, |
| bytes_per_kv_token, max_seq_len_observed, |
| num_kv_heads, head_dim, |
| profiled_at, profiled_by_agent_id, profiled_by_agent_tier |
| ) VALUES ( |
| :model_hash, :calibration_hash, :pipeline_version, |
| :layer_idx, :k_bits, :v_bits, :quantizer, |
| :drift_attn_output, :drift_metric, |
| :bytes_per_kv_token, :max_seq_len_observed, |
| :num_kv_heads, :head_dim, |
| :profiled_at, :profiled_by_agent_id, :profiled_by_agent_tier |
| ) |
| """ |
|
|
| def insert_kv_profile_rows( |
| self, |
| rows: Sequence[object], |
| ) -> InsertResult: |
| """Bulk-insert ProfileRows from kv_profiler.profile_kv_sensitivity(). |
| |
| Each row carries its own agent_id + agent_tier; we check the gate |
| once per distinct (agent_id, agent_tier) pair seen in the batch. |
| """ |
| if not rows: |
| return InsertResult(requested=0, inserted=0, table="kv_sensitivity_profile") |
|
|
| |
| seen_agents: set[tuple[str, int]] = set() |
| for r in rows: |
| seen_agents.add((r.profiled_by_agent_id, r.profiled_by_agent_tier)) |
| for agent_id, agent_tier in seen_agents: |
| self.gate.check(Capability.VAULT_APPEND, agent_id, agent_tier) |
|
|
| |
| |
| payloads = [] |
| for r in rows: |
| if hasattr(r, "to_vault_payload"): |
| payloads.append(r.to_vault_payload()) |
| else: |
| |
| payloads.append(dict(r)) |
|
|
| |
| before = self._count_kv_rows() |
| self.conn.executemany(self.KV_INSERT_SQL, payloads) |
| self.conn.commit() |
| after = self._count_kv_rows() |
|
|
| return InsertResult( |
| requested=len(payloads), |
| inserted=after - before, |
| table="kv_sensitivity_profile", |
| ) |
|
|
| def _count_kv_rows(self) -> int: |
| cur = self.conn.execute("SELECT COUNT(*) FROM kv_sensitivity_profile") |
| return cur.fetchone()[0] |
|
|
| def fetch_kv_profile_rows( |
| self, |
| model_hash: str, |
| calibration_hash: str, |
| pipeline_version: str, |
| ) -> list[dict]: |
| """Return all KV profile rows for an invalidation key, as dicts. |
| |
| Caller can either consume the dicts directly or rehydrate them |
| into ProfileRow objects via the constructor. |
| """ |
| cur = self.conn.execute(""" |
| SELECT * FROM kv_sensitivity_profile |
| WHERE model_hash = ? |
| AND calibration_hash = ? |
| AND pipeline_version = ? |
| ORDER BY layer_idx, k_bits, v_bits, quantizer |
| """, (model_hash, calibration_hash, pipeline_version)) |
| return [dict(row) for row in cur.fetchall()] |
|
|
| def has_kv_profile( |
| self, |
| model_hash: str, |
| calibration_hash: str, |
| pipeline_version: str, |
| ) -> bool: |
| """Quick existence check — does the cache have this key?""" |
| cur = self.conn.execute(""" |
| SELECT 1 FROM kv_sensitivity_profile |
| WHERE model_hash = ? |
| AND calibration_hash = ? |
| AND pipeline_version = ? |
| LIMIT 1 |
| """, (model_hash, calibration_hash, pipeline_version)) |
| return cur.fetchone() is not None |
|
|
| |
|
|
| CANDIDATE_INSERT_SQL = """ |
| INSERT OR IGNORE INTO candidate_record ( |
| model_id, model_hash, source, discovered_at, |
| arch_type, param_count, hidden_size, num_layers, |
| num_attention_heads, num_kv_heads, head_dim, max_position_embeddings, |
| license, license_commercial_ok, tokenizer_family, tokenizer_compat_score, |
| has_published_sensitivity_profile, published_profile_source, |
| kv_bytes_per_token_fp16, kv_bytes_per_token_int8, |
| predicted_vram_weights_mixed_34, predicted_vram_kv_4k_int8, |
| predicted_vram_total_4k, predicted_headroom_gb, |
| pruning_eligible, pruning_eligible_reason, |
| hsaq_eligibility, eligibility_reasons, |
| discovered_by_agent_id, discovered_by_agent_tier |
| ) VALUES ( |
| :model_id, :model_hash, :source, :discovered_at, |
| :arch_type, :param_count, :hidden_size, :num_layers, |
| :num_attention_heads, :num_kv_heads, :head_dim, :max_position_embeddings, |
| :license, :license_commercial_ok, :tokenizer_family, :tokenizer_compat_score, |
| :has_published_sensitivity_profile, :published_profile_source, |
| :kv_bytes_per_token_fp16, :kv_bytes_per_token_int8, |
| :predicted_vram_weights_mixed_34, :predicted_vram_kv_4k_int8, |
| :predicted_vram_total_4k, :predicted_headroom_gb, |
| :pruning_eligible, :pruning_eligible_reason, |
| :hsaq_eligibility, :eligibility_reasons, |
| :discovered_by_agent_id, :discovered_by_agent_tier |
| ) |
| """ |
|
|
| def insert_candidate_record(self, record: object) -> InsertResult: |
| """Insert one CandidateRecord (from candidate_record.py). The |
| record's to_vault_payload() shape is the contract.""" |
| import json |
|
|
| if not hasattr(record, "to_vault_payload"): |
| raise TypeError( |
| "CandidateRecord-like object required (has .to_vault_payload())" |
| ) |
|
|
| payload = record.to_vault_payload() |
|
|
| |
| if isinstance(payload.get("eligibility_reasons"), list): |
| payload["eligibility_reasons"] = json.dumps(payload["eligibility_reasons"]) |
|
|
| |
| |
| for k in ("license_commercial_ok", "has_published_sensitivity_profile", |
| "pruning_eligible"): |
| if k in payload and isinstance(payload[k], bool): |
| payload[k] = 1 if payload[k] else 0 |
|
|
| self.gate.check( |
| Capability.VAULT_APPEND, |
| payload["discovered_by_agent_id"], |
| payload["discovered_by_agent_tier"], |
| ) |
|
|
| before = self.conn.execute( |
| "SELECT COUNT(*) FROM candidate_record" |
| ).fetchone()[0] |
| self.conn.execute(self.CANDIDATE_INSERT_SQL, payload) |
| self.conn.commit() |
| after = self.conn.execute( |
| "SELECT COUNT(*) FROM candidate_record" |
| ).fetchone()[0] |
|
|
| return InsertResult( |
| requested=1, |
| inserted=after - before, |
| table="candidate_record", |
| ) |
|
|
| |
|
|
| @contextmanager |
| def transaction(self) -> Iterator["VaultAdapter"]: |
| """Manual transaction grouping. Commits on success, rolls back on |
| exception. Use when you want multiple inserts atomic.""" |
| try: |
| yield self |
| self.conn.commit() |
| except Exception: |
| self.conn.rollback() |
| raise |
|
|
|
|
| |
| |
| |
|
|
|
|
| def default_vault_path() -> Path: |
| """Where the Vault DB lives by default. Caller can override. |
| |
| Convention: $SOVEREIGN_HIVE_VAULT or ~/.sovereign_hive/vault.db. |
| """ |
| import os |
| env = os.environ.get("SOVEREIGN_HIVE_VAULT") |
| if env: |
| return Path(env) |
| return Path.home() / ".sovereign_hive" / "vault.db" |
|
|
|
|
| def open_vault( |
| db_path: str | Path | None = None, |
| permission_gate: Optional[PermissionGate] = None, |
| ) -> VaultAdapter: |
| """Open the Vault. If `db_path` is None, uses default_vault_path() |
| and creates the parent directory if needed.""" |
| path = Path(db_path) if db_path else default_vault_path() |
| path.parent.mkdir(parents=True, exist_ok=True) |
| return VaultAdapter(path, permission_gate=permission_gate) |
|
|