File size: 3,073 Bytes
dbb04e4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c3a3710
 
 
dbb04e4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
"""

Tests for Consolidation Worker (Phase 3.5.3)

===========================================

Verify event consumption and consolidation logic using unittest.mock.

"""

import unittest
from unittest.mock import AsyncMock, MagicMock, patch
import asyncio

from mnemocore.core.consolidation_worker import ConsolidationWorker

class TestConsolidationWorker(unittest.IsolatedAsyncioTestCase):
    
    async def asyncSetUp(self):
        # Patch dependencies
        self.storage_patcher = patch('mnemocore.core.consolidation_worker.AsyncRedisStorage')
        self.tier_manager_patcher = patch('mnemocore.core.consolidation_worker.TierManager')
        self.config_patcher = patch('mnemocore.core.consolidation_worker.get_config')
        
        self.MockStorage = self.storage_patcher.start()
        self.MockTierManager = self.tier_manager_patcher.start()
        self.mock_config = self.config_patcher.start()
        
        # Setup mock storage instance
        self.mock_storage_instance = MagicMock()
        self.mock_storage_instance.redis_client = AsyncMock()
        self.MockStorage.return_value = self.mock_storage_instance

        self.worker = ConsolidationWorker(storage=self.mock_storage_instance)

    async def asyncTearDown(self):
        self.storage_patcher.stop()
        self.tier_manager_patcher.stop()
        self.config_patcher.stop()

    async def test_setup_stream(self):
        await self.worker.setup_stream()
        self.mock_storage_instance.redis_client.xgroup_create.assert_called_once()

    async def test_process_event_created(self):
        event_data = {"type": "memory.created", "id": "mem_1"}
        await self.worker.process_event("evt_1", event_data)
        # Currently just logs, verify no exceptions
        
    async def test_run_consolidation_cycle(self):
        await self.worker.run_consolidation_cycle()
        # Should call tier_manager.consolidate_warm_to_cold in a thread
        # Verify TierManager instance called
        self.worker.tier_manager.consolidate_warm_to_cold.assert_called_once()

    async def test_consume_loop_logic(self):
        # Make xreadgroup return one event then block indefinitely (return empty)
        call_count = 0
        async def mock_xreadgroup(*args, **kwargs):
            nonlocal call_count
            call_count += 1
            if call_count == 1:
                return [("stream_key", [("evt_1", {"type": "memory.created", "id": "mem_1"})])]
            # Second call: signal stop
            self.worker.running = False
            return []

        self.mock_storage_instance.redis_client.xreadgroup = mock_xreadgroup
        self.mock_storage_instance.redis_client.xack = AsyncMock()

        self.worker.running = True
        try:
            await asyncio.wait_for(self.worker.consume_loop(), timeout=2.0)
        except asyncio.TimeoutError:
            self.worker.running = False

        self.assertGreaterEqual(call_count, 1)


if __name__ == '__main__':
    unittest.main()