| """ |
| MemoryAgent — Persistent long-term memory system (Phase 5) |
| SQLite-backed with semantic search simulation |
| """ |
| import json |
| import time |
| from typing import Dict, List, Optional |
| import structlog |
| from .base_agent import BaseAgent |
| from memory.db import save_memory, search_memory, get_history, get_project_memory |
|
|
| log = structlog.get_logger() |
|
|
|
|
| class MemoryAgent(BaseAgent): |
| def __init__(self, ws_manager=None, ai_router=None): |
| super().__init__("MemoryAgent", ws_manager, ai_router) |
|
|
| async def run(self, task: str, context: Dict = {}, **kwargs) -> str: |
| session_id = kwargs.get("session_id", "") |
| task_id = kwargs.get("task_id", "") |
|
|
| |
| task_lower = task.lower() |
| if any(k in task_lower for k in ["remember", "save", "store", "record"]): |
| content = context.get("content", task) |
| await self.save(content, session_id=session_id, memory_type="user_directive") |
| return f"✅ Saved to memory: {content[:100]}" |
| else: |
| results = await self.retrieve(task, session_id=session_id) |
| if results: |
| return "📚 **Memory Retrieved:**\n\n" + "\n".join(f"- {r['content'][:200]}" for r in results[:5]) |
| return "No relevant memories found." |
|
|
| async def save( |
| self, |
| content: str, |
| session_id: str = "", |
| project_id: str = "", |
| memory_type: str = "general", |
| key: str = "", |
| metadata: Dict = {}, |
| ): |
| """Save content to persistent memory.""" |
| await save_memory( |
| content=content, |
| memory_type=memory_type, |
| session_id=session_id, |
| project_id=project_id, |
| key=key, |
| metadata=metadata, |
| ) |
|
|
| async def retrieve( |
| self, |
| query: str, |
| session_id: str = "", |
| project_id: str = "", |
| limit: int = 10, |
| ) -> List[Dict]: |
| """Retrieve relevant memories.""" |
| return await search_memory(query[:100], session_id=session_id, project_id=project_id, limit=limit) |
|
|
| async def get_conversation_history(self, session_id: str, limit: int = 20) -> List[Dict]: |
| """Get conversation history for a session.""" |
| return await get_history(session_id, limit=limit) |
|
|
| async def save_interaction( |
| self, |
| user_message: str, |
| assistant_response: str, |
| session_id: str = "", |
| intent: Dict = {}, |
| ): |
| """Save a full interaction to memory.""" |
| await save_memory( |
| content=user_message, |
| memory_type="conversation", |
| session_id=session_id, |
| key="user_message", |
| metadata={"intent": intent.get("intent", ""), "timestamp": time.time()}, |
| ) |
| await save_memory( |
| content=assistant_response, |
| memory_type="conversation", |
| session_id=session_id, |
| key="assistant_response", |
| metadata={"agent": intent.get("primary_agent", "chat"), "timestamp": time.time()}, |
| ) |
|
|
| async def save_coding_style(self, style_notes: str, session_id: str = ""): |
| """Remember user's coding preferences.""" |
| await save_memory( |
| content=style_notes, |
| memory_type="user_preference", |
| session_id=session_id, |
| key="coding_style", |
| metadata={"category": "coding_style"}, |
| ) |
|
|
| async def save_project_context(self, project_id: str, context: Dict): |
| """Save project-specific context.""" |
| await save_memory( |
| content=json.dumps(context), |
| memory_type="project_context", |
| project_id=project_id, |
| key="project_context", |
| metadata={"timestamp": time.time()}, |
| ) |
|
|
| async def get_project_context(self, project_id: str) -> Optional[Dict]: |
| """Get project context.""" |
| results = await get_project_memory(project_id, memory_type="project_context", limit=1) |
| if results: |
| try: |
| return json.loads(results[0]["content"]) |
| except Exception: |
| return {"raw": results[0]["content"]} |
| return None |
|
|
| async def build_context_for_agent(self, session_id: str, query: str) -> Dict: |
| """Build rich context dict for agents from memory.""" |
| history = await self.get_conversation_history(session_id, limit=10) |
| relevant = await self.retrieve(query, session_id=session_id, limit=5) |
|
|
| return { |
| "history": [{"role": "user" if i % 2 == 0 else "assistant", "content": h["content"]} for i, h in enumerate(reversed(history))], |
| "relevant_memories": [r["content"][:200] for r in relevant], |
| "session_id": session_id, |
| } |
|
|