Spaces:
Sleeping
Sleeping
| """Integration tests for memory tracking with real stores. | |
| These tests verify that memory tracking works correctly with actual | |
| store implementations - no mocks, no simulations. | |
| """ | |
| from __future__ import annotations | |
| import pytest | |
| from headroom.memory.tracker import MemoryTracker | |
| # Check HNSW availability for skipping tests | |
| try: | |
| from headroom.memory.adapters.hnsw import _check_hnswlib_available | |
| HNSW_AVAILABLE = _check_hnswlib_available() | |
| except ImportError: | |
| HNSW_AVAILABLE = False | |
| class TestCompressionStoreMemoryTracking: | |
| """Tests for CompressionStore memory tracking integration.""" | |
| def reset_tracker(self): | |
| """Reset the tracker singleton before each test.""" | |
| MemoryTracker.reset() | |
| yield | |
| MemoryTracker.reset() | |
| def test_compression_store_reports_memory_stats(self): | |
| """Test that CompressionStore correctly reports memory stats.""" | |
| from headroom.cache.compression_store import CompressionStore | |
| store = CompressionStore(max_entries=100) | |
| # Add some data - store(original, compressed, ...) | |
| store.store("original content 1" * 100, "compressed1") | |
| store.store("original content 2" * 100, "compressed2") | |
| stats = store.get_memory_stats() | |
| assert stats.name == "compression_store" | |
| assert stats.entry_count == 2 | |
| assert stats.size_bytes > 0 | |
| # budget_bytes is None since CompressionStore uses entry count limit not byte limit | |
| def test_compression_store_tracks_hits(self): | |
| """Test that CompressionStore tracks cache hits.""" | |
| from headroom.cache.compression_store import CompressionStore | |
| store = CompressionStore(max_entries=100) | |
| # Store and retrieve (hit) | |
| hash_key = store.store("original content", "compressed") | |
| store.retrieve(hash_key) # Hit - increments retrieval_count | |
| store.retrieve(hash_key) # Another retrieval | |
| store.retrieve("nonexistent_hash") # Miss (not tracked) | |
| stats = store.get_memory_stats() | |
| # Hits counts entries with retrieval_count > 0, not total retrievals | |
| assert stats.hits == 1 # 1 entry has been retrieved | |
| # CompressionStore doesn't track misses | |
| assert stats.misses == 0 | |
| def test_compression_store_registers_with_tracker(self): | |
| """Test that CompressionStore can register with MemoryTracker.""" | |
| from headroom.cache.compression_store import CompressionStore | |
| tracker = MemoryTracker.get() | |
| store = CompressionStore(max_entries=100) | |
| # Register the store | |
| tracker.register("compression_store", store.get_memory_stats) | |
| # Verify it's registered | |
| assert "compression_store" in tracker.registered_components | |
| # Get stats through tracker | |
| stats = tracker.get_component_stats("compression_store") | |
| assert stats is not None | |
| assert stats.name == "compression_store" | |
| class TestBatchContextStoreMemoryTracking: | |
| """Tests for BatchContextStore memory tracking integration.""" | |
| def reset_tracker(self): | |
| """Reset the tracker singleton before each test.""" | |
| MemoryTracker.reset() | |
| yield | |
| MemoryTracker.reset() | |
| def test_batch_context_store_reports_memory_stats(self): | |
| """Test that BatchContextStore correctly reports memory stats.""" | |
| from headroom.ccr.batch_store import ( | |
| BatchContext, | |
| BatchContextStore, | |
| BatchRequestContext, | |
| ) | |
| store = BatchContextStore(ttl=3600, max_contexts=100) | |
| # Add some batch contexts | |
| ctx1 = BatchContext(batch_id="batch_1", provider="anthropic") | |
| ctx1.add_request( | |
| BatchRequestContext( | |
| custom_id="req_1", | |
| messages=[{"role": "user", "content": "Hello world"}], | |
| model="claude-3-opus", | |
| ) | |
| ) | |
| ctx2 = BatchContext(batch_id="batch_2", provider="openai") | |
| ctx2.add_request( | |
| BatchRequestContext( | |
| custom_id="req_2", | |
| messages=[{"role": "user", "content": "Test message"}], | |
| model="gpt-4", | |
| ) | |
| ) | |
| # Store them (sync for testing - accessing internal dict) | |
| store._contexts["batch_1"] = ctx1 | |
| store._contexts["batch_2"] = ctx2 | |
| stats = store.get_memory_stats() | |
| assert stats.name == "batch_context_store" | |
| assert stats.entry_count == 2 | |
| assert stats.size_bytes > 0 | |
| def test_batch_context_store_registers_with_tracker(self): | |
| """Test that BatchContextStore can register with MemoryTracker.""" | |
| from headroom.ccr.batch_store import BatchContextStore | |
| tracker = MemoryTracker.get() | |
| store = BatchContextStore() | |
| # Register the store | |
| tracker.register("batch_context_store", store.get_memory_stats) | |
| # Verify it's registered | |
| assert "batch_context_store" in tracker.registered_components | |
| class TestGraphStoreMemoryTracking: | |
| """Tests for InMemoryGraphStore memory tracking integration.""" | |
| def reset_tracker(self): | |
| """Reset the tracker singleton before each test.""" | |
| MemoryTracker.reset() | |
| yield | |
| MemoryTracker.reset() | |
| async def test_graph_store_reports_memory_stats(self): | |
| """Test that InMemoryGraphStore correctly reports memory stats.""" | |
| from headroom.memory.adapters.graph import InMemoryGraphStore | |
| from headroom.memory.adapters.graph_models import Entity, Relationship | |
| store = InMemoryGraphStore() | |
| # Add some entities using the correct API | |
| entity1 = Entity(id="node1", user_id="test", name="Test Entity 1", entity_type="entity") | |
| entity2 = Entity(id="node2", user_id="test", name="Test Entity 2", entity_type="entity") | |
| entity3 = Entity(id="node3", user_id="test", name="Test Concept", entity_type="concept") | |
| await store.add_entity(entity1) | |
| await store.add_entity(entity2) | |
| await store.add_entity(entity3) | |
| # Add a relationship | |
| rel = Relationship( | |
| source_id="node1", | |
| target_id="node2", | |
| relation_type="related_to", | |
| user_id="test", | |
| ) | |
| await store.add_relationship(rel) | |
| stats = store.get_memory_stats() | |
| assert stats.name == "graph_store" | |
| assert stats.entry_count == 4 # 3 entities + 1 relationship | |
| assert stats.size_bytes > 0 | |
| async def test_graph_store_size_grows_with_data(self): | |
| """Test that reported size grows as data is added.""" | |
| from headroom.memory.adapters.graph import InMemoryGraphStore | |
| from headroom.memory.adapters.graph_models import Entity | |
| store = InMemoryGraphStore() | |
| # Get initial size | |
| initial_stats = store.get_memory_stats() | |
| initial_size = initial_stats.size_bytes | |
| # Add data | |
| for i in range(100): | |
| entity = Entity( | |
| id=f"node_{i}", | |
| user_id="test", | |
| name=f"Entity {i}", | |
| entity_type="entity", | |
| properties={"data": "x" * 100}, | |
| ) | |
| await store.add_entity(entity) | |
| # Get new size | |
| final_stats = store.get_memory_stats() | |
| assert final_stats.size_bytes > initial_size | |
| assert final_stats.entry_count == 100 | |
| class TestHNSWVectorIndexMemoryTracking: | |
| """Tests for HNSWVectorIndex memory tracking integration.""" | |
| def reset_tracker(self): | |
| """Reset the tracker singleton before each test.""" | |
| MemoryTracker.reset() | |
| yield | |
| MemoryTracker.reset() | |
| async def test_hnsw_index_reports_memory_stats(self): | |
| """Test that HNSWVectorIndex correctly reports memory stats.""" | |
| from headroom.memory.adapters.hnsw import HNSWVectorIndex | |
| from headroom.memory.models import Memory | |
| index = HNSWVectorIndex(dimension=128) | |
| # Add some vectors using Memory objects | |
| import numpy as np | |
| for i in range(10): | |
| embedding = np.random.rand(128).astype(np.float32).tolist() | |
| memory = Memory( | |
| id=f"mem_{i}", | |
| content=f"Test memory {i}", | |
| user_id="test_user", | |
| embedding=embedding, | |
| ) | |
| await index.index(memory) | |
| stats = index.get_memory_stats() | |
| assert stats.name == "vector_index" | |
| assert stats.entry_count == 10 | |
| assert stats.size_bytes > 0 | |
| async def test_hnsw_index_size_grows_with_vectors(self): | |
| """Test that reported size grows as vectors are added.""" | |
| from headroom.memory.adapters.hnsw import HNSWVectorIndex | |
| from headroom.memory.models import Memory | |
| index = HNSWVectorIndex(dimension=256) | |
| # Get initial size | |
| initial_stats = index.get_memory_stats() | |
| initial_size = initial_stats.size_bytes | |
| # Add vectors | |
| import numpy as np | |
| for i in range(100): | |
| embedding = np.random.rand(256).astype(np.float32).tolist() | |
| memory = Memory( | |
| id=f"mem_{i}", | |
| content=f"Test memory {i}", | |
| user_id="test_user", | |
| embedding=embedding, | |
| ) | |
| await index.index(memory) | |
| # Get new size | |
| final_stats = index.get_memory_stats() | |
| assert final_stats.size_bytes > initial_size | |
| assert final_stats.entry_count == 100 | |
| class TestTrackerIntegrationWithMultipleStores: | |
| """Tests for MemoryTracker with multiple real stores.""" | |
| def reset_tracker(self): | |
| """Reset the tracker singleton before each test.""" | |
| MemoryTracker.reset() | |
| yield | |
| MemoryTracker.reset() | |
| async def test_tracker_aggregates_multiple_stores(self): | |
| """Test that tracker correctly aggregates stats from multiple stores.""" | |
| from headroom.cache.compression_store import CompressionStore | |
| from headroom.ccr.batch_store import BatchContextStore | |
| from headroom.memory.adapters.graph import InMemoryGraphStore | |
| from headroom.memory.adapters.graph_models import Entity | |
| tracker = MemoryTracker.get() | |
| # Create stores | |
| compression_store = CompressionStore(max_entries=100) | |
| batch_store = BatchContextStore() | |
| graph_store = InMemoryGraphStore() | |
| # Add some data | |
| compression_store.store("original content" * 50, "compressed") | |
| entity = Entity(id="node1", user_id="test", name="Test", entity_type="entity") | |
| await graph_store.add_entity(entity) | |
| # Register all stores | |
| tracker.register("compression_store", compression_store.get_memory_stats) | |
| tracker.register("batch_context_store", batch_store.get_memory_stats) | |
| tracker.register("graph_store", graph_store.get_memory_stats) | |
| # Get total | |
| total = tracker.get_total_tracked_bytes() | |
| # Should be sum of all stores | |
| cs_stats = compression_store.get_memory_stats() | |
| bs_stats = batch_store.get_memory_stats() | |
| gs_stats = graph_store.get_memory_stats() | |
| expected_total = cs_stats.size_bytes + bs_stats.size_bytes + gs_stats.size_bytes | |
| assert total == expected_total | |
| async def test_full_memory_report(self): | |
| """Test generating a full memory report with real stores.""" | |
| from headroom.cache.compression_store import CompressionStore | |
| from headroom.memory.adapters.graph import InMemoryGraphStore | |
| from headroom.memory.adapters.graph_models import Entity | |
| tracker = MemoryTracker.get(target_budget_mb=100.0) | |
| # Create and register stores | |
| compression_store = CompressionStore(max_entries=1000) | |
| graph_store = InMemoryGraphStore() | |
| # Add data | |
| for i in range(10): | |
| compression_store.store(f"original content {i}" * 100, f"compressed_{i}") | |
| entity = Entity( | |
| id=f"node_{i}", | |
| user_id="test", | |
| name=f"Entity {i}", | |
| entity_type="entity", | |
| ) | |
| await graph_store.add_entity(entity) | |
| tracker.register("compression_store", compression_store.get_memory_stats) | |
| tracker.register("graph_store", graph_store.get_memory_stats) | |
| # Get full report | |
| report = tracker.get_report() | |
| # Verify report structure | |
| assert report.process is not None | |
| assert report.process.rss_bytes >= 0 | |
| assert len(report.components) == 2 | |
| assert "compression_store" in report.components | |
| assert "graph_store" in report.components | |
| assert report.total_tracked_bytes > 0 | |
| assert report.target_budget_bytes == 100 * 1024 * 1024 | |
| # Verify serialization | |
| d = report.to_dict() | |
| assert "process" in d | |
| assert "components" in d | |
| assert "total_tracked_mb" in d | |
| class TestMemoryBudgetEnforcement: | |
| """Tests for memory budget checking.""" | |
| def reset_tracker(self): | |
| """Reset the tracker singleton before each test.""" | |
| MemoryTracker.reset() | |
| yield | |
| MemoryTracker.reset() | |
| def test_under_budget(self): | |
| """Test that under-budget is correctly detected.""" | |
| from headroom.cache.compression_store import CompressionStore | |
| tracker = MemoryTracker.get(target_budget_mb=100.0) # 100 MB budget | |
| store = CompressionStore(max_entries=10) # Small store | |
| store.store("original", "compressed") | |
| tracker.register("compression_store", store.get_memory_stats) | |
| report = tracker.get_report() | |
| # Small store should be under budget | |
| assert report.is_over_budget is False | |
| def test_over_budget_detection(self): | |
| """Test that over-budget is correctly detected.""" | |
| tracker = MemoryTracker.get(target_budget_mb=0.001) # Very small budget (1 KB) | |
| # Create a component that reports large size | |
| from headroom.memory.tracker import ComponentStats | |
| def large_component_stats() -> ComponentStats: | |
| return ComponentStats( | |
| name="large_component", | |
| entry_count=1000, | |
| size_bytes=10 * 1024 * 1024, # 10 MB | |
| ) | |
| tracker.register("large_component", large_component_stats) | |
| report = tracker.get_report() | |
| # Should be over budget | |
| assert report.is_over_budget is True | |
| class TestProcessStatsCollection: | |
| """Tests for process-level memory stats.""" | |
| def reset_tracker(self): | |
| """Reset the tracker singleton before each test.""" | |
| MemoryTracker.reset() | |
| yield | |
| MemoryTracker.reset() | |
| def test_process_stats_collected(self): | |
| """Test that process stats are collected from the real process.""" | |
| tracker = MemoryTracker.get() | |
| stats = tracker.get_process_stats() | |
| # psutil is optional — without it, stats are all zeros (graceful degradation) | |
| try: | |
| import psutil # noqa: F401 | |
| assert stats.rss_bytes > 0 # Process must use some memory | |
| assert stats.vms_bytes > 0 | |
| except ImportError: | |
| assert stats.rss_bytes == 0 # No psutil → zeros expected | |
| assert stats.percent >= 0 # Could be 0 on some systems | |
| def test_process_stats_in_report(self): | |
| """Test that process stats are included in report.""" | |
| tracker = MemoryTracker.get() | |
| report = tracker.get_report() | |
| try: | |
| import psutil # noqa: F401 | |
| assert report.process.rss_bytes > 0 | |
| assert report.process.rss_mb > 0 | |
| except ImportError: | |
| # Without psutil, process stats are zeros — that's expected | |
| assert report.process.rss_bytes == 0 | |