| """Tests for jarvis.memory.""" |
|
|
| import sqlite3 |
| from pathlib import Path |
|
|
| import pytest |
|
|
| from jarvis.memory import MemoryStore |
|
|
|
|
| def test_memory_store_accepts_relative_path(tmp_path, monkeypatch): |
| monkeypatch.chdir(tmp_path) |
| store = MemoryStore("memory.sqlite") |
| try: |
| store.add_memory("Relative path works.") |
| finally: |
| store.close() |
| assert (Path(tmp_path) / "memory.sqlite").exists() |
|
|
|
|
| def test_update_task_step_returns_false_for_missing_step(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| plan_id = store.add_task_plan("Test", ["A", "B"]) |
| updated = store.update_task_step(plan_id, 99, "done") |
| finally: |
| store.close() |
| assert updated is False |
|
|
|
|
| def test_add_task_plan_requires_non_empty_steps(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| with pytest.raises(ValueError): |
| store.add_task_plan("Plan", [" ", "\n"]) |
| finally: |
| store.close() |
|
|
|
|
| def test_update_task_step_rejects_invalid_status(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| plan_id = store.add_task_plan("Plan", ["step"]) |
| with pytest.raises(ValueError): |
| store.update_task_step(plan_id, 0, "finished") |
| finally: |
| store.close() |
|
|
|
|
| def test_recent_tolerates_invalid_tags_payload(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| memory_id = store.add_memory("Note", tags=["valid"]) |
| store._conn.execute("UPDATE memory SET tags = ? WHERE id = ?", ("{not-json", memory_id)) |
| store._conn.commit() |
| rows = store.recent(limit=1) |
| assert len(rows) == 1 |
| assert rows[0].tags == [] |
| finally: |
| store.close() |
|
|
|
|
| def test_memory_store_close_is_idempotent(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| store.close() |
| store.close() |
|
|
|
|
| def test_memory_store_enables_foreign_keys(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| value = store._conn.execute("PRAGMA foreign_keys;").fetchone()[0] |
| assert int(value) == 1 |
| finally: |
| store.close() |
|
|
|
|
| def test_memory_store_sets_busy_timeout(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| value = store._conn.execute("PRAGMA busy_timeout;").fetchone()[0] |
| assert int(value) == 5000 |
| finally: |
| store.close() |
|
|
|
|
| def test_task_plan_reopens_when_step_marked_not_done(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| plan_id = store.add_task_plan("Plan", ["A", "B"]) |
| assert store.update_task_step(plan_id, 0, "done") |
| assert store.update_task_step(plan_id, 1, "done") |
| closed = store.list_task_plans(open_only=False)[0] |
| assert closed.status == "closed" |
|
|
| assert store.update_task_step(plan_id, 1, "pending") |
| reopened = store.list_task_plans(open_only=False)[0] |
| assert reopened.status == "open" |
| finally: |
| store.close() |
|
|
|
|
| def test_search_limits_are_clamped_in_store(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| for idx in range(10): |
| store.add_memory(f"memory {idx}") |
| results = store.search_v2("memory", limit=10000) |
| assert len(results) <= 200 |
| finally: |
| store.close() |
|
|
|
|
| def test_recent_bool_limit_uses_default_store_limit(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| for idx in range(10): |
| store.add_memory(f"memory {idx}") |
| rows = store.recent(limit=True) |
| assert len(rows) == 5 |
| finally: |
| store.close() |
|
|
|
|
| def test_search_fractional_limit_uses_default_store_limit(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| for idx in range(10): |
| store.add_memory(f"memory {idx}") |
| rows = store.search_v2("memory", limit=2.7) |
| assert len(rows) == 5 |
| finally: |
| store.close() |
|
|
|
|
| def test_memory_optimize_and_vacuum_update_status(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| store.optimize() |
| store.vacuum() |
| status = store.memory_status() |
| assert status["last_optimize"] is not None |
| assert status["last_vacuum"] is not None |
| finally: |
| store.close() |
|
|
|
|
| def test_get_summary_returns_topic_case_insensitive(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| store.upsert_summary("Persona_Style", "friendly") |
| summary = store.get_summary("persona_style") |
| assert summary is not None |
| assert summary.summary == "friendly" |
| finally: |
| store.close() |
|
|
|
|
| def test_add_task_plan_rolls_back_on_step_insert_failure(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| store._conn.execute( |
| """ |
| CREATE TRIGGER fail_second_step_insert |
| BEFORE INSERT ON task_steps |
| WHEN NEW.idx = 1 |
| BEGIN |
| SELECT RAISE(FAIL, 'step insert failure'); |
| END; |
| """ |
| ) |
| with pytest.raises(sqlite3.DatabaseError): |
| store.add_task_plan("Plan", ["A", "B"]) |
| plan_count = store._conn.execute("SELECT COUNT(*) FROM task_plans").fetchone()[0] |
| step_count = store._conn.execute("SELECT COUNT(*) FROM task_steps").fetchone()[0] |
| assert int(plan_count) == 0 |
| assert int(step_count) == 0 |
| finally: |
| store.close() |
|
|
|
|
| def test_update_task_step_rolls_back_on_plan_status_failure(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| plan_id = store.add_task_plan("Plan", ["A"]) |
| store._conn.execute( |
| """ |
| CREATE TRIGGER fail_plan_status_update |
| BEFORE UPDATE ON task_plans |
| BEGIN |
| SELECT RAISE(FAIL, 'plan status update failure'); |
| END; |
| """ |
| ) |
| with pytest.raises(sqlite3.DatabaseError): |
| store.update_task_step(plan_id, 0, "done") |
| step_status = store._conn.execute( |
| "SELECT status FROM task_steps WHERE plan_id = ? AND idx = 0", |
| (plan_id,), |
| ).fetchone()[0] |
| plan_status = store._conn.execute( |
| "SELECT status FROM task_plans WHERE id = ?", |
| (plan_id,), |
| ).fetchone()[0] |
| assert step_status == "pending" |
| assert plan_status == "open" |
| finally: |
| store.close() |
|
|
|
|
| def test_timer_store_lifecycle(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| now = 1_700_000_000.0 |
| timer_id = store.add_timer( |
| due_at=now + 120.0, |
| duration_sec=120.0, |
| label="tea", |
| created_at=now, |
| ) |
| active = store.list_timers(status="active", include_expired=False, now=now) |
| assert [timer.id for timer in active] == [timer_id] |
| assert active[0].label == "tea" |
|
|
| assert store.cancel_timer(timer_id) is True |
| assert store.cancel_timer(timer_id) is False |
| cancelled = store.list_timers(status="cancelled") |
| assert [timer.id for timer in cancelled] == [timer_id] |
| finally: |
| store.close() |
|
|
|
|
| def test_expire_timers_moves_due_rows_to_expired(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| now = 1_700_000_000.0 |
| _ = store.add_timer( |
| due_at=now - 5.0, |
| duration_sec=30.0, |
| label="old", |
| created_at=now - 35.0, |
| ) |
| changed = store.expire_timers(now=now) |
| assert changed == 1 |
| active = store.list_timers(status="active", include_expired=False, now=now) |
| assert active == [] |
| expired = store.list_timers(status="expired") |
| assert len(expired) == 1 |
| counts = store.timer_counts() |
| assert counts["expired"] == 1 |
| assert counts["active"] == 0 |
| finally: |
| store.close() |
|
|
|
|
| def test_reminder_store_lifecycle(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| now = 1_700_000_000.0 |
| reminder_id = store.add_reminder( |
| text="take medicine", |
| due_at=now + 300.0, |
| created_at=now, |
| ) |
|
|
| pending = store.list_reminders(status="pending", now=now) |
| assert [reminder.id for reminder in pending] == [reminder_id] |
| assert pending[0].text == "take medicine" |
| assert pending[0].notified_at is None |
|
|
| assert store.mark_reminder_notified(reminder_id, notified_at=now + 60.0) is True |
| pending_after_notify = store.list_reminders(status="pending", include_notified=False, now=now) |
| assert pending_after_notify == [] |
|
|
| assert store.complete_reminder(reminder_id, completed_at=now + 120.0) is True |
| assert store.complete_reminder(reminder_id, completed_at=now + 121.0) is False |
| completed = store.list_reminders(status="completed") |
| assert [reminder.id for reminder in completed] == [reminder_id] |
| assert completed[0].completed_at == pytest.approx(now + 120.0) |
| finally: |
| store.close() |
|
|
|
|
| def test_list_reminders_due_only_filter(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| now = 1_700_000_000.0 |
| _ = store.add_reminder(text="overdue", due_at=now - 5.0, created_at=now - 10.0) |
| _ = store.add_reminder(text="future", due_at=now + 600.0, created_at=now - 5.0) |
|
|
| due = store.list_reminders(status="pending", due_only=True, now=now) |
| assert len(due) == 1 |
| assert due[0].text == "overdue" |
| finally: |
| store.close() |
|
|
|
|
| def test_prune_retention_removes_old_non_active_data(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| cutoff = 1_700_000_000.0 |
| old_ts = cutoff - 10_000.0 |
| new_ts = cutoff + 10_000.0 |
|
|
| old_memory = store.add_memory("old memory") |
| new_memory = store.add_memory("new memory") |
| store._conn.execute("UPDATE memory SET created_at = ? WHERE id = ?", (old_ts, old_memory)) |
| store._conn.execute("UPDATE memory SET created_at = ? WHERE id = ?", (new_ts, new_memory)) |
|
|
| old_plan = store.add_task_plan("old plan", ["a"]) |
| new_plan = store.add_task_plan("new plan", ["b"]) |
| store._conn.execute("UPDATE task_plans SET created_at = ? WHERE id = ?", (old_ts, old_plan)) |
| store._conn.execute("UPDATE task_plans SET created_at = ? WHERE id = ?", (new_ts, new_plan)) |
|
|
| store.upsert_summary("old_topic", "old summary") |
| store.upsert_summary("new_topic", "new summary") |
| store._conn.execute("UPDATE memory_summaries SET updated_at = ? WHERE topic = 'old_topic'", (old_ts,)) |
| store._conn.execute("UPDATE memory_summaries SET updated_at = ? WHERE topic = 'new_topic'", (new_ts,)) |
|
|
| old_timer = store.add_timer(due_at=old_ts + 60.0, duration_sec=60.0, label="old", created_at=old_ts) |
| _ = store.cancel_timer(old_timer, cancelled_at=old_ts + 30.0) |
| _ = store.add_timer(due_at=new_ts + 60.0, duration_sec=60.0, label="new-active", created_at=old_ts) |
|
|
| old_reminder = store.add_reminder(text="old completed", due_at=old_ts + 120.0, created_at=old_ts) |
| _ = store.complete_reminder(old_reminder, completed_at=old_ts + 300.0) |
| _ = store.add_reminder(text="old pending", due_at=old_ts + 180.0, created_at=old_ts) |
|
|
| store._conn.commit() |
|
|
| deleted = store.prune_retention(cutoff_ts=cutoff) |
| assert deleted["memory"] >= 1 |
| assert deleted["task_plans"] >= 1 |
| assert deleted["task_steps"] >= 1 |
| assert deleted["memory_summaries"] >= 1 |
| assert deleted["timers"] >= 1 |
| assert deleted["reminders"] >= 1 |
|
|
| remaining_memory = [row.id for row in store.recent(limit=10)] |
| assert new_memory in remaining_memory |
| assert old_memory not in remaining_memory |
|
|
| plans = store.list_task_plans(open_only=False) |
| assert any(plan.id == new_plan for plan in plans) |
| assert all(plan.id != old_plan for plan in plans) |
|
|
| summaries = store.list_summaries(limit=10) |
| topics = {item.topic for item in summaries} |
| assert "new_topic" in topics |
| assert "old_topic" not in topics |
|
|
| active_timers = store.list_timers(status="active", include_expired=True, now=cutoff + 1.0) |
| assert any(timer.label == "new-active" for timer in active_timers) |
| finally: |
| store.close() |
|
|
|
|
| def test_memory_store_encryption_round_trip(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite"), encryption_key="top-secret") |
| try: |
| memory_id = store.add_memory("secret note", kind="note") |
| raw_text = store._conn.execute("SELECT text FROM memory WHERE id = ?", (memory_id,)).fetchone()[0] |
| assert str(raw_text).startswith("enc:v1:") |
|
|
| rows = store.search_v2("secret", limit=5) |
| assert rows |
| assert rows[0].text == "secret note" |
|
|
| reminder_id = store.add_reminder(text="secret reminder", due_at=1_700_000_500.0, created_at=1_700_000_000.0) |
| raw_reminder = store._conn.execute("SELECT text FROM reminders WHERE id = ?", (reminder_id,)).fetchone()[0] |
| assert str(raw_reminder).startswith("enc:v1:") |
| reminders = store.list_reminders(status="pending") |
| assert reminders[0].text == "secret reminder" |
|
|
| status = store.memory_status() |
| assert status["encrypted"] is True |
| assert status["fts"] is False |
| finally: |
| store.close() |
|
|
|
|
| def test_memory_store_encryption_reads_legacy_plaintext(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite"), encryption_key="top-secret") |
| try: |
| store._conn.execute( |
| "INSERT INTO memory(created_at, kind, text, tags, importance, sensitivity, source) VALUES (?, ?, ?, ?, ?, ?, ?)", |
| (1_700_000_000.0, "note", "legacy row", "[]", 0.5, 0.0, "user"), |
| ) |
| store._conn.commit() |
|
|
| rows = store.search_v2("legacy", limit=5) |
| assert rows |
| assert rows[0].text == "legacy row" |
| finally: |
| store.close() |
|
|
|
|
| def test_search_v2_prefers_stronger_lexical_match_when_importance_weight_is_lower(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| best_id = store.add_memory( |
| "Project atlas launch deadline is Friday.", |
| importance=0.2, |
| source="notes", |
| ) |
| _ = store.add_memory( |
| "Atlas launch planning note.", |
| importance=1.0, |
| source="notes", |
| ) |
|
|
| rows = store.search_v2( |
| "project atlas launch deadline", |
| limit=2, |
| hybrid_weight=0.2, |
| ) |
| assert rows |
| assert rows[0].id == best_id |
| finally: |
| store.close() |
|
|
|
|
| def test_inspect_memory_candidate_detects_duplicates_and_contradictions(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| contradiction_id = store.add_memory("favorite color is blue", source="profile") |
| duplicate_id = store.add_memory("buy oat milk every week", source="profile") |
|
|
| contradiction_report = store.inspect_memory_candidate("favorite color is not blue", limit=5, fanout=50) |
| contradiction_ids = [row["memory_id"] for row in contradiction_report["contradictions"]] |
| assert contradiction_id in contradiction_ids |
|
|
| duplicate_report = store.inspect_memory_candidate("buy oat milk every week", limit=5, fanout=50) |
| duplicate_ids = [row["memory_id"] for row in duplicate_report["near_duplicates"]] |
| assert duplicate_id in duplicate_ids |
| finally: |
| store.close() |
|
|
|
|
| def test_add_memory_refreshes_embedding_when_enabled(tmp_path, monkeypatch): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| store._embedding_enabled = True |
| calls: list[tuple[int, str]] = [] |
|
|
| def _fake_refresh(memory_id: int, text: str) -> None: |
| calls.append((int(memory_id), text)) |
|
|
| monkeypatch.setattr(store, "_refresh_embedding_for_memory", _fake_refresh) |
| memory_id = store.add_memory("Lights should be warm white") |
| assert calls == [(memory_id, "Lights should be warm white")] |
| finally: |
| store.close() |
|
|
|
|
| def test_search_v2_uses_vector_results_when_lexical_misses(tmp_path, monkeypatch): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| target_id = store.add_memory("Configure evening lights to warm white.", importance=0.2) |
| _ = store.add_memory("Buy paper towels this weekend.", importance=0.9) |
| store._embedding_enabled = True |
| target = next(entry for entry in store.recent(limit=20) if entry.id == target_id) |
|
|
| def _fake_vector_search(*_args, **_kwargs): |
| return [(target, 0.99)] |
|
|
| monkeypatch.setattr(store, "_search_vector", _fake_vector_search) |
| rows = store.search_v2("make the room cozy for movie night", limit=1, hybrid_weight=0.2) |
| assert rows |
| assert rows[0].id == target_id |
| finally: |
| store.close() |
|
|
|
|
| def test_bitemporal_assertion_conflict_invalidates_prior_memory(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| first_id = store.add_memory("favorite color is blue", source="profile") |
| second_id = store.add_memory("favorite color is not blue", source="profile") |
|
|
| first_row = store._conn.execute( |
| "SELECT valid_to, superseded_by, invalidated_reason FROM memory WHERE id = ?", |
| (first_id,), |
| ).fetchone() |
| assert first_row is not None |
| assert first_row["valid_to"] is not None |
| assert int(first_row["superseded_by"]) == second_id |
| assert str(first_row["invalidated_reason"]) == "contradiction" |
|
|
| recent = store.recent(limit=10) |
| recent_ids = [row.id for row in recent] |
| assert second_id in recent_ids |
| assert first_id not in recent_ids |
| finally: |
| store.close() |
|
|
|
|
| def test_memory_doctor_and_graph_snapshot_report_assertions(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite")) |
| try: |
| _ = store.add_memory("favorite drink is coffee", source="profile") |
|
|
| doctor = store.memory_doctor() |
| assert doctor["status"] in {"ok", "degraded"} |
| assert doctor["entries_total"] == 1 |
| assert doctor["entries_active"] == 1 |
|
|
| graph = store.entity_graph_snapshot(limit=10) |
| assert graph["edge_count"] >= 1 |
| assert "favorite drink" in graph["nodes"] |
| finally: |
| store.close() |
|
|
|
|
| def test_pre_compaction_flush_updates_status_timestamp(tmp_path): |
| store = MemoryStore(str(tmp_path / "memory.sqlite"), ingest_async_enabled=True) |
| try: |
| _ = store.add_memory("status flush note") |
| flush = store.pre_compaction_flush(reason="test") |
| assert flush["reason"] == "test" |
| status = store.memory_status() |
| assert status["bitemporal"]["last_pre_compaction_flush"] is not None |
| finally: |
| store.close() |
|
|