MnemoCore / tests /test_consolidation_worker.py
Granis87's picture
Upload folder using huggingface_hub
c3a3710 verified
"""
Tests for Consolidation Worker (Phase 3.5.3)
===========================================
Verify event consumption and consolidation logic using unittest.mock.
"""
import unittest
from unittest.mock import AsyncMock, MagicMock, patch
import asyncio
from mnemocore.core.consolidation_worker import ConsolidationWorker
class TestConsolidationWorker(unittest.IsolatedAsyncioTestCase):
async def asyncSetUp(self):
# Patch dependencies
self.storage_patcher = patch('mnemocore.core.consolidation_worker.AsyncRedisStorage')
self.tier_manager_patcher = patch('mnemocore.core.consolidation_worker.TierManager')
self.config_patcher = patch('mnemocore.core.consolidation_worker.get_config')
self.MockStorage = self.storage_patcher.start()
self.MockTierManager = self.tier_manager_patcher.start()
self.mock_config = self.config_patcher.start()
# Setup mock storage instance
self.mock_storage_instance = MagicMock()
self.mock_storage_instance.redis_client = AsyncMock()
self.MockStorage.return_value = self.mock_storage_instance
self.worker = ConsolidationWorker(storage=self.mock_storage_instance)
async def asyncTearDown(self):
self.storage_patcher.stop()
self.tier_manager_patcher.stop()
self.config_patcher.stop()
async def test_setup_stream(self):
await self.worker.setup_stream()
self.mock_storage_instance.redis_client.xgroup_create.assert_called_once()
async def test_process_event_created(self):
event_data = {"type": "memory.created", "id": "mem_1"}
await self.worker.process_event("evt_1", event_data)
# Currently just logs, verify no exceptions
async def test_run_consolidation_cycle(self):
await self.worker.run_consolidation_cycle()
# Should call tier_manager.consolidate_warm_to_cold in a thread
# Verify TierManager instance called
self.worker.tier_manager.consolidate_warm_to_cold.assert_called_once()
async def test_consume_loop_logic(self):
# Make xreadgroup return one event then block indefinitely (return empty)
call_count = 0
async def mock_xreadgroup(*args, **kwargs):
nonlocal call_count
call_count += 1
if call_count == 1:
return [("stream_key", [("evt_1", {"type": "memory.created", "id": "mem_1"})])]
# Second call: signal stop
self.worker.running = False
return []
self.mock_storage_instance.redis_client.xreadgroup = mock_xreadgroup
self.mock_storage_instance.redis_client.xack = AsyncMock()
self.worker.running = True
try:
await asyncio.wait_for(self.worker.consume_loop(), timeout=2.0)
except asyncio.TimeoutError:
self.worker.running = False
self.assertGreaterEqual(call_count, 1)
if __name__ == '__main__':
unittest.main()