File size: 10,308 Bytes
4454066
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
"""
Test FAISS memory system with deduplication and semantic search.

Run with: python test_memory.py
"""

import os
import sys
import logging
import tempfile
import shutil

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger(__name__)


def test_basic_memory():
    """Test basic add/search functionality."""
    from core.memory import AgentMemory
    from core.embeddings import EmbeddingModel

    logger.info("=== Test 1: Basic Memory Operations ===")

    embedder = EmbeddingModel()
    memory = AgentMemory(embedder=embedder)

    # Add some tasks
    tasks = [
        ("Calculate the sum of 2 and 2", "4"),
        ("What is 5 multiplied by 3?", "15"),
        ("Convert 100 Fahrenheit to Celsius", "37.78"),
        ("List the first 5 prime numbers", "2, 3, 5, 7, 11"),
    ]

    for task, result in tasks:
        added = memory.add(task, result)
        assert added, f"Failed to add: {task}"

    # Check stats
    stats = memory.get_stats()
    logger.info(f"Memory stats: {stats}")
    assert stats["total_items"] == len(tasks), f"Expected {len(tasks)}, got {stats['total_items']}"

    # Search for similar task
    results = memory.search("What is 2+2?", k=2)
    logger.info(f"Search results for '2+2': {len(results)} found")

    if results:
        logger.info(f"Top result: {results[0]['task']} -> {results[0]['result']} (similarity={results[0]['similarity']:.3f})")
        # Should find the "2 and 2" task
        assert results[0]['similarity'] > 0.5, "Expected high similarity"

    logger.info("✓ Basic memory operations passed\n")


def test_deduplication():
    """Test deduplication functionality."""
    from core.memory import AgentMemory
    from core.embeddings import EmbeddingModel

    logger.info("=== Test 2: Deduplication ===")

    embedder = EmbeddingModel()
    memory = AgentMemory(embedder=embedder, dedup_threshold=0.95)

    # Add original task
    task1 = "Calculate the square root of 16"
    result1 = "4"
    added1 = memory.add(task1, result1)
    assert added1, "Failed to add original task"
    logger.info(f"Added original: {task1}")

    # Try to add very similar task (should be detected as duplicate)
    task2 = "Calculate the square root of 16"  # Exact duplicate
    result2 = "4.0"
    added2 = memory.add(task2, result2)
    logger.info(f"Duplicate detection for exact match: {'Blocked' if not added2 else 'Added'}")

    # Try slightly different task (might not be duplicate)
    task3 = "What is the square root of 16?"
    result3 = "4"
    added3 = memory.add(task3, result3)
    logger.info(f"Similar but different: {'Blocked' if not added3 else 'Added'}")

    # Check total items
    stats = memory.get_stats()
    logger.info(f"Total items after dedup test: {stats['total_items']}")
    assert stats['total_items'] <= 2, f"Deduplication failed, expected <=2, got {stats['total_items']}"

    # Explicit duplicate check
    is_dup = memory.is_duplicate("Calculate the square root of 16")
    logger.info(f"Explicit duplicate check: {is_dup}")
    assert is_dup, "Should detect duplicate"

    logger.info("✓ Deduplication passed\n")


def test_semantic_search():
    """Test semantic similarity search."""
    from core.memory import AgentMemory
    from core.embeddings import EmbeddingModel

    logger.info("=== Test 3: Semantic Search ===")

    embedder = EmbeddingModel()
    memory = AgentMemory(embedder=embedder, similarity_threshold=0.5)

    # Add tasks with different topics
    tasks = [
        ("What is the capital of France?", "Paris"),
        ("What is the capital of Germany?", "Berlin"),
        ("How do I bake a chocolate cake?", "Mix flour, sugar, eggs, cocoa..."),
        ("What's the recipe for cookies?", "Mix butter, sugar, flour..."),
        ("Solve the equation x + 5 = 10", "x = 5"),
        ("What is 15 divided by 3?", "5"),
    ]

    for task, result in tasks:
        memory.add(task, result)

    # Search for capital city query (should find similar tasks)
    results = memory.search("What is the capital of Spain?", k=3)
    logger.info(f"Search 'capital of Spain' found {len(results)} results:")
    for r in results:
        logger.info(f"  - {r['task'][:50]} (sim={r['similarity']:.3f})")

    # Should find other capital queries
    if results:
        assert "capital" in results[0]["task"].lower(), "Should find capital-related tasks"

    # Search for math query
    results = memory.search("Solve x + 10 = 20", k=3)
    logger.info(f"Search 'solve equation' found {len(results)} results:")
    for r in results:
        logger.info(f"  - {r['task'][:50]} (sim={r['similarity']:.3f})")

    # Search for baking query
    results = memory.search("How to make brownies?", k=3)
    logger.info(f"Search 'make brownies' found {len(results)} results:")
    for r in results:
        logger.info(f"  - {r['task'][:50]} (sim={r['similarity']:.3f})")

    logger.info("✓ Semantic search passed\n")


