Spaces:
Running
Running
eldarski
π₯ Memvid MCP Server - Hackathon Submission - Complete MCP server with 24 tools for video-based AI memory storage - Dual storage with Modal GPU acceleration - Ready for Agents-MCP-Hackathon Track 1
168b0da
| """ | |
| Dual Storage Manager - Orchestrates memvid and vector storage with performance comparison. | |
| Provides unified interface for dual storage modes with background metrics collection. | |
| """ | |
| import os | |
| import json | |
| import time | |
| import logging | |
| from typing import Dict, Any, Optional | |
| from pathlib import Path | |
| from .memvid_manager import MemvidManager | |
| from .vector_storage_manager import VectorStorageManager | |
| # Modal services imports (with fallback for local development) | |
| try: | |
| import sys | |
| from pathlib import Path | |
| # Add parent directory to path for Modal service imports | |
| parent_dir = Path(__file__).parent.parent | |
| if str(parent_dir) not in sys.path: | |
| sys.path.insert(0, str(parent_dir)) | |
| from modal_vector_service import ModalVectorClient | |
| from modal_memvid_service import ModalMemvidClient | |
| MODAL_AVAILABLE = True | |
| print("β Modal services imported successfully") | |
| except ImportError as e: | |
| print(f"β οΈ Modal services not available, using local implementations: {e}") | |
| MODAL_AVAILABLE = False | |
| from .metrics_collector import MetricsCollector | |
| class DualStorageManager: | |
| """ | |
| Orchestrates dual storage between memvid (video-based) and vector storage. | |
| Provides unified interface with configurable storage modes and performance tracking. | |
| """ | |
| def __init__(self, data_dir: str = "data"): | |
| """ | |
| Initialize dual storage manager with Modal-first architecture. | |
| Args: | |
| data_dir (str): Base directory for storing data | |
| """ | |
| self.logger = logging.getLogger(__name__) | |
| # Get storage mode from environment | |
| self.storage_mode = os.getenv("STORAGE_MODE", "dual").lower() | |
| self.enable_metrics = ( | |
| os.getenv("ENABLE_PERFORMANCE_TRACKING", "true").lower() == "true" | |
| ) | |
| # Check for Modal configuration | |
| modal_token = os.getenv("MODAL_TOKEN") | |
| use_modal = MODAL_AVAILABLE and modal_token | |
| # Initialize storage backends (Modal-first with local fallback) | |
| if use_modal: | |
| print("π Initializing Modal-powered storage backends...") | |
| try: | |
| self.memvid_manager = ModalMemvidClient(modal_token=modal_token) | |
| self.vector_manager = ModalVectorClient(modal_token=modal_token) | |
| self.using_modal = True | |
| print("β Modal services initialized successfully") | |
| except Exception as e: | |
| print(f"β οΈ Modal initialization failed, falling back to local: {e}") | |
| self.memvid_manager = MemvidManager(data_dir) | |
| self.vector_manager = VectorStorageManager( | |
| data_dir, storage_handler=self.memvid_manager.storage_handler | |
| ) # Shared HF storage | |
| self.using_modal = False | |
| else: | |
| print("π Using local storage backends...") | |
| self.memvid_manager = MemvidManager(data_dir) | |
| self.vector_manager = VectorStorageManager( | |
| data_dir, storage_handler=self.memvid_manager.storage_handler | |
| ) # Shared HF storage | |
| self.using_modal = False | |
| # Initialize metrics collector | |
| self.metrics = MetricsCollector() if self.enable_metrics else None | |
| infrastructure = "Modal" if self.using_modal else "Local" | |
| self.logger.info( | |
| f"DualStorageManager initialized with mode: {self.storage_mode}" | |
| ) | |
| print(f"ποΈ Infrastructure: {infrastructure}") | |
| print( | |
| f"π Performance tracking: {'enabled' if self.enable_metrics else 'disabled'}" | |
| ) | |
| def set_storage_mode(self, mode: str, client_id: str = "") -> str: | |
| """ | |
| Set storage mode at runtime. | |
| Args: | |
| mode (str): Storage mode (memvid_only, vector_only, dual) | |
| client_id (str): Optional client-specific setting | |
| Returns: | |
| str: Success message | |
| """ | |
| valid_modes = ["memvid_only", "vector_only", "dual"] | |
| if mode not in valid_modes: | |
| return f"Error: Invalid mode '{mode}'. Valid modes: {valid_modes}" | |
| self.storage_mode = mode | |
| return f"Storage mode set to: {mode}" + ( | |
| f" for client {client_id}" if client_id else " (global)" | |
| ) | |
| def get_storage_mode(self, client_id: str = "") -> str: | |
| """ | |
| Get current storage mode. | |
| Args: | |
| client_id (str): Client identifier (for future client-specific modes) | |
| Returns: | |
| str: Current storage mode information | |
| """ | |
| return json.dumps( | |
| { | |
| "storage_mode": self.storage_mode, | |
| "metrics_enabled": self.enable_metrics, | |
| "backends_available": { | |
| "memvid": True, | |
| "vector": self.vector_manager is not None, | |
| }, | |
| }, | |
| indent=2, | |
| ) | |
| def store_memory( | |
| self, text: str, client_id: str, metadata: Dict[str, Any] = None | |
| ) -> str: | |
| """ | |
| Universal memory storage interface. | |
| Args: | |
| text (str): Text content to store | |
| client_id (str): Client identifier | |
| metadata (dict): Additional metadata | |
| Returns: | |
| str: Storage result message | |
| """ | |
| try: | |
| if self.storage_mode == "memvid_only": | |
| return self._store_memvid_only(text, client_id, metadata) | |
| elif self.storage_mode == "vector_only": | |
| return self._store_vector_only(text, client_id, metadata) | |
| else: # dual mode | |
| return self._store_dual_mode(text, client_id, metadata) | |
| except Exception as e: | |
| error_msg = f"Error in store_memory: {str(e)}" | |
| self.logger.error(error_msg) | |
| return error_msg | |
| def search_memory( | |
| self, query: str, client_id: str, memory_name: str, top_k: int = 5 | |
| ) -> str: | |
| """ | |
| Universal memory search interface. | |
| Args: | |
| query (str): Search query | |
| client_id (str): Client identifier | |
| memory_name (str): Memory name to search | |
| top_k (int): Number of results | |
| Returns: | |
| str: Search results | |
| """ | |
| try: | |
| if self.storage_mode == "memvid_only": | |
| return self._search_memvid_only(query, client_id, memory_name, top_k) | |
| elif self.storage_mode == "vector_only": | |
| return self._search_vector_only(query, client_id, memory_name, top_k) | |
| else: # dual mode | |
| return self._search_dual_mode(query, client_id, memory_name, top_k) | |
| except Exception as e: | |
| error_msg = f"Error in search_memory: {str(e)}" | |
| self.logger.error(error_msg) | |
| return json.dumps({"error": error_msg}) | |
| def get_memory_stats(self, client_id: str) -> str: | |
| """ | |
| Get aggregated memory statistics based on storage mode. | |
| Args: | |
| client_id (str): Client identifier | |
| Returns: | |
| str: JSON string with statistics | |
| """ | |
| try: | |
| if self.storage_mode == "dual" and self.metrics: | |
| return self.metrics.get_comparison_report(client_id) | |
| elif self.storage_mode == "memvid_only": | |
| return self.memvid_manager.get_memory_stats(client_id) | |
| elif self.storage_mode == "vector_only" and self.vector_manager: | |
| return self.vector_manager.get_stats(client_id) | |
| else: | |
| # Fallback to memvid stats | |
| return self.memvid_manager.get_memory_stats(client_id) | |
| except Exception as e: | |
| error_msg = f"Error getting memory stats: {str(e)}" | |
| self.logger.error(error_msg) | |
| return json.dumps({"error": error_msg}) | |
| def delete_memory(self, client_id: str, memory_name: str) -> str: | |
| """ | |
| Universal memory deletion interface. | |
| Args: | |
| client_id (str): Client identifier | |
| memory_name (str): Memory name to delete | |
| Returns: | |
| str: Deletion result | |
| """ | |
| try: | |
| results = [] | |
| if self.storage_mode in ["memvid_only", "dual"]: | |
| result = self.memvid_manager.delete_memory(client_id, memory_name) | |
| results.append(f"Memvid: {result}") | |
| if self.storage_mode in ["vector_only", "dual"] and self.vector_manager: | |
| result = self.vector_manager.delete_memory(client_id, memory_name) | |
| results.append(f"Vector: {result}") | |
| return " | ".join(results) if results else "No storage backends available" | |
| except Exception as e: | |
| error_msg = f"Error deleting memory: {str(e)}" | |
| self.logger.error(error_msg) | |
| return error_msg | |
| def list_memories(self, client_id: str) -> str: | |
| """ | |
| Universal memory listing interface. | |
| Args: | |
| client_id (str): Client identifier | |
| Returns: | |
| str: JSON string with memory list | |
| """ | |
| try: | |
| # Use memvid as primary source for listing | |
| return self.memvid_manager.list_memories(client_id) | |
| except Exception as e: | |
| error_msg = f"Error listing memories: {str(e)}" | |
| self.logger.error(error_msg) | |
| return json.dumps({"error": error_msg}) | |
| def build_memory_video(self, client_id: str, memory_name: str) -> str: | |
| """ | |
| Build memory video from stored chunks (memvid-specific). | |
| Args: | |
| client_id (str): Client identifier | |
| memory_name (str): Name for the memory video | |
| Returns: | |
| str: Build result message | |
| """ | |
| try: | |
| return self.memvid_manager.build_memory_video(client_id, memory_name) | |
| except Exception as e: | |
| error_msg = f"Error in build_memory_video: {str(e)}" | |
| self.logger.error(error_msg) | |
| return error_msg | |
| def chat_with_memory(self, query: str, client_id: str, memory_name: str) -> str: | |
| """ | |
| Universal chat interface. | |
| Args: | |
| query (str): User query | |
| client_id (str): Client identifier | |
| memory_name (str): Memory name to chat with | |
| Returns: | |
| str: Chat response | |
| """ | |
| try: | |
| # Use memvid for chat (better for conversational AI) | |
| return self.memvid_manager.chat_with_memory(query, client_id, memory_name) | |
| except Exception as e: | |
| error_msg = f"Error in chat_with_memory: {str(e)}" | |
| self.logger.error(error_msg) | |
| return error_msg | |
| # Private methods for storage mode implementations | |
| def _store_memvid_only( | |
| self, text: str, client_id: str, metadata: Dict[str, Any] | |
| ) -> str: | |
| """Store using memvid only.""" | |
| start_time = time.time() | |
| result = self.memvid_manager.store_memory(text, client_id, metadata) | |
| if self.metrics: | |
| self.metrics.track_storage_operation( | |
| "memvid", time.time() - start_time, len(text) | |
| ) | |
| return result | |
| def _store_vector_only( | |
| self, text: str, client_id: str, metadata: Dict[str, Any] | |
| ) -> str: | |
| """Store using vector storage only.""" | |
| if not self.vector_manager: | |
| return "Error: Vector storage not available (Modal credentials needed)" | |
| start_time = time.time() | |
| result = self.vector_manager.store_memory(text, client_id, metadata) | |
| if self.metrics: | |
| self.metrics.track_storage_operation( | |
| "vector", time.time() - start_time, len(text) | |
| ) | |
| return result | |
| def _store_dual_mode( | |
| self, text: str, client_id: str, metadata: Dict[str, Any] | |
| ) -> str: | |
| """Store using both storage backends with performance comparison.""" | |
| results = [] | |
| # Store in memvid | |
| start_time = time.time() | |
| memvid_result = self.memvid_manager.store_memory(text, client_id, metadata) | |
| memvid_time = time.time() - start_time | |
| results.append(f"Memvid({memvid_time:.3f}s): {memvid_result}") | |
| # Store in vector (if available) | |
| if self.vector_manager: | |
| start_time = time.time() | |
| vector_result = self.vector_manager.store_memory(text, client_id, metadata) | |
| vector_time = time.time() - start_time | |
| results.append(f"Vector({vector_time:.3f}s): {vector_result}") | |
| # Track comparison metrics | |
| if self.metrics: | |
| self.metrics.track_dual_storage_comparison( | |
| memvid_time, vector_time, len(text), client_id | |
| ) | |
| else: | |
| results.append("Vector: Not available (Modal credentials needed)") | |
| return " | ".join(results) | |
| def _search_memvid_only( | |
| self, query: str, client_id: str, memory_name: str, top_k: int | |
| ) -> str: | |
| """Search using memvid only.""" | |
| start_time = time.time() | |
| result = self.memvid_manager.search_memory(query, client_id, memory_name, top_k) | |
| if self.metrics: | |
| self.metrics.track_search_operation( | |
| "memvid", time.time() - start_time, top_k | |
| ) | |
| # Convert dict to JSON string for MCP interface | |
| if isinstance(result, dict): | |
| return json.dumps(result, indent=2) | |
| return result | |
| def _search_vector_only( | |
| self, query: str, client_id: str, memory_name: str, top_k: int | |
| ) -> str: | |
| """Search using vector storage only.""" | |
| if not self.vector_manager: | |
| return json.dumps( | |
| {"error": "Vector storage not available (Modal credentials needed)"} | |
| ) | |
| start_time = time.time() | |
| result = self.vector_manager.search_memory(query, client_id, top_k=top_k) | |
| if self.metrics: | |
| self.metrics.track_search_operation( | |
| "vector", time.time() - start_time, top_k | |
| ) | |
| # Convert dict to JSON string for MCP interface | |
| if isinstance(result, dict): | |
| return json.dumps(result, indent=2) | |
| return result | |
| def _search_dual_mode( | |
| self, query: str, client_id: str, memory_name: str, top_k: int | |
| ) -> str: | |
| """Search using both backends with performance comparison.""" | |
| # Search memvid first | |
| memvid_data = {"error": "Memvid search not attempted"} | |
| memvid_time = 0 | |
| start_time = time.time() | |
| memvid_result = self.memvid_manager.search_memory( | |
| query, client_id, memory_name, top_k | |
| ) | |
| memvid_time = time.time() - start_time | |
| # Handle memvid result - Modal clients should return dicts | |
| memvid_data = ( | |
| memvid_result | |
| if isinstance(memvid_result, dict) | |
| else { | |
| "error": f"Unexpected memvid type: {type(memvid_result)}", | |
| "content": str(memvid_result)[:200], | |
| } | |
| ) | |
| # Search vector second | |
| vector_data = {"error": "Vector search not attempted"} | |
| vector_time = 0 | |
| if self.vector_manager: | |
| start_time = time.time() | |
| vector_result = self.vector_manager.search_memory( | |
| query, client_id, memory_name=memory_name, top_k=top_k | |
| ) | |
| vector_time = time.time() - start_time | |
| # Handle vector result - Modal clients should return dicts | |
| vector_data = ( | |
| vector_result | |
| if isinstance(vector_result, dict) | |
| else { | |
| "error": f"Unexpected vector type: {type(vector_result)}", | |
| "content": str(vector_result)[:200], | |
| } | |
| ) | |
| else: | |
| vector_data = {"error": "Vector storage not available"} | |
| # Track comparison metrics | |
| if self.metrics: | |
| self.metrics.track_dual_search_comparison( | |
| memvid_time, vector_time, query, client_id | |
| ) | |
| # Return comparison results | |
| return json.dumps( | |
| { | |
| "query": query, | |
| "client_id": client_id, | |
| "memory_name": memory_name, | |
| "dual_search_results": { | |
| "memvid": { | |
| "time_ms": round(memvid_time * 1000, 2), | |
| "results": memvid_data, | |
| }, | |
| "vector": { | |
| "time_ms": round(vector_time * 1000, 2), | |
| "results": vector_data, | |
| }, | |
| }, | |
| "performance_winner": ( | |
| "memvid" if memvid_time < vector_time else "vector" | |
| ), | |
| }, | |
| indent=2, | |
| ) | |