MnemoCore / tests /test_consolidation.py
Granis87's picture
Initial upload of MnemoCore
dbb04e4 verified
"""
Tests for Semantic Consolidation (Phase 4.0+)
=============================================
Tests for SemanticConsolidator class verifying:
- Similar memories are merged correctly
- Distinct memories are preserved
- Queries find consolidated memories
- Highest strength is preserved during consolidation
"""
import pytest
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import numpy as np
from mnemocore.core.consolidation import SemanticConsolidator
from mnemocore.core.binary_hdv import BinaryHDV, majority_bundle
from mnemocore.core.node import MemoryNode
# Helper to create similar vectors
def create_similar_vector(base: BinaryHDV, flip_bits: int = 100) -> BinaryHDV:
"""Create a vector similar to base by flipping a small number of bits."""
bits = np.unpackbits(base.data).copy()
dim = len(bits)
# Flip random bits
indices = np.random.choice(dim, size=flip_bits, replace=False)
bits[indices] = 1 - bits[indices]
return BinaryHDV(data=np.packbits(bits), dimension=dim)
def create_distinct_vector(dimension: int = 16384) -> BinaryHDV:
"""Create a random vector (expected ~0.5 similarity to any other)."""
return BinaryHDV.random(dimension)
class TestSemanticConsolidator:
"""Tests for the SemanticConsolidator class."""
@pytest.fixture
def mock_tier_manager(self):
"""Create a mock TierManager for testing."""
manager = MagicMock()
manager.get_hot_snapshot = AsyncMock(return_value=[])
manager.list_warm = AsyncMock(return_value=[])
manager.delete_memory = AsyncMock(return_value=True)
return manager
@pytest.fixture
def consolidator(self, mock_tier_manager):
"""Create a SemanticConsolidator instance."""
return SemanticConsolidator(
tier_manager=mock_tier_manager,
similarity_threshold=0.85,
min_cluster_size=2,
)
def test_find_clusters_empty_list(self, consolidator):
"""Test find_clusters with empty input returns empty list."""
clusters = consolidator.find_clusters([], threshold=0.85)
assert clusters == []
def test_find_clusters_single_node(self, consolidator):
"""Test find_clusters with single node returns empty list."""
node = MemoryNode(
id="test1",
hdv=BinaryHDV.random(16384),
content="test content",
)
clusters = consolidator.find_clusters([node], threshold=0.85)
assert clusters == []
def test_similar_memories_are_clustered(self, consolidator):
"""Test that similar memories are grouped into clusters."""
# Create a base vector
base_vec = BinaryHDV.random(16384)
# Create similar vectors (flip ~200 bits = ~1.2% different = ~98.8% similar)
similar_vecs = [create_similar_vector(base_vec, flip_bits=200) for _ in range(3)]
# Create nodes
nodes = [
MemoryNode(id=f"similar_{i}", hdv=vec, content=f"similar content {i}")
for i, vec in enumerate(similar_vecs)
]
# Find clusters with high threshold
clusters = consolidator.find_clusters(nodes, threshold=0.95)
# All similar nodes should be in one cluster
assert len(clusters) == 1
assert len(clusters[0]) == 3
def test_distinct_memories_are_preserved(self, consolidator):
"""Test that distinct memories form separate clusters or no clusters."""
# Create distinct vectors (random = ~50% similar)
distinct_vecs = [BinaryHDV.random(16384) for _ in range(4)]
# Create nodes
nodes = [
MemoryNode(id=f"distinct_{i}", hdv=vec, content=f"distinct content {i}")
for i, vec in enumerate(distinct_vecs)
]
# Find clusters with high threshold
clusters = consolidator.find_clusters(nodes, threshold=0.85)
# With random vectors at 0.85 threshold, no clusters should form
# (random vectors have ~0.5 similarity)
assert len(clusters) == 0
def test_mixed_similar_and_distinct(self, consolidator):
"""Test with both similar and distinct memories."""
# Create two groups of similar vectors
base1 = BinaryHDV.random(16384)
base2 = BinaryHDV.random(16384)
group1 = [create_similar_vector(base1, 150) for _ in range(3)]
group2 = [create_similar_vector(base2, 150) for _ in range(2)]
# Create nodes
nodes = [
*[MemoryNode(id=f"group1_{i}", hdv=vec, content=f"group1 {i}")
for i, vec in enumerate(group1)],
*[MemoryNode(id=f"group2_{i}", hdv=vec, content=f"group2 {i}")
for i, vec in enumerate(group2)],
]
# Find clusters
clusters = consolidator.find_clusters(nodes, threshold=0.95)
# Should have 2 clusters
assert len(clusters) == 2
cluster_sizes = sorted([len(c) for c in clusters])
assert cluster_sizes == [2, 3]
def test_merge_cluster_selects_highest_ltp(self, consolidator):
"""Test that merge_cluster selects the node with highest LTP as representative."""
base_vec = BinaryHDV.random(16384)
similar_vecs = [create_similar_vector(base_vec, 100) for _ in range(3)]
# Create nodes with different LTP strengths
nodes = []
for i, vec in enumerate(similar_vecs):
node = MemoryNode(
id=f"node_{i}",
hdv=vec,
content=f"content {i}",
ltp_strength=0.3 + i * 0.2, # 0.3, 0.5, 0.7
)
nodes.append(node)
# Merge cluster
representative, pruned_ids = consolidator.merge_cluster(nodes)
# Representative should be node_2 (highest LTP = 0.7)
assert representative.id == "node_2"
assert len(pruned_ids) == 2
assert "node_0" in pruned_ids
assert "node_1" in pruned_ids
assert "node_2" not in pruned_ids
def test_merge_cluster_updates_metadata(self, consolidator):
"""Test that merge_cluster updates metadata correctly."""
base_vec = BinaryHDV.random(16384)
similar_vecs = [create_similar_vector(base_vec, 100) for _ in range(3)]
nodes = []
for i, vec in enumerate(similar_vecs):
node = MemoryNode(
id=f"node_{i}",
hdv=vec,
content=f"content {i}",
ltp_strength=0.5,
access_count=10 + i * 5,
)
nodes.append(node)
# Merge cluster
representative, _ = consolidator.merge_cluster(nodes)
# Check metadata updates
assert "consolidation_history" in representative.metadata
assert representative.metadata["consolidation_history"][0]["merged_count"] == 2
assert representative.access_count == 10 + 15 + 20 # Sum of all access counts
assert representative.ltp_strength > 0.5 # Should be boosted
def test_merge_cluster_produces_proto_vector(self, consolidator):
"""Test that merge_cluster creates a proper proto-vector via bundling."""
# Create vectors that will produce a predictable bundle
vec1 = BinaryHDV.random(16384)
vec2 = create_similar_vector(vec1, 200)
vec3 = create_similar_vector(vec1, 200)
nodes = [
MemoryNode(id="n1", hdv=vec1, content="c1", ltp_strength=0.9),
MemoryNode(id="n2", hdv=vec2, content="c2", ltp_strength=0.5),
MemoryNode(id="n3", hdv=vec3, content="c3", ltp_strength=0.3),
]
# Expected proto vector
expected_proto = majority_bundle([vec1, vec2, vec3])
# Merge cluster
representative, _ = consolidator.merge_cluster(nodes)
# Check the proto-vector matches expected
assert representative.hdv == expected_proto
@pytest.mark.asyncio
async def test_consolidate_tier_hot(self, consolidator, mock_tier_manager):
"""Test consolidate_tier with HOT tier."""
# Create similar nodes
base_vec = BinaryHDV.random(16384)
similar_vecs = [create_similar_vector(base_vec, 150) for _ in range(3)]
nodes = [
MemoryNode(id=f"hot_{i}", hdv=vec, content=f"hot content {i}", ltp_strength=0.5 + i * 0.1)
for i, vec in enumerate(similar_vecs)
]
mock_tier_manager.get_hot_snapshot.return_value = nodes
# Run consolidation
stats = await consolidator.consolidate_tier("hot", threshold=0.95)
# Check stats
assert stats["nodes_processed"] == 3
assert stats["clusters_found"] == 1
assert stats["nodes_merged"] == 2
assert stats["nodes_pruned"] == 2
# Verify delete was called for pruned nodes
assert mock_tier_manager.delete_memory.call_count == 2
@pytest.mark.asyncio
async def test_consolidate_tier_warm(self, consolidator, mock_tier_manager):
"""Test consolidate_tier with WARM tier."""
# Create distinct nodes (no clustering expected)
nodes = [
MemoryNode(id=f"warm_{i}", hdv=BinaryHDV.random(16384), content=f"warm content {i}")
for i in range(4)
]
mock_tier_manager.list_warm.return_value = nodes
# Run consolidation
stats = await consolidator.consolidate_tier("warm", threshold=0.85)
# With distinct vectors, no clusters should form
assert stats["nodes_processed"] == 4
assert stats["clusters_found"] == 0
assert stats["nodes_merged"] == 0
assert stats["nodes_pruned"] == 0
@pytest.mark.asyncio
async def test_consolidate_tier_empty(self, consolidator, mock_tier_manager):
"""Test consolidate_tier with empty tier."""
mock_tier_manager.get_hot_snapshot.return_value = []
stats = await consolidator.consolidate_tier("hot", threshold=0.85)
assert stats["nodes_processed"] == 0
assert stats["clusters_found"] == 0
class TestConsolidationIntegration:
"""Integration tests for consolidation with query finding."""
@pytest.fixture
def mock_tier_manager(self):
"""Create a mock TierManager with in-memory storage."""
manager = MagicMock()
manager.hot_storage = {}
manager.get_hot_snapshot = AsyncMock(return_value=[])
manager.list_warm = AsyncMock(return_value=[])
manager.delete_memory = AsyncMock(side_effect=lambda nid: True)
return manager
@pytest.fixture
def consolidator(self, mock_tier_manager):
"""Create a SemanticConsolidator instance."""
return SemanticConsolidator(
tier_manager=mock_tier_manager,
similarity_threshold=0.85,
min_cluster_size=2,
)
@pytest.mark.asyncio
async def test_query_finds_consolidated_memory(self, consolidator, mock_tier_manager):
"""Test that a query can find a consolidated/merged memory."""
# Create a cluster of similar memories about "machine learning"
base_vec = BinaryHDV.random(16384)
similar_vecs = [create_similar_vector(base_vec, 150) for _ in range(3)]
nodes = [
MemoryNode(
id=f"ml_{i}",
hdv=vec,
content=f"machine learning concept {i}",
ltp_strength=0.5 + i * 0.1,
)
for i, vec in enumerate(similar_vecs)
]
# Set up mock to return these nodes
mock_tier_manager.get_hot_snapshot.return_value = nodes
# Store original vectors for later query simulation
query_vec = similar_vecs[0] # Query with one of the similar vectors
# Run consolidation
stats = await consolidator.consolidate_tier("hot", threshold=0.95)
# Verify consolidation happened
assert stats["nodes_merged"] == 2
assert stats["nodes_pruned"] == 2
# The representative should have a proto-vector that is still similar
# to the query vector (majority bundle preserves semantic content)
representative = max(nodes, key=lambda n: n.ltp_strength)
similarity = query_vec.similarity(representative.hdv)
# The proto-vector should be highly similar to the query
# (since all cluster members were similar)
assert similarity >= 0.90, f"Expected similarity >= 0.90, got {similarity}"
@pytest.mark.asyncio
async def test_consolidation_preserves_highest_strength(self, consolidator, mock_tier_manager):
"""Test that consolidation preserves and boosts the highest LTP strength."""
base_vec = BinaryHDV.random(16384)
similar_vecs = [create_similar_vector(base_vec, 100) for _ in range(4)]
# Create nodes with varying LTP strengths
ltp_values = [0.3, 0.5, 0.9, 0.4] # Index 2 has highest
nodes = [
MemoryNode(
id=f"node_{i}",
hdv=vec,
content=f"content {i}",
ltp_strength=ltp_values[i],
)
for i, vec in enumerate(similar_vecs)
]
original_highest_ltp = max(ltp_values) # 0.9
original_highest_id = "node_2"
mock_tier_manager.get_hot_snapshot.return_value = nodes
# Run consolidation
await consolidator.consolidate_tier("hot", threshold=0.95)
# Find the representative (should be node_2)
representative = next(n for n in nodes if n.id == original_highest_id)
# Verify LTP was boosted
assert representative.ltp_strength > original_highest_ltp, \
f"LTP should be boosted from {original_highest_ltp} to {representative.ltp_strength}"
# Verify other nodes would be pruned
assert representative.id == original_highest_id
class TestConsolidationThreshold:
"""Tests for threshold behavior."""
@pytest.fixture
def consolidator(self):
"""Create a basic consolidator."""
manager = MagicMock()
manager.get_hot_snapshot = AsyncMock(return_value=[])
manager.list_warm = AsyncMock(return_value=[])
manager.delete_memory = AsyncMock(return_value=True)
return SemanticConsolidator(
tier_manager=manager,
similarity_threshold=0.85,
)
def test_threshold_85_clusters_similar(self, consolidator):
"""Test that 0.85 threshold correctly clusters similar memories."""
# Create vectors with ~10% difference (0.90 similarity)
base = BinaryHDV.random(16384)
# Flip ~1640 bits for 10% difference
similar = create_similar_vector(base, flip_bits=1640)
nodes = [
MemoryNode(id="n1", hdv=base, content="base"),
MemoryNode(id="n2", hdv=similar, content="similar"),
]
# At 0.85 threshold, these should cluster
clusters = consolidator.find_clusters(nodes, threshold=0.85)
assert len(clusters) == 1
def test_threshold_85_separates_distinct(self, consolidator):
"""Test that 0.85 threshold keeps distinct memories separate."""
# Create truly random vectors (expected ~0.5 similarity)
nodes = [
MemoryNode(id=f"rand_{i}", hdv=BinaryHDV.random(16384), content=f"random {i}")
for i in range(3)
]
# At 0.85 threshold, these should NOT cluster
clusters = consolidator.find_clusters(nodes, threshold=0.85)
# Random vectors are ~0.5 similar, so no clusters at 0.85
assert len(clusters) == 0
def test_threshold_motivation(self, consolidator):
"""
Test the motivation for 0.85 threshold.
Rationale:
- Random binary HDVs have expected similarity ~0.5 (Kanerva, 2009)
- Similarity >= 0.85 is well above random chance
- For 16384 dimensions: 0.85 similarity = 2457 differing bits
- This captures semantic kinship while avoiding false positives
"""
# Verify random vectors cluster at ~0.5 similarity
random_pairs_similarities = []
for _ in range(10):
v1 = BinaryHDV.random(16384)
v2 = BinaryHDV.random(16384)
sim = v1.similarity(v2)
random_pairs_similarities.append(sim)
avg_random_similarity = np.mean(random_pairs_similarities)
# Random vectors should be ~0.5 similar
assert 0.45 <= avg_random_similarity <= 0.55, \
f"Random vectors should be ~0.5 similar, got {avg_random_similarity}"
# Create semantically similar vectors (flip 10% of bits)
base = BinaryHDV.random(16384)
similar = create_similar_vector(base, flip_bits=1640)
similar_similarity = base.similarity(similar)
# Should be ~0.90 similar (10% flipped)
assert similar_similarity >= 0.85, \
f"Similar vectors should be >= 0.85 similar, got {similar_similarity}"
if __name__ == "__main__":
pytest.main([__file__, "-v"])