File size: 9,600 Bytes
c5a913d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
"""Adversarial / edge-case tests for the sibyl setup migration phases.

Hunts for the failure modes a hand-written happy-path suite misses: large/many/
nested/unicode/binary files, symlinks, corrupted/locked DBs, permission errors,
backup integrity under stress, atomic + idempotent debloat, and a fuzz loop over
random file trees asserting (a) byte-exact backup, (b) sources never modified,
(c) debloat refuses without a backup and round-trips with one.
"""
import os
import random
import sqlite3
import string
import sys
from pathlib import Path

import pytest

from sibyl_memory_cli import migrate as M
from sibyl_memory_client import MemoryClient


# ---------------------------------------------------------------- backup stress

def test_backup_large_file(tmp_path):
    home = tmp_path / "h"; home.mkdir()
    big = home / "CLAUDE.md"; big.write_bytes(b"x" * (5 * 1024 * 1024))  # 5 MB
    res = M.run_backup(M.scan_memory_files(home, cwd=home), tmp_path / "b")
    assert res.ok and res.total_bytes >= 5 * 1024 * 1024
    assert (res.backup_dir / "CLAUDE.md").stat().st_size == big.stat().st_size


def test_backup_many_files_nested(tmp_path):
    home = tmp_path / "h"; (home / ".hermes" / "memory" / "deep" / "deeper").mkdir(parents=True)
    for i in range(120):
        (home / ".hermes" / "memory" / "deep" / "deeper" / f"n{i}.md").write_text(f"note {i}\n")
    res = M.run_backup(M.scan_memory_files(home, cwd=home), tmp_path / "b")
    assert res.ok
    copied = list((res.backup_dir).rglob("n*.md"))
    assert len(copied) == 120


def test_backup_unicode_and_binary_content(tmp_path):
    home = tmp_path / "h"; home.mkdir()
    (home / "CLAUDE.md").write_text("# café ☕ 你好 \U0001f9e0\nkeep: π=3.14159\n", encoding="utf-8")
    (home / "AGENTS.md").write_bytes(bytes(range(256)))  # raw binary
    res = M.run_backup(M.scan_memory_files(home, cwd=home), tmp_path / "b")
    assert res.ok
    assert (res.backup_dir / "CLAUDE.md").read_text(encoding="utf-8").startswith("# café")
    assert (res.backup_dir / "AGENTS.md").read_bytes() == bytes(range(256))


def test_backup_never_modifies_sources(tmp_path):
    home = tmp_path / "h"; home.mkdir()
    files = {}
    for n in ("CLAUDE.md", "AGENTS.md"):
        p = home / n; p.write_text("content " * 50); files[p] = (p.read_bytes(), p.stat().st_mtime_ns)
    M.run_backup(M.scan_memory_files(home, cwd=home), tmp_path / "b")
    for p, (b, mt) in files.items():
        assert p.read_bytes() == b and p.stat().st_mtime_ns == mt


def test_backup_dir_collision_errors_cleanly(tmp_path):
    home = tmp_path / "h"; home.mkdir(); (home / "CLAUDE.md").write_text("x")
    files = M.scan_memory_files(home, cwd=home)
    fixed = M.run_backup(files, tmp_path / "b")
    # forcing the SAME backup dir name must not silently overwrite
    same = tmp_path / "b2"
    r1 = M.run_backup(files, same)
    from datetime import datetime
    # re-run into a pre-created dir of the same timestamp name -> clean error, no crash
    dirname = r1.backup_dir.name
    (tmp_path / "b3").mkdir(); (tmp_path / "b3" / dirname).mkdir()
    # monkey the name fn by writing into existing dir: simulate by calling with now fixed
    res = M.run_backup(files, tmp_path / "b3", now=datetime.fromisoformat(dirname.replace("sibyl-migration-backup-","").replace("_",":")))
    assert (res.ok is False) and "backup dir" in (res.error or "")


@pytest.mark.skipif(os.geteuid() == 0, reason="root bypasses file permissions")
def test_backup_permission_denied_source_aborts(tmp_path):
    home = tmp_path / "h"; home.mkdir()
    p = home / "CLAUDE.md"; p.write_text("secret")
    files = M.scan_memory_files(home, cwd=home)
    os.chmod(p, 0o000)
    try:
        res = M.run_backup(files, tmp_path / "b")
        # either it copies (some FS) or aborts cleanly — must NOT raise
        assert res.ok in (True, False)
        if not res.ok:
            assert "copy failed" in (res.error or "")
    finally:
        os.chmod(p, 0o644)


# ---------------------------------------------------------------- scan edge

def test_scan_handles_broken_symlink(tmp_path):
    home = tmp_path / "h"; home.mkdir()
    (home / "CLAUDE.md").symlink_to(home / "does-not-exist")  # broken symlink
    # must not raise; broken link .exists() is False so it's skipped
    found = M.scan_memory_files(home, cwd=home)
    assert isinstance(found, list)


def test_scan_no_files_returns_empty(tmp_path):
    home = tmp_path / "empty"; home.mkdir()
    assert M.scan_memory_files(home, cwd=home) == []


# ---------------------------------------------------------------- verify / DB

def test_verify_corrupt_db_is_contained(tmp_path):
    db = tmp_path / "memory.db"; db.write_bytes(os.urandom(4096))  # not a sqlite file
    assert M.db_baseline(db) == 0                      # contained, no raise
    v = M.verify_new_entries(db, 0)
    assert v["ok"] is False                            # contained, no raise


