""" Memory Manager - Graphiti Knowledge Graph Interface. Provides unified memory operations using the same Graphiti instance configured for Claude Code's Graphiti MCP server. Configuration (must match Graphiti MCP): - FALKORDB_URI: redis://localhost:6379 (default) - FALKORDB_DATABASE: graphiti (default) - MISTRAL_API_KEY: Required for entity extraction - GRAPHITI_GROUP_ID: main (default) """ from __future__ import annotations import os from datetime import datetime, timezone from typing import Annotated, Literal, Optional import gradio as gr from ._docstrings import autodoc # Graphiti configuration - matches Graphiti MCP server FALKORDB_URI = os.getenv("FALKORDB_URI", "redis://localhost:6379") FALKORDB_DATABASE = os.getenv("FALKORDB_DATABASE", "graphiti") MISTRAL_API_KEY = os.getenv("MISTRAL_API_KEY", "") GRAPHITI_GROUP_ID = os.getenv("GRAPHITI_GROUP_ID", "main") # Check if Graphiti is available GRAPHITI_AVAILABLE = bool(MISTRAL_API_KEY) # Lazy-loaded Graphiti client _graphiti_client = None def _get_graphiti_client(): """Get or create the Graphiti client (lazy load to avoid import errors).""" global _graphiti_client if _graphiti_client is None and GRAPHITI_AVAILABLE: try: from graphiti_core import Graphiti from graphiti_core.llm_client import OpenAIClient from graphiti_core.driver.falkordb_driver import FalkorDriver # Create FalkorDB driver driver = FalkorDriver( uri=FALKORDB_URI, database=FALKORDB_DATABASE, ) # Create Mistral LLM client (OpenAI-compatible API) llm_client = OpenAIClient( api_key=MISTRAL_API_KEY, base_url="https://api.mistral.ai/v1", model="mistral-large-2411", ) # Create Graphiti client _graphiti_client = Graphiti( uri=FALKORDB_URI, driver=driver, llm_client=llm_client, ) except ImportError as e: print(f"[Memory_Manager] Graphiti not available: {e}") return None except Exception as e: print(f"[Memory_Manager] Failed to initialize Graphiti: {e}") return None return _graphiti_client def _format_timestamp() -> str: """Return current UTC timestamp in ISO format.""" return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") # ============================================================================ # Graphiti Memory Operations # ============================================================================ def _graphiti_save(text: str, tags: str) -> str: """Save memory to Graphiti knowledge graph.""" if not GRAPHITI_AVAILABLE: return "Error: MISTRAL_API_KEY not set. Cannot save to Graphiti." client = _get_graphiti_client() if not client: return "Error: Failed to initialize Graphiti client." try: # Build episode body with tags episode_body = text.strip() if tags and tags.strip(): episode_body = f"{text.strip()}\n\nTags: {tags.strip()}" # Add episode to Graphiti import asyncio async def _save(): return await client.add_episode( name=f"Memory {_format_timestamp()}", episode_body=episode_body, source_description="Memory_Manager tool", group_id=GRAPHITI_GROUP_ID, ) result = asyncio.run(_save()) return f"Memory saved to Graphiti knowledge graph (group: {GRAPHITI_GROUP_ID})" except Exception as e: return f"Error saving to Graphiti: {e}" def _graphiti_list(limit: int, include_tags: bool) -> str: """List recent memories from Graphiti.""" if not GRAPHITI_AVAILABLE: return "Error: MISTRAL_API_KEY not set. Cannot access Graphiti." client = _get_graphiti_client() if not client: return "Error: Failed to initialize Graphiti client." try: import asyncio async def _list(): # Get episodes from Graphiti return await client.get_episodes( group_ids=[GRAPHITI_GROUP_ID], limit=limit, ) episodes = asyncio.run(_list()) if not episodes: return f"No memories found in Graphiti (group: {GRAPHITI_GROUP_ID})" lines = [f"Graphiti Memories (group: {GRAPHITI_GROUP_ID})", "-" * 50] for ep in episodes: name = ep.name if hasattr(ep, "name") else "?" created = ep.created_at if hasattr(ep, "created_at") else "?" content = ep.content if hasattr(ep, "content") else str(ep) # Extract tags from content if present tags_str = "" if include_tags and "Tags:" in content: parts = content.split("Tags:") if len(parts) > 1: tags_str = f" | tags: {parts[1].strip()}" content = parts[0].strip() lines.append(f"[{created}] {content[:100]}{'...' if len(content) > 100 else ''}{tags_str}") return "\n".join(lines) except Exception as e: return f"Error listing from Graphiti: {e}" def _graphiti_search(query: str, limit: int) -> str: """Search memories in Graphiti knowledge graph.""" if not GRAPHITI_AVAILABLE: return "Error: MISTRAL_API_KEY not set. Cannot search Graphiti." client = _get_graphiti_client() if not client: return "Error: Failed to initialize Graphiti client." try: import asyncio async def _search(): # Use Graphiti's hybrid search return await client.search( query=query, group_ids=[GRAPHITI_GROUP_ID], num_results=limit, ) results = asyncio.run(_search()) if not results: return f"No matches found for: {query}" lines = [f"Graphiti Search Results for: {query}", "-" * 50] for i, result in enumerate(results, 1): if hasattr(result, "fact"): # Edge/fact result source = getattr(result, "source_node", "?") target = getattr(result, "target_node", "?") fact = result.fact lines.append(f"{i}. {source} -> {target}: {fact}") elif hasattr(result, "name"): # Node result name = result.name summary = getattr(result, "summary", "") lines.append(f"{i}. [{name}] {summary[:150]}{'...' if len(summary) > 150 else ''}") else: lines.append(f"{i}. {str(result)[:150]}") return "\n".join(lines) except Exception as e: return f"Error searching Graphiti: {e}" def _graphiti_delete(memory_id: str) -> str: """Delete memory from Graphiti (requires episode UUID).""" if not GRAPHITI_AVAILABLE: return "Error: MISTRAL_API_KEY not set. Cannot access Graphiti." # Note: Graphiti deletion requires the full episode UUID # This is a simplified implementation return f"Note: To delete from Graphiti, use the Graphiti MCP directly with the episode UUID. Memory deletion is not fully implemented in this interface." # ============================================================================ # Status Check # ============================================================================ def _get_status() -> str: """Get Graphiti connection status.""" if not GRAPHITI_AVAILABLE: return "Status: MISTRAL_API_KEY not configured" client = _get_graphiti_client() if client: return f"Status: Connected to Graphiti\nDatabase: {FALKORDB_DATABASE}\nGroup: {GRAPHITI_GROUP_ID}" return "Status: Failed to initialize Graphiti client" # ============================================================================ # Main Tool Function # ============================================================================ TOOL_SUMMARY = ( "Manage memories in Graphiti knowledge graph (save, list, search, status). " "Connects to the same Graphiti instance as the Graphiti MCP server. " "Requires MISTRAL_API_KEY for entity extraction and knowledge graph operations." ) @autodoc(summary=TOOL_SUMMARY) def Memory_Manager( action: Annotated[Literal["save", "list", "search", "status"], "Action: save | list | search | status"] = "list", text: Annotated[Optional[str], "Memory text (Save only)"] = None, tags: Annotated[Optional[str], "Comma-separated tags (Save only)"] = None, query: Annotated[Optional[str], "Search query (Search only)"] = None, limit: Annotated[int, "Max results (List/Search only)"] = 20, include_tags: Annotated[bool, "Include tags in output"] = True, ) -> str: """ Memory Manager - Graphiti Knowledge Graph Interface. Connects to the same Graphiti instance used by Claude Code's Graphiti MCP. All memories are stored in the knowledge graph with automatic entity extraction and relationship detection. """ act = (action or "list").lower().strip() if act == "status": return _get_status() if act == "save": text = (text or "").strip() if not text: return "Error: 'text' is required when action=save." return _graphiti_save(text=text, tags=tags or "") if act == "list": return _graphiti_list(limit=max(1, min(200, limit)), include_tags=include_tags) if act == "search": query = (query or "").strip() if not query: return "Error: 'query' is required when action=search." return _graphiti_search(query=query, limit=max(1, min(200, limit))) return "Error: invalid action (use save|list|search|status)." def build_interface() -> gr.Interface: """Build Gradio interface for Memory Manager.""" status_info = _get_status() return gr.Interface( fn=Memory_Manager, inputs=[ gr.Radio( label="Action", choices=["save", "list", "search", "status"], value="status", info="Action to perform", ), gr.Textbox(label="Text", lines=3, info="Memory text (Save only)"), gr.Textbox(label="Tags", placeholder="tag1, tag2", max_lines=1, info="Comma-separated tags (Save only)"), gr.Textbox(label="Query", placeholder="search terms...", max_lines=1, info="Search query (Search only)"), gr.Slider(1, 200, value=20, step=1, label="Limit", info="Max results (List/Search only)"), gr.Checkbox(value=True, label="Include Tags", info="Include tags in output"), ], outputs=gr.Textbox(label="Result", lines=14), title="Memory Manager - Graphiti", description=f"