Spaces:
Running
Running
eldarski
π₯ Memvid MCP Server - Hackathon Submission - Complete MCP server with 24 tools for video-based AI memory storage - Dual storage with Modal GPU acceleration - Ready for Agents-MCP-Hackathon Track 1
168b0da
| """ | |
| Modal Memvid Service - GPU-accelerated video memory processing | |
| This service provides: | |
| - GPU-accelerated video processing using memvid library | |
| - QR code generation and decoding optimization | |
| - Modal object storage for MP4 files | |
| - Auto-scaling based on video processing workload | |
| """ | |
| import os | |
| import time | |
| import json | |
| import modal | |
| from typing import List, Dict, Any, Optional | |
| # Modal App Configuration | |
| app = modal.App("memvid-video-service") | |
| # Docker image with all video processing dependencies | |
| memvid_image = ( | |
| modal.Image.debian_slim() | |
| .pip_install( | |
| [ | |
| "memvid>=0.1.0", | |
| "opencv-python-headless>=4.8.0", | |
| "pillow>=9.5.0", | |
| "qrcode>=7.4.2", | |
| "pyzbar>=0.1.9", # QR code decoding | |
| "numpy>=1.24.0", | |
| "torch>=2.0.0", # PyTorch for GPU acceleration | |
| ] | |
| ) | |
| .apt_install( | |
| [ | |
| "libzbar0", # For QR code decoding | |
| "ffmpeg", # For video processing | |
| "libgl1-mesa-glx", # OpenCV dependencies | |
| "libglib2.0-0", | |
| ] | |
| ) | |
| ) | |
| # Volume for persistent video storage | |
| videos_volume = modal.Volume.from_name("memvid-videos", create_if_missing=True) | |
| def process_video_memory( | |
| text: str, client_id: str, metadata: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """ | |
| GPU-accelerated video memory processing on Modal | |
| Args: | |
| text: Text content to store as video memory | |
| client_id: Unique identifier for the client/user | |
| metadata: Additional metadata for the memory | |
| Returns: | |
| Dict with processing results and metrics | |
| """ | |
| import sys | |
| sys.path.append("/storage") | |
| from memvid import MemvidEncoder, MemvidRetriever | |
| import shutil | |
| import uuid | |
| start_time = time.time() | |
| processing_metrics = {"gpu_used": "T4", "cpu_count": 4, "memory_gb": 8} | |
| try: | |
| # Setup storage paths in Modal volume | |
| client_storage_path = f"/storage/{client_id}" | |
| os.makedirs(client_storage_path, exist_ok=True) | |
| print(f"π¬ Processing video memory for client: {client_id}") | |
| print(f"π Text content: {text[:100]}...") | |
| # Initialize memvid encoder with Modal storage | |
| encoder = MemvidEncoder() | |
| # Process video memory with GPU acceleration | |
| video_start_time = time.time() | |
| # Add text to encoder and build video | |
| encoder.add_text(text) | |
| # Create output paths | |
| video_file = f"{client_storage_path}/videos/memory_{int(time.time())}.mp4" | |
| index_file = ( | |
| f"{client_storage_path}/videos/memory_{int(time.time())}_index.json" | |
| ) | |
| # Ensure directories exist | |
| os.makedirs(os.path.dirname(video_file), exist_ok=True) | |
| # Build video with QR codes | |
| result = encoder.build_video(video_file, index_file) | |
| video_processing_time = time.time() - video_start_time | |
| processing_metrics["video_processing_time"] = video_processing_time | |
| # Get file information | |
| video_files = [] | |
| chunk_files = [] | |
| if os.path.exists(client_storage_path): | |
| # Find video files | |
| videos_dir = os.path.join(client_storage_path, "videos") | |
| if os.path.exists(videos_dir): | |
| for file in os.listdir(videos_dir): | |
| if file.endswith(".mp4"): | |
| file_path = os.path.join(videos_dir, file) | |
| file_size = os.path.getsize(file_path) | |
| video_files.append( | |
| { | |
| "filename": file, | |
| "size_bytes": file_size, | |
| "path": file_path, | |
| } | |
| ) | |
| # Find chunk files | |
| chunks_dir = os.path.join(client_storage_path, "chunks") | |
| if os.path.exists(chunks_dir): | |
| for file in os.listdir(chunks_dir): | |
| if file.endswith(".txt"): | |
| file_path = os.path.join(chunks_dir, file) | |
| file_size = os.path.getsize(file_path) | |
| chunk_files.append( | |
| { | |
| "filename": file, | |
| "size_bytes": file_size, | |
| "path": file_path, | |
| } | |
| ) | |
| # Calculate storage metrics | |
| total_video_size = sum(f["size_bytes"] for f in video_files) | |
| total_chunks_size = sum(f["size_bytes"] for f in chunk_files) | |
| processing_metrics.update( | |
| { | |
| "video_files_count": len(video_files), | |
| "chunk_files_count": len(chunk_files), | |
| "total_video_size": total_video_size, | |
| "total_chunks_size": total_chunks_size, | |
| "total_storage_size": total_video_size + total_chunks_size, | |
| } | |
| ) | |
| # Generate unique memory ID | |
| memory_id = f"modal_video_{client_id}_{int(time.time())}_{uuid.uuid4().hex[:8]}" | |
| total_time = time.time() - start_time | |
| processing_metrics["total_time"] = total_time | |
| print(f"β Video memory processed successfully") | |
| print(f"π Created {len(video_files)} videos, {len(chunk_files)} chunks") | |
| print(f"πΎ Total storage: {total_video_size + total_chunks_size} bytes") | |
| print(f"β±οΈ Processing time: {total_time:.2f}s") | |
| return { | |
| "success": True, | |
| "memory_id": memory_id, | |
| "client_id": client_id, | |
| "video_files": video_files, | |
| "chunk_files": chunk_files, | |
| "processing_metrics": processing_metrics, | |
| "metadata": metadata, | |
| "storage_path": client_storage_path, | |
| "infrastructure": "Modal + T4 GPU + Volume Storage", | |
| } | |
| except Exception as e: | |
| print(f"β Error in video processing: {str(e)}") | |
| processing_metrics["error_time"] = time.time() - start_time | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "processing_metrics": processing_metrics, | |
| "infrastructure": "Modal + T4 GPU + Volume Storage", | |
| } | |
| def search_video_memory( | |
| query: str, client_id: str, memory_name: Optional[str] = None, top_k: int = 5 | |
| ) -> Dict[str, Any]: | |
| """ | |
| GPU-accelerated video memory search on Modal | |
| Args: | |
| query: Search query text | |
| client_id: Client identifier to search within | |
| memory_name: Optional specific memory name filter | |
| top_k: Number of top results to return | |
| Returns: | |
| Dict with search results and metrics | |
| """ | |
| import sys | |
| sys.path.append("/storage") | |
| from memvid import MemvidEncoder, MemvidRetriever | |
| start_time = time.time() | |
| try: | |
| print(f"π Searching video memory for query: {query}") | |
| print(f"π€ Client: {client_id}") | |
| # Initialize memvid retriever with Modal storage | |
| client_storage_path = f"/storage/{client_id}" | |
| # Find video files for this client | |
| videos_dir = os.path.join(client_storage_path, "videos") | |
| video_files = [] | |
| if os.path.exists(videos_dir): | |
| for file in os.listdir(videos_dir): | |
| if file.endswith(".mp4"): | |
| video_files.append(os.path.join(videos_dir, file)) | |
| if not video_files: | |
| return { | |
| "success": True, | |
| "query": query, | |
| "client_id": client_id, | |
| "results": [], | |
| "total_results": 0, | |
| "message": "No video memories found for this client", | |
| "processing_metrics": { | |
| "search_time": 0, | |
| "total_time": time.time() - start_time, | |
| "gpu_used": "T4", | |
| "infrastructure": "Modal + Video Processing", | |
| }, | |
| } | |
| # Perform video-based search | |
| search_start_time = time.time() | |
| # Search through available video files | |
| results = [] | |
| for video_file in video_files[:1]: # Search first video for now | |
| try: | |
| # Find corresponding index file | |
| index_file = video_file.replace(".mp4", "_index.json") | |
| if not os.path.exists(index_file): | |
| # Try alternative index file naming | |
| index_file = video_file.replace(".mp4", ".json") | |
| if not os.path.exists(index_file): | |
| print(f"No index file found for {video_file}") | |
| continue | |
| # Initialize retriever with video and index files | |
| retriever = MemvidRetriever(video_file, index_file) | |
| video_results = retriever.search(query, top_k=top_k) | |
| if video_results: | |
| results.extend(video_results) | |
| except Exception as e: | |
| print(f"Error searching video {video_file}: {e}") | |
| continue | |
| search_time = time.time() - search_start_time | |
| # Format results for consistency | |
| formatted_results = [] | |
| if isinstance(results, list): | |
| for i, result in enumerate(results[:top_k]): | |
| if isinstance(result, dict): | |
| formatted_results.append( | |
| { | |
| "memory_id": result.get("id", f"video_result_{i}"), | |
| "text": result.get("text", result.get("content", "")), | |
| "metadata": result.get("metadata", {}), | |
| "similarity_score": result.get( | |
| "score", 0.8 | |
| ), # Default score | |
| "video_file": result.get("video_file", ""), | |
| "chunk_file": result.get("chunk_file", ""), | |
| } | |
| ) | |
| elif isinstance(result, str): | |
| formatted_results.append( | |
| { | |
| "memory_id": f"video_result_{i}", | |
| "text": result, | |
| "metadata": {}, | |
| "similarity_score": 0.75, | |
| "video_file": "", | |
| "chunk_file": "", | |
| } | |
| ) | |
| elif isinstance(results, str): | |
| # Single result | |
| formatted_results.append( | |
| { | |
| "memory_id": "video_result_0", | |
| "text": results, | |
| "metadata": {}, | |
| "similarity_score": 0.8, | |
| "video_file": "", | |
| "chunk_file": "", | |
| } | |
| ) | |
| total_time = time.time() - start_time | |
| print(f"β Video search completed") | |
| print(f"π Found {len(formatted_results)} results") | |
| print(f"β±οΈ Search time: {search_time:.2f}s, Total time: {total_time:.2f}s") | |
| return { | |
| "success": True, | |
| "query": query, | |
| "client_id": client_id, | |
| "results": formatted_results, | |
| "total_results": len(formatted_results), | |
| "processing_metrics": { | |
| "search_time": search_time, | |
| "total_time": total_time, | |
| "gpu_used": "T4", | |
| "infrastructure": "Modal + Video Processing", | |
| }, | |
| } | |
| except Exception as e: | |
| print(f"β Error in video search: {str(e)}") | |
| return { | |
| "success": False, | |
| "error": str(e), | |
| "processing_time": time.time() - start_time, | |
| "results": [], | |
| "infrastructure": "Modal + T4 GPU + Volume Storage", | |
| } | |
| def get_video_stats(client_id: str) -> Dict[str, Any]: | |
| """ | |
| Get statistics for video storage | |
| Args: | |
| client_id: Client identifier | |
| Returns: | |
| Dict with storage statistics | |
| """ | |
| import os | |
| import json | |
| try: | |
| client_storage_path = f"/storage/{client_id}" | |
| if not os.path.exists(client_storage_path): | |
| return { | |
| "client_id": client_id, | |
| "storage_type": "modal_video", | |
| "memory_count": 0, | |
| "total_video_size": 0, | |
| "total_chunks": 0, | |
| "infrastructure": "Modal + T4 GPU + Volume Storage", | |
| } | |
| # Count video files | |
| videos_dir = os.path.join(client_storage_path, "videos") | |
| video_count = 0 | |
| total_video_size = 0 | |
| if os.path.exists(videos_dir): | |
| for file in os.listdir(videos_dir): | |
| if file.endswith(".mp4"): | |
| video_count += 1 | |
| file_path = os.path.join(videos_dir, file) | |
| total_video_size += os.path.getsize(file_path) | |
| # Count chunk files | |
| chunks_dir = os.path.join(client_storage_path, "chunks") | |
| chunk_count = 0 | |
| total_chunks_size = 0 | |
| if os.path.exists(chunks_dir): | |
| for file in os.listdir(chunks_dir): | |
| if file.endswith(".txt"): | |
| chunk_count += 1 | |
| file_path = os.path.join(chunks_dir, file) | |
| total_chunks_size += os.path.getsize(file_path) | |
| # Get metadata if available | |
| metadata_file = os.path.join(client_storage_path, "metadata.json") | |
| first_memory = None | |
| last_memory = None | |
| if os.path.exists(metadata_file): | |
| try: | |
| with open(metadata_file, "r") as f: | |
| metadata = json.load(f) | |
| # Extract creation times if available | |
| first_memory = metadata.get("first_memory") | |
| last_memory = metadata.get("last_memory") | |
| except: | |
| pass | |
| return { | |
| "client_id": client_id, | |
| "storage_type": "modal_video", | |
| "memory_count": video_count, | |
| "total_video_size": total_video_size, | |
| "total_chunks": chunk_count, | |
| "total_chunks_size": total_chunks_size, | |
| "total_storage_size": total_video_size + total_chunks_size, | |
| "first_memory": first_memory, | |
| "last_memory": last_memory, | |
| "infrastructure": "Modal + T4 GPU + Volume Storage", | |
| "storage_path": client_storage_path, | |
| } | |
| except Exception as e: | |
| return { | |
| "client_id": client_id, | |
| "storage_type": "modal_video", | |
| "error": str(e), | |
| "infrastructure": "Modal + T4 GPU + Volume Storage", | |
| } | |
| # Client class for easy integration with DualStorageManager | |
| class ModalMemvidClient: | |
| """Client for interacting with Modal Memvid Service""" | |
| def __init__(self, modal_token: Optional[str] = None): | |
| """ | |
| Initialize Modal Memvid Client | |
| Args: | |
| modal_token: Optional Modal token (uses environment if not provided) | |
| """ | |
| if modal_token: | |
| os.environ["MODAL_TOKEN"] = modal_token | |
| # Test Modal connection | |
| try: | |
| import modal | |
| print("β Modal Memvid Client initialized successfully") | |
| except Exception as e: | |
| print(f"β οΈ Modal Memvid Client initialization warning: {e}") | |
| def store_memory( | |
| self, text: str, client_id: str, metadata: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Store memory using Modal memvid service""" | |
| try: | |
| # Use the deployed app's function with correct Modal calling pattern | |
| import modal | |
| func = modal.Function.from_name( | |
| "memvid-video-service", "process_video_memory" | |
| ) | |
| return func.remote(text, client_id, metadata) | |
| except Exception as e: | |
| return {"success": False, "error": f"Modal memvid storage failed: {e}"} | |
| def search_memory( | |
| self, | |
| query: str, | |
| client_id: str, | |
| memory_name: Optional[str] = None, | |
| top_k: int = 5, | |
| ) -> Dict[str, Any]: | |
| """Search memory using Modal memvid service""" | |
| try: | |
| # Use the deployed app's function with correct Modal calling pattern | |
| import modal | |
| func = modal.Function.from_name( | |
| "memvid-video-service", "search_video_memory" | |
| ) | |
| return func.remote(query, client_id, memory_name, top_k) | |
| except Exception as e: | |
| return { | |
| "success": False, | |
| "error": f"Modal memvid search failed: {e}", | |
| "results": [], | |
| } | |
| def get_stats(self, client_id: str) -> Dict[str, Any]: | |
| """Get statistics using Modal memvid service""" | |
| try: | |
| # Use the deployed app's function with correct Modal calling pattern | |
| import modal | |
| func = modal.Function.from_name("memvid-video-service", "get_video_stats") | |
| return func.remote(client_id) | |
| except Exception as e: | |
| return {"success": False, "error": f"Modal memvid stats failed: {e}"} | |
| def list_memories(self, client_id: str) -> str: | |
| """List memories for client (Modal implementation)""" | |
| try: | |
| stats = self.get_stats(client_id) | |
| if stats.get( | |
| "success", True | |
| ): # Modal stats don't have success field currently | |
| memory_list = { | |
| "client_id": client_id, | |
| "storage_type": "modal_video", | |
| "memory_count": stats.get("memory_count", 0), | |
| "memories": [], # Modal doesn't currently track individual memory names | |
| "total_size": stats.get("total_storage_size", 0), | |
| "infrastructure": "Modal + T4 GPU + Volume Storage", | |
| } | |
| return json.dumps(memory_list, indent=2) | |
| else: | |
| return json.dumps( | |
| { | |
| "error": f"Failed to list memories: {stats.get('error', 'Unknown error')}" | |
| } | |
| ) | |
| except Exception as e: | |
| return json.dumps({"error": f"Modal memvid list_memories failed: {e}"}) | |
| def build_memory_video(self, client_id: str, memory_name: str) -> str: | |
| """Build memory video (Modal implementation)""" | |
| # For Modal, videos are built automatically during storage | |
| return f"Memory videos are automatically built during storage in Modal for client {client_id}. Memory name: {memory_name}" | |
| def chat_with_memory(self, query: str, client_id: str, memory_name: str) -> str: | |
| """Chat with memory using Modal memvid service""" | |
| try: | |
| # Use search as basis for chat | |
| search_results = self.search_memory(query, client_id, memory_name, top_k=3) | |
| if search_results.get("success", False): | |
| results = search_results.get("results", []) | |
| if results: | |
| # Simple chat response based on search results | |
| context = "\n".join( | |
| [result.get("text", "") for result in results[:2]] | |
| ) | |
| response = f"Based on your memories: {context}\n\nYour query '{query}' relates to the stored information above." | |
| return response | |
| else: | |
| return f"I couldn't find any relevant memories for '{query}' in your video storage." | |
| else: | |
| return f"Error accessing memories: {search_results.get('error', 'Unknown error')}" | |
| except Exception as e: | |
| return f"Modal memvid chat failed: {e}" | |
| def delete_memory(self, client_id: str, memory_name: str) -> str: | |
| """Delete memory (Modal implementation)""" | |
| # Modal currently doesn't support selective deletion | |
| return f"Memory deletion not yet implemented in Modal for client {client_id}, memory {memory_name}" | |
| def get_memory_stats(self, client_id: str) -> str: | |
| """Get memory statistics as JSON string""" | |
| try: | |
| stats = self.get_stats(client_id) | |
| return json.dumps(stats, indent=2) | |
| except Exception as e: | |
| return json.dumps({"error": f"Modal memvid get_memory_stats failed: {e}"}) | |
| if __name__ == "__main__": | |
| # Test the Modal functions locally | |
| print("π§ͺ Testing Modal Memvid Service...") | |
| # Test client | |
| client = ModalMemvidClient() | |
| # Test storage | |
| result = client.store_memory( | |
| "This is a test memory for Modal video storage with GPU acceleration", | |
| "test_client", | |
| {"test": True, "timestamp": time.time()}, | |
| ) | |
| print(f"π¬ Storage result: {result}") | |
| # Test search | |
| search_result = client.search_memory("test memory GPU", "test_client", top_k=3) | |
| print(f"π Search result: {search_result}") | |
| # Test stats | |
| stats = client.get_stats("test_client") | |
| print(f"οΏ½οΏ½ Stats: {stats}") | |