File size: 4,963 Bytes
dbb04e4 c3a3710 dbb04e4 c3a3710 dbb04e4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | import asyncio
import json
import os
import sys
import time
from unittest.mock import MagicMock, patch
import pytest
# --- Mocking Infrastructure ---
import types
def mock_module(name):
m = types.ModuleType(name)
sys.modules[name] = m
return m
# Try to import real modules first
try:
import mnemocore.core.engine
import mnemocore.core.node
import mnemocore.core.qdrant_store
import mnemocore.core.async_storage
import mnemocore.meta.learning_journal
except ImportError:
pass
# Mock dependencies if they are not importable
if "mnemocore.core.engine" not in sys.modules:
mock_module("mnemocore.core")
mock_module("mnemocore.core.engine")
sys.modules["mnemocore.core.engine"].HAIMEngine = MagicMock()
mock_module("mnemocore.core.node")
sys.modules["mnemocore.core.node"].MemoryNode = MagicMock()
mock_module("mnemocore.core.qdrant_store")
sys.modules["mnemocore.core.qdrant_store"].QdrantStore = MagicMock()
if "mnemocore.core.async_storage" not in sys.modules:
mock_module("mnemocore.core.async_storage")
sys.modules["mnemocore.core.async_storage"].AsyncRedisStorage = MagicMock()
if "mnemocore.meta.learning_journal" not in sys.modules:
mock_module("mnemocore.meta")
mock_module("mnemocore.meta.learning_journal")
sys.modules["mnemocore.meta.learning_journal"].LearningJournal = MagicMock()
if "aiohttp" not in sys.modules:
mock_module("aiohttp")
sys.modules["aiohttp"].ClientSession = MagicMock()
# Now we can safely import daemon
sys.path.insert(0, os.path.abspath("."))
from mnemocore.subconscious.daemon import SubconsciousDaemon
async def _async_test_save_evolution_state_non_blocking():
"""
Async test logic that verifies _save_evolution_state does not block the event loop.
We simulate slow I/O by patching json.dump.
"""
# 1. Setup Daemon
daemon = SubconsciousDaemon()
# Use a temp path for the state file to avoid permission issues
with patch("mnemocore.subconscious.daemon.EVOLUTION_STATE_PATH", "/tmp/test_evolution_perf.json"):
# 2. Patch json.dump to be slow (simulate blocking I/O)
# We need to patch it where it is used. daemon.py imports json.
# So we patch json.dump.
original_dump = json.dump
def slow_dump(*args, **kwargs):
time.sleep(0.2) # Block for 200ms
return original_dump(*args, **kwargs)
with patch("json.dump", side_effect=slow_dump):
# 3. Create a background task (ticker) to measure loop blocking
# If the loop is blocked, this task won't get a chance to run
loop_blocked_duration = 0
ticker_running = True
async def ticker():
nonlocal loop_blocked_duration
while ticker_running:
start = time.time()
await asyncio.sleep(0.01) # Yield control
diff = time.time() - start
# If sleep(0.01) took significantly longer, the loop was blocked
if diff > 0.05:
loop_blocked_duration = max(loop_blocked_duration, diff)
ticker_task = asyncio.create_task(ticker())
# Allow ticker to start
await asyncio.sleep(0.05)
# 4. Run the method under test
# If it is synchronous, it will block the loop, and ticker won't run until it finishes.
# If it is asynchronous and properly non-blocking (awaiting in executor), ticker should run in between.
start_time = time.time()
if asyncio.iscoroutinefunction(daemon._save_evolution_state):
await daemon._save_evolution_state()
else:
daemon._save_evolution_state()
end_time = time.time()
# Cleanup
ticker_running = False
try:
await ticker_task
except asyncio.CancelledError:
pass
# 5. Assertions
print(f"Operation took: {end_time - start_time:.4f}s")
print(f"Max loop block: {loop_blocked_duration:.4f}s")
# If the operation was truly non-blocking, the ticker should have run frequently,
# and the max loop block should be close to 0.01s (maybe up to 0.05s tolerance).
# If it was blocking (synchronous sleep(0.2)), the ticker would be delayed by ~0.2s.
# We fail if loop was blocked for more than 100ms
if loop_blocked_duration >= 0.1:
raise AssertionError(f"Event loop was blocked for {loop_blocked_duration:.4f}s")
def test_save_evolution_state_non_blocking():
asyncio.run(_async_test_save_evolution_state_non_blocking())
if __name__ == "__main__":
test_save_evolution_state_non_blocking()
|