| """
|
| Holographic Active Inference Memory Engine (HAIM) - Phase 4.3+
|
| Uses Binary HDV for efficient storage and computation.
|
| """
|
|
|
| from typing import List, Tuple, Dict, Optional, Any, TYPE_CHECKING, Deque
|
|
|
| if TYPE_CHECKING:
|
| from .container import Container
|
| from .qdrant_store import QdrantStore
|
| import heapq
|
| from collections import deque
|
| from itertools import islice
|
| import numpy as np
|
| import hashlib
|
| import os
|
| import json
|
| import asyncio
|
| import functools
|
| import uuid
|
| import re
|
| from datetime import datetime, timezone
|
| from loguru import logger
|
|
|
| from .config import get_config, HAIMConfig, SubconsciousAIConfig
|
| from .binary_hdv import BinaryHDV, TextEncoder, majority_bundle
|
| from .node import MemoryNode
|
| from .synapse import SynapticConnection
|
| from .holographic import ConceptualMemory
|
| from .tier_manager import TierManager
|
|
|
|
|
| from .attention import XORAttentionMasker, AttentionConfig, XORIsolationMask, IsolationConfig
|
| from .bayesian_ltp import get_bayesian_updater
|
| from .semantic_consolidation import SemanticConsolidationWorker, SemanticConsolidationConfig
|
| from .immunology import ImmunologyLoop, ImmunologyConfig
|
| from .gap_detector import GapDetector, GapDetectorConfig
|
| from .gap_filler import GapFiller, GapFillerConfig
|
| from .synapse_index import SynapseIndex
|
| from .subconscious_ai import SubconsciousAIWorker
|
|
|
|
|
| from .working_memory import WorkingMemoryService
|
| from .episodic_store import EpisodicStoreService
|
| from .semantic_store import SemanticStoreService
|
|
|
|
|
| from .recursive_synthesizer import RecursiveSynthesizer, SynthesizerConfig
|
|
|
|
|
| from .metrics import (
|
| timer, traced, get_trace_id, set_trace_id,
|
| STORE_DURATION_SECONDS, QUERY_DURATION_SECONDS,
|
| MEMORY_COUNT_TOTAL, QUEUE_LENGTH, ERROR_TOTAL,
|
| update_memory_count, update_queue_length, record_error
|
| )
|
|
|
|
|
| class HAIMEngine:
|
| """
|
| Holographic Active Inference Memory Engine (Phase 4.3+)
|
| Uses Binary HDV and Tiered Storage for efficient cognitive memory.
|
| """
|
|
|
| @staticmethod
|
| @functools.lru_cache(maxsize=10000)
|
| def _get_token_vector(token: str, dimension: int) -> np.ndarray:
|
| """Cached generation of deterministic token vectors (legacy compatibility)."""
|
| seed_bytes = hashlib.shake_256(token.encode()).digest(4)
|
| seed = int.from_bytes(seed_bytes, 'little')
|
| return np.random.RandomState(seed).choice([-1, 1], size=dimension)
|
|
|
| def __init__(
|
| self,
|
| dimension: int = 16384,
|
| persist_path: Optional[str] = None,
|
| config: Optional[HAIMConfig] = None,
|
| tier_manager: Optional[TierManager] = None,
|
| working_memory: Optional[WorkingMemoryService] = None,
|
| episodic_store: Optional[EpisodicStoreService] = None,
|
| semantic_store: Optional[SemanticStoreService] = None,
|
| ):
|
| """
|
| Initialize HAIMEngine with optional dependency injection.
|
|
|
| Args:
|
| dimension: Vector dimensionality (default 16384).
|
| persist_path: Path to memory persistence file.
|
| config: Configuration object. If None, uses global get_config().
|
| tier_manager: TierManager instance. If None, creates a new one.
|
| working_memory: Optional Phase 5 WM service.
|
| episodic_store: Optional Phase 5 EM service.
|
| semantic_store: Optional Phase 5 Semantic service.
|
| """
|
| self.config = config or get_config()
|
| self.dimension = self.config.dimensionality
|
|
|
|
|
| self._initialized: bool = False
|
|
|
|
|
| self.tier_manager = tier_manager or TierManager(config=self.config)
|
|
|
|
|
| self.working_memory = working_memory
|
| self.episodic_store = episodic_store
|
| self.semantic_store = semantic_store
|
| self.binary_encoder = TextEncoder(self.dimension)
|
|
|
|
|
| self.synapses: Dict[Tuple[str, str], SynapticConnection] = {}
|
| self.synapse_adjacency: Dict[str, List[SynapticConnection]] = {}
|
|
|
| self.synapse_lock: asyncio.Lock = asyncio.Lock()
|
|
|
| self._write_lock: asyncio.Lock = asyncio.Lock()
|
|
|
| self._store_lock: asyncio.Lock = asyncio.Lock()
|
|
|
| self._dream_sem: asyncio.Semaphore = asyncio.Semaphore(1)
|
|
|
|
|
| self._synapse_index = SynapseIndex()
|
|
|
|
|
| self.attention_masker = XORAttentionMasker(AttentionConfig())
|
|
|
|
|
| isolation_enabled = getattr(self.config, 'attention_masking', None)
|
| isolation_enabled = isolation_enabled.enabled if isolation_enabled else True
|
| self.isolation_masker = XORIsolationMask(IsolationConfig(
|
| enabled=isolation_enabled,
|
| dimension=self.dimension,
|
| ))
|
|
|
|
|
| self.gap_detector = GapDetector(GapDetectorConfig())
|
| self._gap_filler: Optional[GapFiller] = None
|
|
|
|
|
| self._semantic_worker: Optional[SemanticConsolidationWorker] = None
|
|
|
|
|
| self._immunology: Optional[ImmunologyLoop] = None
|
|
|
|
|
| self._subconscious_ai: Optional[SubconsciousAIWorker] = None
|
|
|
|
|
| self._recursive_synthesizer: Optional[RecursiveSynthesizer] = None
|
|
|
|
|
| from .topic_tracker import TopicTracker
|
| self.topic_tracker = TopicTracker(self.config.context, self.dimension)
|
|
|
|
|
| from .preference_store import PreferenceStore
|
| self.preference_store = PreferenceStore(self.config.preference, self.dimension)
|
|
|
|
|
| from .anticipatory import AnticipatoryEngine
|
| self.anticipatory_engine = AnticipatoryEngine(
|
| self.config.anticipatory,
|
| self._synapse_index,
|
| self.tier_manager,
|
| self.topic_tracker
|
| )
|
|
|
|
|
| data_dir = self.config.paths.data_dir
|
| self.soul = ConceptualMemory(dimension=self.dimension, storage_dir=data_dir)
|
|
|
|
|
| self.persist_path = persist_path or self.config.paths.memory_file
|
| self.synapse_path = self.config.paths.synapses_file
|
|
|
|
|
| queue_maxlen = self.config.dream_loop.subconscious_queue_maxlen
|
| self.subconscious_queue: Deque[str] = deque(maxlen=queue_maxlen)
|
| self._last_stored_id: Optional[str] = None
|
|
|
|
|
| self.epistemic_drive_active = True
|
| self.surprise_threshold = 0.7
|
|
|
| async def initialize(self):
|
| """Async initialization."""
|
| if self._initialized:
|
| return
|
|
|
| await self.tier_manager.initialize()
|
| await self._load_legacy_if_needed()
|
| await self._load_synapses()
|
| self._initialized = True
|
|
|
|
|
| self._semantic_worker = SemanticConsolidationWorker(self)
|
| await self._semantic_worker.start()
|
|
|
| self._immunology = ImmunologyLoop(self)
|
| await self._immunology.start()
|
|
|
|
|
| if self.config.subconscious_ai.enabled:
|
| self._subconscious_ai = SubconsciousAIWorker(self, self.config.subconscious_ai)
|
| await self._subconscious_ai.start()
|
| logger.info("Phase 4.4 SubconsciousAI worker started (BETA).")
|
|
|
| logger.info("Phase 4.0 background workers started (consolidation + immunology).")
|
|
|
| async def _run_in_thread(self, func, *args, **kwargs):
|
| """Run blocking function in thread pool."""
|
| loop = asyncio.get_running_loop()
|
| return await loop.run_in_executor(None, functools.partial(func, *args, **kwargs))
|
|
|
| def calculate_eig(self, candidate: BinaryHDV, context: BinaryHDV) -> float:
|
| """
|
| Calculate Expected Information Gain (EIG).
|
| Proportional to novelty (distance) against the context.
|
|
|
| Returns value in [0.0, 1.0] where:
|
| - 0.0 = candidate is identical to context (no new information)
|
| - 1.0 = candidate is maximally different from context (max information)
|
| """
|
| return candidate.normalized_distance(context)
|
|
|
| async def _current_context_vector(self, sample_n: int = 50) -> BinaryHDV:
|
| """Superpose a slice of working memory (HOT tier) into a single context vector."""
|
| recent_nodes = await self.tier_manager.get_hot_recent(sample_n)
|
|
|
| if not recent_nodes:
|
| return BinaryHDV.zeros(self.dimension)
|
|
|
| vectors = [n.hdv for n in recent_nodes]
|
| if not vectors:
|
| return BinaryHDV.zeros(self.dimension)
|
|
|
| return majority_bundle(vectors)
|
|
|
|
|
|
|
|
|
|
|
| async def _encode_input(
|
| self,
|
| content: str,
|
| metadata: Optional[Dict[str, Any]] = None,
|
| goal_id: Optional[str] = None,
|
| ) -> Tuple[BinaryHDV, Dict[str, Any]]:
|
| """
|
| Encode input content to BinaryHDV and bind goal context if present.
|
|
|
| Args:
|
| content: The text content to encode.
|
| metadata: Optional metadata dictionary (will be mutated if goal_id present).
|
| goal_id: Optional goal identifier to bind as context.
|
|
|
| Returns:
|
| Tuple of (encoded BinaryHDV, updated metadata dict).
|
| """
|
|
|
| content_vec = await self._run_in_thread(self.binary_encoder.encode, content)
|
|
|
|
|
| if metadata is None:
|
| metadata = {}
|
|
|
| final_vec = content_vec
|
|
|
|
|
| if goal_id:
|
| goal_vec = await self._run_in_thread(
|
| self.binary_encoder.encode, f"GOAL_CONTEXT_{goal_id}"
|
| )
|
| final_vec = content_vec.xor_bind(goal_vec)
|
| metadata['goal_context'] = goal_id
|
|
|
| return final_vec, metadata
|
|
|
| async def _evaluate_tier(
|
| self,
|
| encoded_vec: BinaryHDV,
|
| metadata: Dict[str, Any],
|
| ) -> Dict[str, Any]:
|
| """
|
| Calculate epistemic valuation (EIG) and update metadata accordingly.
|
|
|
| Args:
|
| encoded_vec: The encoded BinaryHDV to evaluate.
|
| metadata: Metadata dictionary to update with EIG values.
|
|
|
| Returns:
|
| Updated metadata dictionary with EIG information.
|
| """
|
| if self.epistemic_drive_active:
|
| ctx_vec = await self._current_context_vector(sample_n=50)
|
| eig = self.calculate_eig(encoded_vec, ctx_vec)
|
| metadata["eig"] = float(eig)
|
|
|
| if eig >= self.surprise_threshold:
|
| metadata.setdefault("tags", [])
|
| if isinstance(metadata["tags"], list):
|
| metadata["tags"].append("epistemic_high")
|
| else:
|
| metadata.setdefault("eig", 0.0)
|
|
|
| return metadata
|
|
|
| async def _persist_memory(
|
| self,
|
| content: str,
|
| encoded_vec: BinaryHDV,
|
| metadata: Dict[str, Any],
|
| ) -> MemoryNode:
|
| """
|
| Create MemoryNode and persist to tier manager and disk.
|
|
|
| Phase 4.3: Automatically sets previous_id for episodic chaining.
|
|
|
| Args:
|
| content: Original text content.
|
| encoded_vec: Encoded BinaryHDV for the content.
|
| metadata: Metadata dictionary for the node.
|
|
|
| Returns:
|
| The created and persisted MemoryNode.
|
| """
|
| async with self._store_lock:
|
| previous_id = self._last_stored_id
|
|
|
|
|
| node_id = str(uuid.uuid4())
|
| node = MemoryNode(
|
| id=node_id,
|
| hdv=encoded_vec,
|
| content=content,
|
| metadata=metadata,
|
| previous_id=previous_id,
|
| )
|
|
|
|
|
| node.epistemic_value = float(metadata.get("eig", 0.0))
|
| node.calculate_ltp()
|
|
|
|
|
| await self.tier_manager.add_memory(node)
|
|
|
|
|
| await self._append_persisted(node)
|
|
|
|
|
| self._last_stored_id = node.id
|
|
|
| return node
|
|
|
| async def _trigger_post_store(
|
| self,
|
| node: MemoryNode,
|
| metadata: Dict[str, Any],
|
| ) -> None:
|
| """
|
| Execute post-store triggers: subconscious queue and background dream.
|
|
|
| Gap-filled memories must NOT re-enter the dream/gap loop to prevent
|
| an indefinite store -> dream -> detect -> fill -> store cycle.
|
|
|
| Args:
|
| node: The MemoryNode that was stored.
|
| metadata: Metadata dictionary (checked for gap fill source).
|
| """
|
| _is_gap_fill = metadata.get("source") == "llm_gap_fill"
|
|
|
|
|
|
|
| if hasattr(self.config, 'synapse') and self.config.synapse.auto_bind_on_store:
|
| similar_nodes = await self.query(
|
| node.content,
|
| top_k=3,
|
| associative_jump=False,
|
| track_gaps=False,
|
| )
|
| bind_pairs = [
|
| (node.id, neighbor_id)
|
| for neighbor_id, similarity in similar_nodes
|
| if neighbor_id != node.id
|
| and similarity >= self.config.synapse.similarity_threshold
|
| ]
|
| if bind_pairs:
|
| await self._auto_bind_batch(bind_pairs)
|
|
|
| self.subconscious_queue.append(node.id)
|
|
|
| if not _is_gap_fill:
|
| await self._background_dream(depth=1)
|
|
|
|
|
|
|
|
|
|
|
|
|
| _MAX_CONTENT_LENGTH: int = 100_000
|
|
|
| @timer(STORE_DURATION_SECONDS, labels={"tier": "hot"})
|
| @traced("store_memory")
|
| async def store(
|
| self,
|
| content: str,
|
| metadata: Optional[Dict[str, Any]] = None,
|
| goal_id: Optional[str] = None,
|
| project_id: Optional[str] = None,
|
| ) -> str:
|
| """
|
| Store new memory with holographic encoding.
|
|
|
| This method orchestrates the memory storage pipeline:
|
| 1. Validate input
|
| 2. Encode input content
|
| 3. Evaluate tier placement via EIG
|
| 4. Persist to storage
|
| 5. Trigger post-store processing
|
|
|
| Args:
|
| content: The text content to store. Must be non-empty and ≤100 000 chars.
|
| metadata: Optional metadata dictionary.
|
| goal_id: Optional goal identifier for context binding.
|
| project_id: Optional project identifier for isolation masking (Phase 4.1).
|
|
|
| Returns:
|
| The unique identifier of the stored memory node.
|
|
|
| Raises:
|
| ValueError: If content is empty or exceeds the maximum allowed length.
|
| RuntimeError: If the engine has not been initialized via initialize().
|
| """
|
|
|
| if not content or not content.strip():
|
| raise ValueError("Memory content cannot be empty or whitespace-only.")
|
| if len(content) > self._MAX_CONTENT_LENGTH:
|
| raise ValueError(
|
| f"Memory content is too long ({len(content):,} chars). "
|
| f"Maximum: {self._MAX_CONTENT_LENGTH:,}."
|
| )
|
| if not self._initialized:
|
| raise RuntimeError(
|
| "HAIMEngine.initialize() must be awaited before calling store()."
|
| )
|
|
|
|
|
| encoded_vec, updated_metadata = await self._encode_input(content, metadata, goal_id)
|
|
|
|
|
| if project_id:
|
| encoded_vec = self.isolation_masker.apply_mask(encoded_vec, project_id)
|
| updated_metadata['project_id'] = project_id
|
|
|
|
|
| updated_metadata = await self._evaluate_tier(encoded_vec, updated_metadata)
|
|
|
|
|
| node = await self._persist_memory(content, encoded_vec, updated_metadata)
|
|
|
|
|
| agent_id = updated_metadata.get("agent_id")
|
| if agent_id:
|
| if self.working_memory:
|
| from .memory_model import WorkingMemoryItem
|
| self.working_memory.push_item(
|
| agent_id,
|
| WorkingMemoryItem(
|
| id=f"wm_{node.id[:8]}",
|
| agent_id=agent_id,
|
| created_at=datetime.utcnow(),
|
| ttl_seconds=3600,
|
| content=content,
|
| kind="observation",
|
| importance=node.epistemic_value or 0.5,
|
| tags=updated_metadata.get("tags", []),
|
| hdv=encoded_vec
|
| )
|
| )
|
|
|
| episode_id = updated_metadata.get("episode_id")
|
| if episode_id and self.episodic_store:
|
| self.episodic_store.append_event(
|
| episode_id=episode_id,
|
| kind="observation",
|
| content=content,
|
| metadata=updated_metadata
|
| )
|
|
|
|
|
| await self._trigger_post_store(node, updated_metadata)
|
|
|
|
|
| update_queue_length(len(self.subconscious_queue))
|
|
|
| logger.info(f"Stored memory {node.id} (EIG: {updated_metadata.get('eig', 0.0):.4f})")
|
| return node.id
|
|
|
| async def delete_memory(self, node_id: str) -> bool:
|
| """
|
| Delete a memory from all internal states and storage tiers.
|
| Returns True if something was deleted.
|
|
|
| Phase 4.0: uses SynapseIndex.remove_node() for O(k) removal.
|
| """
|
| logger.info(f"Deleting memory {node_id}")
|
|
|
|
|
| deleted = await self.tier_manager.delete_memory(node_id)
|
|
|
|
|
| if node_id in self.subconscious_queue:
|
| self.subconscious_queue.remove(node_id)
|
|
|
|
|
|
|
| async with self.synapse_lock:
|
| removed_count = self._synapse_index.remove_node(node_id)
|
|
|
| if removed_count:
|
| await self._save_synapses()
|
|
|
| return deleted
|
|
|
| async def log_decision(self, context_text: str, outcome: float) -> None:
|
| """
|
| Phase 12.3: Logs a user decision or feedback context to update preference vector.
|
| Outcome should be positive (e.g. 1.0) or negative (e.g. -1.0).
|
| """
|
| vec = await self._run_in_thread(self.binary_encoder.encode, context_text)
|
| self.preference_store.log_decision(vec, outcome)
|
|
|
| async def close(self):
|
| """Perform graceful shutdown of engine components."""
|
| logger.info("Shutting down HAIMEngine...")
|
|
|
|
|
| if self._semantic_worker:
|
| await self._semantic_worker.stop()
|
| if self._immunology:
|
| await self._immunology.stop()
|
| if self._gap_filler:
|
| await self._gap_filler.stop()
|
| if self._subconscious_ai:
|
| await self._subconscious_ai.stop()
|
|
|
| await self._save_synapses()
|
| if self.tier_manager.use_qdrant and self.tier_manager.qdrant:
|
| await self.tier_manager.qdrant.close()
|
|
|
| @timer(QUERY_DURATION_SECONDS)
|
| @traced("query_memory")
|
| async def query(
|
| self,
|
| query_text: str,
|
| top_k: int = 5,
|
| associative_jump: bool = True,
|
| track_gaps: bool = True,
|
| project_id: Optional[str] = None,
|
| time_range: Optional[Tuple[datetime, datetime]] = None,
|
| chrono_weight: bool = True,
|
| chrono_lambda: float = 0.0001,
|
| include_neighbors: bool = False,
|
| metadata_filter: Optional[Dict[str, Any]] = None,
|
| include_cold: bool = False,
|
| ) -> List[Tuple[str, float]]:
|
| """
|
| Query memories using Hamming distance.
|
| Searches HOT tier and limited WARM tier.
|
|
|
| Phase 4.0 additions:
|
| - XOR attention masking re-ranks results for novelty.
|
| - Gap detection runs on low-confidence results (disabled when
|
| track_gaps=False to prevent dream-loop feedback).
|
|
|
| Phase 4.1 additions:
|
| - project_id applies isolation mask to query for project-scoped search.
|
|
|
| Phase 4.3 additions (Temporal Recall):
|
| - time_range: Filter to memories within (start, end) datetime range.
|
| - chrono_weight: Apply temporal decay to boost newer memories.
|
| Formula: Final_Score = Semantic_Similarity * (1 / (1 + lambda * Time_Delta))
|
| - chrono_lambda: Decay rate in seconds^-1 (default: 0.0001 ~ 2.7h half-life).
|
| - include_neighbors: Also fetch temporal neighbors (previous/next) for top results.
|
| - include_cold: Include COLD tier in the search (bounded linear scan, default False).
|
|
|
| Fix 3: Triggers anticipatory preloading (Phase 13.2) as fire-and-forget after returning.
|
| """
|
|
|
| query_vec = await self._run_in_thread(self.binary_encoder.encode, query_text)
|
|
|
|
|
| is_shift, sim = self.topic_tracker.add_query(query_vec)
|
| if is_shift:
|
| logger.info(f"Context shifted during query. (sim {sim:.3f})")
|
|
|
|
|
| if project_id:
|
| query_vec = self.isolation_masker.apply_mask(query_vec, project_id)
|
|
|
|
|
|
|
| search_results = await self.tier_manager.search(
|
| query_vec,
|
| top_k=top_k * 2,
|
| time_range=time_range,
|
| metadata_filter=metadata_filter,
|
| include_cold=include_cold,
|
| )
|
|
|
| scores: Dict[str, float] = {}
|
| now_ts = datetime.now(timezone.utc).timestamp()
|
| mem_map: Dict[str, MemoryNode] = {}
|
|
|
| if chrono_weight and search_results:
|
| mems = await self.tier_manager.get_memories_batch(
|
| [nid for nid, _ in search_results]
|
| )
|
| mem_map = {m.id: m for m in mems if m}
|
|
|
| for nid, base_sim in search_results:
|
|
|
| boost = self._synapse_index.boost(nid)
|
| score = base_sim * boost
|
|
|
|
|
| if chrono_weight and score > 0:
|
| mem = mem_map.get(nid)
|
| if mem:
|
| time_delta = max(0.0, now_ts - mem.created_at.timestamp())
|
|
|
| decay_factor = 1.0 / (1.0 + chrono_lambda * time_delta)
|
| score = score * decay_factor
|
|
|
|
|
| if self.preference_store.config.enabled and self.preference_store.preference_vector is not None:
|
| mem = mem_map.get(nid)
|
| if not mem:
|
| mem = await self.tier_manager.get_memory(nid)
|
| if mem and mem.id not in mem_map:
|
| mem_map[mem.id] = mem
|
| if mem:
|
| score = self.preference_store.bias_score(mem.hdv, score)
|
|
|
| scores[nid] = score
|
|
|
|
|
| agent_id = metadata_filter.get("agent_id") if metadata_filter else None
|
| if agent_id and self.working_memory:
|
| wm_state = self.working_memory.get_state(agent_id)
|
| if wm_state:
|
| wm_texts = [item.content for item in wm_state.items]
|
| if wm_texts:
|
|
|
| q_lower = query_text.lower()
|
| for nid in scores:
|
| mem = mem_map.get(nid)
|
| if mem and mem.content:
|
| if any(w_text.lower() in mem.content.lower() for w_text in wm_texts):
|
| scores[nid] *= 1.15
|
|
|
|
|
| if associative_jump and self._synapse_index:
|
| top_seeds = sorted(scores.items(), key=lambda x: x[1], reverse=True)[:3]
|
| augmented_scores = scores.copy()
|
|
|
| for seed_id, seed_score in top_seeds:
|
| if seed_score <= 0:
|
| continue
|
|
|
| neighbour_synapses = self._synapse_index.neighbours(seed_id)
|
|
|
| for syn in neighbour_synapses:
|
| neighbor = (
|
| syn.neuron_b_id if syn.neuron_a_id == seed_id else syn.neuron_a_id
|
| )
|
| if neighbor not in augmented_scores:
|
| mem = await self.tier_manager.get_memory(neighbor)
|
| if mem:
|
| if metadata_filter:
|
| match = True
|
| node_meta = mem.metadata or {}
|
| for k, v in metadata_filter.items():
|
| if node_meta.get(k) != v:
|
| match = False
|
| break
|
| if not match:
|
| continue
|
| augmented_scores[neighbor] = query_vec.similarity(mem.hdv)
|
|
|
| if neighbor in augmented_scores:
|
| spread = seed_score * syn.get_current_strength() * 0.3
|
| augmented_scores[neighbor] += spread
|
|
|
| scores = augmented_scores
|
|
|
|
|
| attention_mask = None
|
| top_results: List[Tuple[str, float]] = sorted(
|
| scores.items(), key=lambda x: x[1], reverse=True
|
| )[:top_k]
|
|
|
| if scores:
|
|
|
| recent_nodes = await self.tier_manager.get_hot_recent(
|
| self.attention_masker.config.context_sample_n
|
| )
|
| if recent_nodes:
|
| ctx_vecs = [n.hdv for n in recent_nodes]
|
| ctx_key = self.attention_masker.build_context_key(ctx_vecs)
|
| attention_mask = self.attention_masker.build_attention_mask(query_vec, ctx_key)
|
|
|
|
|
| mem_vecs: Dict[str, BinaryHDV] = {}
|
| async with self.tier_manager.lock:
|
| for nid in list(scores.keys()):
|
| node = self.tier_manager.hot.get(nid)
|
| if node:
|
| mem_vecs[nid] = node.hdv
|
|
|
| ranked = self.attention_masker.rerank(scores, mem_vecs, attention_mask)
|
| top_results = self.attention_masker.extract_scores(ranked)[:top_k]
|
|
|
|
|
|
|
| if track_gaps:
|
| asyncio.ensure_future(
|
| self.gap_detector.assess_query(query_text, top_results, attention_mask)
|
| )
|
|
|
|
|
|
|
| if include_neighbors and top_results:
|
| neighbor_ids: set = set()
|
| for result_id, _ in top_results[:3]:
|
| mem = await self.tier_manager.get_memory(result_id)
|
| if not mem:
|
| continue
|
|
|
|
|
| if mem.previous_id:
|
| prev_mem = await self.tier_manager.get_memory(mem.previous_id)
|
| if prev_mem and prev_mem.id not in scores:
|
| if metadata_filter:
|
| match = True
|
| p_meta = prev_mem.metadata or {}
|
| for k, v in metadata_filter.items():
|
| if p_meta.get(k) != v:
|
| match = False
|
| break
|
| if not match:
|
| continue
|
| neighbor_ids.add(prev_mem.id)
|
|
|
|
|
|
|
|
|
| next_mem = await self.tier_manager.get_next_in_chain(result_id)
|
| if next_mem and next_mem.id not in scores:
|
| neighbor_ids.add(next_mem.id)
|
|
|
|
|
| for neighbor_id in neighbor_ids:
|
| mem = await self.tier_manager.get_memory(neighbor_id)
|
| if mem:
|
| neighbor_score = query_vec.similarity(mem.hdv)
|
| top_results.append((neighbor_id, neighbor_score * 0.8))
|
|
|
|
|
| top_results = sorted(top_results, key=lambda x: x[1], reverse=True)[:top_k]
|
|
|
|
|
|
|
| if top_results and self._initialized and self.config.anticipatory.enabled:
|
| asyncio.ensure_future(
|
| self.anticipatory_engine.predict_and_preload(top_results[0][0])
|
| )
|
|
|
| return top_results
|
|
|
| async def get_context_nodes(self, top_k: int = 3) -> List[Tuple[str, float]]:
|
| """
|
| Phase 12.2: Contextual Awareness
|
| Retrieves the top_k most relevant nodes relating to the current topic context vector.
|
| Should be explicitly used by prompt builders before LLM logic injection.
|
| """
|
| if not self.topic_tracker.config.enabled:
|
| return []
|
|
|
| ctx = self.topic_tracker.get_context()
|
| if ctx is None:
|
| return []
|
|
|
| results = await self.tier_manager.search(
|
| ctx,
|
| top_k=top_k,
|
| time_range=None,
|
| metadata_filter=None,
|
| )
|
| return results
|
| async def _background_dream(self, depth: int = 2):
|
| """
|
| Passive Subconscious – strengthen synapses in idle cycles.
|
|
|
| Uses a semaphore so at most one dream task runs concurrently,
|
| and passes track_gaps=False so dream queries cannot feed the
|
| gap detector (breaking the store→dream→gap→fill→store loop).
|
| """
|
| if not self.subconscious_queue:
|
| return
|
|
|
|
|
| if self._dream_sem.locked():
|
| return
|
|
|
| async with self._dream_sem:
|
| stim_id = self.subconscious_queue.popleft()
|
| stim_node = await self.tier_manager.get_memory(stim_id)
|
| if not stim_node:
|
| return
|
|
|
| potential_connections = await self.query(
|
| stim_node.content,
|
| top_k=depth + 1,
|
| associative_jump=False,
|
| track_gaps=False,
|
| )
|
|
|
| for neighbor_id, similarity in potential_connections:
|
| if neighbor_id != stim_id and similarity > 0.15:
|
| await self.bind_memories(stim_id, neighbor_id, success=True)
|
|
|
| async def orchestrate_orch_or(self, max_collapse: int = 3) -> List[MemoryNode]:
|
| """
|
| Collapse active HOT-tier superposition by a simple free-energy proxy.
|
|
|
| The score combines LTP (long-term stability), epistemic value (novelty),
|
| and access_count (usage evidence).
|
| """
|
| async with self.tier_manager.lock:
|
| active_nodes = list(self.tier_manager.hot.values())
|
| if not active_nodes or max_collapse <= 0:
|
| return []
|
|
|
| def score(node: MemoryNode) -> float:
|
| ltp = float(getattr(node, "ltp_strength", 0.0))
|
| epistemic = float(getattr(node, "epistemic_value", 0.0))
|
| access = float(getattr(node, "access_count", 0))
|
| return (0.6 * ltp) + (0.3 * epistemic) + (0.1 * np.log1p(access))
|
|
|
| return sorted(active_nodes, key=score, reverse=True)[:max_collapse]
|
|
|
| async def _auto_bind_batch(
|
| self,
|
| pairs: List[Tuple[str, str]],
|
| success: bool = True,
|
| weight: float = 1.0,
|
| ) -> None:
|
| """
|
| Fix 4: Bind multiple (id_a, id_b) pairs in one pass, saving synapses once.
|
|
|
| Used by auto-bind in _trigger_post_store() to avoid N disk writes per store.
|
| """
|
| async with self.synapse_lock:
|
| for id_a, id_b in pairs:
|
| mem_a = await self.tier_manager.get_memory(id_a)
|
| mem_b = await self.tier_manager.get_memory(id_b)
|
| if mem_a and mem_b:
|
| self._synapse_index.add_or_fire(id_a, id_b, success=success, weight=weight)
|
| await self._save_synapses()
|
|
|
| async def bind_memories(self, id_a: str, id_b: str, success: bool = True, weight: float = 1.0):
|
| """
|
| Bind two memories by ID.
|
|
|
| Fix 2: delegates exclusively to SynapseIndex — legacy dict sync removed.
|
| The legacy self.synapses / self.synapse_adjacency attributes remain for
|
| backward compatibility but are only populated at startup from disk.
|
| """
|
| mem_a = await self.tier_manager.get_memory(id_a)
|
| mem_b = await self.tier_manager.get_memory(id_b)
|
|
|
| if not mem_a or not mem_b:
|
| return
|
|
|
| async with self.synapse_lock:
|
| self._synapse_index.add_or_fire(id_a, id_b, success=success, weight=weight)
|
|
|
| await self._save_synapses()
|
|
|
| async def get_node_boost(self, node_id: str) -> float:
|
| """
|
| Compute synaptic boost for scoring.
|
|
|
| Phase 4.0: O(k) via SynapseIndex (was O(k) before but with lock overhead).
|
| """
|
| return self._synapse_index.boost(node_id)
|
|
|
| async def cleanup_decay(self, threshold: float = 0.1):
|
| """
|
| Remove synapses that have decayed below the threshold.
|
|
|
| Phase 4.0: O(E) via SynapseIndex.compact(), no lock required for the index itself.
|
| Also syncs any legacy dict entries into the index before compacting.
|
| """
|
| async with self.synapse_lock:
|
|
|
|
|
| for syn in list(self.synapses.values()):
|
| if self._synapse_index.get(syn.neuron_a_id, syn.neuron_b_id) is None:
|
| self._synapse_index.register(syn)
|
|
|
| removed = self._synapse_index.compact(threshold)
|
|
|
| if removed:
|
| logger.info(f"cleanup_decay: pruned {removed} synapses below {threshold}")
|
| await self._save_synapses()
|
|
|
| async def get_stats(self) -> Dict[str, Any]:
|
| """Aggregate statistics from engine components."""
|
| tier_stats = await self.tier_manager.get_stats()
|
|
|
| async with self.synapse_lock:
|
| syn_count = len(self._synapse_index)
|
|
|
| stats = {
|
| "engine_version": "4.5.0",
|
| "dimension": self.dimension,
|
| "encoding": "binary_hdv",
|
| "tiers": tier_stats,
|
| "concepts_count": len(self.soul.concepts),
|
| "symbols_count": len(self.soul.symbols),
|
| "synapses_count": syn_count,
|
| "synapse_index": self._synapse_index.stats,
|
| "subconscious_backlog": len(self.subconscious_queue),
|
|
|
| "gap_detector": self.gap_detector.stats,
|
| "immunology": self._immunology.stats if self._immunology else {},
|
| "semantic_consolidation": (
|
| self._semantic_worker.stats if self._semantic_worker else {}
|
| ),
|
|
|
| "subconscious_ai": (
|
| self._subconscious_ai.stats if self._subconscious_ai else {}
|
| ),
|
|
|
| "recursive_synthesizer": (
|
| self._recursive_synthesizer.stats if self._recursive_synthesizer else {}
|
| ),
|
| "timestamp": datetime.now(timezone.utc).isoformat(),
|
| }
|
| return stats
|
|
|
| def encode_content(self, content: str) -> BinaryHDV:
|
| """Encode text to Binary HDV."""
|
| return self.binary_encoder.encode(content)
|
|
|
|
|
|
|
| async def enable_gap_filling(
|
| self,
|
| llm_integrator,
|
| config: Optional["GapFillerConfig"] = None,
|
| ) -> None:
|
| """
|
| Attach an LLM integrator to autonomously fill knowledge gaps.
|
|
|
| Args:
|
| llm_integrator: HAIMLLMIntegrator instance.
|
| config: Optional GapFillerConfig overrides.
|
| """
|
| if self._gap_filler:
|
| await self._gap_filler.stop()
|
|
|
| self._gap_filler = GapFiller(
|
| engine=self,
|
| llm_integrator=llm_integrator,
|
| gap_detector=self.gap_detector,
|
| config=config or GapFillerConfig(),
|
| )
|
| await self._gap_filler.start()
|
| logger.info("Phase 4.0 GapFiller started.")
|
|
|
| async def enable_recursive_synthesis(
|
| self,
|
| llm_call: Optional[Any] = None,
|
| config: Optional["SynthesizerConfig"] = None,
|
| ) -> None:
|
| """
|
| Enable Phase 4.5 Recursive Synthesis Engine.
|
|
|
| Args:
|
| llm_call: Optional callable for LLM-powered decomposition and synthesis.
|
| Signature: (prompt: str) -> str.
|
| config: Optional SynthesizerConfig overrides.
|
| """
|
| self._recursive_synthesizer = RecursiveSynthesizer(
|
| engine=self,
|
| config=config or SynthesizerConfig(),
|
| llm_call=llm_call,
|
| )
|
| logger.info("Phase 4.5 RecursiveSynthesizer enabled.")
|
|
|
| async def record_retrieval_feedback(
|
| self,
|
| node_id: str,
|
| helpful: bool,
|
| eig_signal: float = 1.0,
|
| ) -> None:
|
| """
|
| Record whether a retrieved memory was useful.
|
|
|
| Phase 4.0: feeds the Bayesian LTP updater for the node.
|
|
|
| Args:
|
| node_id: The memory node that was retrieved.
|
| helpful: Was the retrieval actually useful?
|
| eig_signal: Strength of evidence (0–1).
|
| """
|
| node = await self.tier_manager.get_memory(node_id)
|
| if node:
|
| updater = get_bayesian_updater()
|
| updater.observe_node_retrieval(node, helpful=helpful, eig_signal=eig_signal)
|
|
|
| async def register_negative_feedback(self, query_text: str) -> None:
|
| """
|
| Signal that a recent query was not adequately answered.
|
| Creates a high-priority gap record for LLM gap-filling.
|
| """
|
| await self.gap_detector.register_negative_feedback(query_text)
|
|
|
| async def get_memory(self, node_id: str) -> Optional[MemoryNode]:
|
| """Retrieve memory via TierManager."""
|
| return await self.tier_manager.get_memory(node_id)
|
|
|
|
|
|
|
| def _legacy_encode_content_numpy(self, content: str) -> np.ndarray:
|
| """
|
| Original localized encoding logic for backward compatibility.
|
| Used only for migrating legacy data.
|
| """
|
| tokens = re.findall(r'\w+', content.lower())
|
| if not tokens:
|
| seed_bytes = hashlib.shake_256(content.encode()).digest(4)
|
| seed = int.from_bytes(seed_bytes, 'little')
|
| return np.random.RandomState(seed).choice([-1, 1], size=self.dimension)
|
|
|
| combined = np.zeros(self.dimension)
|
| for t in tokens:
|
| t_vec = self._get_token_vector(t, self.dimension)
|
| combined += t_vec
|
|
|
| v = np.sign(combined)
|
| v[v == 0] = np.random.RandomState(42).choice([-1, 1], size=np.sum(v == 0))
|
| return v.astype(int)
|
|
|
| async def _load_legacy_if_needed(self):
|
| """Load from memory.jsonl into TierManager, converting to BinaryHDV."""
|
| if not os.path.exists(self.persist_path):
|
| return
|
|
|
| logger.info(f"Loading legacy memory from {self.persist_path}")
|
|
|
| def _load():
|
| try:
|
| with open(self.persist_path, 'r', encoding='utf-8') as f:
|
| return f.readlines()
|
| except Exception:
|
| return []
|
|
|
| lines = await self._run_in_thread(_load)
|
|
|
| for line in lines:
|
| line = line.strip()
|
| if not line:
|
| continue
|
| try:
|
| rec = json.loads(line)
|
| content = rec.get('content', '')
|
| if not content:
|
| continue
|
|
|
| node_id = rec.get('id')
|
|
|
|
|
| hdv = self.binary_encoder.encode(content)
|
|
|
| node = MemoryNode(
|
| id=node_id,
|
| hdv=hdv,
|
| content=content,
|
| metadata=rec.get('metadata') or {}
|
| )
|
|
|
|
|
| if 'created_at' in rec:
|
| node.created_at = datetime.fromisoformat(rec['created_at'])
|
|
|
|
|
| if 'previous_id' in rec:
|
| node.previous_id = rec['previous_id']
|
|
|
|
|
| await self.tier_manager.add_memory(node)
|
|
|
| except Exception as e:
|
| logger.warning(f"Failed to load record: {e}")
|
|
|
| async def _load_synapses(self):
|
| """
|
| Load synapses from disk.
|
|
|
| Phase 4.0: uses SynapseIndex.load_from_file() which restores Bayesian state.
|
| """
|
| if not os.path.exists(self.synapse_path):
|
| return
|
|
|
| def _load():
|
| self._synapse_index.load_from_file(self.synapse_path)
|
|
|
|
|
| await self._run_in_thread(_load)
|
|
|
| async def _save_synapses(self):
|
| """
|
| Save synapses to disk in JSONL format.
|
|
|
| Phase 4.0: uses SynapseIndex.save_to_file() which includes Bayesian state.
|
| A dedicated _write_lock serialises concurrent callers so the file is never
|
| written by two coroutines at the same time. Does NOT acquire synapse_lock.
|
| """
|
| path_snapshot = self.synapse_path
|
|
|
| def _save():
|
| self._synapse_index.save_to_file(path_snapshot)
|
|
|
| async with self._write_lock:
|
| await self._run_in_thread(_save)
|
|
|
| async def _append_persisted(self, node: MemoryNode):
|
| """Append-only log with Phase 4.3 temporal metadata."""
|
|
|
| def _append():
|
| try:
|
| with open(self.persist_path, 'a', encoding='utf-8') as f:
|
| rec = {
|
| 'id': node.id,
|
| 'content': node.content,
|
| 'metadata': node.metadata,
|
| 'created_at': node.created_at.isoformat(),
|
|
|
| 'unix_timestamp': node.unix_timestamp,
|
| 'iso_date': node.iso_date,
|
| 'previous_id': node.previous_id,
|
| }
|
| f.write(json.dumps(rec) + "\n")
|
| except Exception as e:
|
| logger.error(f"Failed to persist memory: {e}")
|
|
|
| await self._run_in_thread(_append)
|
|
|
| async def persist_memory_snapshot(self, node: MemoryNode) -> None:
|
| """Persist a current snapshot of a memory node to the append-only log."""
|
| await self._append_persisted(node)
|
|
|
|
|
|
|
| async def define_concept(self, name: str, attributes: Dict[str, str]):
|
| await self._run_in_thread(self.soul.store_concept, name, attributes)
|
|
|
| async def reason_by_analogy(self, src: str, val: str, tgt: str):
|
| return await self._run_in_thread(self.soul.solve_analogy, src, val, tgt)
|
|
|
| async def cross_domain_inference(self, src: str, tgt: str, pat: str):
|
| return await self._run_in_thread(self.soul.solve_analogy, src, pat, tgt)
|
|
|
| async def inspect_concept(self, name: str, attr: str):
|
| return await self._run_in_thread(self.soul.extract_attribute, name, attr)
|
|
|