Spaces:
Paused
Paused
File size: 11,999 Bytes
fb867c3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 |
"""
Test suite for memory system integration with Central Post.
This module tests the integration of the knowledge store, task memory,
and context compression systems with the Central Post communication hub.
"""
import pytest
import tempfile
import os
from unittest.mock import AsyncMock, MagicMock
from src.communication.central_post import CentralPost, MessageType
from src.memory.knowledge_store import KnowledgeStore, KnowledgeType, ConfidenceLevel
from src.memory.task_memory import TaskMemory, TaskOutcome
from src.memory.context_compression import ContextCompressor, CompressionStrategy
class TestMemoryIntegration:
"""Test memory system integration with Central Post."""
@pytest.fixture
def temp_db_path(self):
"""Create temporary database file for testing."""
fd, path = tempfile.mkstemp(suffix='.db')
os.close(fd)
yield path
if os.path.exists(path):
os.unlink(path)
@pytest.fixture
def central_post(self, temp_db_path):
"""Create Central Post with memory systems enabled."""
return CentralPost(
max_agents=10,
enable_metrics=True,
enable_memory=True,
memory_db_path=temp_db_path
)
def test_memory_system_initialization(self, central_post):
"""Test that memory systems are properly initialized."""
assert central_post.knowledge_store is not None
assert central_post.task_memory is not None
assert central_post.context_compressor is not None
assert isinstance(central_post.knowledge_store, KnowledgeStore)
assert isinstance(central_post.task_memory, TaskMemory)
assert isinstance(central_post.context_compressor, ContextCompressor)
def test_memory_disabled_initialization(self, temp_db_path):
"""Test initialization with memory systems disabled."""
central_post = CentralPost(
max_agents=10,
enable_metrics=True,
enable_memory=False
)
assert central_post.knowledge_store is None
assert central_post.task_memory is None
assert central_post.context_compressor is None
def test_store_agent_result_as_knowledge(self, central_post):
"""Test storing agent results as knowledge."""
# Test with memory enabled
result = central_post.store_agent_result_as_knowledge(
agent_id="test_agent",
content="Test result content",
confidence=0.8,
domain="test_domain",
tags=["test", "result"]
)
assert result is True
# Verify knowledge was stored
from src.memory.knowledge_store import KnowledgeQuery
query = KnowledgeQuery(domains=["test_domain"])
knowledge_entries = central_post.knowledge_store.retrieve_knowledge(query)
assert len(knowledge_entries) > 0
assert knowledge_entries[0].content["result"] == "Test result content"
assert knowledge_entries[0].confidence_level == ConfidenceLevel.HIGH
def test_store_agent_result_memory_disabled(self, temp_db_path):
"""Test storing agent results with memory disabled."""
central_post = CentralPost(enable_memory=False)
result = central_post.store_agent_result_as_knowledge(
agent_id="test_agent",
content="Test content",
confidence=0.8
)
assert result is False
def test_retrieve_relevant_knowledge(self, central_post):
"""Test retrieving relevant knowledge."""
# Store some knowledge first
central_post.store_agent_result_as_knowledge(
agent_id="agent1",
content="Machine learning best practices",
confidence=0.9,
domain="AI",
tags=["ML", "best_practices"]
)
# Retrieve knowledge
knowledge = central_post.retrieve_relevant_knowledge(
domain="AI",
keywords=["machine", "learning"]
)
assert len(knowledge) > 0
assert "Machine learning best practices" in knowledge[0].content["result"]
def test_retrieve_knowledge_memory_disabled(self, temp_db_path):
"""Test retrieving knowledge with memory disabled."""
central_post = CentralPost(enable_memory=False)
knowledge = central_post.retrieve_relevant_knowledge(domain="AI")
assert knowledge == []
def test_get_task_strategy_recommendations(self, central_post):
"""Test getting task strategy recommendations."""
# Record a successful task execution
if central_post.task_memory:
from src.memory.task_memory import TaskComplexity
central_post.task_memory.record_task_execution(
task_description="Test AI task",
task_type="AI",
complexity=TaskComplexity.MODERATE,
outcome=TaskOutcome.SUCCESS,
duration=1.5,
agents_used=["research_agent", "analysis_agent"],
strategies_used=["helix_spiral"],
context_size=1000,
success_metrics={"confidence": 0.85, "quality_score": 0.9}
)
# Get recommendations
recommendations = central_post.get_task_strategy_recommendations(
task_description="Another AI test task",
task_type="AI",
complexity="MODERATE"
)
assert isinstance(recommendations, dict)
# Should have recommendations based on the recorded task
if recommendations.get("strategies"):
assert "helix_spiral" in recommendations["strategies"]
def test_get_strategy_recommendations_memory_disabled(self, temp_db_path):
"""Test getting strategy recommendations with memory disabled."""
central_post = CentralPost(enable_memory=False)
recommendations = central_post.get_task_strategy_recommendations(
task_description="Test task"
)
assert recommendations == {}
def test_compress_large_context(self, central_post):
"""Test context compression functionality."""
large_context = "This is a very long context that needs compression. " * 100
compressed = central_post.compress_large_context(
context=large_context,
strategy=CompressionStrategy.EXTRACTIVE_SUMMARY,
target_size=200
)
assert compressed is not None
assert len(compressed.content) < len(large_context)
assert compressed.compression_ratio > 0
# Allow for small discrepancies in size measurement due to processing overhead
assert abs(compressed.original_size - len(large_context)) <= 50
def test_compress_context_memory_disabled(self, temp_db_path):
"""Test context compression with memory disabled."""
central_post = CentralPost(enable_memory=False)
result = central_post.compress_large_context(
context="Test context",
strategy=CompressionStrategy.EXTRACTIVE_SUMMARY,
target_size=100
)
assert result is None
def test_get_memory_summary(self, central_post):
"""Test getting memory system summary."""
# Add some data first
central_post.store_agent_result_as_knowledge(
agent_id="agent1",
content="Test knowledge",
confidence=0.8,
domain="test"
)
summary = central_post.get_memory_summary()
assert "knowledge_entries" in summary
assert "task_patterns" in summary
assert summary["knowledge_entries"] > 0
def test_get_memory_summary_disabled(self, temp_db_path):
"""Test getting memory summary with memory disabled."""
central_post = CentralPost(enable_memory=False)
summary = central_post.get_memory_summary()
assert summary == {
"knowledge_entries": 0,
"task_patterns": 0,
"memory_enabled": False
}
def test_task_completion_message_handling(self, central_post):
"""Test that task completion messages can be stored as knowledge."""
# Test that we can manually store task completion as knowledge
result = central_post.store_agent_result_as_knowledge(
agent_id="test_agent",
content="Task completed successfully",
confidence=0.85,
domain="test",
tags=["completion"]
)
assert result is True
# Verify knowledge was stored
knowledge = central_post.retrieve_relevant_knowledge(domain="test")
assert len(knowledge) > 0
assert "Task completed successfully" in knowledge[0].content["result"]
def test_error_report_message_handling(self, central_post):
"""Test that error reports can be stored for learning."""
# Test that we can manually store error information as knowledge
result = central_post.store_agent_result_as_knowledge(
agent_id="error_agent",
content="Failed to process input - need better validation",
confidence=0.3,
domain="error_analysis",
tags=["error", "processing", "validation"]
)
assert result is True
# Verify error was stored as knowledge for learning
knowledge = central_post.retrieve_relevant_knowledge(domain="error_analysis")
assert len(knowledge) > 0
assert "Failed to process input" in knowledge[0].content["result"]
def test_cross_run_persistence(self, temp_db_path):
"""Test that knowledge persists across Central Post instances."""
# Create first instance and store knowledge
central_post1 = CentralPost(
enable_memory=True,
memory_db_path=temp_db_path
)
central_post1.store_agent_result_as_knowledge(
agent_id="agent1",
content="Persistent knowledge test",
confidence=0.9,
domain="persistence_test"
)
# Create second instance with same database
central_post2 = CentralPost(
enable_memory=True,
memory_db_path=temp_db_path
)
# Verify knowledge persists
knowledge = central_post2.retrieve_relevant_knowledge(
domain="persistence_test"
)
assert len(knowledge) > 0
assert "Persistent knowledge test" in knowledge[0].content["result"]
def test_knowledge_filtering_and_retrieval(self, central_post):
"""Test advanced knowledge filtering and retrieval."""
# Store different types of knowledge
central_post.store_agent_result_as_knowledge(
agent_id="agent1",
content="High confidence result",
confidence=0.95,
domain="test",
tags=["high_quality"]
)
central_post.store_agent_result_as_knowledge(
agent_id="agent2",
content="Medium confidence result",
confidence=0.6,
domain="test",
tags=["medium_quality"]
)
# Test confidence-based filtering
high_conf_knowledge = central_post.retrieve_relevant_knowledge(
domain="test",
min_confidence=ConfidenceLevel.HIGH
)
assert len(high_conf_knowledge) == 1
assert "High confidence result" in high_conf_knowledge[0].content["result"]
# Test tag-based filtering by retrieving all and checking tags
all_test_knowledge = central_post.retrieve_relevant_knowledge(domain="test")
high_quality_knowledge = [k for k in all_test_knowledge if "high_quality" in (k.tags or [])]
assert len(high_quality_knowledge) == 1
assert "High confidence result" in high_quality_knowledge[0].content["result"]
|