memvid-mcp / utils /dual_storage_manager.py
eldarski
πŸŽ₯ Memvid MCP Server - Hackathon Submission - Complete MCP server with 24 tools for video-based AI memory storage - Dual storage with Modal GPU acceleration - Ready for Agents-MCP-Hackathon Track 1
168b0da
"""
Dual Storage Manager - Orchestrates memvid and vector storage with performance comparison.
Provides unified interface for dual storage modes with background metrics collection.
"""
import os
import json
import time
import logging
from typing import Dict, Any, Optional
from pathlib import Path
from .memvid_manager import MemvidManager
from .vector_storage_manager import VectorStorageManager
# Modal services imports (with fallback for local development)
try:
import sys
from pathlib import Path
# Add parent directory to path for Modal service imports
parent_dir = Path(__file__).parent.parent
if str(parent_dir) not in sys.path:
sys.path.insert(0, str(parent_dir))
from modal_vector_service import ModalVectorClient
from modal_memvid_service import ModalMemvidClient
MODAL_AVAILABLE = True
print("βœ… Modal services imported successfully")
except ImportError as e:
print(f"⚠️ Modal services not available, using local implementations: {e}")
MODAL_AVAILABLE = False
from .metrics_collector import MetricsCollector
class DualStorageManager:
"""
Orchestrates dual storage between memvid (video-based) and vector storage.
Provides unified interface with configurable storage modes and performance tracking.
"""
def __init__(self, data_dir: str = "data"):
"""
Initialize dual storage manager with Modal-first architecture.
Args:
data_dir (str): Base directory for storing data
"""
self.logger = logging.getLogger(__name__)
# Get storage mode from environment
self.storage_mode = os.getenv("STORAGE_MODE", "dual").lower()
self.enable_metrics = (
os.getenv("ENABLE_PERFORMANCE_TRACKING", "true").lower() == "true"
)
# Check for Modal configuration
modal_token = os.getenv("MODAL_TOKEN")
use_modal = MODAL_AVAILABLE and modal_token
# Initialize storage backends (Modal-first with local fallback)
if use_modal:
print("πŸš€ Initializing Modal-powered storage backends...")
try:
self.memvid_manager = ModalMemvidClient(modal_token=modal_token)
self.vector_manager = ModalVectorClient(modal_token=modal_token)
self.using_modal = True
print("βœ… Modal services initialized successfully")
except Exception as e:
print(f"⚠️ Modal initialization failed, falling back to local: {e}")
self.memvid_manager = MemvidManager(data_dir)
self.vector_manager = VectorStorageManager(
data_dir, storage_handler=self.memvid_manager.storage_handler
) # Shared HF storage
self.using_modal = False
else:
print("🏠 Using local storage backends...")
self.memvid_manager = MemvidManager(data_dir)
self.vector_manager = VectorStorageManager(
data_dir, storage_handler=self.memvid_manager.storage_handler
) # Shared HF storage
self.using_modal = False
# Initialize metrics collector
self.metrics = MetricsCollector() if self.enable_metrics else None
infrastructure = "Modal" if self.using_modal else "Local"
self.logger.info(
f"DualStorageManager initialized with mode: {self.storage_mode}"
)
print(f"πŸ—οΈ Infrastructure: {infrastructure}")
print(
f"πŸ“Š Performance tracking: {'enabled' if self.enable_metrics else 'disabled'}"
)
def set_storage_mode(self, mode: str, client_id: str = "") -> str:
"""
Set storage mode at runtime.
Args:
mode (str): Storage mode (memvid_only, vector_only, dual)
client_id (str): Optional client-specific setting
Returns:
str: Success message
"""
valid_modes = ["memvid_only", "vector_only", "dual"]
if mode not in valid_modes:
return f"Error: Invalid mode '{mode}'. Valid modes: {valid_modes}"
self.storage_mode = mode
return f"Storage mode set to: {mode}" + (
f" for client {client_id}" if client_id else " (global)"
)
def get_storage_mode(self, client_id: str = "") -> str:
"""
Get current storage mode.
Args:
client_id (str): Client identifier (for future client-specific modes)
Returns:
str: Current storage mode information
"""
return json.dumps(
{
"storage_mode": self.storage_mode,
"metrics_enabled": self.enable_metrics,
"backends_available": {
"memvid": True,
"vector": self.vector_manager is not None,
},
},
indent=2,
)
def store_memory(
self, text: str, client_id: str, metadata: Dict[str, Any] = None
) -> str:
"""
Universal memory storage interface.
Args:
text (str): Text content to store
client_id (str): Client identifier
metadata (dict): Additional metadata
Returns:
str: Storage result message
"""
try:
if self.storage_mode == "memvid_only":
return self._store_memvid_only(text, client_id, metadata)
elif self.storage_mode == "vector_only":
return self._store_vector_only(text, client_id, metadata)
else: # dual mode
return self._store_dual_mode(text, client_id, metadata)
except Exception as e:
error_msg = f"Error in store_memory: {str(e)}"
self.logger.error(error_msg)
return error_msg
def search_memory(
self, query: str, client_id: str, memory_name: str, top_k: int = 5
) -> str:
"""
Universal memory search interface.
Args:
query (str): Search query
client_id (str): Client identifier
memory_name (str): Memory name to search
top_k (int): Number of results
Returns:
str: Search results
"""
try:
if self.storage_mode == "memvid_only":
return self._search_memvid_only(query, client_id, memory_name, top_k)
elif self.storage_mode == "vector_only":
return self._search_vector_only(query, client_id, memory_name, top_k)
else: # dual mode
return self._search_dual_mode(query, client_id, memory_name, top_k)
except Exception as e:
error_msg = f"Error in search_memory: {str(e)}"
self.logger.error(error_msg)
return json.dumps({"error": error_msg})
def get_memory_stats(self, client_id: str) -> str:
"""
Get aggregated memory statistics based on storage mode.
Args:
client_id (str): Client identifier
Returns:
str: JSON string with statistics
"""
try:
if self.storage_mode == "dual" and self.metrics:
return self.metrics.get_comparison_report(client_id)
elif self.storage_mode == "memvid_only":
return self.memvid_manager.get_memory_stats(client_id)
elif self.storage_mode == "vector_only" and self.vector_manager:
return self.vector_manager.get_stats(client_id)
else:
# Fallback to memvid stats
return self.memvid_manager.get_memory_stats(client_id)
except Exception as e:
error_msg = f"Error getting memory stats: {str(e)}"
self.logger.error(error_msg)
return json.dumps({"error": error_msg})
def delete_memory(self, client_id: str, memory_name: str) -> str:
"""
Universal memory deletion interface.
Args:
client_id (str): Client identifier
memory_name (str): Memory name to delete
Returns:
str: Deletion result
"""
try:
results = []
if self.storage_mode in ["memvid_only", "dual"]:
result = self.memvid_manager.delete_memory(client_id, memory_name)
results.append(f"Memvid: {result}")
if self.storage_mode in ["vector_only", "dual"] and self.vector_manager:
result = self.vector_manager.delete_memory(client_id, memory_name)
results.append(f"Vector: {result}")
return " | ".join(results) if results else "No storage backends available"
except Exception as e:
error_msg = f"Error deleting memory: {str(e)}"
self.logger.error(error_msg)
return error_msg
def list_memories(self, client_id: str) -> str:
"""
Universal memory listing interface.
Args:
client_id (str): Client identifier
Returns:
str: JSON string with memory list
"""
try:
# Use memvid as primary source for listing
return self.memvid_manager.list_memories(client_id)
except Exception as e:
error_msg = f"Error listing memories: {str(e)}"
self.logger.error(error_msg)
return json.dumps({"error": error_msg})
def build_memory_video(self, client_id: str, memory_name: str) -> str:
"""
Build memory video from stored chunks (memvid-specific).
Args:
client_id (str): Client identifier
memory_name (str): Name for the memory video
Returns:
str: Build result message
"""
try:
return self.memvid_manager.build_memory_video(client_id, memory_name)
except Exception as e:
error_msg = f"Error in build_memory_video: {str(e)}"
self.logger.error(error_msg)
return error_msg
def chat_with_memory(self, query: str, client_id: str, memory_name: str) -> str:
"""
Universal chat interface.
Args:
query (str): User query
client_id (str): Client identifier
memory_name (str): Memory name to chat with
Returns:
str: Chat response
"""
try:
# Use memvid for chat (better for conversational AI)
return self.memvid_manager.chat_with_memory(query, client_id, memory_name)
except Exception as e:
error_msg = f"Error in chat_with_memory: {str(e)}"
self.logger.error(error_msg)
return error_msg
# Private methods for storage mode implementations
def _store_memvid_only(
self, text: str, client_id: str, metadata: Dict[str, Any]
) -> str:
"""Store using memvid only."""
start_time = time.time()
result = self.memvid_manager.store_memory(text, client_id, metadata)
if self.metrics:
self.metrics.track_storage_operation(
"memvid", time.time() - start_time, len(text)
)
return result
def _store_vector_only(
self, text: str, client_id: str, metadata: Dict[str, Any]
) -> str:
"""Store using vector storage only."""
if not self.vector_manager:
return "Error: Vector storage not available (Modal credentials needed)"
start_time = time.time()
result = self.vector_manager.store_memory(text, client_id, metadata)
if self.metrics:
self.metrics.track_storage_operation(
"vector", time.time() - start_time, len(text)
)
return result
def _store_dual_mode(
self, text: str, client_id: str, metadata: Dict[str, Any]
) -> str:
"""Store using both storage backends with performance comparison."""
results = []
# Store in memvid
start_time = time.time()
memvid_result = self.memvid_manager.store_memory(text, client_id, metadata)
memvid_time = time.time() - start_time
results.append(f"Memvid({memvid_time:.3f}s): {memvid_result}")
# Store in vector (if available)
if self.vector_manager:
start_time = time.time()
vector_result = self.vector_manager.store_memory(text, client_id, metadata)
vector_time = time.time() - start_time
results.append(f"Vector({vector_time:.3f}s): {vector_result}")
# Track comparison metrics
if self.metrics:
self.metrics.track_dual_storage_comparison(
memvid_time, vector_time, len(text), client_id
)
else:
results.append("Vector: Not available (Modal credentials needed)")
return " | ".join(results)
def _search_memvid_only(
self, query: str, client_id: str, memory_name: str, top_k: int
) -> str:
"""Search using memvid only."""
start_time = time.time()
result = self.memvid_manager.search_memory(query, client_id, memory_name, top_k)
if self.metrics:
self.metrics.track_search_operation(
"memvid", time.time() - start_time, top_k
)
# Convert dict to JSON string for MCP interface
if isinstance(result, dict):
return json.dumps(result, indent=2)
return result
def _search_vector_only(
self, query: str, client_id: str, memory_name: str, top_k: int
) -> str:
"""Search using vector storage only."""
if not self.vector_manager:
return json.dumps(
{"error": "Vector storage not available (Modal credentials needed)"}
)
start_time = time.time()
result = self.vector_manager.search_memory(query, client_id, top_k=top_k)
if self.metrics:
self.metrics.track_search_operation(
"vector", time.time() - start_time, top_k
)
# Convert dict to JSON string for MCP interface
if isinstance(result, dict):
return json.dumps(result, indent=2)
return result
def _search_dual_mode(
self, query: str, client_id: str, memory_name: str, top_k: int
) -> str:
"""Search using both backends with performance comparison."""
# Search memvid first
memvid_data = {"error": "Memvid search not attempted"}
memvid_time = 0
start_time = time.time()
memvid_result = self.memvid_manager.search_memory(
query, client_id, memory_name, top_k
)
memvid_time = time.time() - start_time
# Handle memvid result - Modal clients should return dicts
memvid_data = (
memvid_result
if isinstance(memvid_result, dict)
else {
"error": f"Unexpected memvid type: {type(memvid_result)}",
"content": str(memvid_result)[:200],
}
)
# Search vector second
vector_data = {"error": "Vector search not attempted"}
vector_time = 0
if self.vector_manager:
start_time = time.time()
vector_result = self.vector_manager.search_memory(
query, client_id, memory_name=memory_name, top_k=top_k
)
vector_time = time.time() - start_time
# Handle vector result - Modal clients should return dicts
vector_data = (
vector_result
if isinstance(vector_result, dict)
else {
"error": f"Unexpected vector type: {type(vector_result)}",
"content": str(vector_result)[:200],
}
)
else:
vector_data = {"error": "Vector storage not available"}
# Track comparison metrics
if self.metrics:
self.metrics.track_dual_search_comparison(
memvid_time, vector_time, query, client_id
)
# Return comparison results
return json.dumps(
{
"query": query,
"client_id": client_id,
"memory_name": memory_name,
"dual_search_results": {
"memvid": {
"time_ms": round(memvid_time * 1000, 2),
"results": memvid_data,
},
"vector": {
"time_ms": round(vector_time * 1000, 2),
"results": vector_data,
},
},
"performance_winner": (
"memvid" if memvid_time < vector_time else "vector"
),
},
indent=2,
)