headroom / tests /test_memory /test_memory_tracker_integration.py
tudragon154203
fix: route count_tokens to api.anthropic.com, not proxy base_url
0adb431
"""Integration tests for memory tracking with real stores.
These tests verify that memory tracking works correctly with actual
store implementations - no mocks, no simulations.
"""
from __future__ import annotations
import pytest
from headroom.memory.tracker import MemoryTracker
# Check HNSW availability for skipping tests
try:
from headroom.memory.adapters.hnsw import _check_hnswlib_available
HNSW_AVAILABLE = _check_hnswlib_available()
except ImportError:
HNSW_AVAILABLE = False
class TestCompressionStoreMemoryTracking:
"""Tests for CompressionStore memory tracking integration."""
@pytest.fixture(autouse=True)
def reset_tracker(self):
"""Reset the tracker singleton before each test."""
MemoryTracker.reset()
yield
MemoryTracker.reset()
def test_compression_store_reports_memory_stats(self):
"""Test that CompressionStore correctly reports memory stats."""
from headroom.cache.compression_store import CompressionStore
store = CompressionStore(max_entries=100)
# Add some data - store(original, compressed, ...)
store.store("original content 1" * 100, "compressed1")
store.store("original content 2" * 100, "compressed2")
stats = store.get_memory_stats()
assert stats.name == "compression_store"
assert stats.entry_count == 2
assert stats.size_bytes > 0
# budget_bytes is None since CompressionStore uses entry count limit not byte limit
def test_compression_store_tracks_hits(self):
"""Test that CompressionStore tracks cache hits."""
from headroom.cache.compression_store import CompressionStore
store = CompressionStore(max_entries=100)
# Store and retrieve (hit)
hash_key = store.store("original content", "compressed")
store.retrieve(hash_key) # Hit - increments retrieval_count
store.retrieve(hash_key) # Another retrieval
store.retrieve("nonexistent_hash") # Miss (not tracked)
stats = store.get_memory_stats()
# Hits counts entries with retrieval_count > 0, not total retrievals
assert stats.hits == 1 # 1 entry has been retrieved
# CompressionStore doesn't track misses
assert stats.misses == 0
def test_compression_store_registers_with_tracker(self):
"""Test that CompressionStore can register with MemoryTracker."""
from headroom.cache.compression_store import CompressionStore
tracker = MemoryTracker.get()
store = CompressionStore(max_entries=100)
# Register the store
tracker.register("compression_store", store.get_memory_stats)
# Verify it's registered
assert "compression_store" in tracker.registered_components
# Get stats through tracker
stats = tracker.get_component_stats("compression_store")
assert stats is not None
assert stats.name == "compression_store"
class TestBatchContextStoreMemoryTracking:
"""Tests for BatchContextStore memory tracking integration."""
@pytest.fixture(autouse=True)
def reset_tracker(self):
"""Reset the tracker singleton before each test."""
MemoryTracker.reset()
yield
MemoryTracker.reset()
def test_batch_context_store_reports_memory_stats(self):
"""Test that BatchContextStore correctly reports memory stats."""
from headroom.ccr.batch_store import (
BatchContext,
BatchContextStore,
BatchRequestContext,
)
store = BatchContextStore(ttl=3600, max_contexts=100)
# Add some batch contexts
ctx1 = BatchContext(batch_id="batch_1", provider="anthropic")
ctx1.add_request(
BatchRequestContext(
custom_id="req_1",
messages=[{"role": "user", "content": "Hello world"}],
model="claude-3-opus",
)
)
ctx2 = BatchContext(batch_id="batch_2", provider="openai")
ctx2.add_request(
BatchRequestContext(
custom_id="req_2",
messages=[{"role": "user", "content": "Test message"}],
model="gpt-4",
)
)
# Store them (sync for testing - accessing internal dict)
store._contexts["batch_1"] = ctx1
store._contexts["batch_2"] = ctx2
stats = store.get_memory_stats()
assert stats.name == "batch_context_store"
assert stats.entry_count == 2
assert stats.size_bytes > 0
def test_batch_context_store_registers_with_tracker(self):
"""Test that BatchContextStore can register with MemoryTracker."""
from headroom.ccr.batch_store import BatchContextStore
tracker = MemoryTracker.get()
store = BatchContextStore()
# Register the store
tracker.register("batch_context_store", store.get_memory_stats)
# Verify it's registered
assert "batch_context_store" in tracker.registered_components
class TestGraphStoreMemoryTracking:
"""Tests for InMemoryGraphStore memory tracking integration."""
@pytest.fixture(autouse=True)
def reset_tracker(self):
"""Reset the tracker singleton before each test."""
MemoryTracker.reset()
yield
MemoryTracker.reset()
@pytest.mark.asyncio
async def test_graph_store_reports_memory_stats(self):
"""Test that InMemoryGraphStore correctly reports memory stats."""
from headroom.memory.adapters.graph import InMemoryGraphStore
from headroom.memory.adapters.graph_models import Entity, Relationship
store = InMemoryGraphStore()
# Add some entities using the correct API
entity1 = Entity(id="node1", user_id="test", name="Test Entity 1", entity_type="entity")
entity2 = Entity(id="node2", user_id="test", name="Test Entity 2", entity_type="entity")
entity3 = Entity(id="node3", user_id="test", name="Test Concept", entity_type="concept")
await store.add_entity(entity1)
await store.add_entity(entity2)
await store.add_entity(entity3)
# Add a relationship
rel = Relationship(
source_id="node1",
target_id="node2",
relation_type="related_to",
user_id="test",
)
await store.add_relationship(rel)
stats = store.get_memory_stats()
assert stats.name == "graph_store"
assert stats.entry_count == 4 # 3 entities + 1 relationship
assert stats.size_bytes > 0
@pytest.mark.asyncio
async def test_graph_store_size_grows_with_data(self):
"""Test that reported size grows as data is added."""
from headroom.memory.adapters.graph import InMemoryGraphStore
from headroom.memory.adapters.graph_models import Entity
store = InMemoryGraphStore()
# Get initial size
initial_stats = store.get_memory_stats()
initial_size = initial_stats.size_bytes
# Add data
for i in range(100):
entity = Entity(
id=f"node_{i}",
user_id="test",
name=f"Entity {i}",
entity_type="entity",
properties={"data": "x" * 100},
)
await store.add_entity(entity)
# Get new size
final_stats = store.get_memory_stats()
assert final_stats.size_bytes > initial_size
assert final_stats.entry_count == 100
@pytest.mark.skipif(not HNSW_AVAILABLE, reason="hnswlib not available")
class TestHNSWVectorIndexMemoryTracking:
"""Tests for HNSWVectorIndex memory tracking integration."""
@pytest.fixture(autouse=True)
def reset_tracker(self):
"""Reset the tracker singleton before each test."""
MemoryTracker.reset()
yield
MemoryTracker.reset()
@pytest.mark.asyncio
async def test_hnsw_index_reports_memory_stats(self):
"""Test that HNSWVectorIndex correctly reports memory stats."""
from headroom.memory.adapters.hnsw import HNSWVectorIndex
from headroom.memory.models import Memory
index = HNSWVectorIndex(dimension=128)
# Add some vectors using Memory objects
import numpy as np
for i in range(10):
embedding = np.random.rand(128).astype(np.float32).tolist()
memory = Memory(
id=f"mem_{i}",
content=f"Test memory {i}",
user_id="test_user",
embedding=embedding,
)
await index.index(memory)
stats = index.get_memory_stats()
assert stats.name == "vector_index"
assert stats.entry_count == 10
assert stats.size_bytes > 0
@pytest.mark.asyncio
async def test_hnsw_index_size_grows_with_vectors(self):
"""Test that reported size grows as vectors are added."""
from headroom.memory.adapters.hnsw import HNSWVectorIndex
from headroom.memory.models import Memory
index = HNSWVectorIndex(dimension=256)
# Get initial size
initial_stats = index.get_memory_stats()
initial_size = initial_stats.size_bytes
# Add vectors
import numpy as np
for i in range(100):
embedding = np.random.rand(256).astype(np.float32).tolist()
memory = Memory(
id=f"mem_{i}",
content=f"Test memory {i}",
user_id="test_user",
embedding=embedding,
)
await index.index(memory)
# Get new size
final_stats = index.get_memory_stats()
assert final_stats.size_bytes > initial_size
assert final_stats.entry_count == 100
class TestTrackerIntegrationWithMultipleStores:
"""Tests for MemoryTracker with multiple real stores."""
@pytest.fixture(autouse=True)
def reset_tracker(self):
"""Reset the tracker singleton before each test."""
MemoryTracker.reset()
yield
MemoryTracker.reset()
@pytest.mark.asyncio
async def test_tracker_aggregates_multiple_stores(self):
"""Test that tracker correctly aggregates stats from multiple stores."""
from headroom.cache.compression_store import CompressionStore
from headroom.ccr.batch_store import BatchContextStore
from headroom.memory.adapters.graph import InMemoryGraphStore
from headroom.memory.adapters.graph_models import Entity
tracker = MemoryTracker.get()
# Create stores
compression_store = CompressionStore(max_entries=100)
batch_store = BatchContextStore()
graph_store = InMemoryGraphStore()
# Add some data
compression_store.store("original content" * 50, "compressed")
entity = Entity(id="node1", user_id="test", name="Test", entity_type="entity")
await graph_store.add_entity(entity)
# Register all stores
tracker.register("compression_store", compression_store.get_memory_stats)
tracker.register("batch_context_store", batch_store.get_memory_stats)
tracker.register("graph_store", graph_store.get_memory_stats)
# Get total
total = tracker.get_total_tracked_bytes()
# Should be sum of all stores
cs_stats = compression_store.get_memory_stats()
bs_stats = batch_store.get_memory_stats()
gs_stats = graph_store.get_memory_stats()
expected_total = cs_stats.size_bytes + bs_stats.size_bytes + gs_stats.size_bytes
assert total == expected_total
@pytest.mark.asyncio
async def test_full_memory_report(self):
"""Test generating a full memory report with real stores."""
from headroom.cache.compression_store import CompressionStore
from headroom.memory.adapters.graph import InMemoryGraphStore
from headroom.memory.adapters.graph_models import Entity
tracker = MemoryTracker.get(target_budget_mb=100.0)
# Create and register stores
compression_store = CompressionStore(max_entries=1000)
graph_store = InMemoryGraphStore()
# Add data
for i in range(10):
compression_store.store(f"original content {i}" * 100, f"compressed_{i}")
entity = Entity(
id=f"node_{i}",
user_id="test",
name=f"Entity {i}",
entity_type="entity",
)
await graph_store.add_entity(entity)
tracker.register("compression_store", compression_store.get_memory_stats)
tracker.register("graph_store", graph_store.get_memory_stats)
# Get full report
report = tracker.get_report()
# Verify report structure
assert report.process is not None
assert report.process.rss_bytes >= 0
assert len(report.components) == 2
assert "compression_store" in report.components
assert "graph_store" in report.components
assert report.total_tracked_bytes > 0
assert report.target_budget_bytes == 100 * 1024 * 1024
# Verify serialization
d = report.to_dict()
assert "process" in d
assert "components" in d
assert "total_tracked_mb" in d
class TestMemoryBudgetEnforcement:
"""Tests for memory budget checking."""
@pytest.fixture(autouse=True)
def reset_tracker(self):
"""Reset the tracker singleton before each test."""
MemoryTracker.reset()
yield
MemoryTracker.reset()
def test_under_budget(self):
"""Test that under-budget is correctly detected."""
from headroom.cache.compression_store import CompressionStore
tracker = MemoryTracker.get(target_budget_mb=100.0) # 100 MB budget
store = CompressionStore(max_entries=10) # Small store
store.store("original", "compressed")
tracker.register("compression_store", store.get_memory_stats)
report = tracker.get_report()
# Small store should be under budget
assert report.is_over_budget is False
def test_over_budget_detection(self):
"""Test that over-budget is correctly detected."""
tracker = MemoryTracker.get(target_budget_mb=0.001) # Very small budget (1 KB)
# Create a component that reports large size
from headroom.memory.tracker import ComponentStats
def large_component_stats() -> ComponentStats:
return ComponentStats(
name="large_component",
entry_count=1000,
size_bytes=10 * 1024 * 1024, # 10 MB
)
tracker.register("large_component", large_component_stats)
report = tracker.get_report()
# Should be over budget
assert report.is_over_budget is True
class TestProcessStatsCollection:
"""Tests for process-level memory stats."""
@pytest.fixture(autouse=True)
def reset_tracker(self):
"""Reset the tracker singleton before each test."""
MemoryTracker.reset()
yield
MemoryTracker.reset()
def test_process_stats_collected(self):
"""Test that process stats are collected from the real process."""
tracker = MemoryTracker.get()
stats = tracker.get_process_stats()
# psutil is optional — without it, stats are all zeros (graceful degradation)
try:
import psutil # noqa: F401
assert stats.rss_bytes > 0 # Process must use some memory
assert stats.vms_bytes > 0
except ImportError:
assert stats.rss_bytes == 0 # No psutil → zeros expected
assert stats.percent >= 0 # Could be 0 on some systems
def test_process_stats_in_report(self):
"""Test that process stats are included in report."""
tracker = MemoryTracker.get()
report = tracker.get_report()
try:
import psutil # noqa: F401
assert report.process.rss_bytes > 0
assert report.process.rss_mb > 0
except ImportError:
# Without psutil, process stats are zeros — that's expected
assert report.process.rss_bytes == 0