"""Memory Agent: ?????? ?????? ???????.""" import logging from typing import Any, Dict from memory.short_term import ShortTermMemory from memory.episodic_db import EpisodicDB from memory.knowledge_vector import KnowledgeVectorStore from api.deps import get_logger, is_remote_brain_only from agents.supervisor_agent import AgentOS logger = get_logger("kapo.agent.memory") class MemoryAgent: def __init__(self): self.remote_only = is_remote_brain_only() self.short = ShortTermMemory() self.episodic = None if self.remote_only else EpisodicDB() self.knowledge = None if self.remote_only else KnowledgeVectorStore() self.agent_os = None if self.remote_only else AgentOS() def write_short_term(self, key: str, value: Dict[str, Any]) -> None: if self.remote_only: return self.short.set(key, value) def read_short_term(self, key: str) -> Dict[str, Any] | None: if self.remote_only: return None return self.short.get(key) def store_experience(self, payload: Dict[str, Any]) -> None: if self.remote_only or self.episodic is None: return self.episodic.insert_experience( task=payload.get("task", ""), plan=payload.get("plan", {}), tools_used=payload.get("tools_used", {}), result=payload.get("result", {}), success=1 if payload.get("success") else 0, ) def query_knowledge(self, text: str, top_k: int = 3): if self.remote_only or self.knowledge is None: return [] return self.knowledge.query(text, top_k=top_k) def run_agent_os(self): """????? ???? ??????? ?????? ??????.""" if self.remote_only or self.agent_os is None: return {"skipped": True, "reason": "REMOTE_BRAIN_ONLY"} return self.agent_os.run()