| """
|
| Tests for Consolidation Worker (Phase 3.5.3)
|
| ===========================================
|
| Verify event consumption and consolidation logic using unittest.mock.
|
| """
|
|
|
| import unittest
|
| from unittest.mock import AsyncMock, MagicMock, patch
|
| import asyncio
|
|
|
| from mnemocore.core.consolidation_worker import ConsolidationWorker
|
|
|
| class TestConsolidationWorker(unittest.IsolatedAsyncioTestCase):
|
|
|
| async def asyncSetUp(self):
|
|
|
| self.storage_patcher = patch('mnemocore.core.consolidation_worker.AsyncRedisStorage')
|
| self.tier_manager_patcher = patch('mnemocore.core.consolidation_worker.TierManager')
|
| self.config_patcher = patch('mnemocore.core.consolidation_worker.get_config')
|
|
|
| self.MockStorage = self.storage_patcher.start()
|
| self.MockTierManager = self.tier_manager_patcher.start()
|
| self.mock_config = self.config_patcher.start()
|
|
|
|
|
| self.mock_storage_instance = MagicMock()
|
| self.mock_storage_instance.redis_client = AsyncMock()
|
| self.MockStorage.return_value = self.mock_storage_instance
|
|
|
| self.worker = ConsolidationWorker(storage=self.mock_storage_instance)
|
|
|
| async def asyncTearDown(self):
|
| self.storage_patcher.stop()
|
| self.tier_manager_patcher.stop()
|
| self.config_patcher.stop()
|
|
|
| async def test_setup_stream(self):
|
| await self.worker.setup_stream()
|
| self.mock_storage_instance.redis_client.xgroup_create.assert_called_once()
|
|
|
| async def test_process_event_created(self):
|
| event_data = {"type": "memory.created", "id": "mem_1"}
|
| await self.worker.process_event("evt_1", event_data)
|
|
|
|
|
| async def test_run_consolidation_cycle(self):
|
| await self.worker.run_consolidation_cycle()
|
|
|
|
|
| self.worker.tier_manager.consolidate_warm_to_cold.assert_called_once()
|
|
|
| async def test_consume_loop_logic(self):
|
|
|
| call_count = 0
|
| async def mock_xreadgroup(*args, **kwargs):
|
| nonlocal call_count
|
| call_count += 1
|
| if call_count == 1:
|
| return [("stream_key", [("evt_1", {"type": "memory.created", "id": "mem_1"})])]
|
|
|
| self.worker.running = False
|
| return []
|
|
|
| self.mock_storage_instance.redis_client.xreadgroup = mock_xreadgroup
|
| self.mock_storage_instance.redis_client.xack = AsyncMock()
|
|
|
| self.worker.running = True
|
| try:
|
| await asyncio.wait_for(self.worker.consume_loop(), timeout=2.0)
|
| except asyncio.TimeoutError:
|
| self.worker.running = False
|
|
|
| self.assertGreaterEqual(call_count, 1)
|
|
|
|
|
| if __name__ == '__main__':
|
| unittest.main()
|
|
|