""" Modal Vector Service - GPU-accelerated vector memory processing This service provides: - GPU-accelerated embedding generation using sentence-transformers - FAISS with Modal Volume storage for scalable vector search - FAISS for fast similarity search optimization - Auto-scaling based on workload """ import os import time import json import modal import asyncio from typing import List, Dict, Any, Optional # Modal App Configuration app = modal.App("memvid-vector-service") # Docker image with all vector processing dependencies vector_image = modal.Image.debian_slim().pip_install( [ "sentence-transformers>=2.0.0", "faiss-cpu>=1.8.0", "numpy>=1.24.0", "scikit-learn>=1.3.0", # For additional vector operations ] ) # Volume for persistent model storage models_volume = modal.Volume.from_name("vector-models", create_if_missing=True) @app.function( image=vector_image, gpu="A100", # High-performance GPU for embedding generation volumes={"/models": models_volume}, timeout=600, # 10 minutes timeout for large operations ) def process_vector_memory( text: str, client_id: str, metadata: Dict[str, Any] ) -> Dict[str, Any]: """ GPU-accelerated vector memory processing on Modal Args: text: Text content to store as vector embeddings client_id: Unique identifier for the client/user metadata: Additional metadata for the memory Returns: Dict with processing results and metrics """ import numpy as np from sentence_transformers import SentenceTransformer import json start_time = time.time() try: # Load or download sentence transformer model (cached in volume) model_path = "/models/sentence-transformer" if not os.path.exists(model_path): print("๐Ÿ“ฅ Downloading sentence transformer model...") model = SentenceTransformer("all-MiniLM-L6-v2", device="cuda") model.save(model_path) else: print("๐Ÿ“‚ Loading cached sentence transformer model...") model = SentenceTransformer(model_path, device="cuda") # Generate embeddings on GPU print(f"๐Ÿš€ Generating embeddings for text: {text[:100]}...") embeddings = model.encode([text], device="cuda") embedding_vector = embeddings[0].tolist() # Convert to list for JSON storage # Calculate processing metrics embedding_time = time.time() - start_time # Store vector in Modal Volume with FAISS index import faiss import pickle storage_path = f"/models/vectors/{client_id}" os.makedirs(storage_path, exist_ok=True) # Load or create FAISS index index_path = f"{storage_path}/faiss_index.bin" metadata_path = f"{storage_path}/metadata.json" if os.path.exists(index_path): print("๐Ÿ“‚ Loading existing FAISS index...") index = faiss.read_index(index_path) with open(metadata_path, "r") as f: all_metadata = json.load(f) else: print("๐Ÿ†• Creating new FAISS index...") # Create FAISS index for 384-dimensional vectors index = faiss.IndexFlatIP(384) # Inner product for cosine similarity all_metadata = [] # Add vector to index vector_array = np.array([embedding_vector], dtype=np.float32) # Normalize for cosine similarity faiss.normalize_L2(vector_array) index.add(vector_array) # Store metadata memory_id = f"vector_{len(all_metadata)}" memory_metadata = { "id": memory_id, "client_id": client_id, "text": text, "metadata": metadata, "created_at": time.time(), } all_metadata.append(memory_metadata) # Save updated index and metadata faiss.write_index(index, index_path) with open(metadata_path, "w") as f: json.dump(all_metadata, f) print( f"โœ… Vector memory stored with ID: {memory_id} (FAISS index size: {index.ntotal})" ) total_time = time.time() - start_time return { "success": True, "memory_id": memory_id, "client_id": client_id, "embedding_dim": len(embedding_vector), "embedding_preview": embedding_vector[:5], # First 5 dimensions for preview "processing_metrics": { "embedding_time": embedding_time, "total_time": total_time, "storage_size": len(embedding_vector) * 4, # 4 bytes per float32 "gpu_used": "A100", "model_used": "all-MiniLM-L6-v2", }, "metadata": metadata, "infrastructure": "Modal + A100 GPU + FAISS + Volume Storage", } except Exception as e: print(f"โŒ Error in vector processing: {str(e)}") return { "success": False, "error": str(e), "processing_time": time.time() - start_time, "infrastructure": "Modal + A100 GPU + FAISS + Volume Storage", } @app.function( image=vector_image, gpu="A100", volumes={"/models": models_volume}, timeout=300, # 5 minutes timeout for search operations ) def search_vector_memory( query: str, client_id: str, memory_name: Optional[str] = None, top_k: int = 5 ) -> Dict[str, Any]: """ Ultra-fast vector similarity search on Modal Args: query: Search query text client_id: Client identifier to search within memory_name: Optional specific memory name filter top_k: Number of top results to return Returns: Dict with search results and metrics """ import numpy as np from sentence_transformers import SentenceTransformer import json start_time = time.time() try: # Load model for query embedding model_path = "/models/sentence-transformer" model = SentenceTransformer(model_path, device="cuda") # Generate query embedding query_embedding = model.encode([query], device="cuda")[0].tolist() embedding_time = time.time() - start_time # Search in Modal Volume with FAISS storage_path = f"/models/vectors/{client_id}" index_path = f"{storage_path}/faiss_index.bin" metadata_path = f"{storage_path}/metadata.json" if os.path.exists(index_path) and os.path.exists(metadata_path): print("๐Ÿ” Searching in FAISS index...") import faiss # Load FAISS index and metadata index = faiss.read_index(index_path) with open(metadata_path, "r") as f: all_metadata = json.load(f) # Prepare query vector query_vector = np.array([query_embedding], dtype=np.float32) faiss.normalize_L2(query_vector) # Perform similarity search scores, indices = index.search(query_vector, min(top_k, index.ntotal)) # Format results formatted_results = [] for i, (score, idx) in enumerate(zip(scores[0], indices[0])): if idx < len(all_metadata): # Valid index metadata_item = all_metadata[idx] formatted_results.append( { "memory_id": metadata_item["id"], "text": metadata_item["text"], "metadata": metadata_item.get("metadata", {}), "similarity_score": float(score), "distance": 1 - float(score), } ) else: # No stored vectors yet formatted_results = [] search_time = time.time() - start_time return { "success": True, "query": query, "client_id": client_id, "results": formatted_results, "total_results": len(formatted_results), "processing_metrics": { "embedding_time": embedding_time, "search_time": search_time - embedding_time, "total_time": search_time, "gpu_used": "A100", "model_used": "all-MiniLM-L6-v2", }, "infrastructure": "Modal + A100 GPU + FAISS + Volume Storage", } except Exception as e: print(f"โŒ Error in vector search: {str(e)}") return { "success": False, "error": str(e), "processing_time": time.time() - start_time, "results": [], "infrastructure": "Modal + A100 GPU + FAISS + Volume Storage", } @app.function( image=vector_image, volumes={"/models": models_volume}, timeout=60, ) def get_vector_stats(client_id: str) -> Dict[str, Any]: """ Get statistics for vector storage Args: client_id: Client identifier Returns: Dict with storage statistics """ import json import os try: storage_path = f"/models/vectors/{client_id}" index_path = f"{storage_path}/faiss_index.bin" metadata_path = f"{storage_path}/metadata.json" if os.path.exists(index_path) and os.path.exists(metadata_path): import faiss # Load FAISS index and metadata index = faiss.read_index(index_path) with open(metadata_path, "r") as f: all_metadata = json.load(f) # Calculate stats memory_count = len(all_metadata) first_memory = ( min(item["created_at"] for item in all_metadata) if all_metadata else None ) last_memory = ( max(item["created_at"] for item in all_metadata) if all_metadata else None ) return { "client_id": client_id, "storage_type": "modal_vector_faiss", "memory_count": memory_count, "avg_embedding_dim": 384, # all-MiniLM-L6-v2 dimension "index_size": index.ntotal, "first_memory": ( time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(first_memory)) if first_memory else None ), "last_memory": ( time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(last_memory)) if last_memory else None ), "infrastructure": "Modal + A100 GPU + FAISS + Volume Storage", } else: return { "client_id": client_id, "storage_type": "modal_vector_faiss", "memory_count": 0, "infrastructure": "Modal + A100 GPU + FAISS + Volume Storage", "note": "No vectors stored yet", } except Exception as e: return { "client_id": client_id, "storage_type": "modal_vector_faiss", "error": str(e), "infrastructure": "Modal + A100 GPU + FAISS + Volume Storage", } # Client class for easy integration with DualStorageManager class ModalVectorClient: """Client for interacting with Modal Vector Service""" def __init__(self, modal_token: Optional[str] = None): """ Initialize Modal Vector Client Args: modal_token: Optional Modal token (uses environment if not provided) """ if modal_token: os.environ["MODAL_TOKEN"] = modal_token # Test Modal connection try: import modal print("โœ… Modal Vector Client initialized successfully") except Exception as e: print(f"โš ๏ธ Modal Vector Client initialization warning: {e}") def store_memory( self, text: str, client_id: str, metadata: Dict[str, Any] ) -> Dict[str, Any]: """Store memory using Modal vector service""" try: # Use the deployed app's function with correct Modal calling pattern import modal func = modal.Function.from_name( "memvid-vector-service", "process_vector_memory" ) return func.remote(text, client_id, metadata) except Exception as e: return {"success": False, "error": f"Modal vector storage failed: {e}"} def search_memory( self, query: str, client_id: str, memory_name: Optional[str] = None, top_k: int = 5, ) -> Dict[str, Any]: """Search memory using Modal vector service""" try: # Use the deployed app's function with correct Modal calling pattern import modal func = modal.Function.from_name( "memvid-vector-service", "search_vector_memory" ) return func.remote(query, client_id, memory_name, top_k) except Exception as e: return { "success": False, "error": f"Modal vector search failed: {e}", "results": [], } def get_stats(self, client_id: str) -> Dict[str, Any]: """Get statistics using Modal vector service""" try: # Use the deployed app's function with correct Modal calling pattern import modal func = modal.Function.from_name("memvid-vector-service", "get_vector_stats") return func.remote(client_id) except Exception as e: return {"success": False, "error": f"Modal vector stats failed: {e}"} def list_memories(self, client_id: str) -> str: """List memories for client (Modal vector implementation)""" try: stats = self.get_stats(client_id) if stats.get( "success", True ): # Modal stats don't have success field currently memory_list = { "client_id": client_id, "storage_type": "modal_vector", "memory_count": stats.get("memory_count", 0), "memories": [], # Modal doesn't currently track individual memory names "avg_embedding_dim": stats.get("avg_embedding_dim", 0), "infrastructure": "Modal + A100 GPU + PostgreSQL + pgvector", } return json.dumps(memory_list, indent=2) else: return json.dumps( { "error": f"Failed to list memories: {stats.get('error', 'Unknown error')}" } ) except Exception as e: return json.dumps({"error": f"Modal vector list_memories failed: {e}"}) def build_memory_video(self, client_id: str, memory_name: str) -> str: """Build memory video (not applicable for vector storage)""" return f"Memory videos are not applicable for vector storage. Client: {client_id}, Memory: {memory_name}" def chat_with_memory(self, query: str, client_id: str, memory_name: str) -> str: """Chat with memory using Modal vector service""" try: # Use search as basis for chat search_results = self.search_memory(query, client_id, memory_name, top_k=3) if search_results.get("success", False): results = search_results.get("results", []) if results: # Simple chat response based on search results context = "\n".join( [result.get("text", "") for result in results[:2]] ) response = f"Based on your vector memories: {context}\n\nYour query '{query}' relates to the stored information above." return response else: return f"I couldn't find any relevant memories for '{query}' in your vector storage." else: return f"Error accessing memories: {search_results.get('error', 'Unknown error')}" except Exception as e: return f"Modal vector chat failed: {e}" def delete_memory(self, client_id: str, memory_name: str) -> str: """Delete memory (Modal vector implementation)""" # Modal currently doesn't support selective deletion return f"Memory deletion not yet implemented in Modal vector storage for client {client_id}, memory {memory_name}" def get_memory_stats(self, client_id: str) -> str: """Get memory statistics as JSON string""" try: stats = self.get_stats(client_id) return json.dumps(stats, indent=2) except Exception as e: return json.dumps({"error": f"Modal vector get_memory_stats failed: {e}"}) # For compatibility with the dual storage manager method calls def store_embedding( self, text: str, client_id: str, metadata: Dict[str, Any] ) -> str: """Alias for store_memory for backward compatibility""" result = self.store_memory(text, client_id, metadata) return json.dumps(result) if isinstance(result, dict) else str(result) def search_embeddings(self, query: str, client_id: str, top_k: int = 5) -> str: """Alias for search_memory for backward compatibility""" result = self.search_memory(query, client_id, top_k=top_k) return json.dumps(result) if isinstance(result, dict) else str(result) if __name__ == "__main__": # Test the Modal functions locally print("๐Ÿงช Testing Modal Vector Service...") # Test client client = ModalVectorClient() # Test storage result = client.store_memory( "This is a test memory for Modal vector storage", "test_client", {"test": True, "timestamp": time.time()}, ) print(f"๐Ÿ“ฅ Storage result: {result}") # Test search search_result = client.search_memory("test memory", "test_client", top_k=3) print(f"๐Ÿ” Search result: {search_result}") # Test stats stats = client.get_stats("test_client") print(f" Stats: {stats}")