Spaces:
Build error
Build error
| #!/usr/bin/env python3 | |
| """Godlike Memory Engine - Production-ready holographic memory | |
| Features: | |
| - Real embeddings (sentence-transformers) | |
| - FAISS for billion-scale similarity search | |
| - Multiple memory fields (episodic, semantic, procedural) | |
| - Capacity management with normalization & forgetting | |
| - Health monitoring & persistence (S3 optional) | |
| - Full MCP server support | |
| """ | |
| import os | |
| import json | |
| import logging | |
| import hashlib | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import List, Dict, Any, Optional | |
| import numpy as np | |
| import gradio as gr | |
| from sentence_transformers import SentenceTransformer | |
| # Try FAISS - fallback to numpy if not available | |
| try: | |
| import faiss | |
| FAISS_AVAILABLE = True | |
| except ImportError: | |
| FAISS_AVAILABLE = False | |
| logging.warning("FAISS not available; using numpy fallback (slower)") | |
| # MCP support | |
| try: | |
| from mcp import Tool as MCPTool | |
| from mcp.server import Server | |
| from mcp.server.stdio import stdio_server | |
| MCP_AVAILABLE = True | |
| except ImportError: | |
| MCP_AVAILABLE = False | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s %(levelname)s [%(name)s] %(message)s" | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # ============ Configuration ============ | |
| EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "all-MiniLM-L6-v2") | |
| EMBEDDING_DIM = { | |
| "all-MiniLM-L6-v2": 384, | |
| "all-mpnet-base-v2": 768, | |
| "paraphrase-MiniLM-L6-v2": 384, | |
| }[EMBEDDING_MODEL] | |
| MEMORY_CAPACITY = int(os.getenv("MEMORY_CAPACITY", 10000)) # Max items per field | |
| FORGETTING_RATE = float(os.getenv("FORGETTING_RATE", 0.01)) # Decay per access | |
| PERSIST_DIR = os.getenv("PERSIST_DIR", "./memory_data") | |
| ENABLE_PERSISTENCE = os.getenv("ENABLE_PERSISTENCE", "false").lower() == "true" | |
| # Create persistence directory | |
| Path(PERSIST_DIR).mkdir(exist_ok=True) | |
| # ============ Embedding Engine ============ | |
| class EmbeddingEngine: | |
| def __init__(self, model_name=EMBEDDING_MODEL): | |
| logger.info("Loading embedding model: %s", model_name) | |
| self.model = SentenceTransformer(model_name) | |
| self.dim = self.model.get_sentence_embedding_dimension() | |
| logger.info("Embedding model loaded (dim=%d)", self.dim) | |
| def encode(self, texts: List[str], batch_size=32) -> np.ndarray: | |
| """Encode texts to embeddings.""" | |
| embeddings = self.model.encode( | |
| texts, | |
| batch_size=batch_size, | |
| convert_to_numpy=True, | |
| normalize_embeddings=True | |
| ) | |
| return embeddings.astype(np.float32) | |
| def encode_single(self, text: str) -> np.ndarray: | |
| """Encode single text to normalized embedding.""" | |
| emb = self.model.encode([text], convert_to_numpy=True, normalize_embeddings=True)[0] | |
| return emb.astype(np.float32) | |
| # ============ Memory Field (single hippocampus-like region) ============ | |
| class MemoryField: | |
| """ | |
| Distributed associative memory field using FAISS for efficient search. | |
| Stores items as normalized embeddings in a vector space. | |
| Supports pattern completion via nearest neighbor search. | |
| """ | |
| def __init__(self, name: str, dim: int, capacity: int = MEMORY_CAPACITY): | |
| self.name = name | |
| self.dim = dim | |
| self.capacity = capacity | |
| # FAISS index for fast similarity search | |
| if FAISS_AVAILABLE: | |
| self.index = faiss.IndexFlatIP(dim) # Inner product (cosine similarity on normalized vectors) | |
| self.index_buffer = [] # Store texts separately | |
| else: | |
| self.index = None | |
| self.embeddings = [] | |
| self.texts = [] | |
| self.access_counts = {} # For forgetting mechanism | |
| self.metadata = [] # Store timestamps, sources, etc. | |
| logger.info("Memory field '%s' initialized (dim=%d, capacity=%d)", name, dim, capacity) | |
| def store(self, text: str, metadata: Optional[Dict] = None) -> Dict[str, Any]: | |
| """Store a text item with its embedding.""" | |
| if len(self) >= self.capacity: | |
| self._forget_least_used() | |
| embedding = embedding_engine.encode_single(text) | |
| if FAISS_AVAILABLE: | |
| self.index.add(np.expand_dims(embedding, 0)) | |
| else: | |
| self.embeddings.append(embedding) | |
| item_id = str(len(self)) | |
| self.index_buffer.append({ | |
| "id": item_id, | |
| "text": text, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "access_count": 0, | |
| **(metadata or {}) | |
| }) | |
| self.access_counts[item_id] = 0 | |
| # Persist if enabled | |
| if ENABLE_PERSISTENCE: | |
| self._persist_async() | |
| return { | |
| "stored": True, | |
| "item_id": item_id, | |
| "field": self.name, | |
| "total_items": len(self) | |
| } | |
| def retrieve(self, query: str, k: int = 5) -> List[Dict[str, Any]]: | |
| """Retrieve similar items via pattern completion.""" | |
| if len(self) == 0: | |
| return [] | |
| query_emb = embedding_engine.encode_single(query) | |
| k = min(k, len(self)) | |
| if FAISS_AVAILABLE: | |
| scores, indices = self.index.search(np.expand_dims(query_emb, 0), k) | |
| scores = scores[0] | |
| indices = indices[0] | |
| else: | |
| # Numpy fallback | |
| embeddings = np.array(self.embeddings) | |
| scores = np.dot(embeddings, query_emb) | |
| indices = np.argsort(-scores)[:k] | |
| scores = scores[indices] | |
| results = [] | |
| for score, idx in zip(scores, indices): | |
| if idx >= 0 and idx < len(self.index_buffer): | |
| item = self.index_buffer[idx].copy() | |
| item["similarity"] = float(score) | |
| item["field"] = self.name | |
| # Track access for forgetting | |
| item_id = item["id"] | |
| self.access_counts[item_id] = self.access_counts.get(item_id, 0) + 1 | |
| results.append(item) | |
| return results | |
| def _forget_least_used(self): | |
| """Remove least accessed item to maintain capacity.""" | |
| if len(self.access_counts) == 0: | |
| return | |
| # Find item with lowest access count (LRU-like) | |
| least_used = min(self.access_counts.items(), key=lambda x: x[1])[0] | |
| idx_to_remove = None | |
| for i, item in enumerate(self.index_buffer): | |
| if item["id"] == least_used: | |
| idx_to_remove = i | |
| break | |
| if idx_to_remove is None: | |
| return | |
| # Remove from FAISS index (rebuild index) | |
| if FAISS_AVAILABLE: | |
| self._rebuild_index_without(idx_to_remove) | |
| else: | |
| del self.embeddings[idx_to_remove] | |
| del self.index_buffer[idx_to_remove] | |
| del self.access_counts[least_used] | |
| logger.debug("Forgot item %s from field '%s'", least_used, self.name) | |
| def _rebuild_index_without(self, remove_idx: int): | |
| """Rebuild FAISS index excluding one item (slow but necessary).""" | |
| # This is expensive; in production use a more sophisticated approach | |
| new_embeddings = [] | |
| for i in range(len(self)): | |
| if i != remove_idx: | |
| if FAISS_AVAILABLE: | |
| vec = self.index.reconstruct(i) | |
| new_embeddings.append(vec) | |
| else: | |
| new_embeddings.append(self.embeddings[i]) | |
| if new_embeddings: | |
| self.index = faiss.IndexFlatIP(self.dim) | |
| self.index.add(np.array(new_embeddings)) | |
| else: | |
| self.index = faiss.IndexFlatIP(self.dim) | |
| def stats(self) -> Dict[str, Any]: | |
| """Return statistics about this memory field.""" | |
| return { | |
| "name": self.name, | |
| "items": len(self), | |
| "capacity": self.capacity, | |
| "utilization": len(self) / self.capacity if self.capacity > 0 else 0, | |
| "dim": self.dim, | |
| "faiss_enabled": FAISS_AVAILABLE, | |
| } | |
| def reset(self) -> str: | |
| """Clear all memories.""" | |
| if FAISS_AVAILABLE: | |
| self.index.reset() | |
| else: | |
| self.embeddings = [] | |
| self.index_buffer = [] | |
| self.access_counts = {} | |
| logger.info("Memory field '%s' reset", self.name) | |
| return f"Field '{self.name}' cleared." | |
| def _persist_async(self): | |
| """Save to disk (simplified; implement proper persistence).""" | |
| pass # TODO: implement S3/local persistence | |
| def __len__(self): | |
| return len(self.index_buffer) | |
| # ============ Godlike Memory Engine (multi-field) ============ | |
| class GodlikeMemory: | |
| """ | |
| Multi-field memory system inspired by different hippocampal/neocortical systems: | |
| - episodic: temporal sequences, personal experiences | |
| - semantic: conceptual knowledge, facts | |
| - procedural: skills, patterns | |
| """ | |
| def __init__(self, embedding_engine): | |
| self.embedding_engine = embedding_engine | |
| self.fields = {} | |
| # Initialize different memory fields | |
| for field_name, capacity_multiplier in [ | |
| ("episodic", 1.0), | |
| ("semantic", 2.0), | |
| ("procedural", 1.5) | |
| ]: | |
| field = MemoryField( | |
| name=field_name, | |
| dim=embedding_engine.dim, | |
| capacity=int(MEMORY_CAPACITY * capacity_multiplier) | |
| ) | |
| self.fields[field_name] = field | |
| logger.info("Godlike Memory initialized with %d fields", len(self.fields)) | |
| def memorize(self, text: str, field: str = "semantic", metadata: Optional[Dict] = None) -> Dict[str, Any]: | |
| """Store memory in specified field.""" | |
| if field not in self.fields: | |
| return {"error": f"Unknown field: {field}. Available: {list(self.fields.keys())}"} | |
| return self.fields[field].store(text, metadata) | |
| def recall(self, query: str, field: str = "all", k: int = 5) -> Dict[str, Any]: | |
| """Recall from memory fields using pattern completion.""" | |
| results = [] | |
| fields_to_search = [field] if field != "all" else list(self.fields.keys()) | |
| for field_name in fields_to_search: | |
| if field_name not in self.fields: | |
| continue | |
| field_results = self.fields[field_name].retrieve(query, k=k) | |
| results.extend(field_results) | |
| # Sort by similarity | |
| results.sort(key=lambda x: x["similarity"], reverse=True) | |
| return { | |
| "query": query, | |
| "total_results": len(results), | |
| "results": results[:k] | |
| } | |
| def think(self, query: str, k: int = 10) -> str: | |
| """Generate coherent thought from memory (simple retrieval + synthesis).""" | |
| recall_result = self.recall(query, field="all", k=k) | |
| if not recall_result["results"]: | |
| return "Memory field empty. No relevant memories found." | |
| # Extract top memories | |
| memories = [r["text"] for r in recall_result["results"][:5]] | |
| # Simple synthesis (in real version, use LLM) | |
| synthesis = f"Thinking about: {query}\n\nRelated memories:\n" | |
| for i, mem in enumerate(memories, 1): | |
| synthesis += f"{i}. {mem}\n" | |
| synthesis += f"\nTotal relevant memories: {recall_result['total_results']}" | |
| return synthesis | |
| def stats(self) -> Dict[str, Any]: | |
| """Global statistics.""" | |
| total_items = sum(len(f) for f in self.fields.values()) | |
| return { | |
| "total_items": total_items, | |
| "fields": {name: f.stats() for name, f in self.fields.items()}, | |
| "embedding_model": EMBEDDING_MODEL, | |
| "embedding_dim": embedding_engine.dim, | |
| } | |
| def reset_all(self): | |
| """Clear all memory fields.""" | |
| for field in self.fields.values(): | |
| field.reset() | |
| return "All memory fields cleared." | |
| # ============ Initialize ============ | |
| embedding_engine = EmbeddingEngine() | |
| god = GodlikeMemory(embedding_engine) | |
| # ============ MCP Server ============ | |
| def create_mcp_server(): | |
| if not MCP_AVAILABLE: | |
| return None | |
| server = Server("godlike-memory-engine") | |
| async def list_tools() -> List[MCPTool]: | |
| return [ | |
| MCPTool( | |
| name="memorize", | |
| description="Store information in godlike memory. Choose field: episodic, semantic, or procedural.", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "text": {"type": "string", "description": "Information to store"}, | |
| "field": {"type": "string", "description": "Memory field (episodic/semantic/procedural)", "default": "semantic"}, | |
| "metadata": {"type": "object", "description": "Optional metadata", "default": {}} | |
| }, | |
| "required": ["text"] | |
| } | |
| ), | |
| MCPTool( | |
| name="recall", | |
| description="Recall memories using pattern completion. Searches across fields.", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "query": {"type": "string", "description": "Search query"}, | |
| "field": {"type": "string", "description": "Field to search (episodic/semantic/procedural/all)", "default": "all"}, | |
| "k": {"type": "integer", "description": "Number of results", "default": 5} | |
| }, | |
| "required": ["query"] | |
| } | |
| ), | |
| MCPTool( | |
| name="think", | |
| description="Generate a thought from memories (retrieval + synthesis).", | |
| inputSchema={ | |
| "type": "object", | |
| "properties": { | |
| "query": {"type": "string", "description": "Topic to think about"}, | |
| "k": {"type": "integer", "description": "Number of memories to retrieve", "default": 10} | |
| }, | |
| "required": ["query"] | |
| } | |
| ), | |
| MCPTool( | |
| name="memory_stats", | |
| description="Get statistics about all memory fields.", | |
| inputSchema={"type": "object", "properties": {}} | |
| ), | |
| MCPTool( | |
| name="reset_memory", | |
| description="Clear all memory fields.", | |
| inputSchema={"type": "object", "properties": {}} | |
| ), | |
| ] | |
| async def call_tool(name: str, arguments: Dict[str, Any]): | |
| try: | |
| if name == "memorize": | |
| text = arguments.get("text", "") | |
| field = arguments.get("field", "semantic") | |
| metadata = arguments.get("metadata", {}) | |
| if not text: | |
| return [{"type": "text", "text": "Error: text required"}] | |
| result = god.memorize(text, field=field, metadata=metadata) | |
| return [{"type": "json", "json": result}] | |
| elif name == "recall": | |
| query = arguments.get("query", "") | |
| field = arguments.get("field", "all") | |
| k = int(arguments.get("k", 5)) | |
| if not query: | |
| return [{"type": "json", "json": {"error": "query required"}}] | |
| result = god.recall(query, field=field, k=k) | |
| return [{"type": "json", "json": result}] | |
| elif name == "think": | |
| query = arguments.get("query", "") | |
| k = int(arguments.get("k", 10)) | |
| if not query: | |
| return [{"type": "text", "text": "Error: query required"}] | |
| result = god.think(query, k=k) | |
| return [{"type": "text", "text": result}] | |
| elif name == "memory_stats": | |
| result = god.stats() | |
| return [{"type": "json", "json": result}] | |
| elif name == "reset_memory": | |
| result = god.reset_all() | |
| return [{"type": "text", "text": result}] | |
| else: | |
| return [{"type": "text", "text": f"Unknown tool: {name}"}] | |
| except Exception as e: | |
| logger.exception("MCP tool error: %s", name) | |
| return [{"type": "text", "text": f"Error: {e}"}] | |
| return server | |
| # ============ Gradio UI ============ | |
| def ui_memorize(text: str, field: str) -> str: | |
| if not text.strip(): | |
| return "β Text cannot be empty" | |
| result = god.memorize(text, field=field) | |
| return f"β Stored in {field} (ID: {result.get('item_id')}, Total: {result.get('total_items')})" | |
| def ui_recall(query: str, field: str, k: int) -> str: | |
| if not query.strip(): | |
| return "β Query cannot be empty" | |
| result = god.recall(query, field=field, k=k) | |
| if not result["results"]: | |
| return "π No memories found" | |
| output = f"**Query:** {query}\n**Field:** {field}\n**Results:** {len(result['results'])}\n\n" | |
| for i, r in enumerate(result["results"], 1): | |
| output += f"{i}. [{r['field']}] Similarity: {r['similarity']:.4f}\n {r['text'][:150]}...\n\n" | |
| return output | |
| def ui_think(query: str) -> str: | |
| if not query.strip(): | |
| return "β Query cannot be empty" | |
| return god.think(query) | |
| def ui_stats() -> str: | |
| return json.dumps(god.stats(), indent=2) | |
| def ui_reset() -> str: | |
| return god.reset_all() | |
| # ============ Build Interface ============ | |
| with gr.Blocks(title="Godlike Memory Engine") as demo: | |
| gr.Markdown("# π§ Godlike Memory Engine") | |
| gr.Markdown(f"**Embedding model:** `{EMBEDDING_MODEL}` | **Dimension:** `{embedding_engine.dim}` | **FAISS:** `{FAISS_AVAILABLE}`") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("## π Memorize") | |
| mem_text = gr.Textbox(label="Text", lines=3, placeholder="Store information...") | |
| mem_field = gr.Radio(["episodic", "semantic", "procedural"], label="Field", value="semantic") | |
| mem_btn = gr.Button("Store", variant="primary") | |
| mem_out = gr.Textbox(label="Result", lines=2) | |
| with gr.Column(): | |
| gr.Markdown("## π Recall") | |
| recall_query = gr.Textbox(label="Query", placeholder="Search memory...") | |
| recall_field = gr.Radio(["all", "episodic", "semantic", "procedural"], label="Field", value="all") | |
| recall_k = gr.Slider(1, 20, value=5, label="K") | |
| recall_btn = gr.Button("Search") | |
| recall_out = gr.Textbox(label="Results", lines=10) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("## π Think") | |
| think_query = gr.Textbox(label="Topic", lines=2, placeholder="What do you remember about...") | |
| think_btn = gr.Button("Generate Thought") | |
| think_out = gr.Textbox(label="Thought", lines=8) | |
| with gr.Column(): | |
| gr.Markdown("## π System") | |
| stats_btn = gr.Button("Show Stats") | |
| reset_btn = gr.Button("Reset All Memory", variant="stop") | |
| stats_out = gr.JSON(label="Statistics") | |
| # Connect events | |
| mem_btn.click(ui_memorize, [mem_text, mem_field], mem_out) | |
| recall_btn.click(ui_recall, [recall_query, recall_field, recall_k], recall_out) | |
| think_btn.click(ui_think, think_query, think_out) | |
| stats_btn.click(ui_stats, None, stats_out) | |
| reset_btn.click(ui_reset, None, stats_out) | |
| gr.Markdown("---") | |
| gr.Markdown("### π MCP Tools Available") | |
| gr.Markdown(""" | |
| Tools for AI assistants: | |
| - `memorize` - Store in episodic/semantic/procedural fields | |
| - `recall` - Pattern-completion search | |
| - `think` - Synthesize memories into coherent thought | |
| - `memory_stats` - Global statistics | |
| - `reset_memory` - Clear all fields | |
| """) | |
| # ============ Main ============ | |
| if __name__ == "__main__": | |
| import sys | |
| # Check for MCP mode | |
| if "--mcp-server" in sys.argv: | |
| if not MCP_AVAILABLE: | |
| logger.error("MCP package required. Install: pip install mcp") | |
| sys.exit(1) | |
| server = create_mcp_server() | |
| if server: | |
| logger.info("Starting MCP server...") | |
| try: | |
| import anyio | |
| anyio.run(stdio_server, server) | |
| except KeyboardInterrupt: | |
| logger.info("MCP server stopped") | |
| else: | |
| # Gradio mode | |
| logger.info("Starting Godlike Memory Engine...") | |
| demo.launch(server_name="0.0.0.0", server_port=7860, show_error=True) | |