| """
|
| Tests for Dream Loop (Subconscious Daemon)
|
|
|
| Tests configurability, graceful shutdown, non-blocking behavior, and metrics.
|
| """
|
|
|
| import asyncio
|
| import time
|
| import sys
|
| import importlib
|
| import pytest
|
| from unittest.mock import MagicMock, AsyncMock, patch
|
| from dataclasses import dataclass
|
|
|
| pytest_plugins = ['pytest_asyncio']
|
|
|
|
|
| @dataclass(frozen=True)
|
| class MockDreamLoopConfig:
|
| """Mock dream loop configuration."""
|
| enabled: bool = True
|
| frequency_seconds: int = 1
|
| batch_size: int = 10
|
| max_iterations: int = 0
|
| ollama_url: str = "http://localhost:11434/api/generate"
|
| model: str = "gemma3:1b"
|
|
|
|
|
| @dataclass(frozen=True)
|
| class MockRedisConfig:
|
| """Mock Redis configuration."""
|
| url: str = "redis://localhost:6379/0"
|
| stream_key: str = "haim:subconscious"
|
| max_connections: int = 10
|
| socket_timeout: int = 5
|
| password: str = None
|
|
|
|
|
| @dataclass(frozen=True)
|
| class MockConfig:
|
| """Mock configuration for testing."""
|
| dream_loop: MockDreamLoopConfig = None
|
| redis: MockRedisConfig = None
|
|
|
| def __post_init__(self):
|
| if self.dream_loop is None:
|
| object.__setattr__(self, 'dream_loop', MockDreamLoopConfig())
|
| if self.redis is None:
|
| object.__setattr__(self, 'redis', MockRedisConfig())
|
|
|
|
|
| @pytest.fixture
|
| def mock_config():
|
| """Create a mock configuration with short intervals for testing."""
|
| return MockConfig(
|
| dream_loop=MockDreamLoopConfig(
|
| enabled=True,
|
| frequency_seconds=1,
|
| batch_size=10,
|
| max_iterations=0,
|
| ),
|
| redis=MockRedisConfig()
|
| )
|
|
|
|
|
| @pytest.fixture
|
| def mock_config_disabled():
|
| """Create a mock configuration with dream loop disabled."""
|
| return MockConfig(
|
| dream_loop=MockDreamLoopConfig(
|
| enabled=False,
|
| frequency_seconds=1,
|
| ),
|
| redis=MockRedisConfig()
|
| )
|
|
|
|
|
| @pytest.fixture
|
| def mock_config_limited_iterations():
|
| """Create a mock configuration with limited iterations."""
|
| return MockConfig(
|
| dream_loop=MockDreamLoopConfig(
|
| enabled=True,
|
| frequency_seconds=1,
|
| max_iterations=2,
|
| ),
|
| redis=MockRedisConfig()
|
| )
|
|
|
|
|
| @pytest.fixture
|
| def mock_storage():
|
| """Create a mock AsyncRedisStorage."""
|
| storage = MagicMock()
|
| storage.redis_client = MagicMock()
|
| storage.check_health = AsyncMock(return_value=True)
|
| storage.publish_event = AsyncMock(return_value=None)
|
| storage.retrieve_memory = AsyncMock(return_value=None)
|
| storage.close = AsyncMock(return_value=None)
|
| return storage
|
|
|
|
|
| @pytest.fixture
|
| def daemon_module():
|
| """Fixture to import the daemon module with all mocks in place."""
|
|
|
| mock_aiohttp = MagicMock()
|
|
|
|
|
| mock_dream_loop_total = MagicMock()
|
| mock_dream_loop_total.labels = MagicMock(return_value=MagicMock())
|
| mock_dream_loop_iteration_seconds = MagicMock()
|
| mock_dream_loop_iteration_seconds.observe = MagicMock()
|
| mock_dream_loop_insights = MagicMock()
|
| mock_dream_loop_insights.labels = MagicMock(return_value=MagicMock())
|
| mock_dream_loop_active = MagicMock()
|
| mock_dream_loop_active.set = MagicMock()
|
|
|
|
|
| patches = {
|
| 'aiohttp': mock_aiohttp,
|
| 'mnemocore.subconscious.daemon.aiohttp': mock_aiohttp,
|
| 'mnemocore.subconscious.daemon.DREAM_LOOP_TOTAL': mock_dream_loop_total,
|
| 'mnemocore.subconscious.daemon.DREAM_LOOP_ITERATION_SECONDS': mock_dream_loop_iteration_seconds,
|
| 'mnemocore.subconscious.daemon.DREAM_LOOP_INSIGHTS_GENERATED': mock_dream_loop_insights,
|
| 'mnemocore.subconscious.daemon.DREAM_LOOP_ACTIVE': mock_dream_loop_active,
|
| }
|
|
|
|
|
| original_values = {}
|
| for key, value in patches.items():
|
| if key in sys.modules:
|
| original_values[key] = sys.modules[key]
|
| sys.modules[key] = value
|
|
|
|
|
| if 'mnemocore.subconscious.daemon' in sys.modules:
|
| del sys.modules['mnemocore.subconscious.daemon']
|
|
|
| try:
|
| import mnemocore.subconscious.daemon as dm
|
| yield dm
|
| finally:
|
|
|
| for key in patches:
|
| if key in original_values:
|
| sys.modules[key] = original_values[key]
|
| elif key in sys.modules:
|
| del sys.modules[key]
|
|
|
| if 'mnemocore.subconscious.daemon' in sys.modules:
|
| del sys.modules['mnemocore.subconscious.daemon']
|
|
|
|
|
| class TestDreamLoopStartsAndStops:
|
| """Test that dream loop can start and stop properly."""
|
|
|
| @pytest.mark.asyncio
|
| async def test_dream_loop_starts_and_stops(self, mock_config, mock_storage, daemon_module):
|
| """Test that the dream loop starts and stops correctly."""
|
| SubconsciousDaemon = daemon_module.SubconsciousDaemon
|
|
|
| daemon = SubconsciousDaemon(storage=mock_storage, config=mock_config)
|
|
|
|
|
| assert daemon.running is False
|
| assert daemon._should_stop() is False
|
|
|
|
|
| run_task = asyncio.create_task(daemon.run())
|
|
|
|
|
| await asyncio.sleep(0.2)
|
|
|
|
|
| assert daemon.running is True
|
|
|
|
|
| await daemon.request_stop()
|
|
|
|
|
| await asyncio.wait_for(run_task, timeout=2.0)
|
|
|
|
|
| assert daemon.running is False
|
| assert daemon._should_stop() is True
|
|
|
| @pytest.mark.asyncio
|
| async def test_dream_loop_respects_disabled_config(self, mock_config_disabled, mock_storage, daemon_module):
|
| """Test that the dream loop exits immediately when disabled."""
|
| SubconsciousDaemon = daemon_module.SubconsciousDaemon
|
|
|
| daemon = SubconsciousDaemon(storage=mock_storage, config=mock_config_disabled)
|
|
|
|
|
| await daemon.run()
|
|
|
|
|
| assert daemon.running is False
|
|
|
|
|
| class TestDreamLoopFrequency:
|
| """Test that dream loop respects frequency configuration."""
|
|
|
| @pytest.mark.asyncio
|
| async def test_dream_respects_frequency(self, mock_config, mock_storage, daemon_module):
|
| """Test that dream cycles respect the configured frequency."""
|
| SubconsciousDaemon = daemon_module.SubconsciousDaemon
|
|
|
| daemon = SubconsciousDaemon(storage=mock_storage, config=mock_config)
|
|
|
|
|
| cycle_times = []
|
|
|
| original_run_cycle = daemon.run_cycle
|
|
|
| async def tracked_run_cycle():
|
| cycle_times.append(time.time())
|
| await original_run_cycle()
|
|
|
| daemon.run_cycle = tracked_run_cycle
|
|
|
|
|
| run_task = asyncio.create_task(daemon.run())
|
|
|
|
|
| await asyncio.sleep(0.3)
|
| await daemon.request_stop()
|
| await asyncio.wait_for(run_task, timeout=2.0)
|
|
|
|
|
| assert len(cycle_times) >= 1
|
|
|
|
|
| class TestDreamLoopNonBlocking:
|
| """Test that dream loop does not block other operations."""
|
|
|
| @pytest.mark.asyncio
|
| async def test_dream_does_not_block_queries(self, mock_config, mock_storage, daemon_module):
|
| """Test that dream loop iterations don't block other async operations."""
|
| SubconsciousDaemon = daemon_module.SubconsciousDaemon
|
|
|
| daemon = SubconsciousDaemon(storage=mock_storage, config=mock_config)
|
|
|
|
|
| query_executed = asyncio.Event()
|
|
|
| async def mock_query():
|
| query_executed.set()
|
| return {"result": "ok"}
|
|
|
|
|
| run_task = asyncio.create_task(daemon.run())
|
|
|
|
|
| await asyncio.sleep(0.1)
|
| query_task = asyncio.create_task(mock_query())
|
|
|
|
|
| try:
|
| await asyncio.wait_for(query_executed.wait(), timeout=0.5)
|
| assert query_executed.is_set()
|
| finally:
|
| await daemon.request_stop()
|
| await asyncio.wait_for(run_task, timeout=2.0)
|
|
|
|
|
| class TestDreamLoopIdempotentRestart:
|
| """Test that dream loop can be restarted idempotently."""
|
|
|
| @pytest.mark.asyncio
|
| async def test_dream_loop_idempotent_restart(self, mock_config, mock_storage, daemon_module):
|
| """Test that the dream loop can be stopped and restarted multiple times."""
|
| SubconsciousDaemon = daemon_module.SubconsciousDaemon
|
|
|
| daemon = SubconsciousDaemon(storage=mock_storage, config=mock_config)
|
|
|
|
|
| run_task1 = asyncio.create_task(daemon.run())
|
| await asyncio.sleep(0.1)
|
| assert daemon.running is True
|
|
|
| await daemon.request_stop()
|
| await asyncio.wait_for(run_task1, timeout=2.0)
|
| assert daemon.running is False
|
| assert daemon._should_stop() is True
|
|
|
|
|
| run_task2 = asyncio.create_task(daemon.run())
|
| await asyncio.sleep(0.1)
|
| assert daemon.running is True
|
|
|
| await daemon.request_stop()
|
| await asyncio.wait_for(run_task2, timeout=2.0)
|
| assert daemon.running is False
|
|
|
| @pytest.mark.asyncio
|
| async def test_dream_loop_multiple_stop_calls(self, mock_config, mock_storage, daemon_module):
|
| """Test that multiple stop calls don't cause issues."""
|
| SubconsciousDaemon = daemon_module.SubconsciousDaemon
|
|
|
| daemon = SubconsciousDaemon(storage=mock_storage, config=mock_config)
|
|
|
|
|
| daemon.stop()
|
| daemon.stop()
|
| await daemon.request_stop()
|
| daemon.stop()
|
|
|
| assert daemon._should_stop() is True
|
|
|
|
|
| class TestDreamLoopMetrics:
|
| """Test that dream loop emits proper metrics."""
|
|
|
| @pytest.mark.asyncio
|
| async def test_dream_loop_metrics_recorded(self, mock_config, mock_storage, daemon_module):
|
| """Test that metrics are recorded during dream loop execution."""
|
| SubconsciousDaemon = daemon_module.SubconsciousDaemon
|
|
|
| daemon = SubconsciousDaemon(storage=mock_storage, config=mock_config)
|
|
|
|
|
| run_task = asyncio.create_task(daemon.run())
|
| await asyncio.sleep(0.3)
|
|
|
| await daemon.request_stop()
|
| await asyncio.wait_for(run_task, timeout=2.0)
|
|
|
|
|
| assert daemon.running is False
|
|
|
|
|
| class TestDreamLoopMaxIterations:
|
| """Test that dream loop respects max_iterations configuration."""
|
|
|
| @pytest.mark.asyncio
|
| async def test_dream_loop_respects_max_iterations(self, mock_config_limited_iterations, mock_storage, daemon_module):
|
| """Test that the dream loop stops after max_iterations."""
|
| SubconsciousDaemon = daemon_module.SubconsciousDaemon
|
|
|
| daemon = SubconsciousDaemon(storage=mock_storage, config=mock_config_limited_iterations)
|
|
|
|
|
| start_time = time.time()
|
| run_task = asyncio.create_task(daemon.run())
|
|
|
|
|
| await asyncio.wait_for(run_task, timeout=5.0)
|
| elapsed = time.time() - start_time
|
|
|
|
|
| assert daemon.running is False
|
|
|
| assert elapsed < 5.0
|
|
|
|
|
| class TestDreamLoopConfiguration:
|
| """Test dream loop configuration loading."""
|
|
|
| def test_dream_loop_config_from_yaml(self):
|
| """Test that dream loop configuration is loaded from config.yaml."""
|
| from mnemocore.core.config import load_config, DreamLoopConfig
|
|
|
| config = load_config()
|
|
|
|
|
| assert hasattr(config, 'dream_loop')
|
| assert isinstance(config.dream_loop, DreamLoopConfig)
|
| assert hasattr(config.dream_loop, 'enabled')
|
| assert hasattr(config.dream_loop, 'frequency_seconds')
|
| assert hasattr(config.dream_loop, 'batch_size')
|
| assert hasattr(config.dream_loop, 'max_iterations')
|
| assert hasattr(config.dream_loop, 'ollama_url')
|
| assert hasattr(config.dream_loop, 'model')
|
|
|
| def test_dream_loop_config_defaults(self):
|
| """Test that dream loop config has sensible defaults."""
|
| from mnemocore.core.config import DreamLoopConfig
|
|
|
| config = DreamLoopConfig()
|
|
|
| assert config.enabled is True
|
| assert config.frequency_seconds == 60
|
| assert config.batch_size == 10
|
| assert config.max_iterations == 0
|
| assert config.ollama_url == "http://localhost:11434/api/generate"
|
| assert config.model == "gemma3:1b"
|
|
|