memvid-mcp / modal_vector_service.py
eldarski
πŸŽ₯ Memvid MCP Server - Hackathon Submission - Complete MCP server with 24 tools for video-based AI memory storage - Dual storage with Modal GPU acceleration - Ready for Agents-MCP-Hackathon Track 1
168b0da
"""
Modal Vector Service - GPU-accelerated vector memory processing
This service provides:
- GPU-accelerated embedding generation using sentence-transformers
- FAISS with Modal Volume storage for scalable vector search
- FAISS for fast similarity search optimization
- Auto-scaling based on workload
"""
import os
import time
import json
import modal
import asyncio
from typing import List, Dict, Any, Optional
# Modal App Configuration
app = modal.App("memvid-vector-service")
# Docker image with all vector processing dependencies
vector_image = modal.Image.debian_slim().pip_install(
[
"sentence-transformers>=2.0.0",
"faiss-cpu>=1.8.0",
"numpy>=1.24.0",
"scikit-learn>=1.3.0", # For additional vector operations
]
)
# Volume for persistent model storage
models_volume = modal.Volume.from_name("vector-models", create_if_missing=True)
@app.function(
image=vector_image,
gpu="A100", # High-performance GPU for embedding generation
volumes={"/models": models_volume},
timeout=600, # 10 minutes timeout for large operations
)
def process_vector_memory(
text: str, client_id: str, metadata: Dict[str, Any]
) -> Dict[str, Any]:
"""
GPU-accelerated vector memory processing on Modal
Args:
text: Text content to store as vector embeddings
client_id: Unique identifier for the client/user
metadata: Additional metadata for the memory
Returns:
Dict with processing results and metrics
"""
import numpy as np
from sentence_transformers import SentenceTransformer
import json
start_time = time.time()
try:
# Load or download sentence transformer model (cached in volume)
model_path = "/models/sentence-transformer"
if not os.path.exists(model_path):
print("πŸ“₯ Downloading sentence transformer model...")
model = SentenceTransformer("all-MiniLM-L6-v2", device="cuda")
model.save(model_path)
else:
print("πŸ“‚ Loading cached sentence transformer model...")
model = SentenceTransformer(model_path, device="cuda")
# Generate embeddings on GPU
print(f"πŸš€ Generating embeddings for text: {text[:100]}...")
embeddings = model.encode([text], device="cuda")
embedding_vector = embeddings[0].tolist() # Convert to list for JSON storage
# Calculate processing metrics
embedding_time = time.time() - start_time
# Store vector in Modal Volume with FAISS index
import faiss
import pickle
storage_path = f"/models/vectors/{client_id}"
os.makedirs(storage_path, exist_ok=True)
# Load or create FAISS index
index_path = f"{storage_path}/faiss_index.bin"
metadata_path = f"{storage_path}/metadata.json"
if os.path.exists(index_path):
print("πŸ“‚ Loading existing FAISS index...")
index = faiss.read_index(index_path)
with open(metadata_path, "r") as f:
all_metadata = json.load(f)
else:
print("πŸ†• Creating new FAISS index...")
# Create FAISS index for 384-dimensional vectors
index = faiss.IndexFlatIP(384) # Inner product for cosine similarity
all_metadata = []
# Add vector to index
vector_array = np.array([embedding_vector], dtype=np.float32)
# Normalize for cosine similarity
faiss.normalize_L2(vector_array)
index.add(vector_array)
# Store metadata
memory_id = f"vector_{len(all_metadata)}"
memory_metadata = {
"id": memory_id,
"client_id": client_id,
"text": text,
"metadata": metadata,
"created_at": time.time(),
}
all_metadata.append(memory_metadata)
# Save updated index and metadata
faiss.write_index(index, index_path)
with open(metadata_path, "w") as f:
json.dump(all_metadata, f)
print(
f"βœ… Vector memory stored with ID: {memory_id} (FAISS index size: {index.ntotal})"
)
total_time = time.time() - start_time
return {
"success": True,
"memory_id": memory_id,
"client_id": client_id,
"embedding_dim": len(embedding_vector),
"embedding_preview": embedding_vector[:5], # First 5 dimensions for preview
"processing_metrics": {
"embedding_time": embedding_time,
"total_time": total_time,
"storage_size": len(embedding_vector) * 4, # 4 bytes per float32
"gpu_used": "A100",
"model_used": "all-MiniLM-L6-v2",
},
"metadata": metadata,
"infrastructure": "Modal + A100 GPU + FAISS + Volume Storage",
}
except Exception as e:
print(f"❌ Error in vector processing: {str(e)}")
return {
"success": False,
"error": str(e),
"processing_time": time.time() - start_time,
"infrastructure": "Modal + A100 GPU + FAISS + Volume Storage",
}
@app.function(
image=vector_image,
gpu="A100",
volumes={"/models": models_volume},
timeout=300, # 5 minutes timeout for search operations
)
def search_vector_memory(
query: str, client_id: str, memory_name: Optional[str] = None, top_k: int = 5
) -> Dict[str, Any]:
"""
Ultra-fast vector similarity search on Modal
Args:
query: Search query text
client_id: Client identifier to search within
memory_name: Optional specific memory name filter
top_k: Number of top results to return
Returns:
Dict with search results and metrics
"""
import numpy as np
from sentence_transformers import SentenceTransformer
import json
start_time = time.time()
try:
# Load model for query embedding
model_path = "/models/sentence-transformer"
model = SentenceTransformer(model_path, device="cuda")
# Generate query embedding
query_embedding = model.encode([query], device="cuda")[0].tolist()
embedding_time = time.time() - start_time
# Search in Modal Volume with FAISS
storage_path = f"/models/vectors/{client_id}"
index_path = f"{storage_path}/faiss_index.bin"
metadata_path = f"{storage_path}/metadata.json"
if os.path.exists(index_path) and os.path.exists(metadata_path):
print("πŸ” Searching in FAISS index...")
import faiss
# Load FAISS index and metadata
index = faiss.read_index(index_path)
with open(metadata_path, "r") as f:
all_metadata = json.load(f)
# Prepare query vector
query_vector = np.array([query_embedding], dtype=np.float32)
faiss.normalize_L2(query_vector)
# Perform similarity search
scores, indices = index.search(query_vector, min(top_k, index.ntotal))
# Format results
formatted_results = []
for i, (score, idx) in enumerate(zip(scores[0], indices[0])):
if idx < len(all_metadata): # Valid index
metadata_item = all_metadata[idx]
formatted_results.append(
{
"memory_id": metadata_item["id"],
"text": metadata_item["text"],
"metadata": metadata_item.get("metadata", {}),
"similarity_score": float(score),
"distance": 1 - float(score),
}
)
else:
# No stored vectors yet
formatted_results = []
search_time = time.time() - start_time
return {
"success": True,
"query": query,
"client_id": client_id,
"results": formatted_results,
"total_results": len(formatted_results),
"processing_metrics": {
"embedding_time": embedding_time,
"search_time": search_time - embedding_time,
"total_time": search_time,
"gpu_used": "A100",
"model_used": "all-MiniLM-L6-v2",
},
"infrastructure": "Modal + A100 GPU + FAISS + Volume Storage",
}
except Exception as e:
print(f"❌ Error in vector search: {str(e)}")
return {
"success": False,
"error": str(e),
"processing_time": time.time() - start_time,
"results": [],
"infrastructure": "Modal + A100 GPU + FAISS + Volume Storage",
}
@app.function(
image=vector_image,
volumes={"/models": models_volume},
timeout=60,
)
def get_vector_stats(client_id: str) -> Dict[str, Any]:
"""
Get statistics for vector storage
Args:
client_id: Client identifier
Returns:
Dict with storage statistics
"""
import json
import os
try:
storage_path = f"/models/vectors/{client_id}"
index_path = f"{storage_path}/faiss_index.bin"
metadata_path = f"{storage_path}/metadata.json"
if os.path.exists(index_path) and os.path.exists(metadata_path):
import faiss
# Load FAISS index and metadata
index = faiss.read_index(index_path)
with open(metadata_path, "r") as f:
all_metadata = json.load(f)
# Calculate stats
memory_count = len(all_metadata)
first_memory = (
min(item["created_at"] for item in all_metadata)
if all_metadata
else None
)
last_memory = (
max(item["created_at"] for item in all_metadata)
if all_metadata
else None
)
return {
"client_id": client_id,
"storage_type": "modal_vector_faiss",
"memory_count": memory_count,
"avg_embedding_dim": 384, # all-MiniLM-L6-v2 dimension
"index_size": index.ntotal,
"first_memory": (
time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(first_memory))
if first_memory
else None
),
"last_memory": (
time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(last_memory))
if last_memory
else None
),
"infrastructure": "Modal + A100 GPU + FAISS + Volume Storage",
}
else:
return {
"client_id": client_id,
"storage_type": "modal_vector_faiss",
"memory_count": 0,
"infrastructure": "Modal + A100 GPU + FAISS + Volume Storage",
"note": "No vectors stored yet",
}
except Exception as e:
return {
"client_id": client_id,
"storage_type": "modal_vector_faiss",
"error": str(e),
"infrastructure": "Modal + A100 GPU + FAISS + Volume Storage",
}
# Client class for easy integration with DualStorageManager
class ModalVectorClient:
"""Client for interacting with Modal Vector Service"""
def __init__(self, modal_token: Optional[str] = None):
"""
Initialize Modal Vector Client
Args:
modal_token: Optional Modal token (uses environment if not provided)
"""
if modal_token:
os.environ["MODAL_TOKEN"] = modal_token
# Test Modal connection
try:
import modal
print("βœ… Modal Vector Client initialized successfully")
except Exception as e:
print(f"⚠️ Modal Vector Client initialization warning: {e}")
def store_memory(
self, text: str, client_id: str, metadata: Dict[str, Any]
) -> Dict[str, Any]:
"""Store memory using Modal vector service"""
try:
# Use the deployed app's function with correct Modal calling pattern
import modal
func = modal.Function.from_name(
"memvid-vector-service", "process_vector_memory"
)
return func.remote(text, client_id, metadata)
except Exception as e:
return {"success": False, "error": f"Modal vector storage failed: {e}"}
def search_memory(
self,
query: str,
client_id: str,
memory_name: Optional[str] = None,
top_k: int = 5,
) -> Dict[str, Any]:
"""Search memory using Modal vector service"""
try:
# Use the deployed app's function with correct Modal calling pattern
import modal
func = modal.Function.from_name(
"memvid-vector-service", "search_vector_memory"
)
return func.remote(query, client_id, memory_name, top_k)
except Exception as e:
return {
"success": False,
"error": f"Modal vector search failed: {e}",
"results": [],
}
def get_stats(self, client_id: str) -> Dict[str, Any]:
"""Get statistics using Modal vector service"""
try:
# Use the deployed app's function with correct Modal calling pattern
import modal
func = modal.Function.from_name("memvid-vector-service", "get_vector_stats")
return func.remote(client_id)
except Exception as e:
return {"success": False, "error": f"Modal vector stats failed: {e}"}
def list_memories(self, client_id: str) -> str:
"""List memories for client (Modal vector implementation)"""
try:
stats = self.get_stats(client_id)
if stats.get(
"success", True
): # Modal stats don't have success field currently
memory_list = {
"client_id": client_id,
"storage_type": "modal_vector",
"memory_count": stats.get("memory_count", 0),
"memories": [], # Modal doesn't currently track individual memory names
"avg_embedding_dim": stats.get("avg_embedding_dim", 0),
"infrastructure": "Modal + A100 GPU + PostgreSQL + pgvector",
}
return json.dumps(memory_list, indent=2)
else:
return json.dumps(
{
"error": f"Failed to list memories: {stats.get('error', 'Unknown error')}"
}
)
except Exception as e:
return json.dumps({"error": f"Modal vector list_memories failed: {e}"})
def build_memory_video(self, client_id: str, memory_name: str) -> str:
"""Build memory video (not applicable for vector storage)"""
return f"Memory videos are not applicable for vector storage. Client: {client_id}, Memory: {memory_name}"
def chat_with_memory(self, query: str, client_id: str, memory_name: str) -> str:
"""Chat with memory using Modal vector service"""
try:
# Use search as basis for chat
search_results = self.search_memory(query, client_id, memory_name, top_k=3)
if search_results.get("success", False):
results = search_results.get("results", [])
if results:
# Simple chat response based on search results
context = "\n".join(
[result.get("text", "") for result in results[:2]]
)
response = f"Based on your vector memories: {context}\n\nYour query '{query}' relates to the stored information above."
return response
else:
return f"I couldn't find any relevant memories for '{query}' in your vector storage."
else:
return f"Error accessing memories: {search_results.get('error', 'Unknown error')}"
except Exception as e:
return f"Modal vector chat failed: {e}"
def delete_memory(self, client_id: str, memory_name: str) -> str:
"""Delete memory (Modal vector implementation)"""
# Modal currently doesn't support selective deletion
return f"Memory deletion not yet implemented in Modal vector storage for client {client_id}, memory {memory_name}"
def get_memory_stats(self, client_id: str) -> str:
"""Get memory statistics as JSON string"""
try:
stats = self.get_stats(client_id)
return json.dumps(stats, indent=2)
except Exception as e:
return json.dumps({"error": f"Modal vector get_memory_stats failed: {e}"})
# For compatibility with the dual storage manager method calls
def store_embedding(
self, text: str, client_id: str, metadata: Dict[str, Any]
) -> str:
"""Alias for store_memory for backward compatibility"""
result = self.store_memory(text, client_id, metadata)
return json.dumps(result) if isinstance(result, dict) else str(result)
def search_embeddings(self, query: str, client_id: str, top_k: int = 5) -> str:
"""Alias for search_memory for backward compatibility"""
result = self.search_memory(query, client_id, top_k=top_k)
return json.dumps(result) if isinstance(result, dict) else str(result)
if __name__ == "__main__":
# Test the Modal functions locally
print("πŸ§ͺ Testing Modal Vector Service...")
# Test client
client = ModalVectorClient()
# Test storage
result = client.store_memory(
"This is a test memory for Modal vector storage",
"test_client",
{"test": True, "timestamp": time.time()},
)
print(f"πŸ“₯ Storage result: {result}")
# Test search
search_result = client.search_memory("test memory", "test_client", top_k=3)
print(f"πŸ” Search result: {search_result}")
# Test stats
stats = client.get_stats("test_client")
print(f" Stats: {stats}")