| """
|
| Cognitive Memory Client
|
| =======================
|
| The high-level facade for autonomous agents to interact with the MnemoCore AGI Memory Substrate.
|
| Provides easy methods for observation, episodic sequence tracking, and working memory recall.
|
| """
|
|
|
| from typing import List, Optional, Any, Tuple
|
| import logging
|
|
|
| from .core.engine import HAIMEngine
|
| from .core.working_memory import WorkingMemoryService, WorkingMemoryItem
|
| from .core.episodic_store import EpisodicStoreService
|
| from .core.semantic_store import SemanticStoreService
|
| from .core.procedural_store import ProceduralStoreService
|
| from .core.meta_memory import MetaMemoryService, SelfImprovementProposal
|
| from .core.memory_model import Procedure
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
| class CognitiveMemoryClient:
|
| """
|
| Plug-and-play cognitive memory facade for agent frameworks (LangGraph, AutoGen, OpenClaw, etc.).
|
| """
|
| def __init__(
|
| self,
|
| engine: HAIMEngine,
|
| wm: WorkingMemoryService,
|
| episodic: EpisodicStoreService,
|
| semantic: SemanticStoreService,
|
| procedural: ProceduralStoreService,
|
| meta: MetaMemoryService,
|
| ):
|
| self.engine = engine
|
| self.wm = wm
|
| self.episodic = episodic
|
| self.semantic = semantic
|
| self.procedural = procedural
|
| self.meta = meta
|
|
|
|
|
|
|
| def observe(self, agent_id: str, content: str, kind: str = "observation", importance: float = 0.5, tags: Optional[List[str]] = None, **meta) -> str:
|
| """
|
| Push a new observation or thought directly into the agent's short-term Working Memory.
|
| """
|
| import uuid
|
| from datetime import datetime
|
| item_id = f"wm_{uuid.uuid4().hex[:8]}"
|
|
|
| item = WorkingMemoryItem(
|
| id=item_id,
|
| agent_id=agent_id,
|
| created_at=datetime.utcnow(),
|
| ttl_seconds=3600,
|
| content=content,
|
| kind=kind,
|
| importance=importance,
|
| tags=tags or [],
|
| hdv=None
|
| )
|
| self.wm.push_item(agent_id, item)
|
| logger.debug(f"Agent {agent_id} observed: {content[:30]}...")
|
| return item_id
|
|
|
| def get_working_context(self, agent_id: str, limit: int = 16) -> List[WorkingMemoryItem]:
|
| """
|
| Read the active, un-pruned context out of the agent's working memory buffer.
|
| """
|
| state = self.wm.get_state(agent_id)
|
| if not state:
|
| return []
|
|
|
| return state.items[-limit:]
|
|
|
|
|
|
|
| def start_episode(self, agent_id: str, goal: str, context: Optional[str] = None) -> str:
|
| """Begin a new temporally-linked event sequence."""
|
| return self.episodic.start_episode(agent_id, goal=goal, context=context)
|
|
|
| def append_event(self, episode_id: str, kind: str, content: str, **meta) -> None:
|
| """Log an action or outcome to an ongoing episode."""
|
| self.episodic.append_event(episode_id, kind, content, meta)
|
|
|
| def end_episode(self, episode_id: str, outcome: str, reward: Optional[float] = None) -> None:
|
| """Seal an episode, logging its final success or failure state."""
|
| self.episodic.end_episode(episode_id, outcome, reward)
|
|
|
|
|
|
|
| async def recall(
|
| self,
|
| agent_id: str,
|
| query: str,
|
| context: Optional[str] = None,
|
| top_k: int = 8,
|
| modes: Tuple[str, ...] = ("episodic", "semantic")
|
| ) -> List[dict]:
|
| """
|
| A unified query interface that checks Working Memory, Episodic History, and the Semantic Vector Store.
|
| Currently delegates heavily to the backing HAIMEngine, but can be augmented to return semantic concepts.
|
| """
|
| results = []
|
|
|
|
|
| if "semantic" in modes:
|
| engine_results = await self.engine.query(query, top_k=top_k)
|
| for mem_id, score in engine_results:
|
| node = await self.engine.tier_manager.get_memory(mem_id)
|
| if node:
|
| results.append({"source": "semantic/engine", "content": node.content, "score": score})
|
|
|
|
|
| if "episodic" in modes:
|
| recent_eps = self.episodic.get_recent(agent_id, limit=top_k, context=context)
|
| for ep in recent_eps:
|
| summary = f"Episode(goal={ep.goal}, outcome={ep.outcome}, events={len(ep.events)})"
|
| results.append({"source": "episodic", "content": summary, "score": ep.reliability})
|
|
|
|
|
| results.sort(key=lambda x: x.get("score", 0.0), reverse=True)
|
| return results[:top_k]
|
|
|
|
|
|
|
| def suggest_procedures(self, agent_id: str, query: str, top_k: int = 5) -> List[Procedure]:
|
| """Fetch executable tool-patterns based on the agent's intent."""
|
| return self.procedural.find_applicable_procedures(query, agent_id=agent_id, top_k=top_k)
|
|
|
| def record_procedure_outcome(self, proc_id: str, success: bool) -> None:
|
| """Report on the utility of a chosen procedure."""
|
| self.procedural.record_procedure_outcome(proc_id, success)
|
|
|
|
|
|
|
| def get_knowledge_gaps(self, agent_id: str, lookback_hours: int = 24) -> List[dict]:
|
| """Return currently open knowledge gaps identified by the Pulse loop."""
|
|
|
| return []
|
|
|
| def get_self_improvement_proposals(self) -> List[SelfImprovementProposal]:
|
| """Retrieve system-generated proposals to improve operation or prompt alignment."""
|
| return self.meta.list_proposals()
|
|
|
|
|