pyaesonegtckglay-dotcom
feat: GOD MODE+ v3.0 — Full Autonomous AI Operating System
a20767f
"""
MemoryAgent — Persistent long-term memory system (Phase 5)
SQLite-backed with semantic search simulation
"""
import json
import time
from typing import Dict, List, Optional
import structlog
from .base_agent import BaseAgent
from memory.db import save_memory, search_memory, get_history, get_project_memory
log = structlog.get_logger()
class MemoryAgent(BaseAgent):
def __init__(self, ws_manager=None, ai_router=None):
super().__init__("MemoryAgent", ws_manager, ai_router)
async def run(self, task: str, context: Dict = {}, **kwargs) -> str:
session_id = kwargs.get("session_id", "")
task_id = kwargs.get("task_id", "")
# Determine if retrieve or save
task_lower = task.lower()
if any(k in task_lower for k in ["remember", "save", "store", "record"]):
content = context.get("content", task)
await self.save(content, session_id=session_id, memory_type="user_directive")
return f"✅ Saved to memory: {content[:100]}"
else:
results = await self.retrieve(task, session_id=session_id)
if results:
return "📚 **Memory Retrieved:**\n\n" + "\n".join(f"- {r['content'][:200]}" for r in results[:5])
return "No relevant memories found."
async def save(
self,
content: str,
session_id: str = "",
project_id: str = "",
memory_type: str = "general",
key: str = "",
metadata: Dict = {},
):
"""Save content to persistent memory."""
await save_memory(
content=content,
memory_type=memory_type,
session_id=session_id,
project_id=project_id,
key=key,
metadata=metadata,
)
async def retrieve(
self,
query: str,
session_id: str = "",
project_id: str = "",
limit: int = 10,
) -> List[Dict]:
"""Retrieve relevant memories."""
return await search_memory(query[:100], session_id=session_id, project_id=project_id, limit=limit)
async def get_conversation_history(self, session_id: str, limit: int = 20) -> List[Dict]:
"""Get conversation history for a session."""
return await get_history(session_id, limit=limit)
async def save_interaction(
self,
user_message: str,
assistant_response: str,
session_id: str = "",
intent: Dict = {},
):
"""Save a full interaction to memory."""
await save_memory(
content=user_message,
memory_type="conversation",
session_id=session_id,
key="user_message",
metadata={"intent": intent.get("intent", ""), "timestamp": time.time()},
)
await save_memory(
content=assistant_response,
memory_type="conversation",
session_id=session_id,
key="assistant_response",
metadata={"agent": intent.get("primary_agent", "chat"), "timestamp": time.time()},
)
async def save_coding_style(self, style_notes: str, session_id: str = ""):
"""Remember user's coding preferences."""
await save_memory(
content=style_notes,
memory_type="user_preference",
session_id=session_id,
key="coding_style",
metadata={"category": "coding_style"},
)
async def save_project_context(self, project_id: str, context: Dict):
"""Save project-specific context."""
await save_memory(
content=json.dumps(context),
memory_type="project_context",
project_id=project_id,
key="project_context",
metadata={"timestamp": time.time()},
)
async def get_project_context(self, project_id: str) -> Optional[Dict]:
"""Get project context."""
results = await get_project_memory(project_id, memory_type="project_context", limit=1)
if results:
try:
return json.loads(results[0]["content"])
except Exception:
return {"raw": results[0]["content"]}
return None
async def build_context_for_agent(self, session_id: str, query: str) -> Dict:
"""Build rich context dict for agents from memory."""
history = await self.get_conversation_history(session_id, limit=10)
relevant = await self.retrieve(query, session_id=session_id, limit=5)
return {
"history": [{"role": "user" if i % 2 == 0 else "assistant", "content": h["content"]} for i, h in enumerate(reversed(history))],
"relevant_memories": [r["content"][:200] for r in relevant],
"session_id": session_id,
}