| """`sibyl setup` guided onboarding flow (v2): backup -> wire MCP -> extract -> verify -> debloat. |
| |
| Design (operator-locked 2026-05-31): one dynamic, resumable, guided flow that gets a |
| user "set up and optimized" no matter which harness they run. The CLI does the |
| DETERMINISTIC work (back up files, detect state, verify the DB, trim files) and |
| CONDUCTS; the user's own harness does the semantic EXTRACTION (it has the memory |
| tools). Every gap (no plugin, MCP not wired) prints exact per-harness instructions. |
| |
| This module adds the new phases on top of the existing wirers in setup.py |
| (HermesWirer / ClaudeCodeWirer) and adds CodexWirer so all three harnesses are |
| first-class. Nothing here touches live files except the explicitly-confirmed |
| debloat step, and only after a verified backup exists. |
| """ |
| from __future__ import annotations |
|
|
| import json |
| import os |
| import shutil |
| import sqlite3 |
| from dataclasses import dataclass, field |
| from datetime import datetime, timezone |
| from pathlib import Path |
| from typing import Callable, Optional |
|
|
| from . import _aesthetic as A |
|
|
| |
| |
| |
| |
| |
|
|
| HARNESS_FILES: dict[str, list[str]] = { |
| "claude-code": ["CLAUDE.md", ".claude/CLAUDE.md", ".claude/settings.json"], |
| "codex": ["AGENTS.md", ".codex/config.toml", ".codex/AGENTS.md"], |
| "hermes": [".hermes/config.yaml", ".hermes/memory"], |
| "generic": ["AGENTS.md", "MEMORY.md", "memory.md", ".cursorrules", ".cursor/rules"], |
| } |
|
|
|
|
| @dataclass |
| class FoundFile: |
| harness: str |
| path: Path |
| rel: str |
| is_dir: bool |
| size: int |
|
|
|
|
| def _backup_rel(p: Path, home: Path, cwd: Optional[Path]) -> str: |
| """Collision-free backup path for a source file. Files under home keep their |
| home-relative path; files outside home (a project elsewhere) get a `project/` |
| prefix; anything else `external/<name>`. This prevents a home file and a |
| same-named project file from clobbering each other in the backup (data-loss bug).""" |
| try: |
| if p.is_relative_to(home): |
| return str(p.relative_to(home)) |
| except (ValueError, OSError): |
| pass |
| if cwd: |
| try: |
| cwd = Path(cwd) |
| if p.is_relative_to(cwd): |
| return "project/" + str(p.relative_to(cwd)) |
| except (ValueError, OSError): |
| pass |
| return "external/" + p.name |
|
|
|
|
| def scan_memory_files(home: Optional[Path] = None, cwd: Optional[Path] = None) -> list[FoundFile]: |
| """Find existing memory/agent files across harnesses. De-dupes by resolved path. |
| Looks in both the user's home and the current project dir (CLAUDE.md lives in projects).""" |
| home = Path(home).expanduser() if home else Path.home() |
| roots = [home] |
| if cwd: |
| roots.append(Path(cwd)) |
| seen: set[Path] = set() |
| found: list[FoundFile] = [] |
| for harness, rels in HARNESS_FILES.items(): |
| for rel in rels: |
| for root in roots: |
| p = (root / rel) |
| if not p.exists(): |
| continue |
| try: |
| key = p.resolve() |
| except OSError: |
| key = p |
| if key in seen: |
| continue |
| seen.add(key) |
| is_dir = p.is_dir() |
| size = _tree_size(p) if is_dir else p.stat().st_size |
| found.append(FoundFile(harness, p, _backup_rel(p, home, cwd), is_dir, size)) |
| return found |
|
|
|
|
| def _tree_size(p: Path) -> int: |
| return sum(f.stat().st_size for f in p.rglob("*") if f.is_file()) |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class BackupResult: |
| backup_dir: Path |
| files: list[str] = field(default_factory=list) |
| total_bytes: int = 0 |
| ok: bool = True |
| error: Optional[str] = None |
|
|
|
|
| def backup_dir_name(now: Optional[datetime] = None) -> str: |
| now = now or datetime.now(timezone.utc) |
| return "sibyl-migration-backup-" + now.strftime("%Y-%m-%dT%H_%M_%S") |
|
|
|
|
| def run_backup(files: list[FoundFile], dest_parent: Path, *, now: Optional[datetime] = None) -> BackupResult: |
| """Copy each found file/dir into a fresh timestamped backup folder under dest_parent. |
| Verifies byte counts. Never modifies sources. Aborts (ok=False) on first failure.""" |
| dest_parent = Path(dest_parent).expanduser() |
| backup = dest_parent / backup_dir_name(now) |
| res = BackupResult(backup_dir=backup) |
| try: |
| backup.mkdir(parents=True, exist_ok=False) |
| except Exception as e: |
| res.ok = False; res.error = f"could not create backup dir: {e}" |
| return res |
| for f in files: |
| target = backup / f.rel |
| try: |
| target.parent.mkdir(parents=True, exist_ok=True) |
| if f.is_dir: |
| shutil.copytree(f.path, target, dirs_exist_ok=True) |
| src_sz, dst_sz = _tree_size(f.path), _tree_size(target) |
| else: |
| shutil.copy2(f.path, target) |
| src_sz, dst_sz = f.path.stat().st_size, target.stat().st_size |
| if src_sz != dst_sz: |
| res.ok = False; res.error = f"byte mismatch on {f.rel} ({src_sz} != {dst_sz})" |
| return res |
| res.files.append(f.rel); res.total_bytes += dst_sz |
| except Exception as e: |
| res.ok = False; res.error = f"copy failed on {f.rel}: {type(e).__name__}: {e}" |
| return res |
| return res |
|
|
|
|
| |
| |
| |
| |
|
|
| from .setup import CodexWirer, ClaudeCodeWirer, HermesWirer |
|
|
|
|
| |
| |
| def wire_instructions(harness: str) -> list[str]: |
| if harness == "claude-code": |
| return ["Open a new terminal and run:", |
| " claude mcp add sibyl-memory -- sibyl-memory-mcp", |
| "Restart Claude Code (or /mcp -> reconnect sibyl-memory), then return here."] |
| if harness == "codex": |
| return CodexWirer().instructions() |
| if harness == "hermes": |
| return ["Open a new terminal and run:", |
| " sibyl-memory-hermes install-plugin", |
| "Then set memory.provider: sibyl in ~/.hermes/config.yaml and restart Hermes."] |
| return ["Register an MCP server named 'sibyl-memory' with command 'sibyl-memory-mcp' in your agent's MCP config, then restart it."] |
|
|
|
|
| |
| |
| |
|
|
| def extraction_prompt(harness: str, backup_dir: Path) -> str: |
| """Tailored backup-first prompt the user runs IN their harness. Reads only from |
| the backup; never edits live files. Mirrors the beta-page conventions.""" |
| tool = "sibyl_remember" if harness in ("claude-code", "codex") else "your memory tool" |
| return ( |
| f"Read ONLY from the backup folder at {backup_dir} (never touch my live files). " |
| "For every piece of accumulated memory in those files (facts and configs, preferences " |
| "and patterns, project context, people and relationship notes), write each one into Sibyl " |
| f"Memory using {tool}:\n" |
| " - facts/configs/env: structured key-value content\n" |
| " - preferences/patterns: tagged as preference\n" |
| " - project context/history: under a project namespace\n" |
| " - people/relationships: with the person's name as context\n" |
| "Do not edit, trim, or delete any live file. When done, tell me how many entries you wrote " |
| "in each category." |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def db_baseline(db_path: Path) -> int: |
| """Total entity count now, to diff against after extraction. 0 if no DB yet.""" |
| db_path = Path(db_path).expanduser() |
| if not db_path.exists(): |
| return 0 |
| try: |
| con = sqlite3.connect(str(db_path)); con.row_factory = sqlite3.Row |
| n = con.execute("SELECT COUNT(*) c FROM entities").fetchone()["c"] |
| con.close() |
| return int(n) |
| except sqlite3.Error: |
| return 0 |
|
|
|
|
| def verify_new_entries(db_path: Path, baseline_total: int) -> dict: |
| """Return {'new_total': N, 'by_category': {...}, 'ok': bool}. ok = new_total > 0.""" |
| db_path = Path(db_path).expanduser() |
| out = {"new_total": 0, "by_category": {}, "ok": False} |
| if not db_path.exists(): |
| return out |
| try: |
| con = sqlite3.connect(str(db_path)); con.row_factory = sqlite3.Row |
| total = con.execute("SELECT COUNT(*) c FROM entities").fetchone()["c"] |
| cats = con.execute("SELECT category, COUNT(*) c FROM entities GROUP BY category ORDER BY c DESC").fetchall() |
| con.close() |
| out["new_total"] = max(0, int(total) - int(baseline_total)) |
| out["by_category"] = {r["category"]: int(r["c"]) for r in cats} |
| out["ok"] = out["new_total"] > 0 |
| except sqlite3.Error as e: |
| out["error"] = str(e) |
| return out |
|
|
|
|
| |
| |
| |
|
|
| KEEP_START, KEEP_END = "<!-- sibyl:keep -->", "<!-- /sibyl:keep -->" |
|
|
|
|
| def heuristic_lean(text: str) -> str: |
| """Conservative lean version when the agent didn't provide one. |
| If the file marks a keep-block, keep exactly that. Otherwise keep everything up to |
| the first H2 section (identity/rules usually live at the top) and append a pointer. |
| The full original is always in the backup, so this is reversible.""" |
| if KEEP_START in text and KEEP_END in text: |
| core = text.split(KEEP_START, 1)[1].split(KEEP_END, 1)[0].strip() |
| else: |
| lines, core_lines = text.splitlines(), [] |
| seen_h2 = 0 |
| for ln in lines: |
| if ln.startswith("## "): |
| seen_h2 += 1 |
| if seen_h2 > 1: |
| break |
| core_lines.append(ln) |
| core = "\n".join(core_lines).strip() |
| pointer = ("\n\n<!-- The rest of this file's accumulated memory now lives in Sibyl Memory " |
| "and is recalled on demand. Full pre-migration backup is preserved. -->\n") |
| return core + pointer |
|
|
|
|
| def debloat_file(live_path: Path, lean_text: str, *, backup_exists: bool, dry_run: bool = False) -> dict: |
| """Atomically replace live_path with lean_text. REFUSES unless backup_exists is True. |
| Returns {before, after, written, error}.""" |
| live_path = Path(live_path).expanduser() |
| out = {"before": 0, "after": len(lean_text.encode()), "written": False} |
| if not backup_exists: |
| out["error"] = "refused: no verified backup exists"; return out |
| if not live_path.exists(): |
| out["error"] = "live file not found"; return out |
| out["before"] = live_path.stat().st_size |
| if dry_run: |
| return out |
| tmp = live_path.with_suffix(live_path.suffix + ".sibyl-tmp") |
| tmp.write_text(lean_text, encoding="utf-8") |
| os.replace(tmp, live_path) |
| out["written"] = True |
| return out |
|
|
|
|
| |
| |
| |
|
|
| def detect_state(home: Optional[Path] = None, cwd: Optional[Path] = None, db_path: Optional[Path] = None) -> dict: |
| """Snapshot for resumability: what's present, what's wired, how much memory exists.""" |
| from .setup import HermesWirer, ClaudeCodeWirer |
| home = Path(home).expanduser() if home else Path.home() |
| db_path = Path(db_path).expanduser() if db_path else (home / ".sibyl-memory" / "memory.db") |
| wirers = {"claude-code": ClaudeCodeWirer(), "codex": CodexWirer(), "hermes": HermesWirer()} |
| return { |
| "files": scan_memory_files(home, cwd), |
| "harnesses": {n: {"present": w.is_present(), **w.current_state()} for n, w in wirers.items()}, |
| "db_entries": db_baseline(db_path), |
| "db_path": db_path, |
| } |
|
|
|
|
| class GuidedIO: |
| """IO seam so the guided flow is testable non-interactively. Pass `scripted` |
| answers (list) to drive confirms/pauses without a TTY.""" |
| def __init__(self, scripted=None): |
| self.scripted = list(scripted or []) |
| self.lines: list[str] = [] |
|
|
| def say(self, s: str = "") -> None: |
| self.lines.append(str(s)) |
|
|
| def confirm(self, q: str, *, default: bool = True) -> bool: |
| if self.scripted: |
| ans = self.scripted.pop(0) |
| else: |
| try: |
| ans = input(f"{q} [{'Y/n' if default else 'y/N'}]: ").strip() |
| except EOFError: |
| ans = "" |
| return default if not ans else ans.strip().lower().startswith("y") |
|
|
| def pause(self, q: str = "press Enter to continue") -> None: |
| if self.scripted: |
| self.scripted.pop(0) |
| return |
| try: |
| input(q) |
| except EOFError: |
| pass |
|
|
|
|
| def run_guided_setup(*, home=None, cwd=None, db_path=None, backup_parent=None, |
| io: Optional[GuidedIO] = None, wirers: Optional[dict] = None, |
| extract_fn: Optional[Callable[[Path, Path], None]] = None, |
| debloat: bool = True, force: bool = False, now=None) -> dict: |
| """The assembled guided flow: backup -> auto-wire each harness (instructions on |
| failure) -> extraction handoff -> verify -> confirmed debloat. Returns a structured |
| report. `extract_fn(backup_dir, db_path)` performs/simulates extraction; default |
| prints the prompt for the user to run in their own harness. `wirers` is injectable |
| so tests (and isolation) never touch real config.""" |
| from .setup import ALL_WIRERS |
| io = io or GuidedIO() |
| home = Path(home).expanduser() if home else Path.home() |
| db_path = Path(db_path).expanduser() if db_path else (home / ".sibyl-memory" / "memory.db") |
| backup_parent = Path(backup_parent).expanduser() if backup_parent else home |
| report: dict = {"ok": True, "phases": {}} |
|
|
| |
| files = scan_memory_files(home, cwd) |
| report["files"] = [f.rel for f in files] |
| if not files: |
| io.say("No memory/agent files found. Nothing to migrate.") |
| report["ok"] = False |
| return report |
| bk = run_backup(files, backup_parent, now=now) |
| report["phases"]["backup"] = {"ok": bk.ok, "dir": str(bk.backup_dir), "files": len(bk.files)} |
| if not bk.ok: |
| io.say(f"Backup failed: {bk.error}. Aborting; nothing else touched.") |
| report["ok"] = False |
| return report |
| io.say(f"Backed up {len(bk.files)} files -> {bk.backup_dir} (originals untouched)") |
|
|
| |
| if wirers is None: |
| wirers = {n: cls() for n, cls in ALL_WIRERS.items()} |
| detected = {n: w for n, w in wirers.items() if w.is_present()} |
| wire_report = {} |
| for name, w in detected.items(): |
| if w.current_state().get("wired_with_sibyl"): |
| wire_report[name] = "already" |
| continue |
| outcome = w.wire(force=force) |
| wire_report[name] = outcome.status |
| if outcome.status not in ("wired", "already"): |
| io.say(f"{name}: auto-wire incomplete ({outcome.message}). Do this manually:") |
| for ln in wire_instructions(name): |
| io.say(" " + ln) |
| report["phases"]["wire"] = wire_report |
|
|
| |
| baseline = db_baseline(db_path) |
| target = next(iter(detected), "claude-code") |
| if extract_fn is not None: |
| extract_fn(bk.backup_dir, db_path) |
| else: |
| io.say("Run this in your agent (it reads the backup, writes to Sibyl):") |
| io.say(extraction_prompt(target, bk.backup_dir)) |
| io.pause("After it finishes, press Enter to verify") |
|
|
| |
| v = verify_new_entries(db_path, baseline) |
| report["phases"]["verify"] = v |
| io.say(f"Verified {v['new_total']} new entries in Sibyl Memory.") |
|
|
| |
| cm = (Path(cwd) / "CLAUDE.md") if cwd else (home / "CLAUDE.md") |
| if debloat and v["ok"] and cm.exists(): |
| if io.confirm(f"Trim {cm.name} to lean now? Full backup is safe at {bk.backup_dir}", default=False): |
| lean = heuristic_lean(cm.read_text(encoding="utf-8", errors="replace")) |
| d = debloat_file(cm, lean, backup_exists=bk.ok) |
| report["phases"]["debloat"] = {"written": d["written"], "before": d["before"], "after": d["after"]} |
| io.say(f"Trimmed {cm.name}. Backup safe at {bk.backup_dir}") |
| return report |
|
|