general-reasoning-agent / test_memory.py
chmielvu's picture
feat: add production refinements (Phase 1-3)
4454066 verified
"""
Test FAISS memory system with deduplication and semantic search.
Run with: python test_memory.py
"""
import os
import sys
import logging
import tempfile
import shutil
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def test_basic_memory():
"""Test basic add/search functionality."""
from core.memory import AgentMemory
from core.embeddings import EmbeddingModel
logger.info("=== Test 1: Basic Memory Operations ===")
embedder = EmbeddingModel()
memory = AgentMemory(embedder=embedder)
# Add some tasks
tasks = [
("Calculate the sum of 2 and 2", "4"),
("What is 5 multiplied by 3?", "15"),
("Convert 100 Fahrenheit to Celsius", "37.78"),
("List the first 5 prime numbers", "2, 3, 5, 7, 11"),
]
for task, result in tasks:
added = memory.add(task, result)
assert added, f"Failed to add: {task}"
# Check stats
stats = memory.get_stats()
logger.info(f"Memory stats: {stats}")
assert stats["total_items"] == len(tasks), f"Expected {len(tasks)}, got {stats['total_items']}"
# Search for similar task
results = memory.search("What is 2+2?", k=2)
logger.info(f"Search results for '2+2': {len(results)} found")
if results:
logger.info(f"Top result: {results[0]['task']} -> {results[0]['result']} (similarity={results[0]['similarity']:.3f})")
# Should find the "2 and 2" task
assert results[0]['similarity'] > 0.5, "Expected high similarity"
logger.info("✓ Basic memory operations passed\n")
def test_deduplication():
"""Test deduplication functionality."""
from core.memory import AgentMemory
from core.embeddings import EmbeddingModel
logger.info("=== Test 2: Deduplication ===")
embedder = EmbeddingModel()
memory = AgentMemory(embedder=embedder, dedup_threshold=0.95)
# Add original task
task1 = "Calculate the square root of 16"
result1 = "4"
added1 = memory.add(task1, result1)
assert added1, "Failed to add original task"
logger.info(f"Added original: {task1}")
# Try to add very similar task (should be detected as duplicate)
task2 = "Calculate the square root of 16" # Exact duplicate
result2 = "4.0"
added2 = memory.add(task2, result2)
logger.info(f"Duplicate detection for exact match: {'Blocked' if not added2 else 'Added'}")
# Try slightly different task (might not be duplicate)
task3 = "What is the square root of 16?"
result3 = "4"
added3 = memory.add(task3, result3)
logger.info(f"Similar but different: {'Blocked' if not added3 else 'Added'}")
# Check total items
stats = memory.get_stats()
logger.info(f"Total items after dedup test: {stats['total_items']}")
assert stats['total_items'] <= 2, f"Deduplication failed, expected <=2, got {stats['total_items']}"
# Explicit duplicate check
is_dup = memory.is_duplicate("Calculate the square root of 16")
logger.info(f"Explicit duplicate check: {is_dup}")
assert is_dup, "Should detect duplicate"
logger.info("✓ Deduplication passed\n")
def test_semantic_search():
"""Test semantic similarity search."""
from core.memory import AgentMemory
from core.embeddings import EmbeddingModel
logger.info("=== Test 3: Semantic Search ===")
embedder = EmbeddingModel()
memory = AgentMemory(embedder=embedder, similarity_threshold=0.5)
# Add tasks with different topics
tasks = [
("What is the capital of France?", "Paris"),
("What is the capital of Germany?", "Berlin"),
("How do I bake a chocolate cake?", "Mix flour, sugar, eggs, cocoa..."),
("What's the recipe for cookies?", "Mix butter, sugar, flour..."),
("Solve the equation x + 5 = 10", "x = 5"),
("What is 15 divided by 3?", "5"),
]
for task, result in tasks:
memory.add(task, result)
# Search for capital city query (should find similar tasks)
results = memory.search("What is the capital of Spain?", k=3)
logger.info(f"Search 'capital of Spain' found {len(results)} results:")
for r in results:
logger.info(f" - {r['task'][:50]} (sim={r['similarity']:.3f})")
# Should find other capital queries
if results:
assert "capital" in results[0]["task"].lower(), "Should find capital-related tasks"
# Search for math query
results = memory.search("Solve x + 10 = 20", k=3)
logger.info(f"Search 'solve equation' found {len(results)} results:")
for r in results:
logger.info(f" - {r['task'][:50]} (sim={r['similarity']:.3f})")
# Search for baking query
results = memory.search("How to make brownies?", k=3)
logger.info(f"Search 'make brownies' found {len(results)} results:")
for r in results:
logger.info(f" - {r['task'][:50]} (sim={r['similarity']:.3f})")
logger.info("✓ Semantic search passed\n")
def test_persistence():
"""Test save/load functionality."""
from core.memory import AgentMemory
from core.embeddings import EmbeddingModel
logger.info("=== Test 4: Persistence (Save/Load) ===")
# Create temporary directory
temp_dir = tempfile.mkdtemp()
try:
save_path = os.path.join(temp_dir, "test_memory")
# Create and populate memory
embedder = EmbeddingModel()
memory1 = AgentMemory(embedder=embedder)
tasks = [
("Task 1", "Result 1"),
("Task 2", "Result 2"),
("Task 3", "Result 3"),
]
for task, result in tasks:
memory1.add(task, result, metadata={"source": "test"})
# Save to disk
memory1.save(save_path)
logger.info(f"Saved memory to {save_path}")
assert os.path.exists(f"{save_path}.index"), "Index file not created"
assert os.path.exists(f"{save_path}.meta"), "Metadata file not created"
# Create new memory and load
memory2 = AgentMemory(embedder=embedder)
memory2.load(save_path)
logger.info(f"Loaded memory from {save_path}")
# Verify loaded data
stats1 = memory1.get_stats()
stats2 = memory2.get_stats()
assert stats1["total_items"] == stats2["total_items"], "Item count mismatch"
assert stats1["dimension"] == stats2["dimension"], "Dimension mismatch"
logger.info(f"Loaded {stats2['total_items']} items with dim={stats2['dimension']}")
# Search in loaded memory
results = memory2.search("Task 1", k=1)
assert len(results) > 0, "Search in loaded memory failed"
assert "Task 1" in results[0]["task"], "Loaded data doesn't match"
logger.info(f"Search in loaded memory: {results[0]['task']}")
logger.info("✓ Persistence passed\n")
finally:
# Cleanup
shutil.rmtree(temp_dir)
logger.info(f"Cleaned up {temp_dir}")
def test_threshold_behavior():
"""Test threshold filtering."""
from core.memory import AgentMemory
from core.embeddings import EmbeddingModel
logger.info("=== Test 5: Threshold Behavior ===")
embedder = EmbeddingModel()
# Test with strict threshold
memory_strict = AgentMemory(embedder=embedder, similarity_threshold=0.9)
memory_strict.add("Python programming language", "A high-level language")
results_strict = memory_strict.search("Java programming", k=5)
logger.info(f"Strict threshold (0.9): {len(results_strict)} results")
# Test with lenient threshold
memory_lenient = AgentMemory(embedder=embedder, similarity_threshold=0.3)
memory_lenient.add("Python programming language", "A high-level language")
results_lenient = memory_lenient.search("Java programming", k=5)
logger.info(f"Lenient threshold (0.3): {len(results_lenient)} results")
# Lenient should find more (or equal) results
assert len(results_lenient) >= len(results_strict), "Lenient threshold should find more results"
logger.info("✓ Threshold behavior passed\n")
def test_metadata():
"""Test metadata storage and retrieval."""
from core.memory import AgentMemory
from core.embeddings import EmbeddingModel
logger.info("=== Test 6: Metadata ===")
embedder = EmbeddingModel()
memory = AgentMemory(embedder=embedder)
# Add with metadata
memory.add(
task="Complex calculation",
result="42",
metadata={
"execution_time": 1.5,
"tokens": 100,
"model": "test-model"
}
)
# Search and verify metadata
results = memory.search("calculation", k=1)
assert len(results) > 0, "Search failed"
meta = results[0]["metadata"]
logger.info(f"Retrieved metadata: {meta}")
assert "execution_time" in meta, "Metadata missing"
assert meta["execution_time"] == 1.5, "Metadata value incorrect"
logger.info("✓ Metadata passed\n")
def test_clear():
"""Test memory clearing."""
from core.memory import AgentMemory
from core.embeddings import EmbeddingModel
logger.info("=== Test 7: Clear Memory ===")
embedder = EmbeddingModel()
memory = AgentMemory(embedder=embedder)
# Add items
for i in range(5):
memory.add(f"Task {i}", f"Result {i}")
stats_before = memory.get_stats()
logger.info(f"Before clear: {stats_before['total_items']} items")
assert stats_before["total_items"] == 5
# Clear
memory.clear()
stats_after = memory.get_stats()
logger.info(f"After clear: {stats_after['total_items']} items")
assert stats_after["total_items"] == 0, "Memory not cleared"
logger.info("✓ Clear passed\n")
def run_all_tests():
"""Run all memory tests."""
logger.info("Starting FAISS Memory System Tests\n")
try:
test_basic_memory()
test_deduplication()
test_semantic_search()
test_persistence()
test_threshold_behavior()
test_metadata()
test_clear()
logger.info("=" * 50)
logger.info("All tests passed! ✓")
logger.info("=" * 50)
except Exception as e:
logger.error(f"Test failed: {e}", exc_info=True)
sys.exit(1)
if __name__ == "__main__":
run_all_tests()