def test_persistence():
    """Test save/load functionality."""
    from core.memory import AgentMemory
    from core.embeddings import EmbeddingModel

    logger.info("=== Test 4: Persistence (Save/Load) ===")

    # Create temporary directory
    temp_dir = tempfile.mkdtemp()
    try:
        save_path = os.path.join(temp_dir, "test_memory")

        # Create and populate memory
        embedder = EmbeddingModel()
        memory1 = AgentMemory(embedder=embedder)

        tasks = [
            ("Task 1", "Result 1"),
            ("Task 2", "Result 2"),
            ("Task 3", "Result 3"),
        ]

        for task, result in tasks:
            memory1.add(task, result, metadata={"source": "test"})

        # Save to disk
        memory1.save(save_path)
        logger.info(f"Saved memory to {save_path}")
        assert os.path.exists(f"{save_path}.index"), "Index file not created"
        assert os.path.exists(f"{save_path}.meta"), "Metadata file not created"

        # Create new memory and load
        memory2 = AgentMemory(embedder=embedder)
        memory2.load(save_path)
        logger.info(f"Loaded memory from {save_path}")

        # Verify loaded data
        stats1 = memory1.get_stats()
        stats2 = memory2.get_stats()
        assert stats1["total_items"] == stats2["total_items"], "Item count mismatch"
        assert stats1["dimension"] == stats2["dimension"], "Dimension mismatch"
        logger.info(f"Loaded {stats2['total_items']} items with dim={stats2['dimension']}")

        # Search in loaded memory
        results = memory2.search("Task 1", k=1)
        assert len(results) > 0, "Search in loaded memory failed"
        assert "Task 1" in results[0]["task"], "Loaded data doesn't match"
        logger.info(f"Search in loaded memory: {results[0]['task']}")

        logger.info("✓ Persistence passed\n")

    finally:
        # Cleanup
        shutil.rmtree(temp_dir)
        logger.info(f"Cleaned up {temp_dir}")


def test_threshold_behavior():
    """Test threshold filtering."""
    from core.memory import AgentMemory
    from core.embeddings import EmbeddingModel

    logger.info("=== Test 5: Threshold Behavior ===")

    embedder = EmbeddingModel()

    # Test with strict threshold
    memory_strict = AgentMemory(embedder=embedder, similarity_threshold=0.9)
    memory_strict.add("Python programming language", "A high-level language")

    results_strict = memory_strict.search("Java programming", k=5)
    logger.info(f"Strict threshold (0.9): {len(results_strict)} results")

    # Test with lenient threshold
    memory_lenient = AgentMemory(embedder=embedder, similarity_threshold=0.3)
    memory_lenient.add("Python programming language", "A high-level language")

    results_lenient = memory_lenient.search("Java programming", k=5)
    logger.info(f"Lenient threshold (0.3): {len(results_lenient)} results")

    # Lenient should find more (or equal) results
    assert len(results_lenient) >= len(results_strict), "Lenient threshold should find more results"

    logger.info("✓ Threshold behavior passed\n")


def test_metadata():
    """Test metadata storage and retrieval."""
    from core.memory import AgentMemory
    from core.embeddings import EmbeddingModel

    logger.info("=== Test 6: Metadata ===")

    embedder = EmbeddingModel()
    memory = AgentMemory(embedder=embedder)

    # Add with metadata
    memory.add(
        task="Complex calculation",
        result="42",
        metadata={
            "execution_time": 1.5,
            "tokens": 100,
            "model": "test-model"
        }
    )

    # Search and verify metadata
    results = memory.search("calculation", k=1)
    assert len(results) > 0, "Search failed"

    meta = results[0]["metadata"]
    logger.info(f"Retrieved metadata: {meta}")
    assert "execution_time" in meta, "Metadata missing"
    assert meta["execution_time"] == 1.5, "Metadata value incorrect"

    logger.info("✓ Metadata passed\n")


def test_clear():
    """Test memory clearing."""
    from core.memory import AgentMemory
    from core.embeddings import EmbeddingModel

    logger.info("=== Test 7: Clear Memory ===")

    embedder = EmbeddingModel()
    memory = AgentMemory(embedder=embedder)

    # Add items
    for i in range(5):
        memory.add(f"Task {i}", f"Result {i}")

    stats_before = memory.get_stats()
    logger.info(f"Before clear: {stats_before['total_items']} items")
    assert stats_before["total_items"] == 5

    # Clear
    memory.clear()

    stats_after = memory.get_stats()
    logger.info(f"After clear: {stats_after['total_items']} items")
    assert stats_after["total_items"] == 0, "Memory not cleared"

    logger.info("✓ Clear passed\n")


def run_all_tests():
    """Run all memory tests."""
    logger.info("Starting FAISS Memory System Tests\n")

    try:
        test_basic_memory()
        test_deduplication()
        test_semantic_search()
        test_persistence()
        test_threshold_behavior()
        test_metadata()
        test_clear()

        logger.info("=" * 50)
        logger.info("All tests passed! ✓")
        logger.info("=" * 50)

    except Exception as e:
        logger.error(f"Test failed: {e}", exc_info=True)
        sys.exit(1)


if __name__ == "__main__":
    run_all_tests()