File size: 33,206 Bytes
dbb04e4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 | """
LLM Integration for HAIM
Multi-provider LLM support: OpenAI, OpenRouter, Anthropic, Google Gemini, and Local AI models
"""
import json
import os
from datetime import datetime
from typing import List, Dict, Optional, Tuple, Any, Callable
from dataclasses import dataclass, field
from enum import Enum
from loguru import logger
from mnemocore.core.engine import HAIMEngine
from mnemocore.core.node import MemoryNode
from mnemocore.core.exceptions import (
UnsupportedProviderError,
AgentNotFoundError,
)
class LLMProvider(Enum):
"""Supported LLM providers"""
OPENAI = "openai"
OPENROUTER = "openrouter"
ANTHROPIC = "anthropic"
GOOGLE_GEMINI = "google_gemini"
OLLAMA = "ollama"
LM_STUDIO = "lm_studio"
CUSTOM = "custom"
MOCK = "mock"
@dataclass
class LLMConfig:
"""Configuration for LLM provider"""
provider: LLMProvider = LLMProvider.MOCK
model: str = "gpt-4"
api_key: Optional[str] = None
base_url: Optional[str] = None
max_tokens: int = 1024
temperature: float = 0.7
extra_headers: Dict[str, str] = field(default_factory=dict)
extra_params: Dict[str, Any] = field(default_factory=dict)
# Provider-specific defaults
@classmethod
def openai(cls, model: str = "gpt-4", api_key: Optional[str] = None, **kwargs) -> 'LLMConfig':
return cls(provider=LLMProvider.OPENAI, model=model, api_key=api_key, **kwargs)
@classmethod
def openrouter(cls, model: str = "anthropic/claude-3.5-sonnet", api_key: Optional[str] = None, **kwargs) -> 'LLMConfig':
return cls(
provider=LLMProvider.OPENROUTER,
model=model,
api_key=api_key,
base_url="https://openrouter.ai/api/v1",
extra_headers={"HTTP-Referer": "https://mnemocore.ai", "X-Title": "MnemoCore"},
**kwargs
)
@classmethod
def anthropic(cls, model: str = "claude-3-5-sonnet-20241022", api_key: Optional[str] = None, **kwargs) -> 'LLMConfig':
return cls(provider=LLMProvider.ANTHROPIC, model=model, api_key=api_key, **kwargs)
@classmethod
def google_gemini(cls, model: str = "gemini-1.5-pro", api_key: Optional[str] = None, **kwargs) -> 'LLMConfig':
return cls(provider=LLMProvider.GOOGLE_GEMINI, model=model, api_key=api_key, **kwargs)
@classmethod
def ollama(cls, model: str = "llama3.1", base_url: str = "http://localhost:11434", **kwargs) -> 'LLMConfig':
return cls(provider=LLMProvider.OLLAMA, model=model, base_url=base_url, **kwargs)
@classmethod
def lm_studio(cls, model: str = "local-model", base_url: str = "http://localhost:1234/v1", **kwargs) -> 'LLMConfig':
return cls(provider=LLMProvider.LM_STUDIO, model=model, base_url=base_url, **kwargs)
@classmethod
def custom(cls, model: str, base_url: str, api_key: Optional[str] = None, **kwargs) -> 'LLMConfig':
return cls(provider=LLMProvider.CUSTOM, model=model, base_url=base_url, api_key=api_key, **kwargs)
@classmethod
def mock(cls, **kwargs) -> 'LLMConfig':
return cls(provider=LLMProvider.MOCK, **kwargs)
class LLMClientFactory:
"""Factory for creating LLM clients"""
@staticmethod
def create_client(config: LLMConfig) -> Any:
"""Create an LLM client based on configuration"""
provider = config.provider
if provider == LLMProvider.MOCK:
return None
if provider == LLMProvider.OPENAI:
return LLMClientFactory._create_openai_client(config)
if provider == LLMProvider.OPENROUTER:
return LLMClientFactory._create_openrouter_client(config)
if provider == LLMProvider.ANTHROPIC:
return LLMClientFactory._create_anthropic_client(config)
if provider == LLMProvider.GOOGLE_GEMINI:
return LLMClientFactory._create_gemini_client(config)
if provider == LLMProvider.OLLAMA:
return LLMClientFactory._create_ollama_client(config)
if provider == LLMProvider.LM_STUDIO:
return LLMClientFactory._create_lm_studio_client(config)
if provider == LLMProvider.CUSTOM:
return LLMClientFactory._create_custom_client(config)
supported = [p.value for p in LLMProvider]
raise UnsupportedProviderError(str(provider.value), supported_providers=supported)
@staticmethod
def _create_openai_client(config: LLMConfig) -> Any:
"""Create OpenAI client"""
try:
from openai import OpenAI
api_key = config.api_key or os.environ.get("OPENAI_API_KEY")
return OpenAI(api_key=api_key)
except ImportError:
logger.warning("openai package not installed. Install with: pip install openai")
return None
@staticmethod
def _create_openrouter_client(config: LLMConfig) -> Any:
"""Create OpenRouter client (OpenAI-compatible)"""
try:
from openai import OpenAI
api_key = config.api_key or os.environ.get("OPENROUTER_API_KEY")
return OpenAI(
base_url=config.base_url,
api_key=api_key,
default_headers=config.extra_headers
)
except ImportError:
logger.warning("openai package not installed. Install with: pip install openai")
return None
@staticmethod
def _create_anthropic_client(config: LLMConfig) -> Any:
"""Create Anthropic client"""
try:
import anthropic
api_key = config.api_key or os.environ.get("ANTHROPIC_API_KEY")
return anthropic.Anthropic(api_key=api_key)
except ImportError:
logger.warning("anthropic package not installed. Install with: pip install anthropic")
return None
@staticmethod
def _create_gemini_client(config: LLMConfig) -> Any:
"""Create Google Gemini client"""
try:
import google.generativeai as genai
api_key = config.api_key or os.environ.get("GOOGLE_API_KEY")
genai.configure(api_key=api_key)
return genai.GenerativeModel(config.model)
except ImportError:
logger.warning("google-generativeai package not installed. Install with: pip install google-generativeai")
return None
@staticmethod
def _create_ollama_client(config: LLMConfig) -> Any:
"""Create Ollama client for local models"""
try:
from openai import OpenAI
return OpenAI(base_url=config.base_url, api_key="ollama")
except ImportError:
# Fallback to direct HTTP calls
return OllamaClient(base_url=config.base_url, model=config.model)
@staticmethod
def _create_lm_studio_client(config: LLMConfig) -> Any:
"""Create LM Studio client (OpenAI-compatible)"""
try:
from openai import OpenAI
return OpenAI(base_url=config.base_url, api_key="lm-studio")
except ImportError:
logger.warning("openai package not installed. Install with: pip install openai")
return None
@staticmethod
def _create_custom_client(config: LLMConfig) -> Any:
"""Create custom OpenAI-compatible client"""
try:
from openai import OpenAI
return OpenAI(
base_url=config.base_url,
api_key=config.api_key or "custom"
)
except ImportError:
logger.warning("openai package not installed. Install with: pip install openai")
return None
class OllamaClient:
"""Fallback Ollama client using direct HTTP calls"""
def __init__(self, base_url: str = "http://localhost:11434", model: str = "llama3.1"):
self.base_url = base_url.rstrip("/")
self.model = model
def generate(self, prompt: str, max_tokens: int = 1024) -> str:
"""Generate response using Ollama API"""
import urllib.request
import urllib.error
url = f"{self.base_url}/api/generate"
data = {
"model": self.model,
"prompt": prompt,
"stream": False,
"options": {"num_predict": max_tokens}
}
try:
req = urllib.request.Request(
url,
data=json.dumps(data).encode("utf-8"),
headers={"Content-Type": "application/json"}
)
with urllib.request.urlopen(req, timeout=120) as response:
result = json.loads(response.read().decode("utf-8"))
return result.get("response", "")
except urllib.error.URLError as e:
return f"[Ollama Error: {str(e)}]"
class HAIMLLMIntegrator:
"""Bridge between HAIM holographic memory and LLM reasoning"""
def __init__(
self,
haim_engine: HAIMEngine,
llm_client=None,
llm_config: Optional[LLMConfig] = None
):
self.haim = haim_engine
# Support both legacy client and new config-based approach
if llm_config:
self.config = llm_config
self.llm_client = llm_client or LLMClientFactory.create_client(llm_config)
elif llm_client:
self.llm_client = llm_client
self.config = LLMConfig.mock()
else:
self.llm_client = None
self.config = LLMConfig.mock()
@classmethod
def from_config(cls, haim_engine: HAIMEngine, config: LLMConfig) -> 'HAIMLLMIntegrator':
"""Create integrator from LLM configuration"""
client = LLMClientFactory.create_client(config)
return cls(haim_engine=haim_engine, llm_client=client, llm_config=config)
def _call_llm(self, prompt: str, max_tokens: int = None) -> str:
"""
Call the LLM with the given prompt.
Supports multiple providers: OpenAI, OpenRouter, Anthropic, Gemini, Ollama, LM Studio
"""
max_tokens = max_tokens or self.config.max_tokens
if self.config.provider == LLMProvider.MOCK or self.llm_client is None:
return self._mock_llm_response(prompt)
try:
provider = self.config.provider
# OpenAI / OpenRouter / LM Studio (all use OpenAI SDK)
if provider in (LLMProvider.OPENAI, LLMProvider.OPENROUTER, LLMProvider.LM_STUDIO, LLMProvider.CUSTOM):
return self._call_openai_compatible(prompt, max_tokens)
# Anthropic
if provider == LLMProvider.ANTHROPIC:
return self._call_anthropic(prompt, max_tokens)
# Google Gemini
if provider == LLMProvider.GOOGLE_GEMINI:
return self._call_gemini(prompt, max_tokens)
# Ollama
if provider == LLMProvider.OLLAMA:
return self._call_ollama(prompt, max_tokens)
# Fallback: try to detect client type
return self._call_generic(prompt, max_tokens)
except Exception as e:
logger.error(f"LLM call failed: {e}")
return f"[LLM Error: {str(e)}]"
def _call_openai_compatible(self, prompt: str, max_tokens: int) -> str:
"""Call OpenAI-compatible API (OpenAI, OpenRouter, LM Studio)"""
response = self.llm_client.chat.completions.create(
model=self.config.model,
messages=[{"role": "user", "content": prompt}],
max_tokens=max_tokens,
temperature=self.config.temperature,
**self.config.extra_params
)
return response.choices[0].message.content
def _call_anthropic(self, prompt: str, max_tokens: int) -> str:
"""Call Anthropic Claude API"""
response = self.llm_client.messages.create(
model=self.config.model,
max_tokens=max_tokens,
temperature=self.config.temperature,
messages=[{"role": "user", "content": prompt}],
**self.config.extra_params
)
return response.content[0].text
def _call_gemini(self, prompt: str, max_tokens: int) -> str:
"""Call Google Gemini API"""
generation_config = {
"max_output_tokens": max_tokens,
"temperature": self.config.temperature,
**self.config.extra_params
}
response = self.llm_client.generate_content(
prompt,
generation_config=generation_config
)
return response.text
def _call_ollama(self, prompt: str, max_tokens: int) -> str:
"""Call Ollama local model"""
if hasattr(self.llm_client, 'generate'):
# Using our fallback OllamaClient
return self.llm_client.generate(prompt, max_tokens)
else:
# Using OpenAI SDK with Ollama
return self._call_openai_compatible(prompt, max_tokens)
def _call_generic(self, prompt: str, max_tokens: int) -> str:
"""Generic fallback that tries to detect and use the client"""
client = self.llm_client
# OpenAI-style
if hasattr(client, 'chat') and hasattr(client.chat, 'completions'):
return self._call_openai_compatible(prompt, max_tokens)
# Anthropic-style
if hasattr(client, 'messages') and hasattr(client.messages, 'create'):
return self._call_anthropic(prompt, max_tokens)
# Simple callable
if callable(client):
return client(prompt)
# Generate method
if hasattr(client, 'generate'):
return client.generate(prompt, max_tokens=max_tokens)
return self._mock_llm_response(prompt)
def _mock_llm_response(self, prompt: str) -> str:
"""Generate a mock LLM response when no client is available."""
if "Reconstruct" in prompt or "reconstruct" in prompt:
return "[MOCK RECONSTRUCTION] Based on the retrieved memory fragments, I can synthesize the following: The information appears to be related to the query context. However, please configure an LLM client for actual reconstructive reasoning."
elif "Evaluate" in prompt or "hypothesis" in prompt.lower():
return "[MOCK EVALUATION] Based on memory analysis: Hypothesis 1 appears most supported (confidence: 70%). Please configure an LLM client for actual hypothesis evaluation."
return "[MOCK RESPONSE] Please configure an LLM client for actual responses."
def reconstructive_recall(
self,
cue: str,
top_memories: int = 5,
enable_reasoning: bool = True
) -> Dict:
"""
Reconstruct memory from partial cue
Similar to human recall - you remember fragments, brain reconstructs whole
"""
# Query HAIM for related memories
results = self.haim.query(cue, top_k=top_memories)
# Extract memory content
memory_fragments = []
for node_id, similarity in results:
node = self.haim.tier_manager.get_memory(node_id)
if node:
memory_fragments.append({
"content": node.content,
"metadata": node.metadata,
"similarity": similarity
})
if not enable_reasoning:
return {
"cue": cue,
"fragments": memory_fragments,
"reconstruction": "LLM reasoning disabled"
}
# Use LLM to reconstruct from fragments
reconstruction_prompt = self._build_reconstruction_prompt(
cue=cue,
fragments=memory_fragments
)
# Call LLM for reconstruction
reconstruction = self._call_llm(reconstruction_prompt)
return {
"cue": cue,
"fragments": memory_fragments,
"reconstruction": reconstruction
}
def _build_reconstruction_prompt(
self,
cue: str,
fragments: List[Dict]
) -> str:
"""Build prompt for LLM reconstructive recall"""
prompt = f"""You are an AI with holographic memory. A user asks a question, and you have retrieved partial memory fragments from your holographic memory.
User's Question: "{cue}"
Memory Fragments (retrieved by holographic similarity):
"""
for i, frag in enumerate(fragments, 1):
prompt += f"\nFragment {i} (similarity: {frag['similarity']:.3f}):\n{frag['content']}\n"
prompt += """
Task: Reconstruct a complete, coherent answer from these fragments.
- Combine fragments intelligently
- Fill in gaps using reasoning
- If fragments conflict, use highest-similarity fragment as primary
- Maintain factual accuracy
- Don't hallucinate information not supported by fragments
Reconstruction:"""
return prompt
def multi_hypothesis_query(
self,
query: str,
hypotheses: List[str]
) -> Dict:
"""
Query with multiple active hypotheses (superposition)
Returns LLM evaluation of which hypothesis is most likely
"""
# Query memories using superposition of hypotheses
results = self._superposition_query(query, hypotheses, top_k=10)
# Extract relevant memories
relevant_memories = []
for node_id, similarity in results:
node = self.haim.tier_manager.get_memory(node_id)
if node:
relevant_memories.append({
"content": node.content,
"similarity": similarity
})
# Build evaluation prompt
evaluation_prompt = self._build_hypothesis_evaluation_prompt(
query=query,
hypotheses=hypotheses,
relevant_memories=relevant_memories
)
# Call LLM for evaluation
evaluation = self._call_llm(evaluation_prompt)
return {
"query": query,
"hypotheses": hypotheses,
"relevant_memories": relevant_memories,
"evaluation": evaluation
}
def _superposition_query(
self,
query: str,
hypotheses: List[str],
top_k: int = 10
) -> List[Tuple[str, float]]:
"""
Perform a superposition query by combining query and hypotheses.
Uses HDV bundling to create a superposition vector for retrieval.
"""
# Encode each hypothesis and the main query
query_vec = self.haim.encode_content(query)
# Create superposition by bundling all hypothesis vectors with the query
hypothesis_vectors = [self.haim.encode_content(h) for h in hypotheses]
# Bundle all vectors together (superposition)
from mnemocore.core.binary_hdv import majority_bundle
all_vectors = [query_vec] + hypothesis_vectors
superposition_vec = majority_bundle(all_vectors)
# Query each hypothesis individually and merge results
all_results: Dict[str, float] = {}
# Primary query
primary_results = self.haim.query(query, top_k=top_k)
for node_id, sim in primary_results:
all_results[node_id] = sim
# Query each hypothesis and accumulate scores
for hypothesis in hypotheses:
hyp_results = self.haim.query(hypothesis, top_k=top_k // 2)
for node_id, sim in hyp_results:
if node_id in all_results:
# Boost score for memories relevant to multiple hypotheses
all_results[node_id] = max(all_results[node_id], sim * 0.8)
else:
all_results[node_id] = sim * 0.6
# Sort by score and return top_k
sorted_results = sorted(all_results.items(), key=lambda x: x[1], reverse=True)
return sorted_results[:top_k]
def _build_hypothesis_evaluation_prompt(
self,
query: str,
hypotheses: List[str],
relevant_memories: List[Dict]
) -> str:
"""Build prompt for multi-hypothesis evaluation"""
prompt = f"""You are an AI with holographic memory. You have multiple hypotheses about a question, and you've retrieved relevant memories to evaluate them.
Query: "{query}"
Hypotheses:
"""
for i, hyp in enumerate(hypotheses, 1):
prompt += f"\nHypothesis {i}: {hyp}"
prompt += "\n\nRelevant Memories:\n"
for i, mem in enumerate(relevant_memories, 1):
prompt += f"\nMemory {i} (similarity: {mem['similarity']:.3f}):\n{mem['content']}\n"
prompt += """
Task: Evaluate which hypothesis is most supported by the retrieved memories.
- Consider all memories
- Rank hypotheses by support from memory
- Explain your reasoning
- Provide confidence score (0-100%) for each hypothesis
Evaluation:"""
return prompt
def consolidate_memory(
self,
node_id: str,
new_context: str,
success: bool = True
):
"""
Reconsolidate memory with new context
Similar to how human memories are rewritten when recalled
"""
node = self.haim.tier_manager.get_memory(node_id)
if not node:
return
# Access triggers reconsolidation
node.access()
# Update content with new context (simplified)
# In production: use LLM to intelligently merge
node.content = f"{node.content}\n\n[RECONSOLIDATED]: {new_context}"
# Strengthen synaptic connections if consolidation was successful
if success:
# Find related concepts and strengthen
# (This requires concept extraction - simplified for now)
pass
class MultiAgentHAIM:
"""
Multi-agent system with shared HAIM memory
Demonstrates "collective consciousness"
"""
def __init__(self, num_agents: int = 3):
self.agents = {} # agent_id -> HAIMEngine
self.shared_memory = HAIMEngine(dimension=10000)
# Initialize agents with shared memory
for i in range(num_agents):
agent_id = f"agent_{i}"
self.agents[agent_id] = {
"haim": self.shared_memory, # All share same memory
"role": self._get_agent_role(agent_id)
}
def _get_agent_role(self, agent_id: str) -> str:
"""Define agent roles"""
roles = {
"agent_0": "Research Agent",
"agent_1": "Coding Agent",
"agent_2": "Writing Agent"
}
return roles.get(agent_id, "General Agent")
def agent_learn(
self,
agent_id: str,
content: str,
metadata: dict = None
) -> str:
"""
Agent stores memory in shared HAIM
All agents can access this memory
"""
if agent_id not in self.agents:
raise AgentNotFoundError(agent_id)
# Store in shared memory
node_id = self.shared_memory.store(content, metadata)
# Update metadata with agent info
node = self.shared_memory.tier_manager.get_memory(node_id)
if node:
node.metadata = node.metadata or {}
node.metadata["learned_by"] = agent_id
node.metadata["agent_role"] = self.agents[agent_id]["role"]
node.metadata["timestamp"] = datetime.now().isoformat()
return node_id
def agent_recall(
self,
agent_id: str,
query: str,
top_k: int = 5
) -> List[Dict]:
"""
Agent recalls memory from shared HAIM
Can access memories learned by ANY agent
"""
if agent_id not in self.agents:
raise AgentNotFoundError(agent_id)
# Query shared memory
results = self.shared_memory.query(query, top_k=top_k)
# Enrich with agent context
enriched = []
for node_id, similarity in results:
node = self.shared_memory.tier_manager.get_memory(node_id)
if node:
enriched.append({
"node_id": node_id,
"content": node.content,
"similarity": similarity,
"metadata": node.metadata,
"learned_by": node.metadata.get("learned_by", "unknown"),
"agent_role": node.metadata.get("agent_role", "unknown")
})
return enriched
def cross_agent_learning(
self,
concept_a: str,
concept_b: str,
agent_id: str,
success: bool = True
):
"""
Strengthen connection between concepts across agents
When ANY agent fires this connection, ALL agents benefit
"""
if agent_id not in self.agents:
raise AgentNotFoundError(agent_id)
# Map concepts to memory IDs using holographic similarity
mem_id_a = self._concept_to_memory_id(concept_a)
mem_id_b = self._concept_to_memory_id(concept_b)
if mem_id_a and mem_id_b:
# Schedule binding in the background
self._schedule_async_task(
self.shared_memory.bind_memories(mem_id_a, mem_id_b, success=success)
)
def _concept_to_memory_id(self, concept: str, min_similarity: float = 0.3) -> Optional[str]:
"""
Map a concept string to the best matching memory ID.
Uses holographic similarity to find the most relevant stored memory.
Returns the memory ID if found with sufficient similarity, else None.
"""
# Use synchronous encoding and search via tier manager for direct access
query_vec = self.shared_memory.encode_content(concept)
# Search in hot tier first (most recent/active memories)
best_match_id = None
best_similarity = 0.0
# Check HOT tier
for node_id, node in self.shared_memory.tier_manager.hot.items():
sim = query_vec.similarity(node.hdv)
if sim > best_similarity:
best_similarity = sim
best_match_id = node_id
if best_similarity >= min_similarity:
return best_match_id
return None
def _schedule_async_task(self, coro):
"""Schedule an async coroutine to run, handling the event loop appropriately."""
import asyncio
try:
loop = asyncio.get_running_loop()
# We're in an async context, create a task
loop.create_task(coro)
except RuntimeError:
# No running loop, run synchronously (for demo/testing purposes)
try:
asyncio.run(coro)
except Exception:
pass # Silently fail in demo mode
async def collective_orch_or(
self,
agent_id: str,
query: str,
max_collapse: int = 3
) -> List[Dict]:
"""
Agent performs Orch OR on shared memories
Collapses superposition based on collective free energy
"""
if agent_id not in self.agents:
raise AgentNotFoundError(agent_id)
collapsed = await self.shared_memory.orchestrate_orch_or(max_collapse=max_collapse)
# Enrich with agent context
result = []
for node in collapsed:
result.append({
"content": node.content,
"free_energy_score": getattr(node, 'epistemic_value', 0.0),
"metadata": node.metadata,
"collapsed_by": agent_id,
"agent_role": self.agents[agent_id]["role"]
})
return result
def demonstrate_collective_consciousness(self) -> Dict:
"""
Demonstrate cross-agent learning
Shows that when Agent A learns, Agent B knows
"""
# Agent 0 (Research) learns something
mem_0 = self.agent_learn(
agent_id="agent_0",
content="MnemoCore Market Integrity Engine uses three signal groups: SURGE, FLOW, PATTERN",
metadata={"category": "research", "importance": "high"}
)
# Agent 1 (Coding) learns something
mem_1 = self.agent_learn(
agent_id="agent_1",
content="HAIM uses hyperdimensional vectors with 10,000 dimensions",
metadata={"category": "coding", "importance": "high"}
)
# Agent 2 (Writing) recalls BOTH memories
recall_0 = self.agent_recall(
agent_id="agent_2",
query="MnemoCore Engine",
top_k=1
)
recall_1 = self.agent_recall(
agent_id="agent_2",
query="HAIM dimensions",
top_k=1
)
# Cross-agent learning: strengthen connection
self.cross_agent_learning(
concept_a="MnemoCore Engine",
concept_b="HAIM dimensions",
agent_id="agent_2",
success=True
)
return {
"demonstration": "Collective Consciousness Demo",
"agent_0_learned": mem_0,
"agent_1_learned": mem_1,
"agent_2_recalled_omega": recall_0,
"agent_2_recalled_haim": recall_1,
"cross_agent_connection": "Strengthened between Omega Engine and HAIM dimensions"
}
class RLMIntegrator:
"""
Phase 4.5: RLM (Recursive Language Models) Integrator.
Bridges HAIMLLMIntegrator with the RecursiveSynthesizer to provide
LLM-powered recursive memory queries.
Usage::
integrator = RLMIntegrator(llm_integrator)
result = await integrator.rlm_query(
"What do we know about X and how does it relate to Y?"
)
print(result["synthesis"])
Without an LLM configured, falls back to heuristic decomposition
and score-based synthesis.
"""
def __init__(self, llm_integrator, config=None):
from mnemocore.core.recursive_synthesizer import RecursiveSynthesizer, SynthesizerConfig
self.llm_integrator = llm_integrator
self.haim = llm_integrator.haim
llm_call = None
if llm_integrator.llm_client is not None:
llm_call = llm_integrator._call_llm
synth_config = config or SynthesizerConfig()
self.synthesizer = RecursiveSynthesizer(
engine=self.haim,
config=synth_config,
llm_call=llm_call,
)
async def rlm_query(self, query, context_text=None, project_id=None):
"""
Execute a Phase 4.5 recursive memory query.
Args:
query: The user question (can be complex/multi-topic).
context_text: Optional large external text (Ripple environment).
project_id: Optional project scope for isolation masking.
Returns:
Dict: query, sub_queries, results, synthesis,
max_depth_hit, elapsed_ms, ripple_snippets, stats
"""
from mnemocore.core.ripple_context import RippleContext
ripple_ctx = None
if context_text and context_text.strip():
ripple_ctx = RippleContext(text=context_text, source_label="api_context")
result = await self.synthesizer.synthesize(
query=query,
ripple_context=ripple_ctx,
project_id=project_id,
)
return {
"query": result.query,
"sub_queries": result.sub_queries,
"results": result.results,
"synthesis": result.synthesis,
"max_depth_hit": result.max_depth_hit,
"elapsed_ms": result.total_elapsed_ms,
"ripple_snippets": result.ripple_snippets,
"stats": result.stats,
}
@classmethod
def from_config(cls, haim_engine, llm_config, synth_config=None):
"""Create an RLMIntegrator directly from an LLMConfig."""
llm_integrator = HAIMLLMIntegrator.from_config(haim_engine, llm_config)
return cls(llm_integrator=llm_integrator, config=synth_config)
def create_demo():
"""Create HAIM demo with multi-agent system"""
print("Creating HAIM Multi-Agent Demo...")
# Create multi-agent system
multi_agent_haim = MultiAgentHAIM(num_agents=3)
# Demonstrate collective consciousness
result = multi_agent_haim.demonstrate_collective_consciousness()
print("\n=== DEMO RESULT ===")
print(json.dumps(result, indent=2))
return result
if __name__ == "__main__":
create_demo()
|