"""MemoryClient: the public API for sibyl-memory-client. Polymorphic constructor: open by local path OR by hosted-tier URL (v2+, not implemented yet). The local-first plugin v1 only uses the local path. The API surface mirrors the canonical sibyl_memory.* table shape so callers can move between local-SQLite-backed and Postgres-backed clients without re-learning the model. """ from __future__ import annotations import sqlite3 from pathlib import Path from typing import Any from .exceptions import NotFoundError, StorageError, TenantError, ValidationError from .storage import Storage, dumps, loads, new_id, _utc_now_iso # ---------------------------------------------------------------------- # Identifier validation (v0.4.0, KAPPA YELLOW finding) # ---------------------------------------------------------------------- # Entity names, state keys, and reference doc keys are user-supplied # identifiers. SQL is parameterized everywhere so injection is closed today, # but null bytes break downstream consumers (logs, exports, CLI display), # empty strings are nonsense as primary keys, and unbounded length is a # latent vector if any code path ever spills to filesystem. Validate on # WRITE only: reads of already-stored bad identifiers still work so users # can introspect and migrate. _IDENT_MAX_LENGTH = 1024 # Control chars (0x00-0x1F + DEL) are rejected. Tab/newline/CR included by # design: identifiers are short single-line strings, not arbitrary payloads. _IDENT_FORBIDDEN_CODE_POINTS = frozenset(range(0, 0x20)) | {0x7F} # v0.4.4 (KAPPA #3 defense-in-depth): SQL is parameterized so injection is # closed at the DB, but identifiers flow into consumers that do NOT parameterize # -- filesystem export (a `name` becomes a path component), CLI display, log # lines, future per-entity backends. Reject path-traversal shapes and the # shell/redirection/quote metacharacters that have no place in a short flat key. # Apostrophe is deliberately ALLOWED (legit in name-shaped keys like "o'brien"); # double-quote is rejected because it is also the FTS5 phrase delimiter. # # NOTE: we reject the traversal MARKER ".." (catches KAPPA's "../../etc/passwd" # and "..\\..\\windows") but NOT bare "/" or "\\" -- the v0.4.0 contract # explicitly permits slash-containing keys ("with/slash"). Rejecting raw path # separators for export-safety would be a public-contract change; flagged for # the team rather than taken unilaterally. _IDENT_FORBIDDEN_SUBSTRINGS = ("..",) _IDENT_FORBIDDEN_CHARS = frozenset('<>|;"`') def validate_identifier(value: Any, *, field_name: str) -> str: """Validate a user-supplied identifier (entity name, state key, etc.). Rejects: non-string, empty, control characters / null bytes, length > 1024. Args: value: the identifier to validate. field_name: name of the field for error messages. Returns: the validated string (unchanged on success). Raises: ValidationError on rejection, with a recovery hint. """ if not isinstance(value, str): raise ValidationError( f"{field_name} must be a string (got {type(value).__name__})", recovery=f"Pass a non-empty string for {field_name}.", ) if not value: raise ValidationError( f"{field_name} cannot be empty", recovery=f"Pass a non-empty string for {field_name}.", ) if len(value) > _IDENT_MAX_LENGTH: raise ValidationError( f"{field_name} too long ({len(value)} chars, max {_IDENT_MAX_LENGTH})", recovery=f"Use a shorter {field_name} (under {_IDENT_MAX_LENGTH} chars).", ) for ch in value: if ord(ch) in _IDENT_FORBIDDEN_CODE_POINTS: raise ValidationError( f"{field_name} contains a forbidden control character " f"(code point 0x{ord(ch):02x} at index {value.index(ch)})", recovery=( f"Identifiers must be printable single-line strings. " f"Remove control characters / null bytes / tabs / newlines." ), ) # v0.4.4: path-traversal + dangerous metacharacter defense-in-depth. for bad in _IDENT_FORBIDDEN_SUBSTRINGS: if bad in value: raise ValidationError( f"{field_name} contains a forbidden path sequence ({bad!r})", recovery=( "Identifiers are flat keys, not paths. Remove '/', '\\', " "and '..' sequences." ), ) bad_chars = sorted(_IDENT_FORBIDDEN_CHARS & set(value)) if bad_chars: raise ValidationError( f"{field_name} contains forbidden character(s): {' '.join(bad_chars)}", recovery=( "Remove shell / redirection / quote metacharacters " "( < > | ; \" ` ) from the identifier. Apostrophe is allowed." ), ) return value # ---------------------------------------------------------------------- # FTS5 error surface (v0.4.0, KAPPA YELLOW finding) # ---------------------------------------------------------------------- # Previously search() and search_entities() silently swallowed # sqlite3.OperationalError into `return []` / `pass`. KAPPA's complaint: # "a user has no signal whether their query was malformed or just genuinely # returned nothing." Now we classify: schema-missing → silent (defensive # against partial init), FTS5-syntax-error → ValidationError (caller bug), # anything else → StorageError (real backend issue). # Substrings that mark FTS5 query syntax errors. Matched case-insensitively # against str(OperationalError). Curated against the actual messages SQLite # emits in 3.38+ for FTS5 parse failures. _FTS5_QUERY_ERROR_MARKERS = ( "fts5", "malformed match", "syntax error near", "no such column", ) # Substring marking the schema-missing case: keep silent (return empty) # for defense against partial schema state on very old DBs. _SCHEMA_MISSING_MARKER = "no such table" def _classify_fts5_error(err: sqlite3.OperationalError) -> Exception | None: """Translate an FTS5-related sqlite OperationalError. Returns: None → schema-missing case; caller should treat as empty results. ValidationError → user-visible query syntax problem; raise. StorageError → real backend issue; raise. """ msg = str(err).lower() if _SCHEMA_MISSING_MARKER in msg: return None # defensive: schema partially applied, return empty if any(marker in msg for marker in _FTS5_QUERY_ERROR_MARKERS): return ValidationError( f"FTS5 rejected the search query: {err}", recovery=( "The query passed sanitization but the FTS5 engine still " "rejected it. Pass plain text or simple word tokens; FTS5 " "operator syntax (NEAR, AND/OR/NOT, column filters) is " "treated as literal text after sanitization." ), ) return StorageError( f"SQLite error during FTS5 search: {err}", recovery=( "Backend error. Check disk space, file permissions, and that " "the schema is intact. See exception chain for the underlying " "sqlite3 message." ), ) # External-content FTS5 indexes can be rebuilt from their base table via the # 'rebuild' command. journal_events_fts is contentless and cannot — corruption # there is contained (tier skipped), not self-healed. Names are a fixed # allowlist, never user input, so interpolation below is injection-safe. _EXTERNAL_CONTENT_FTS = frozenset({ "entities_fts", "state_documents_fts", "reference_documents_fts", }) def _heal_fts(conn: sqlite3.Connection, fts_table: str) -> bool: """Rebuild a corrupted external-content FTS5 index from its base table. Returns True only if the rebuild ran without error. A poisoned/desynced external-content index (sqlite3.DatabaseError: "database disk image is malformed") is reconstructed from the intact base table; the base data is never touched. Contentless or unknown tables return False (uncontainable by rebuild). """ if fts_table not in _EXTERNAL_CONTENT_FTS: return False try: conn.execute(f"INSERT INTO {fts_table}({fts_table}) VALUES('rebuild')") conn.commit() return True except sqlite3.Error: return False def _fts_query( conn: sqlite3.Connection, sql: str, params: tuple, fts_table: str, ) -> list: """Run one FTS5 MATCH query with classification + corruption containment. OperationalError → classified (schema-missing → []; query-syntax → ValidationError; other → StorageError), preserving the v0.4.0 KAPPA behavior. A broader DatabaseError (index corruption) is contained: self-heal the external-content index once and retry; if the retry still fails — or the table is contentless — return [] so a single poisoned row can never crash the caller's search. Corruption surfaces under varied messages depending on failure mode ("vtable constructor failed", "database disk image is malformed", "file is not a database"), so containment keys on the exception CLASS, not a message substring. ProgrammingError is re-raised: it signals a code or binding bug in our own SQL and must never be masked as empty results. """ try: return conn.execute(sql, params).fetchall() except sqlite3.OperationalError as e: exc = _classify_fts5_error(e) if exc is None: return [] raise exc from e except sqlite3.ProgrammingError: raise except sqlite3.DatabaseError: if _heal_fts(conn, fts_table): try: return conn.execute(sql, params).fetchall() except sqlite3.DatabaseError: return [] return [] # ---------------------------------------------------------------------- # FTS5 query sanitization # ---------------------------------------------------------------------- # v0.3.3 hardens search() / search_entities() against FTS5 injection + DoS # (audit SEC-3). User input is wrapped as a single quoted FTS5 phrase so # column-filter syntax (`name:`, `category:`, `rowid:`, etc.) and unclosed # quotes can't escape into the FTS5 parser. Caller can still get prefix # matching by passing prefix=True. # Column names + FTS5 reserved operators we reject if they appear unquoted. _FTS5_COLUMN_TOKENS = frozenset({"name", "category", "body", "tenant_id", "entity_id", "document_key", "doc_key", "payload", "ts", "rowid"}) # v0.4.4 (chainriffs Discord report + KAPPA #4): bare uppercase FTS5 operator # keywords typed inside a natural-language query ("auth AND db", "cache NEAR # eviction") were being phrase-quoted into REQUIRED LITERAL tokens, so a matched # row had to literally contain the word "AND" / "NEAR" -- recall silently # collapsed to ~0 hits. Users mean these as connectors, not search terms. Drop # them during tokenization so the remaining terms AND together (FTS5's implicit # space-join), which is the natural intent. If a query is ONLY operator keywords, # keep them as literals so a genuine search for the word "and" still resolves. _FTS5_OPERATOR_KEYWORDS = frozenset({"AND", "OR", "NOT", "NEAR"}) def _drop_fts5_operator_tokens(tokens: list[str]) -> list[str]: """Drop standalone FTS5 operator keywords; keep all tokens if that empties it.""" kept = [t for t in tokens if t.upper() not in _FTS5_OPERATOR_KEYWORDS] return kept or tokens def _sanitize_fts5_query(raw: str, *, prefix: bool = False, as_phrase: bool = False) -> str: """Wrap a user query as a safe FTS5 MATCH expression. Three modes: - Default (``prefix=False, as_phrase=False``): tokenize input into alphanumeric + underscore tokens, wrap each as a single-term phrase, and join with spaces. FTS5 treats space-joined terms as implicit AND so every token must appear in the matched row (in any order). This is the natural-language behaviour most callers want: ``search("H&M tops bought")`` now matches rows containing "H", "M", "tops", and "bought" anywhere. Each token is phrase-quoted so embedded FTS5 operators stay literal. - Explicit phrase (``as_phrase=True``): wrap the entire input as a single double-quoted phrase. Use when consecutive-token phrase match is what the caller actually wants. Embedded double-quotes are doubled per FTS5 escape rules. Safe against injection. - Prefix (``prefix=True``, mutually exclusive with as_phrase; prefix wins): strip to alphanumeric tokens, append ``*`` to the last token for prefix matching. Empty / whitespace-only queries return an empty string; callers should short-circuit on empty. Behaviour change in v0.4.2 (2026-05-22): default mode flipped from phrase-match to AND-of-tokens. Phrase-match was an unintuitive default because it made natural-language queries fail silently - ``search("H&M tops bought")`` returned 0 hits even when the haystack contained all three words. Callers who relied on phrase semantics must now pass ``as_phrase=True`` explicitly. Surfaced by the LongMemEval 50-Q benchmark on 2026-05-22 as the dominant default-UX gap for Hermes-plugin users (every natural-language query hit 0). """ if not raw or not isinstance(raw, str): return "" s = raw.strip() if not s: return "" # Strip control characters that could confuse the FTS5 tokenizer s = "".join(ch for ch in s if ch.isprintable() or ch in (" ", "\t")) if not s.strip(): return "" if prefix: # Reduce to safe bare tokens: alphanumeric + underscore only. # Anything else (quotes, colons, hyphens, FTS5 operators) becomes # a space, then we split-and-rejoin to get clean whitespace. cleaned = "".join(ch if (ch.isalnum() or ch == "_") else " " for ch in s) tokens = [t for t in cleaned.split() if t] if not tokens: return "" # In prefix mode, never use the keep-all fallback: appending `*` to a # raw FTS5 operator keyword (OR*, AND*, NOT*) produces an invalid query # that crashes the FTS5 parser (acerieus stress test # LEARNING-SEARCH-PREFIX-OPERATOR-MUTATIONS-STAY-LITERAL, 2026-06-01). # Hard-drop operators with no fallback; an all-operator prefix query # has no safe FTS5 expansion so we return empty (no match). tokens = [t for t in tokens if t.upper() not in _FTS5_OPERATOR_KEYWORDS] if not tokens: return "" if len(tokens) == 1: return f"{tokens[0]}*" # Multiple tokens: all earlier tokens are literal, the last gets `*`. return " ".join(tokens[:-1]) + f" {tokens[-1]}*" if as_phrase: # Explicit phrase mode (legacy default before v0.4.2). Escape # embedded double-quotes per FTS5 rules. escaped = s.replace('"', '""') return f'"{escaped}"' # NEW default (v0.4.2+): tokenize into alphanumeric + underscore # tokens, wrap each as a single-term phrase, join with spaces. FTS5 # treats space-joined terms as implicit AND. cleaned = "".join(ch if (ch.isalnum() or ch == "_") else " " for ch in s) tokens = [t for t in cleaned.split() if t] if not tokens: # All-symbol input: fall back to the legacy phrase wrap so the # query still has SOME defensible shape rather than empty. escaped = s.replace('"', '""') return f'"{escaped}"' tokens = _drop_fts5_operator_tokens(tokens) return " ".join(f'"{t}"' for t in tokens) # --------------------------------------------------------------------------- # Proximity re-ranking (v0.4.10): precision boost for multi-word search. # # The default sanitizer (v0.4.2+) ANDs query tokens, so every token must appear # somewhere in a matched row, in any order. That gives full recall but lets # "near-negative decoy" rows (short docs that contain the same tokens in an # unrelated context) out-rank the real answer under BM25, which rewards term # density over proximity (chainriffs + KAPPA Discord reports against v0.4.2 and # v0.4.4: precision ~73% at recall 100%). # # Fix: after BM25 ranking, bucket each hit by how tightly it matches the query, # then sort by (bucket, bm25_rank). Recall is untouched: no hit is dropped, the # candidate set is identical, only the order changes before the limit applies. # Single-token queries are a no-op (every hit is bucket 0), so the single-token # searches issued by multi_record_search (the anchor-first resolver) are # unaffected. Prefix searches are also skipped (different intent). # # bucket 0: query tokens appear as a contiguous phrase, in order # bucket 1: all query tokens appear within a small window, any order # bucket 2: tokens are scattered, or cannot be located in the extracted text _PROXIMITY_WINDOW_SLACK = 4 def _match_tokens(query: str) -> list[str]: """Lowercased alphanumeric+underscore tokens, FTS5 operator words dropped. Mirrors the tokenization the default sanitizer ANDs together, so the re-ranker reasons over the same tokens the MATCH actually required. """ if not query or not isinstance(query, str): return [] cleaned = "".join(ch if (ch.isalnum() or ch == "_") else " " for ch in query.lower()) toks = [t for t in cleaned.split() if t] return _drop_fts5_operator_tokens(toks) if toks else [] def _normalize_text(value: Any) -> str: """Flatten a hit's searchable content to a single space-joined token string. Serializes structured bodies via JSON so the re-ranker sees the same text (keys + values) that FTS5 indexed for the row. """ if isinstance(value, str): raw = value else: try: raw = dumps(value) except (TypeError, ValueError): raw = str(value) cleaned = "".join(ch if (ch.isalnum() or ch == "_") else " " for ch in raw.lower()) return " ".join(cleaned.split()) def _min_cover_span(positions: dict[str, list[int]]) -> int | None: """Smallest window (max-min+1) of doc indices covering every token once.""" merged = sorted((i, t) for t, idxs in positions.items() for i in idxs) if not merged: return None need = len(positions) have: dict[str, int] = {} best: int | None = None left = 0 for right in range(len(merged)): have[merged[right][1]] = have.get(merged[right][1], 0) + 1 while len(have) == need: width = merged[right][0] - merged[left][0] + 1 if best is None or width < best: best = width tl = merged[left][1] have[tl] -= 1 if have[tl] == 0: del have[tl] left += 1 return best def _proximity_bucket(query_tokens: list[str], text: str) -> int: """0 = contiguous phrase, 1 = tight window, 2 = scattered/absent.""" n = len(query_tokens) if n < 2: return 0 if f" {' '.join(query_tokens)} " in f" {text} ": return 0 # exact contiguous phrase, in query order doc_tokens = text.split() if not doc_tokens: return 2 positions: dict[str, list[int]] = {t: [] for t in set(query_tokens)} for i, tok in enumerate(doc_tokens): if tok in positions: positions[tok].append(i) if any(not idxs for idxs in positions.values()): return 2 # at least one query token absent from the extracted text span = _min_cover_span(positions) if span is not None and span <= n + _PROXIMITY_WINDOW_SLACK: return 1 return 2 # The default tenant for single-user local installs. DEFAULT_TENANT = "00000000-0000-0000-0000-000000000001" def _check_json(payload: Any, field: str = "body") -> str: """Validate that payload is JSON-serializable, return the encoded string.""" try: return dumps(payload) except (TypeError, ValueError) as e: raise ValidationError( f"{field} is not JSON-serializable: {e}", recovery=f"Pass a dict, list, or JSON primitive as {field}.", ) from e def _require_container(body: Any, field: str = "body") -> None: """Enforce the structured-body contract for entity + state writes. set_entity/set_state declare ``body: dict | list``. A bare primitive (str/int/float/bool/None) is valid JSON, so without this guard it would persist silently and break downstream tools that assume a structured container. reference_documents intentionally takes a free-text str body and does NOT go through here. """ if not isinstance(body, (dict, list)): raise ValidationError( f"{field} must be a dict or list, got {type(body).__name__}", recovery=( f"Wrap the value in a container, e.g. {{'value': ...}} or " f"[...]. Primitive {field} values are rejected because " "downstream consumers assume structured entity/state bodies." ), ) class MemoryClient: """Single canonical interface for reading and writing Sibyl Memory state.""" # Paid-tier-only features. Free tier raises TierGateError; upgrading to any # paid tier unlocks both self-learning and the memory linter. _PAID_ONLY_TIERS = frozenset({"sync", "team", "lifetime", "stake", "enterprise"}) def __init__( self, storage: Storage, *, tenant_id: str = DEFAULT_TENANT, tier: str = "free", account_id: str | None = None, session_token: str | None = None, cap_gate: Any = None, credentials_claim: dict[str, Any] | None = None, credentials_signature: str | None = None, ) -> None: self._storage = storage self._tenant_id = tenant_id self._tier = tier self._account_id = account_id self._session_token = session_token # Cap gate: enforces the 2 MB free-tier cap with server-authoritative # tier verification at the boundary. See _capcheck.py for the design. if cap_gate is None: from ._capcheck import CapGate, TierCache cap_gate = CapGate( account_id=account_id, session_token=session_token, db_size_fn=lambda: ( Path(storage.db_path).stat().st_size if Path(storage.db_path).exists() else 0 ), local_tier_hint=tier, cache=TierCache( Path(storage.db_path).parent / "tier_cache.json" ), credentials_claim=credentials_claim, credentials_signature=credentials_signature, ) self._cap_gate = cap_gate # ------------------------------------------------------------------ # Constructors # ------------------------------------------------------------------ @classmethod def local( cls, path: str | Path = "~/.sibyl-memory/memory.db", *, tenant_id: str = DEFAULT_TENANT, tier: str = "free", account_id: str | None = None, session_token: str | None = None, credentials_claim: dict[str, Any] | None = None, credentials_signature: str | None = None, ) -> "MemoryClient": """Open a local SQLite-backed MemoryClient. The directory at ``path``'s parent is created with mode 0700 if missing. The schema is applied on first open and is idempotent. Set ``tier`` to the user's plugin tier so paid-only features (self-learning + memory linter) gate correctly. Defaults to "free". Pass ``account_id`` and ``session_token`` from credentials.json so the SDK can verify the user's tier against the server when they approach the 2 MB free-tier cap. Without these, the SDK enforces a strict local 2 MB cap (no server check possible). """ storage = Storage(path) return cls( storage, tenant_id=tenant_id, tier=tier, account_id=account_id, session_token=session_token, credentials_claim=credentials_claim, credentials_signature=credentials_signature, ) # ------------------------------------------------------------------ # Tenant management # ------------------------------------------------------------------ def get_tenant(self) -> str: return self._tenant_id def set_tenant(self, tenant_id: str) -> None: if not tenant_id or not isinstance(tenant_id, str): raise TenantError("tenant_id must be a non-empty string") self._tenant_id = tenant_id @property def storage(self) -> Storage: return self._storage def schema_version(self) -> int | None: return self._storage.schema_version() # ------------------------------------------------------------------ # Tier (paid-tier-only feature gating) # ------------------------------------------------------------------ def get_tier(self) -> str: return self._tier def set_tier(self, tier: str) -> None: """Update the user's tier. Called by the credentials loader when the activation flow returns a tier upgrade.""" if not isinstance(tier, str) or not tier: raise ValidationError("tier must be a non-empty string") self._tier = tier def _require_paid_tier(self, feature: str) -> None: """Raise TierGateError if the current tier is not paid-tier.""" from .exceptions import TierGateError if self._tier not in self._PAID_ONLY_TIERS: raise TierGateError( f"{feature} requires a paid tier. Current tier: {self._tier!r}.", feature=feature, current_tier=self._tier, ) # ------------------------------------------------------------------ # Entities (WARM tier): single source of truth per rule 43 # ------------------------------------------------------------------ def set_entity( self, category: str, name: str, body: dict[str, Any] | list[Any], *, status: str | None = None, ) -> dict[str, Any]: """Insert or update an entity. UNIQUE (tenant_id, category, name) is enforced at the DB level. On conflict the existing row is updated (body + status + updated_at). Returns the resulting entity row as a dict. Subject to the 2 MB free-tier cap when tier='free'. Raises CapExceededError if the write would push the local DB past the cap and the server-authoritative tier check confirms the account is still free. v0.4.0: category and name are validated as identifiers (non-empty string, no control characters, length <= 1024). Raises ValidationError on rejection.""" validate_identifier(category, field_name="category") validate_identifier(name, field_name="name") _require_container(body) body_json = _check_json(body) # Cap gate: rough byte estimate (FTS5 + indexes add overhead) self._cap_gate.check(proposed_delta_bytes=len(body_json) + len(name) + len(category) + 200) with self._storage.transaction() as conn: existing = conn.execute( "SELECT id FROM entities WHERE tenant_id = ? AND category = ? AND name = ?", (self._tenant_id, category, name), ).fetchone() if existing is None: ent_id = new_id() conn.execute( "INSERT INTO entities (id, tenant_id, category, name, status, body) " "VALUES (?, ?, ?, ?, ?, ?)", (ent_id, self._tenant_id, category, name, status, body_json), ) else: ent_id = existing["id"] conn.execute( "UPDATE entities SET status = ?, body = ?, " "updated_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now') " "WHERE id = ?", (status, body_json, ent_id), ) return self.get_entity(category, name) def get_entity(self, category: str, name: str) -> dict[str, Any]: with self._storage.connection() as conn: row = conn.execute( "SELECT id, tenant_id, category, name, status, body, created_at, updated_at " "FROM entities WHERE tenant_id = ? AND category = ? AND name = ?", (self._tenant_id, category, name), ).fetchone() if row is None: raise NotFoundError(f"entity {category}/{name} not found for tenant {self._tenant_id}") return self._row_to_entity(row) def list_entities( self, category: str | None = None, *, status: str | None = None, limit: int = 100, ) -> list[dict[str, Any]]: sql = "SELECT id, tenant_id, category, name, status, body, created_at, updated_at FROM entities WHERE tenant_id = ?" params: list[Any] = [self._tenant_id] if category is not None: sql += " AND category = ?" params.append(category) if status is not None: sql += " AND status = ?" params.append(status) sql += " ORDER BY updated_at DESC LIMIT ?" params.append(limit) with self._storage.connection() as conn: rows = conn.execute(sql, params).fetchall() return [self._row_to_entity(r) for r in rows] def delete_entity(self, category: str, name: str) -> bool: with self._storage.transaction() as conn: cur = conn.execute( "DELETE FROM entities WHERE tenant_id = ? AND category = ? AND name = ?", (self._tenant_id, category, name), ) return cur.rowcount > 0 # ------------------------------------------------------------------ # State documents (HOT tier) # ------------------------------------------------------------------ def set_state(self, key: str, body: dict[str, Any] | list[Any]) -> None: """Insert or update a HOT-tier state document. v0.4.0: ``key`` is validated as an identifier (non-empty string, no control characters, length <= 1024). Raises ValidationError on rejection.""" validate_identifier(key, field_name="key") _require_container(body) body_json = _check_json(body) self._cap_gate.check(proposed_delta_bytes=len(body_json) + len(key) + 150) with self._storage.transaction() as conn: conn.execute( "INSERT INTO state_documents (tenant_id, document_key, body) VALUES (?, ?, ?) " "ON CONFLICT(tenant_id, document_key) DO UPDATE SET body = excluded.body, " "updated_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now')", (self._tenant_id, key, body_json), ) def get_state(self, key: str) -> dict[str, Any] | None: with self._storage.connection() as conn: row = conn.execute( "SELECT body, updated_at FROM state_documents WHERE tenant_id = ? AND document_key = ?", (self._tenant_id, key), ).fetchone() if row is None: return None return {"body": loads(row["body"]), "updated_at": row["updated_at"]} # ------------------------------------------------------------------ # Journal (COLD tier): append-only event log # ------------------------------------------------------------------ def write_event( self, *, evaluated: Any = None, acted: Any = None, forward: Any = None, extra: Any = None, ts: str | None = None, ) -> str: # Estimate byte cost from each non-None payload delta = 200 # row + index overhead for payload in (evaluated, acted, forward, extra): if payload is not None: try: delta += len(dumps(payload)) except (TypeError, ValueError): delta += 100 # estimate; the JSON check below will catch real failures self._cap_gate.check(proposed_delta_bytes=delta) ev_id = new_id() with self._storage.transaction() as conn: conn.execute( "INSERT INTO journal_events (id, tenant_id, ts, evaluated, acted, forward, extra) " "VALUES (?, ?, ?, ?, ?, ?, ?)", ( ev_id, self._tenant_id, ts or _utc_now_iso(), _check_json(evaluated, "evaluated") if evaluated is not None else None, _check_json(acted, "acted") if acted is not None else None, _check_json(forward, "forward") if forward is not None else None, _check_json(extra, "extra") if extra is not None else None, ), ) return ev_id def read_events( self, *, limit: int = 50, since: str | None = None, until: str | None = None, ) -> list[dict[str, Any]]: sql = "SELECT id, tenant_id, ts, evaluated, acted, forward, extra FROM journal_events WHERE tenant_id = ?" params: list[Any] = [self._tenant_id] if since is not None: sql += " AND ts >= ?" params.append(since) if until is not None: sql += " AND ts <= ?" params.append(until) sql += " ORDER BY ts DESC, id DESC LIMIT ?" params.append(limit) with self._storage.connection() as conn: rows = conn.execute(sql, params).fetchall() return [ { "id": r["id"], "ts": r["ts"], "evaluated": loads(r["evaluated"]), "acted": loads(r["acted"]), "forward": loads(r["forward"]), "extra": loads(r["extra"]), } for r in rows ] # ------------------------------------------------------------------ # Reference (REFERENCE tier): static lookup documents # ------------------------------------------------------------------ def set_reference( self, key: str, body: str, *, metadata: dict[str, Any] | None = None, ) -> None: """Insert or update a REFERENCE-tier document. v0.4.0: ``key`` is validated as an identifier (non-empty string, no control characters, length <= 1024). Raises ValidationError on rejection.""" validate_identifier(key, field_name="key") meta_json = _check_json(metadata, "metadata") if metadata is not None else None delta = len(body) + len(key) + (len(meta_json) if meta_json else 0) + 200 self._cap_gate.check(proposed_delta_bytes=delta) with self._storage.transaction() as conn: conn.execute( "INSERT INTO reference_documents (tenant_id, doc_key, body, metadata) VALUES (?, ?, ?, ?) " "ON CONFLICT(tenant_id, doc_key) DO UPDATE SET body = excluded.body, " "metadata = excluded.metadata, updated_at = strftime('%Y-%m-%dT%H:%M:%fZ', 'now')", (self._tenant_id, key, body, meta_json), ) def get_reference(self, key: str) -> dict[str, Any] | None: with self._storage.connection() as conn: row = conn.execute( "SELECT body, metadata, updated_at FROM reference_documents WHERE tenant_id = ? AND doc_key = ?", (self._tenant_id, key), ).fetchone() if row is None: return None return {"body": row["body"], "metadata": loads(row["metadata"]), "updated_at": row["updated_at"]} # ------------------------------------------------------------------ # Archive # ------------------------------------------------------------------ def archive_entity(self, category: str, name: str, reason: str | None = None) -> dict[str, Any]: """Move an entity to the archive table and delete from the active set. T1-3 fix: previously this bypassed the cap-gate. A free user at 1.9 MB could archive their largest entities (body copied into archived_entities, doubling footprint temporarily before the DELETE lands) to keep writing past the 2 MB cap. Now gated on the size of the body being copied + 200 bytes overhead. Reads the body first so we know the actual delta. NotFoundError still raised before any cap-gate work. """ # Read the row first so we can size the archive insert. NotFoundError # propagates as before: no cap-gate side effect for missing entities. with self._storage.connection() as conn: preview = conn.execute( "SELECT id, body FROM entities WHERE tenant_id = ? AND category = ? AND name = ?", (self._tenant_id, category, name), ).fetchone() if preview is None: raise NotFoundError(f"entity {category}/{name} not found") body_bytes = len(preview["body"] or "") if preview["body"] else 0 # The archive insert copies the body. Delta = body + name + category # + reason + ~200B SQLite/row overhead. Conservative estimate. delta = body_bytes + len(name) + len(category) + len(reason or "") + 200 self._cap_gate.check(proposed_delta_bytes=delta) with self._storage.transaction() as conn: row = conn.execute( "SELECT id, body FROM entities WHERE tenant_id = ? AND category = ? AND name = ?", (self._tenant_id, category, name), ).fetchone() if row is None: raise NotFoundError(f"entity {category}/{name} not found") arch_id = new_id() conn.execute( "INSERT INTO archived_entities (id, tenant_id, original_entity_id, category, name, body, archive_reason) " "VALUES (?, ?, ?, ?, ?, ?, ?)", (arch_id, self._tenant_id, row["id"], category, name, row["body"], reason), ) conn.execute("DELETE FROM entities WHERE id = ?", (row["id"],)) return {"archived_id": arch_id, "original_id": row["id"]} # ------------------------------------------------------------------ # Self-learning + lint (v0.2.0): paid-tier only # ------------------------------------------------------------------ # Both convenience entrypoints below gate on tier and raise # TierGateError for free-tier callers. The underlying Learner / Linter # classes remain available for power users via direct import, but # the documented surface is the gated convenience API. def learner(self, **kwargs: Any): """Return a Learner bound to this client's storage + tenant. Paid-tier only. Lazy import so the lower SDK stays usable without loading the learning module. Threads the client's CapGate into the Learner so accept_proposal calls go through the cap-check (T1-3 fix). Callers can override cap_gate=None explicitly to opt out for tests.""" self._require_paid_tier("self-learning") from .learning import Learner kwargs.setdefault("cap_gate", self._cap_gate) return Learner(self._storage, tenant_id=self._tenant_id, **kwargs) def learn(self, **kwargs: Any): """Convenience: construct a default Learner and run one pass. Returns a LearningRunReport. Paid-tier only.""" return self.learner(**kwargs).run() def list_skill_proposals( self, *, status: str = "pending", limit: int = 50, ) -> list[Any]: """Paid-tier only.""" return self.learner().list_proposals(status=status, limit=limit) def accept_skill_proposal( self, proposal_id: str, *, note: str | None = None, ) -> dict[str, Any]: """Paid-tier only.""" return self.learner().accept_proposal(proposal_id, note=note) def reject_skill_proposal( self, proposal_id: str, *, note: str | None = None, ) -> dict[str, Any]: """Paid-tier only.""" return self.learner().reject_proposal(proposal_id, note=note) def lint(self, **kwargs: Any): """Run the local memory linter against this tenant. Returns a LintReport with `.findings`, `.counts`, `.ok`, and `.to_ascii()`. Paid-tier only. Free-tier callers raise TierGateError pointing at the upgrade page. """ self._require_paid_tier("memory linter") from .lint import Linter # If the caller didn't supply soft_cap_bytes, look up by tier if "soft_cap_bytes" not in kwargs: from .lint import TIER_SOFT_CAPS, DEFAULT_SOFT_CAP_BYTES cap = TIER_SOFT_CAPS.get(self._tier, DEFAULT_SOFT_CAP_BYTES) # Paid tiers map to None: pass a huge cap so the check effectively never fires kwargs["soft_cap_bytes"] = cap if cap is not None else (1 << 62) return Linter(self._storage, tenant_id=self._tenant_id, **kwargs).run() # ------------------------------------------------------------------ # Free-tier read access (no gating): visibility into the upgrade pressure # ------------------------------------------------------------------ def free_tier_status(self) -> dict[str, Any]: """Return current free-tier state: DB size, soft cap, % used. Always available regardless of tier: free-tier callers use this to render the "you're at X% of your free cap" upgrade prompt without needing to call the (gated) linter. """ from .lint import TIER_SOFT_CAPS, DEFAULT_SOFT_CAP_BYTES from pathlib import Path db_size = Path(self._storage.db_path).stat().st_size if Path(self._storage.db_path).exists() else 0 cap = TIER_SOFT_CAPS.get(self._tier, DEFAULT_SOFT_CAP_BYTES) # Paid tier → no cap if cap is None: return { "tier": self._tier, "db_size_bytes": db_size, "soft_cap_bytes": None, "pct_used": None, "uncapped": True, } return { "tier": self._tier, "db_size_bytes": db_size, "soft_cap_bytes": cap, "pct_used": db_size / cap if cap else None, "uncapped": False, "at_or_above_warning": db_size >= 0.8 * cap, "at_or_above_cap": db_size >= cap, "upgrade_url": "https://sibyllabs.org/plugin#tier", } # ------------------------------------------------------------------ # FTS5 search # ------------------------------------------------------------------ def search_entities(self, query: str, *, limit: int = 20, prefix: bool = False, category: str | None = None) -> list[dict[str, Any]]: """Full-text search over entity name + category + body via FTS5. Returns warm-tier entity rows only. For cross-tier search (entities + state + reference + journal in one call), use ``search()``. Query is sanitized as a single FTS5 phrase: column-filter syntax (``name:foo``) and unclosed quotes can't escape into the parser. Set ``prefix=True`` for prefix matching on the final token. Pass ``category=""`` to anchor the search to a single entity category (exact match); this removes topical bleed across categories on multi-entity workloads (tester email 19e7e75af0b7780a). Omit to search all categories. Returns: list of entity rows. Each row is a dict with keys id, tenant_id, category, name, status, body, created_at, updated_at (body is JSON-deserialized). Raises: StorageError on backend failure; empty list on empty / invalid query. """ limit = max(0, limit) # negative limit must not broaden: SQLite LIMIT -1 = unbounded match_q = _sanitize_fts5_query(query, prefix=prefix) if not match_q: return [] # external-content FTS5: join by rowid back to base table. # _fts_query handles classification (v0.4.0 KAPPA) + corruption # containment (poisoned-index DatabaseError self-heals or returns []). cat_clause = " AND e.category = ?" if category else "" params = ((match_q, self._tenant_id, category, limit) if category else (match_q, self._tenant_id, limit)) with self._storage.connection() as conn: rows = _fts_query( conn, "SELECT e.id, e.tenant_id, e.category, e.name, e.status, e.body, e.created_at, e.updated_at " "FROM entities_fts f " "JOIN entities e ON e.rowid = f.rowid " "WHERE entities_fts MATCH ? AND f.tenant_id = ?" + cat_clause + " " "ORDER BY rank LIMIT ?", params, "entities_fts", ) ents = [self._row_to_entity(r) for r in rows] # v0.4.10: proximity re-rank (see search()). Multi-word, non-prefix only; # re-orders the fetched rows, never drops one. query_tokens = _match_tokens(query) if not prefix and len(query_tokens) >= 2 and len(ents) > 1: keyed = [] for idx, e in enumerate(ents): text = " ".join(( _normalize_text(e.get("body")), _normalize_text(e.get("name", "")), _normalize_text(e.get("category") or ""), )) keyed.append((_proximity_bucket(query_tokens, text), idx, e)) keyed.sort(key=lambda t: (t[0], t[1])) ents = [t[2] for t in keyed] return ents def search(self, query: str, *, limit: int = 20, prefix: bool = False, tiers: tuple[str, ...] | None = None) -> list[dict[str, Any]]: """Cross-tier full-text search over entities + state + reference + journal. Each hit is tier-tagged so callers know which tier surfaced the match. Returns: list of dicts shaped: { "tier": "entity" | "state" | "reference" | "journal", "key": , "category": , "body": , "snippet": , "rank": , "ts": } Ordered by FTS5 rank across the union. The default ``limit`` applies globally (combined across tiers). Pass ``tiers=("entity", "state")`` to restrict. Query is sanitized as a single FTS5 phrase (see ``search_entities`` notes). Empty / invalid queries return []. Raises: StorageError on backend failure. """ limit = max(0, limit) # negative limit must not broaden: SQLite LIMIT -1 = unbounded match_q = _sanitize_fts5_query(query, prefix=prefix) if not match_q: return [] allowed = set(tiers) if tiers else {"entity", "state", "reference", "journal"} hits: list[dict[str, Any]] = [] with self._storage.connection() as conn: # v0.4.0 (KAPPA YELLOW finding): per-tier OperationalError handling # now classifies via _classify_fts5_error. Schema-missing keeps the # previous behavior (skip this tier silently, other tiers continue). # FTS5 syntax / real backend errors raise: the query is bad for # ALL tiers, no point continuing through the union. if "entity" in allowed: for r in _fts_query( conn, "SELECT 'entity' AS tier, e.name AS key, e.category, e.body, " " e.updated_at AS ts, " " snippet(entities_fts, 2, '[', ']', '...', 12) AS snip, " " rank " "FROM entities_fts f JOIN entities e ON e.rowid = f.rowid " "WHERE entities_fts MATCH ? AND f.tenant_id = ? " "ORDER BY rank LIMIT ?", (match_q, self._tenant_id, limit), "entities_fts", ): hits.append({ "tier": "entity", "key": r["key"], "category": r["category"], "body": loads(r["body"]), "snippet": r["snip"], "rank": r["rank"], "ts": r["ts"], }) if "state" in allowed: for r in _fts_query( conn, "SELECT 'state' AS tier, s.document_key AS key, s.body, " " s.updated_at AS ts, " " snippet(state_documents_fts, 1, '[', ']', '...', 12) AS snip, " " rank " "FROM state_documents_fts f JOIN state_documents s " " ON s.rowid = f.rowid " "WHERE state_documents_fts MATCH ? AND f.tenant_id = ? " "ORDER BY rank LIMIT ?", (match_q, self._tenant_id, limit), "state_documents_fts", ): hits.append({ "tier": "state", "key": r["key"], "category": None, "body": loads(r["body"]), "snippet": r["snip"], "rank": r["rank"], "ts": r["ts"], }) if "reference" in allowed: for r in _fts_query( conn, "SELECT 'reference' AS tier, d.doc_key AS key, d.body, " " d.updated_at AS ts, " " snippet(reference_documents_fts, 1, '[', ']', '...', 12) AS snip, " " rank " "FROM reference_documents_fts f JOIN reference_documents d " " ON d.rowid = f.rowid " "WHERE reference_documents_fts MATCH ? AND f.tenant_id = ? " "ORDER BY rank LIMIT ?", (match_q, self._tenant_id, limit), "reference_documents_fts", ): hits.append({ "tier": "reference", "key": r["key"], "category": None, "body": r["body"], "snippet": r["snip"], "rank": r["rank"], "ts": r["ts"], }) if "journal" in allowed: # Journal FTS5 is standalone/contentless: fetch event_id from # the FTS5 table, then join to journal_events by id (TEXT PK) # for typed body fields. Contentless tables can't 'rebuild', # so _fts_query contains corruption by returning [] (tier # skipped) rather than crashing the whole search. # # v0.4.7: cap the journal tier's contribution. Journal entries # are long and share many common terms (Project, Research, # Decision...), so on mixed-keyword queries they were dominating # 50-80% of hits and burying real entities/state/reference. Give # journal at most a quarter of the global limit; the structured # tiers keep the rest. The global rank-sort + limit still applies. journal_limit = max(1, limit // 4) if limit > 0 else 0 for r in _fts_query( conn, "SELECT 'journal' AS tier, j.id AS key, j.ts, " " j.evaluated, j.acted, j.forward, j.extra, " " snippet(journal_events_fts, 1, '[', ']', '...', 12) AS snip, " " f.rank AS rank " "FROM journal_events_fts f JOIN journal_events j " " ON j.id = f.event_id " "WHERE journal_events_fts MATCH ? AND f.tenant_id = ? " "ORDER BY f.rank LIMIT ?", (match_q, self._tenant_id, journal_limit), "journal_events_fts", ): hits.append({ "tier": "journal", "key": r["key"], "category": None, "body": { "evaluated": loads(r["evaluated"]), "acted": loads(r["acted"]), "forward": loads(r["forward"]), "extra": loads(r["extra"]), }, "snippet": r["snip"], "rank": r["rank"], "ts": r["ts"], }) # Sort by rank (lower = better in FTS5), with a tier tiebreaker: at # comparable rank the content tiers (entity/state/reference) sort before # the contentless journal tier, whose BM25 scores are not on the same # scale (cross-tier rank comparability, tester email 19e7eb3096b4dae5). _tier_rank = {"entity": 0, "state": 0, "reference": 0, "journal": 1} # v0.4.10: proximity re-rank. For multi-word (non-prefix) queries, bucket # each hit by how tightly it matches (contiguous phrase > tight window > # scattered) and sort by (bucket, rank, tier). This demotes short # "near-negative decoy" rows that share the query tokens in an unrelated # context, without dropping any hit (recall unchanged). Single-token and # prefix queries keep the plain BM25 order, so multi_record_search (which # only issues single-token searches) is unaffected. query_tokens = _match_tokens(query) if not prefix and len(query_tokens) >= 2: keyed = [] for h in hits: text = " ".join(( _normalize_text(h.get("body")), _normalize_text(h.get("key", "")), _normalize_text(h.get("category") or ""), )) keyed.append(( _proximity_bucket(query_tokens, text), h["rank"], _tier_rank.get(h["tier"], 0), h, )) keyed.sort(key=lambda t: (t[0], t[1], t[2])) hits = [t[3] for t in keyed] else: hits.sort(key=lambda h: (h["rank"], _tier_rank.get(h["tier"], 0))) return hits[:limit] # ------------------------------------------------------------------ # Helpers # ------------------------------------------------------------------ def _row_to_entity(self, row: sqlite3.Row) -> dict[str, Any]: return { "id": row["id"], "tenant_id": row["tenant_id"], "category": row["category"], "name": row["name"], "status": row["status"], "body": loads(row["body"]), "created_at": row["created_at"], "updated_at": row["updated_at"], }