Spaces:
Sleeping
Sleeping
| """ | |
| tests/test_properties.py — C2.1 | |
| Hypothesis property-based tests for the folder-based memory system. | |
| All 8 properties from the spec. | |
| Note: Each property test creates a fresh isolated SQLite DB per hypothesis | |
| example using a shared counter, avoiding state accumulation between examples. | |
| """ | |
| import sys | |
| import os | |
| import datetime | |
| import tempfile | |
| import pytest | |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) | |
| try: | |
| from hypothesis import given, settings, assume, HealthCheck | |
| from hypothesis import strategies as st | |
| HYPOTHESIS_AVAILABLE = True | |
| except ImportError: | |
| HYPOTHESIS_AVAILABLE = False | |
| pytestmark = pytest.mark.skipif( | |
| not HYPOTHESIS_AVAILABLE, | |
| reason="hypothesis not installed — run: pip install hypothesis" | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Helpers | |
| # --------------------------------------------------------------------------- | |
| _counter = [0] | |
| def _fresh_kv(): | |
| """Create a brand-new isolated StateKV in a temp directory.""" | |
| from db import StateKV | |
| os.environ.pop("AGENTCACHE_SECRET", None) | |
| _counter[0] += 1 | |
| d = tempfile.mkdtemp(prefix=f"agmem_prop_{_counter[0]}_") | |
| return StateKV(db_path=os.path.join(d, "test.db")) | |
| def _now(): | |
| return datetime.datetime.utcnow().isoformat() + "Z" | |
| def _safe_path(): | |
| """Strategy for valid, non-traversal folder paths.""" | |
| return st.from_regex( | |
| r"[a-zA-Z][a-zA-Z0-9_-]{0,20}/[a-zA-Z][a-zA-Z0-9_-]{0,20}", | |
| fullmatch=True, | |
| ) | |
| def _safe_agent(): | |
| return st.from_regex(r"[a-z][a-z0-9_-]{1,12}", fullmatch=True) | |
| def _safe_text(): | |
| return st.text( | |
| alphabet=st.characters( | |
| whitelist_categories=("Lu", "Ll", "Nd", "Zs"), | |
| whitelist_characters="_-.,()", | |
| ), | |
| min_size=5, | |
| max_size=200, | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Property 1: Pair Isolation | |
| # Two distinct (folderPath, agentId) pairs never share observations. | |
| # --------------------------------------------------------------------------- | |
| def test_property_1_pair_isolation(path1, agent1, path2, agent2, text): | |
| assume((path1, agent1) != (path2, agent2)) | |
| from functions import folder_observe, KV | |
| kv = _fresh_kv() | |
| folder_observe(kv, {"folderPath": path1, "agentId": agent1, "text": text, "timestamp": _now()}) | |
| scope1 = KV.folder_obs(path1, agent1) | |
| scope2 = KV.folder_obs(path2, agent2) | |
| obs1_ids = {o["id"] for o in kv.list(scope1)} | |
| obs2_ids = {o["id"] for o in kv.list(scope2)} | |
| assert obs1_ids.isdisjoint(obs2_ids) | |
| # --------------------------------------------------------------------------- | |
| # Property 2: Observation Count Consistency | |
| # meta.obsCount == len(kv.list(folder_obs_scope)) | |
| # --------------------------------------------------------------------------- | |
| def test_property_2_obs_count_consistency(path, agent, texts): | |
| from functions import folder_observe, KV | |
| kv = _fresh_kv() | |
| for text in texts: | |
| folder_observe(kv, {"folderPath": path, "agentId": agent, "text": text, "timestamp": _now()}) | |
| meta_scope = KV.folder_meta(path, agent) | |
| meta = kv.get(meta_scope, "meta") | |
| assert meta is not None | |
| actual_obs = kv.list(KV.folder_obs(path, agent)) | |
| assert meta["obsCount"] == len(actual_obs) | |
| # --------------------------------------------------------------------------- | |
| # Property 3: Index Coverage | |
| # Every written pair has a KV.folders entry. | |
| # --------------------------------------------------------------------------- | |
| def test_property_3_index_coverage(path, agent, text): | |
| from functions import folder_observe, KV | |
| kv = _fresh_kv() | |
| folder_observe(kv, {"folderPath": path, "agentId": agent, "text": text, "timestamp": _now()}) | |
| index_entries = kv.list(KV.folders) | |
| normalized_path = path.replace("\\", "/").strip("/") | |
| normalized_agent = agent.strip() | |
| matching = [ | |
| e for e in index_entries | |
| if e.get("folderPath") == normalized_path and e.get("agentId") == normalized_agent | |
| ] | |
| assert len(matching) >= 1 | |
| # --------------------------------------------------------------------------- | |
| # Property 4: Privacy Invariant | |
| # No stored obs text contains raw secrets after folder_observe(). | |
| # --------------------------------------------------------------------------- | |
| def test_property_4_privacy_invariant(path, agent, prefix): | |
| from functions import folder_observe, KV | |
| kv = _fresh_kv() | |
| secret_text = f"My api_key = sk-proj-{prefix}abc123def456ghi789jkl012 in production" | |
| folder_observe(kv, { | |
| "folderPath": path, | |
| "agentId": agent, | |
| "text": secret_text, | |
| "timestamp": _now(), | |
| }) | |
| obs_list = kv.list(KV.folder_obs(path, agent)) | |
| for obs in obs_list: | |
| stored_text = obs.get("text", "") | |
| assert "sk-proj-" not in stored_text | |
| # --------------------------------------------------------------------------- | |
| # Property 5: Timeline Ordering | |
| # folder_timeline() always returns results sorted newest-first. | |
| # --------------------------------------------------------------------------- | |
| def test_property_5_timeline_ordering(path, agent, n): | |
| from functions import folder_observe, folder_timeline | |
| kv = _fresh_kv() | |
| base_ts = datetime.datetime(2025, 1, 1, 0, 0, 0) | |
| for i in range(n): | |
| ts = (base_ts + datetime.timedelta(minutes=i)).isoformat() + "Z" | |
| folder_observe(kv, { | |
| "folderPath": path, | |
| "agentId": agent, | |
| "text": f"Observation number {i}", | |
| "timestamp": ts, | |
| }) | |
| results = folder_timeline(kv, limit=100, folder_path=path, agent_id=agent) | |
| timestamps = [r["timestamp"] for r in results] | |
| assert timestamps == sorted(timestamps, reverse=True) | |
| # --------------------------------------------------------------------------- | |
| # Property 6: Forget Completeness | |
| # After forget({folderPath, agentId}), all three scopes are empty. | |
| # --------------------------------------------------------------------------- | |
| def test_property_6_forget_completeness(path, agent, texts): | |
| from functions import folder_observe, forget, KV | |
| kv = _fresh_kv() | |
| for text in texts: | |
| folder_observe(kv, {"folderPath": path, "agentId": agent, "text": text, "timestamp": _now()}) | |
| assert len(kv.list(KV.folder_obs(path, agent))) > 0 | |
| forget(kv, {"folderPath": path, "agentId": agent}) | |
| normalized_path = path.replace("\\", "/").strip("/") | |
| normalized_agent = agent.strip() | |
| index_key = f"{normalized_path}:{normalized_agent}" | |
| assert kv.list(KV.folder_obs(normalized_path, normalized_agent)) == [] | |
| assert kv.get(KV.folder_meta(normalized_path, normalized_agent), "meta") is None | |
| assert kv.get(KV.folders, index_key) is None | |
| # --------------------------------------------------------------------------- | |
| # Property 7: Memory Version Uniqueness | |
| # Superseded memories have parentId; at least one memory is always latest. | |
| # --------------------------------------------------------------------------- | |
| def test_property_7_memory_version_uniqueness(base_content, n_variants): | |
| from functions import remember, KV | |
| kv = _fresh_kv() | |
| for i in range(n_variants): | |
| content = base_content + f" variant {i}" | |
| remember(kv, {"content": content}) | |
| all_mems = kv.list(KV.memories) | |
| # Build sets for validation | |
| all_ids = {m["id"] for m in all_mems} | |
| superseded_ids = {m["id"] for m in all_mems if m.get("isLatest") is False} | |
| # Every superseded memory must be referenced by exactly one newer memory via parentId | |
| for m in all_mems: | |
| pid = m.get("parentId") | |
| if pid: | |
| # The parentId must point to a memory that exists and is marked isLatest=False | |
| parent = next((x for x in all_mems if x["id"] == pid), None) | |
| assert parent is not None, f"parentId {pid} not found in memories" | |
| assert parent.get("isLatest") is False, "Parent of superseding memory must be isLatest=False" | |
| # At least one memory must be latest | |
| latest_count = sum(1 for m in all_mems if m.get("isLatest") is True) | |
| assert latest_count >= 1 | |
| # --------------------------------------------------------------------------- | |
| # Property 8: Path Normalization Idempotency | |
| # normalize(normalize(p)) == normalize(p) for all valid inputs. | |
| # --------------------------------------------------------------------------- | |
| def test_property_8_path_normalization_idempotency(path): | |
| from functions import normalize_folder_path | |
| try: | |
| normalized_once = normalize_folder_path(path) | |
| normalized_twice = normalize_folder_path(normalized_once) | |
| assert normalized_once == normalized_twice | |
| except ValueError: | |
| pass | |