""" Dual Storage Manager - Orchestrates memvid and vector storage with performance comparison. Provides unified interface for dual storage modes with background metrics collection. """ import os import json import time import logging from typing import Dict, Any, Optional from pathlib import Path from .memvid_manager import MemvidManager from .vector_storage_manager import VectorStorageManager # Modal services imports (with fallback for local development) try: import sys from pathlib import Path # Add parent directory to path for Modal service imports parent_dir = Path(__file__).parent.parent if str(parent_dir) not in sys.path: sys.path.insert(0, str(parent_dir)) from modal_vector_service import ModalVectorClient from modal_memvid_service import ModalMemvidClient MODAL_AVAILABLE = True print("✅ Modal services imported successfully") except ImportError as e: print(f"⚠️ Modal services not available, using local implementations: {e}") MODAL_AVAILABLE = False from .metrics_collector import MetricsCollector class DualStorageManager: """ Orchestrates dual storage between memvid (video-based) and vector storage. Provides unified interface with configurable storage modes and performance tracking. """ def __init__(self, data_dir: str = "data"): """ Initialize dual storage manager with Modal-first architecture. Args: data_dir (str): Base directory for storing data """ self.logger = logging.getLogger(__name__) # Get storage mode from environment self.storage_mode = os.getenv("STORAGE_MODE", "dual").lower() self.enable_metrics = ( os.getenv("ENABLE_PERFORMANCE_TRACKING", "true").lower() == "true" ) # Check for Modal configuration modal_token = os.getenv("MODAL_TOKEN") use_modal = MODAL_AVAILABLE and modal_token # Initialize storage backends (Modal-first with local fallback) if use_modal: print("🚀 Initializing Modal-powered storage backends...") try: self.memvid_manager = ModalMemvidClient(modal_token=modal_token) self.vector_manager = ModalVectorClient(modal_token=modal_token) self.using_modal = True print("✅ Modal services initialized successfully") except Exception as e: print(f"⚠️ Modal initialization failed, falling back to local: {e}") self.memvid_manager = MemvidManager(data_dir) self.vector_manager = VectorStorageManager( data_dir, storage_handler=self.memvid_manager.storage_handler ) # Shared HF storage self.using_modal = False else: print("🏠 Using local storage backends...") self.memvid_manager = MemvidManager(data_dir) self.vector_manager = VectorStorageManager( data_dir, storage_handler=self.memvid_manager.storage_handler ) # Shared HF storage self.using_modal = False # Initialize metrics collector self.metrics = MetricsCollector() if self.enable_metrics else None infrastructure = "Modal" if self.using_modal else "Local" self.logger.info( f"DualStorageManager initialized with mode: {self.storage_mode}" ) print(f"🏗️ Infrastructure: {infrastructure}") print( f"📊 Performance tracking: {'enabled' if self.enable_metrics else 'disabled'}" ) def set_storage_mode(self, mode: str, client_id: str = "") -> str: """ Set storage mode at runtime. Args: mode (str): Storage mode (memvid_only, vector_only, dual) client_id (str): Optional client-specific setting Returns: str: Success message """ valid_modes = ["memvid_only", "vector_only", "dual"] if mode not in valid_modes: return f"Error: Invalid mode '{mode}'. Valid modes: {valid_modes}" self.storage_mode = mode return f"Storage mode set to: {mode}" + ( f" for client {client_id}" if client_id else " (global)" ) def get_storage_mode(self, client_id: str = "") -> str: """ Get current storage mode. Args: client_id (str): Client identifier (for future client-specific modes) Returns: str: Current storage mode information """ return json.dumps( { "storage_mode": self.storage_mode, "metrics_enabled": self.enable_metrics, "backends_available": { "memvid": True, "vector": self.vector_manager is not None, }, }, indent=2, ) def store_memory( self, text: str, client_id: str, metadata: Dict[str, Any] = None ) -> str: """ Universal memory storage interface. Args: text (str): Text content to store client_id (str): Client identifier metadata (dict): Additional metadata Returns: str: Storage result message """ try: if self.storage_mode == "memvid_only": return self._store_memvid_only(text, client_id, metadata) elif self.storage_mode == "vector_only": return self._store_vector_only(text, client_id, metadata) else: # dual mode return self._store_dual_mode(text, client_id, metadata) except Exception as e: error_msg = f"Error in store_memory: {str(e)}" self.logger.error(error_msg) return error_msg def search_memory( self, query: str, client_id: str, memory_name: str, top_k: int = 5 ) -> str: """ Universal memory search interface. Args: query (str): Search query client_id (str): Client identifier memory_name (str): Memory name to search top_k (int): Number of results Returns: str: Search results """ try: if self.storage_mode == "memvid_only": return self._search_memvid_only(query, client_id, memory_name, top_k) elif self.storage_mode == "vector_only": return self._search_vector_only(query, client_id, memory_name, top_k) else: # dual mode return self._search_dual_mode(query, client_id, memory_name, top_k) except Exception as e: error_msg = f"Error in search_memory: {str(e)}" self.logger.error(error_msg) return json.dumps({"error": error_msg}) def get_memory_stats(self, client_id: str) -> str: """ Get aggregated memory statistics based on storage mode. Args: client_id (str): Client identifier Returns: str: JSON string with statistics """ try: if self.storage_mode == "dual" and self.metrics: return self.metrics.get_comparison_report(client_id) elif self.storage_mode == "memvid_only": return self.memvid_manager.get_memory_stats(client_id) elif self.storage_mode == "vector_only" and self.vector_manager: return self.vector_manager.get_stats(client_id) else: # Fallback to memvid stats return self.memvid_manager.get_memory_stats(client_id) except Exception as e: error_msg = f"Error getting memory stats: {str(e)}" self.logger.error(error_msg) return json.dumps({"error": error_msg}) def delete_memory(self, client_id: str, memory_name: str) -> str: """ Universal memory deletion interface. Args: client_id (str): Client identifier memory_name (str): Memory name to delete Returns: str: Deletion result """ try: results = [] if self.storage_mode in ["memvid_only", "dual"]: result = self.memvid_manager.delete_memory(client_id, memory_name) results.append(f"Memvid: {result}") if self.storage_mode in ["vector_only", "dual"] and self.vector_manager: result = self.vector_manager.delete_memory(client_id, memory_name) results.append(f"Vector: {result}") return " | ".join(results) if results else "No storage backends available" except Exception as e: error_msg = f"Error deleting memory: {str(e)}" self.logger.error(error_msg) return error_msg def list_memories(self, client_id: str) -> str: """ Universal memory listing interface. Args: client_id (str): Client identifier Returns: str: JSON string with memory list """ try: # Use memvid as primary source for listing return self.memvid_manager.list_memories(client_id) except Exception as e: error_msg = f"Error listing memories: {str(e)}" self.logger.error(error_msg) return json.dumps({"error": error_msg}) def build_memory_video(self, client_id: str, memory_name: str) -> str: """ Build memory video from stored chunks (memvid-specific). Args: client_id (str): Client identifier memory_name (str): Name for the memory video Returns: str: Build result message """ try: return self.memvid_manager.build_memory_video(client_id, memory_name) except Exception as e: error_msg = f"Error in build_memory_video: {str(e)}" self.logger.error(error_msg) return error_msg def chat_with_memory(self, query: str, client_id: str, memory_name: str) -> str: """ Universal chat interface. Args: query (str): User query client_id (str): Client identifier memory_name (str): Memory name to chat with Returns: str: Chat response """ try: # Use memvid for chat (better for conversational AI) return self.memvid_manager.chat_with_memory(query, client_id, memory_name) except Exception as e: error_msg = f"Error in chat_with_memory: {str(e)}" self.logger.error(error_msg) return error_msg # Private methods for storage mode implementations def _store_memvid_only( self, text: str, client_id: str, metadata: Dict[str, Any] ) -> str: """Store using memvid only.""" start_time = time.time() result = self.memvid_manager.store_memory(text, client_id, metadata) if self.metrics: self.metrics.track_storage_operation( "memvid", time.time() - start_time, len(text) ) return result def _store_vector_only( self, text: str, client_id: str, metadata: Dict[str, Any] ) -> str: """Store using vector storage only.""" if not self.vector_manager: return "Error: Vector storage not available (Modal credentials needed)" start_time = time.time() result = self.vector_manager.store_memory(text, client_id, metadata) if self.metrics: self.metrics.track_storage_operation( "vector", time.time() - start_time, len(text) ) return result def _store_dual_mode( self, text: str, client_id: str, metadata: Dict[str, Any] ) -> str: """Store using both storage backends with performance comparison.""" results = [] # Store in memvid start_time = time.time() memvid_result = self.memvid_manager.store_memory(text, client_id, metadata) memvid_time = time.time() - start_time results.append(f"Memvid({memvid_time:.3f}s): {memvid_result}") # Store in vector (if available) if self.vector_manager: start_time = time.time() vector_result = self.vector_manager.store_memory(text, client_id, metadata) vector_time = time.time() - start_time results.append(f"Vector({vector_time:.3f}s): {vector_result}") # Track comparison metrics if self.metrics: self.metrics.track_dual_storage_comparison( memvid_time, vector_time, len(text), client_id ) else: results.append("Vector: Not available (Modal credentials needed)") return " | ".join(results) def _search_memvid_only( self, query: str, client_id: str, memory_name: str, top_k: int ) -> str: """Search using memvid only.""" start_time = time.time() result = self.memvid_manager.search_memory(query, client_id, memory_name, top_k) if self.metrics: self.metrics.track_search_operation( "memvid", time.time() - start_time, top_k ) # Convert dict to JSON string for MCP interface if isinstance(result, dict): return json.dumps(result, indent=2) return result def _search_vector_only( self, query: str, client_id: str, memory_name: str, top_k: int ) -> str: """Search using vector storage only.""" if not self.vector_manager: return json.dumps( {"error": "Vector storage not available (Modal credentials needed)"} ) start_time = time.time() result = self.vector_manager.search_memory(query, client_id, top_k=top_k) if self.metrics: self.metrics.track_search_operation( "vector", time.time() - start_time, top_k ) # Convert dict to JSON string for MCP interface if isinstance(result, dict): return json.dumps(result, indent=2) return result def _search_dual_mode( self, query: str, client_id: str, memory_name: str, top_k: int ) -> str: """Search using both backends with performance comparison.""" # Search memvid first memvid_data = {"error": "Memvid search not attempted"} memvid_time = 0 start_time = time.time() memvid_result = self.memvid_manager.search_memory( query, client_id, memory_name, top_k ) memvid_time = time.time() - start_time # Handle memvid result - Modal clients should return dicts memvid_data = ( memvid_result if isinstance(memvid_result, dict) else { "error": f"Unexpected memvid type: {type(memvid_result)}", "content": str(memvid_result)[:200], } ) # Search vector second vector_data = {"error": "Vector search not attempted"} vector_time = 0 if self.vector_manager: start_time = time.time() vector_result = self.vector_manager.search_memory( query, client_id, memory_name=memory_name, top_k=top_k ) vector_time = time.time() - start_time # Handle vector result - Modal clients should return dicts vector_data = ( vector_result if isinstance(vector_result, dict) else { "error": f"Unexpected vector type: {type(vector_result)}", "content": str(vector_result)[:200], } ) else: vector_data = {"error": "Vector storage not available"} # Track comparison metrics if self.metrics: self.metrics.track_dual_search_comparison( memvid_time, vector_time, query, client_id ) # Return comparison results return json.dumps( { "query": query, "client_id": client_id, "memory_name": memory_name, "dual_search_results": { "memvid": { "time_ms": round(memvid_time * 1000, 2), "results": memvid_data, }, "vector": { "time_ms": round(vector_time * 1000, 2), "results": vector_data, }, }, "performance_winner": ( "memvid" if memvid_time < vector_time else "vector" ), }, indent=2, )