def test_verify_empty_schema_db(tmp_path):
    db = tmp_path / "memory.db"
    MemoryClient.local(str(db), tenant_id="qa")        # creates schema, 0 rows
    assert M.db_baseline(db) == 0
    assert M.verify_new_entries(db, 0)["ok"] is False


def test_verify_counts_after_writes(tmp_path):
    db = tmp_path / "memory.db"
    c = MemoryClient.local(str(db), tenant_id="qa")
    base = M.db_baseline(db)
    for i in range(25):
        c.set_entity("facts", f"f{i}", {"value": i})
    v = M.verify_new_entries(db, base)
    assert v["new_total"] == 25 and v["by_category"]["facts"] == 25


# ---------------------------------------------------------------- debloat safety

def test_debloat_atomic_no_partial_on_success(tmp_path):
    f = tmp_path / "CLAUDE.md"; f.write_text("# A\n" + "junk\n" * 1000)
    lean = M.heuristic_lean(f.read_text())
    out = M.debloat_file(f, lean, backup_exists=True)
    assert out["written"] and f.read_text() == lean
    assert not list(tmp_path.glob("*.sibyl-tmp"))      # no temp left behind


def test_debloat_idempotent_rerun(tmp_path):
    f = tmp_path / "CLAUDE.md"; f.write_text("# A\nidentity\n## later\njunk\n")
    lean = M.heuristic_lean(f.read_text())
    M.debloat_file(f, lean, backup_exists=True)
    first = f.read_text()
    # re-run with the lean of the now-lean file: should be stable
    M.debloat_file(f, M.heuristic_lean(first), backup_exists=True)
    assert "identity" in f.read_text()


def test_debloat_preserves_unicode(tmp_path):
    f = tmp_path / "CLAUDE.md"; f.write_text("# café ☕\nrule π\n", encoding="utf-8")
    lean = "# café ☕\nrule π\n"
    M.debloat_file(f, lean, backup_exists=True)
    assert f.read_text(encoding="utf-8") == lean


def test_debloat_refuses_no_backup_under_all_inputs(tmp_path):
    f = tmp_path / "CLAUDE.md"; orig = "# keep\n" * 10; f.write_text(orig)
    for lean in ("", "x", orig, "a" * 10000):
        out = M.debloat_file(f, lean, backup_exists=False)
        assert not out["written"] and f.read_text() == orig


# ---------------------------------------------------------------- heuristic_lean edge

def test_lean_empty_and_no_sections(tmp_path):
    assert "Sibyl Memory" in M.heuristic_lean("")
    flat = "just one line, no headings at all\nsecond line\n"
    out = M.heuristic_lean(flat)
    assert "just one line" in out


def test_lean_keepblock_exact(tmp_path):
    t = "x\n<!-- sibyl:keep -->\nONLY THIS\n<!-- /sibyl:keep -->\ny\n"
    assert M.heuristic_lean(t).split("\n")[0] == "ONLY THIS"


# ---------------------------------------------------------------- codex wirer edge

def test_codex_malformed_toml_no_crash(tmp_path):
    cfg = tmp_path / "config.toml"; cfg.write_text("this is = = not valid toml [[[\n")
    w = M.CodexWirer(config_path=cfg)
    st = w.current_state()                              # must not raise
    assert st["config_exists"] and st["wired_with_sibyl"] is False


def test_codex_already_wired_detected(tmp_path):
    cfg = tmp_path / "config.toml"; cfg.write_text('model="o4"\n[mcp_servers.sibyl_memory]\ncommand = "sibyl-memory-mcp"\n')
    assert M.CodexWirer(config_path=cfg).current_state()["wired_with_sibyl"] is True


# ---------------------------------------------------------------- FUZZ

def _rand_text(rng, n):
    return "".join(rng.choice(string.printable) for _ in range(n))


def test_fuzz_backup_roundtrip_and_source_immutability(tmp_path):
    rng = random.Random(20260531)
    for it in range(60):
        home = tmp_path / f"h{it}"; home.mkdir()
        snap = {}
        # random subset of known memory files with random content
        for rel in ("CLAUDE.md", "AGENTS.md", ".codex/config.toml", "MEMORY.md"):
            if rng.random() < 0.6:
                p = home / rel; p.parent.mkdir(parents=True, exist_ok=True)
                p.write_text(_rand_text(rng, rng.randint(0, 4000)), encoding="utf-8")
                snap[p] = (p.read_bytes(), p.stat().st_mtime_ns)
        files = M.scan_memory_files(home, cwd=home)
        res = M.run_backup(files, tmp_path / f"b{it}")
        assert res.ok, res.error
        # byte-exact copies
        for f in files:
            assert (res.backup_dir / f.rel).read_bytes() == f.path.read_bytes()
        # sources untouched
        for p, (b, mt) in snap.items():
            assert p.read_bytes() == b and p.stat().st_mtime_ns == mt
        # debloat round-trip on CLAUDE.md if present
        cm = home / "CLAUDE.md"
        if cm.exists():
            assert not M.debloat_file(cm, "lean", backup_exists=False)["written"]
            M.debloat_file(cm, M.heuristic_lean(cm.read_text(encoding="utf-8", errors="replace")), backup_exists=res.ok)