agentcache / tests /test_properties.py
Yash030's picture
feat: migrate agentmemory to agentcache namespace, endpoints, and tools
12a6c9a
Raw
History Blame Contribute Delete
10.2 kB
"""
tests/test_properties.py — C2.1
Hypothesis property-based tests for the folder-based memory system.
All 8 properties from the spec.
Note: Each property test creates a fresh isolated SQLite DB per hypothesis
example using a shared counter, avoiding state accumulation between examples.
"""
import sys
import os
import datetime
import tempfile
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
try:
from hypothesis import given, settings, assume, HealthCheck
from hypothesis import strategies as st
HYPOTHESIS_AVAILABLE = True
except ImportError:
HYPOTHESIS_AVAILABLE = False
pytestmark = pytest.mark.skipif(
not HYPOTHESIS_AVAILABLE,
reason="hypothesis not installed — run: pip install hypothesis"
)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
_counter = [0]
def _fresh_kv():
"""Create a brand-new isolated StateKV in a temp directory."""
from db import StateKV
os.environ.pop("AGENTCACHE_SECRET", None)
_counter[0] += 1
d = tempfile.mkdtemp(prefix=f"agmem_prop_{_counter[0]}_")
return StateKV(db_path=os.path.join(d, "test.db"))
def _now():
return datetime.datetime.utcnow().isoformat() + "Z"
def _safe_path():
"""Strategy for valid, non-traversal folder paths."""
return st.from_regex(
r"[a-zA-Z][a-zA-Z0-9_-]{0,20}/[a-zA-Z][a-zA-Z0-9_-]{0,20}",
fullmatch=True,
)
def _safe_agent():
return st.from_regex(r"[a-z][a-z0-9_-]{1,12}", fullmatch=True)
def _safe_text():
return st.text(
alphabet=st.characters(
whitelist_categories=("Lu", "Ll", "Nd", "Zs"),
whitelist_characters="_-.,()",
),
min_size=5,
max_size=200,
)
# ---------------------------------------------------------------------------
# Property 1: Pair Isolation
# Two distinct (folderPath, agentId) pairs never share observations.
# ---------------------------------------------------------------------------
@settings(max_examples=50, deadline=None)
@given(
path1=_safe_path(),
agent1=_safe_agent(),
path2=_safe_path(),
agent2=_safe_agent(),
text=_safe_text(),
)
def test_property_1_pair_isolation(path1, agent1, path2, agent2, text):
assume((path1, agent1) != (path2, agent2))
from functions import folder_observe, KV
kv = _fresh_kv()
folder_observe(kv, {"folderPath": path1, "agentId": agent1, "text": text, "timestamp": _now()})
scope1 = KV.folder_obs(path1, agent1)
scope2 = KV.folder_obs(path2, agent2)
obs1_ids = {o["id"] for o in kv.list(scope1)}
obs2_ids = {o["id"] for o in kv.list(scope2)}
assert obs1_ids.isdisjoint(obs2_ids)
# ---------------------------------------------------------------------------
# Property 2: Observation Count Consistency
# meta.obsCount == len(kv.list(folder_obs_scope))
# ---------------------------------------------------------------------------
@settings(max_examples=50, deadline=None)
@given(
path=_safe_path(),
agent=_safe_agent(),
texts=st.lists(_safe_text(), min_size=1, max_size=10),
)
def test_property_2_obs_count_consistency(path, agent, texts):
from functions import folder_observe, KV
kv = _fresh_kv()
for text in texts:
folder_observe(kv, {"folderPath": path, "agentId": agent, "text": text, "timestamp": _now()})
meta_scope = KV.folder_meta(path, agent)
meta = kv.get(meta_scope, "meta")
assert meta is not None
actual_obs = kv.list(KV.folder_obs(path, agent))
assert meta["obsCount"] == len(actual_obs)
# ---------------------------------------------------------------------------
# Property 3: Index Coverage
# Every written pair has a KV.folders entry.
# ---------------------------------------------------------------------------
@settings(max_examples=50, deadline=None)
@given(
path=_safe_path(),
agent=_safe_agent(),
text=_safe_text(),
)
def test_property_3_index_coverage(path, agent, text):
from functions import folder_observe, KV
kv = _fresh_kv()
folder_observe(kv, {"folderPath": path, "agentId": agent, "text": text, "timestamp": _now()})
index_entries = kv.list(KV.folders)
normalized_path = path.replace("\\", "/").strip("/")
normalized_agent = agent.strip()
matching = [
e for e in index_entries
if e.get("folderPath") == normalized_path and e.get("agentId") == normalized_agent
]
assert len(matching) >= 1
# ---------------------------------------------------------------------------
# Property 4: Privacy Invariant
# No stored obs text contains raw secrets after folder_observe().
# ---------------------------------------------------------------------------
@settings(max_examples=30, deadline=None)
@given(
path=_safe_path(),
agent=_safe_agent(),
prefix=st.text(alphabet="abcdefghijklmnop", min_size=3, max_size=10),
)
def test_property_4_privacy_invariant(path, agent, prefix):
from functions import folder_observe, KV
kv = _fresh_kv()
secret_text = f"My api_key = sk-proj-{prefix}abc123def456ghi789jkl012 in production"
folder_observe(kv, {
"folderPath": path,
"agentId": agent,
"text": secret_text,
"timestamp": _now(),
})
obs_list = kv.list(KV.folder_obs(path, agent))
for obs in obs_list:
stored_text = obs.get("text", "")
assert "sk-proj-" not in stored_text
# ---------------------------------------------------------------------------
# Property 5: Timeline Ordering
# folder_timeline() always returns results sorted newest-first.
# ---------------------------------------------------------------------------
@settings(max_examples=40, deadline=None)
@given(
path=_safe_path(),
agent=_safe_agent(),
n=st.integers(min_value=2, max_value=8),
)
def test_property_5_timeline_ordering(path, agent, n):
from functions import folder_observe, folder_timeline
kv = _fresh_kv()
base_ts = datetime.datetime(2025, 1, 1, 0, 0, 0)
for i in range(n):
ts = (base_ts + datetime.timedelta(minutes=i)).isoformat() + "Z"
folder_observe(kv, {
"folderPath": path,
"agentId": agent,
"text": f"Observation number {i}",
"timestamp": ts,
})
results = folder_timeline(kv, limit=100, folder_path=path, agent_id=agent)
timestamps = [r["timestamp"] for r in results]
assert timestamps == sorted(timestamps, reverse=True)
# ---------------------------------------------------------------------------
# Property 6: Forget Completeness
# After forget({folderPath, agentId}), all three scopes are empty.
# ---------------------------------------------------------------------------
@settings(max_examples=40, deadline=None)
@given(
path=_safe_path(),
agent=_safe_agent(),
texts=st.lists(_safe_text(), min_size=1, max_size=5),
)
def test_property_6_forget_completeness(path, agent, texts):
from functions import folder_observe, forget, KV
kv = _fresh_kv()
for text in texts:
folder_observe(kv, {"folderPath": path, "agentId": agent, "text": text, "timestamp": _now()})
assert len(kv.list(KV.folder_obs(path, agent))) > 0
forget(kv, {"folderPath": path, "agentId": agent})
normalized_path = path.replace("\\", "/").strip("/")
normalized_agent = agent.strip()
index_key = f"{normalized_path}:{normalized_agent}"
assert kv.list(KV.folder_obs(normalized_path, normalized_agent)) == []
assert kv.get(KV.folder_meta(normalized_path, normalized_agent), "meta") is None
assert kv.get(KV.folders, index_key) is None
# ---------------------------------------------------------------------------
# Property 7: Memory Version Uniqueness
# Superseded memories have parentId; at least one memory is always latest.
# ---------------------------------------------------------------------------
@settings(max_examples=30, suppress_health_check=[HealthCheck.filter_too_much], deadline=None)
@given(
base_content=st.text(
alphabet="abcdefghijklmnopqrstuvwxyz ",
min_size=30,
max_size=80,
).filter(lambda s: len(s.split()) >= 6),
n_variants=st.integers(min_value=2, max_value=4),
)
def test_property_7_memory_version_uniqueness(base_content, n_variants):
from functions import remember, KV
kv = _fresh_kv()
for i in range(n_variants):
content = base_content + f" variant {i}"
remember(kv, {"content": content})
all_mems = kv.list(KV.memories)
# Build sets for validation
all_ids = {m["id"] for m in all_mems}
superseded_ids = {m["id"] for m in all_mems if m.get("isLatest") is False}
# Every superseded memory must be referenced by exactly one newer memory via parentId
for m in all_mems:
pid = m.get("parentId")
if pid:
# The parentId must point to a memory that exists and is marked isLatest=False
parent = next((x for x in all_mems if x["id"] == pid), None)
assert parent is not None, f"parentId {pid} not found in memories"
assert parent.get("isLatest") is False, "Parent of superseding memory must be isLatest=False"
# At least one memory must be latest
latest_count = sum(1 for m in all_mems if m.get("isLatest") is True)
assert latest_count >= 1
# ---------------------------------------------------------------------------
# Property 8: Path Normalization Idempotency
# normalize(normalize(p)) == normalize(p) for all valid inputs.
# ---------------------------------------------------------------------------
@settings(max_examples=100, deadline=None)
@given(
path=st.text(
alphabet="abcdefghijklmnopqrstuvwxyz0123456789/_-.",
min_size=2,
max_size=100,
).filter(lambda p: ".." not in p and p.strip("/") != ""),
)
def test_property_8_path_normalization_idempotency(path):
from functions import normalize_folder_path
try:
normalized_once = normalize_folder_path(path)
normalized_twice = normalize_folder_path(normalized_once)
assert normalized_once == normalized_twice
except ValueError:
pass